Browse Source

first commit

placidenduwayo 1 year ago
parent
commit
8722bd3bfb

+ 8
- 8
src/main/java/fr/natan/akkastreamfileprocessingapi/controller/MovieController.java View File

36
         return new ResponseEntity<>("is running", HttpStatus.OK);
36
         return new ResponseEntity<>("is running", HttpStatus.OK);
37
     }
37
     }
38
 
38
 
39
-    @RequestMapping(value = "/movies", method = RequestMethod.GET)
40
-    private ResponseEntity<String> getAllMovies() {
39
+    @RequestMapping(value = "/tvseries", method = RequestMethod.GET)
40
+    private ResponseEntity<String> getAllvSeries() {
41
         akkaStreamFilesProcessing.getAllTvSeries();
41
         akkaStreamFilesProcessing.getAllTvSeries();
42
         return new ResponseEntity<>("is running", HttpStatus.OK);
42
         return new ResponseEntity<>("is running", HttpStatus.OK);
43
     }
43
     }
44
 
44
 
45
-    @RequestMapping(value = "/movies/{movieTitle}", method = RequestMethod.GET)
46
-    private ResponseEntity<String> getMovieByTitle(@PathVariable(name ="movieTitle" ) String movieTitle){
47
-        akkaStreamFilesProcessing.getByTvSeriePrimaryTitle(movieTitle);
48
-        return new ResponseEntity<>("is running", HttpStatus.OK);
45
+    @RequestMapping(value = "/tvseries/{tvseriePrimaryTitle}", method = RequestMethod.GET)
46
+    private ResponseEntity<String> getTvserieByPrimaryTitle(@PathVariable(name ="tvseriePrimaryTitle" ) String tvseriePrimaryTitle){
47
+        akkaStreamFilesProcessing.getByTvSeriePrimaryTitle(tvseriePrimaryTitle);
48
+        return new ResponseEntity<>("OK", HttpStatus.OK);
49
     }
49
     }
50
 
50
 
51
-    @RequestMapping(value = "/movies/title/{tvSerieTitle}", method = RequestMethod.GET)
51
+    @RequestMapping(value = "/tvseries/title/{tvSerieTitle}", method = RequestMethod.GET)
52
     private ResponseEntity<String> getListOfPersonsIDByTvSerieID(@PathVariable(name = "tvSerieTitle") String tvSerieTitle){
52
     private ResponseEntity<String> getListOfPersonsIDByTvSerieID(@PathVariable(name = "tvSerieTitle") String tvSerieTitle){
53
-        akkaStreamFilesProcessing.generalize(tvSerieTitle);
53
+      akkaStreamFilesProcessing.getPersonsTeamForTvSerie(tvSerieTitle);
54
        return new ResponseEntity<>("is running", HttpStatus.OK);
54
        return new ResponseEntity<>("is running", HttpStatus.OK);
55
     }
55
     }
56
 }
56
 }

+ 2
- 18
src/main/scala/fr/natan/akkastreamfileprocessingapi/models/Models.scala View File

16
         ", birth-year:" + birthYear +
16
         ", birth-year:" + birthYear +
17
         ", dearth year:" + deathYear +
17
         ", dearth year:" + deathYear +
18
         ", primary profession:" + primaryProfession +
18
         ", primary profession:" + primaryProfession +
19
-        ", known for titles:" + knownForTitles +
19
+      ", known-for-titles"+knownForTitles+
20
         "]"
20
         "]"
21
     }
21
     }
22
-
23
-    def getNconst: String={
24
-      nconst
25
-    }
26
-    def getPrimaryName: String = {
27
-      primaryName
28
-    }
29
   }
22
   }
30
 
23
 
31
   final case class TvSeries(
24
   final case class TvSeries(
33
                              titleType: String,
26
                              titleType: String,
34
                              primaryTitle: String,
27
                              primaryTitle: String,
35
                              originalTitle: String,
28
                              originalTitle: String,
36
-                             isAdult: String,
37
                              startYear: String,
29
                              startYear: String,
38
                              endYear: String,
30
                              endYear: String,
39
                              runtimeMinutes: String,
31
                              runtimeMinutes: String,
45
         ", title-type:" + titleType +
37
         ", title-type:" + titleType +
46
         ", primary-title:" + primaryTitle + "" +
38
         ", primary-title:" + primaryTitle + "" +
47
         ", riginal-title:" + originalTitle +
39
         ", riginal-title:" + originalTitle +
48
-        ", is adult (no=0, yes=1):" + isAdult +
49
         ", start year:" + startYear +
40
         ", start year:" + startYear +
50
         ", end year:" + endYear +
41
         ", end year:" + endYear +
51
         ", runtime minutes:" + runtimeMinutes +
42
         ", runtime minutes:" + runtimeMinutes +
52
-        ", genre:" + genres + "]"
53
-    }
54
-
55
-   def getTconst: String= {
56
-     tconst
57
-   }
58
-    def getPrimaryTitle: String={
59
-      primaryTitle
43
+        ", genres:" + genres + "]"
60
     }
44
     }
61
   }
45
   }
62
 
46
 

+ 16
- 17
src/main/scala/fr/natan/akkastreamfileprocessingapi/models/ModelsBuilder.scala View File

2
 import Models.{Person, TvSeries}
2
 import Models.{Person, TvSeries}
3
 object ModelsBuilder {
3
 object ModelsBuilder {
4
 
4
 
5
-  def buildPersonModel(map: Map[String, String]): Person = {
5
+  def buildPersonModel(personMap: Map[String, String]): Person = {
6
     Person(
6
     Person(
7
-      map("nconst"),
8
-      map("primaryName"),
9
-      map("birthYear"),
10
-      map("deathYear"),
11
-      map("primaryProfession").split(",").toList,
12
-      map("knownForTitles").split(",").toList,
7
+      personMap("nconst"),
8
+      personMap("primaryName"),
9
+      personMap("birthYear"),
10
+      personMap("deathYear"),
11
+      personMap("primaryProfession").split(",").toList,
12
+      personMap("knownForTitles").split(",").toList
13
     )
13
     )
14
   }
14
   }
15
 
15
 
16
-  def buildTvSerieModel(map: Map[String, String]): TvSeries = {
16
+  def buildTvSerieModel(tvSerieMap: Map[String, String]): TvSeries = {
17
     val tvSerie: TvSeries = TvSeries(
17
     val tvSerie: TvSeries = TvSeries(
18
-      map("tconst"),
19
-      map("titleType"),
20
-      map("primaryTitle"),
21
-      map("originalTitle"),
22
-      map("isAdult"),
23
-      map("startYear"),
24
-      map("endYear"),
25
-      map("runtimeMinutes"),
26
-      map("genres")
18
+      tvSerieMap("tconst"),
19
+      tvSerieMap("titleType"),
20
+      tvSerieMap("primaryTitle"),
21
+      tvSerieMap("originalTitle"),
22
+      tvSerieMap("startYear"),
23
+      tvSerieMap("endYear"),
24
+      tvSerieMap("runtimeMinutes"),
25
+      tvSerieMap("genres")
27
     )
26
     )
28
     tvSerie
27
     tvSerie
29
   }
28
   }

+ 13
- 11
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamComponents.scala View File

48
   def filterByTvSeriePrimaryTitleFlow(tvSeriePrimaryTitle: String): Flow[Map[String, String], TvSeries, NotUsed] = {
48
   def filterByTvSeriePrimaryTitleFlow(tvSeriePrimaryTitle: String): Flow[Map[String, String], TvSeries, NotUsed] = {
49
     val filterFlow: Flow[Map[String, String], TvSeries, NotUsed] = Flow[Map[String, String]]
49
     val filterFlow: Flow[Map[String, String], TvSeries, NotUsed] = Flow[Map[String, String]]
50
         .filter((rows: Map[String, String]) => {
50
         .filter((rows: Map[String, String]) => {
51
-          rows.getOrElse(key = "primaryName", default = "")==tvSeriePrimaryTitle
51
+          rows.getOrElse(key = "primaryTitle", default = "")==tvSeriePrimaryTitle
52
         })
52
         })
53
         .map(rowMap => {
53
         .map(rowMap => {
54
-          val tvSerie: TvSeries = buildTvSerieModel(map = rowMap)
54
+          val tvSerie: TvSeries = buildTvSerieModel(tvSerieMap = rowMap)
55
           tvSerie
55
           tvSerie
56
         })
56
         })
57
 
57
 
66
           rowMap.getOrElse(key="nconst",default="")==nconst
66
           rowMap.getOrElse(key="nconst",default="")==nconst
67
         })
67
         })
68
         .map(rowMap=>{
68
         .map(rowMap=>{
69
-          buildPersonModel(map = rowMap)
69
+          buildPersonModel(personMap = rowMap)
70
         })
70
         })
71
 
71
 
72
     personFilter
72
     personFilter
80
           rowMap.getOrElse(key = "primaryName", default = "")==personName
80
           rowMap.getOrElse(key = "primaryName", default = "")==personName
81
         })
81
         })
82
         .map((rowMap: Map[String, String])=>{
82
         .map((rowMap: Map[String, String])=>{
83
-          buildPersonModel(map = rowMap)
83
+          buildPersonModel(personMap = rowMap)
84
         })
84
         })
85
 
85
 
86
     personFilter
86
     personFilter
103
       }
103
       }
104
       )
104
       )
105
       .via(Compression.gunzip())
105
       .via(Compression.gunzip())
106
-      .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
106
+      .via(CsvParsing.lineScanner(CsvParsing.Tab))
107
       .via(CsvToMap.toMapAsStrings())
107
       .via(CsvToMap.toMapAsStrings())
108
 
108
 
109
     datasource
109
     datasource
121
   //sinks building
121
   //sinks building
122
   //sink1
122
   //sink1
123
   def buildTvSeriesSink(logger: Logger): Sink[TvSeries, Future[Done]] = {
123
   def buildTvSeriesSink(logger: Logger): Sink[TvSeries, Future[Done]] = {
124
-    val tvSeriesSink : Sink[TvSeries, Future[Done]]= Sink.foreach[TvSeries](
125
-      (tvSerie: TvSeries) => {
126
-        logger.info(s"${tvSerie.toString}")
127
-      }
128
-    )
124
+    val tvSeriesSink : Sink[TvSeries, Future[Done]]= Sink
125
+      .foreach((tvSerie: TvSeries)=> logger.info(s"${tvSerie.toString}"))
129
     tvSeriesSink
126
     tvSeriesSink
130
   }
127
   }
131
 
128
 
129
+  def buildAllTvSeriesSink(logger: Logger): Sink[TvSeries, Future[Done]] = {
130
+    val tvSeriesSink: Sink[TvSeries, Future[Done]] = Sink
131
+      .foreach((tvSerie: TvSeries)=>logger.info(s"${tvSerie.toString}"))
132
+    tvSeriesSink
133
+  }
132
   //sink2
134
   //sink2
133
   def buildAllPersonsSink(logger: Logger): Sink[Person,Future[Done]] = {
135
   def buildAllPersonsSink(logger: Logger): Sink[Person,Future[Done]] = {
134
     val listPersonsSink: Sink[Person, Future[Done]]=
136
     val listPersonsSink: Sink[Person, Future[Done]]=
135
-      Sink.foreach[Person]((person: Person)=>logger.info(s"${person.toString}"))
137
+      Sink.foreach((person: Person)=>logger.info(s"${person.toString}"))
136
     listPersonsSink
138
     listPersonsSink
137
   }
139
   }
138
 
140
 

+ 7
- 4
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamFileProcessing.scala View File

1
 package fr.natan.akkastreamfileprocessingapi.service
1
 package fr.natan.akkastreamfileprocessingapi.service
2
 
2
 
3
-import fr.natan.akkastreamfileprocessingapi.models.Models.Person
3
+import akka.stream.scaladsl.Source
4
+import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSeries}
5
+
6
+import scala.concurrent.Future
4
 
7
 
5
 trait AkkaStreamFileProcessing {
8
 trait AkkaStreamFileProcessing {
6
   def getAllPersons(): Unit
9
   def getAllPersons(): Unit
7
 
10
 
8
-  def getPersonById(nconst: String): Person
11
+  def getPersonById(nconst: String): Future[Person]
9
   def getPersonByName(primaryName: String): Unit
12
   def getPersonByName(primaryName: String): Unit
10
 
13
 
11
   def getAllTvSeries(): Unit
14
   def getAllTvSeries(): Unit
12
 
15
 
13
-  def getByTvSeriePrimaryTitle(tvSerieTitle: String): Unit
16
+  def getByTvSeriePrimaryTitle(tvSerieTitle: String):Future[Seq[TvSeries]]
14
 
17
 
15
-  def generalize(tvSeriePrimaryTitle: String): Unit
18
+  def getPersonsTeamForTvSerie(tvSeriePrimaryTitle: String)
16
 
19
 
17
 
20
 
18
 }
21
 }

+ 36
- 52
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamFileProcessingImpl.scala View File

7
 import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics}
7
 import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics}
8
 import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSeries}
8
 import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSeries}
9
 import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.{buildPersonModel, buildTvSerieModel}
9
 import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.{buildPersonModel, buildTvSerieModel}
10
-import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{
11
-  buildAllPersonsSink,
12
-  buildAndValidateSource,
13
-  buildPersonFlow,
14
-  buildPersonSink,
15
-  buildSource,
16
-  buildTvSerieFlow,
17
-  buildTvSeriesSink,
18
-  filterByPersonIdFlow,
19
-  filterByPersonNameFlow,
20
-  filterByTvSeriePrimaryTitleFlow
21
-}
10
+import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow, buildPersonSink, buildSource, buildTvSerieFlow, buildTvSeriesSink, filterByPersonIdFlow, filterByPersonNameFlow, filterByTvSeriePrimaryTitleFlow}
22
 import org.slf4j.LoggerFactory
11
 import org.slf4j.LoggerFactory
23
 import org.springframework.stereotype.Component
12
 import org.springframework.stereotype.Component
24
 
13
 
34
   implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor")
23
   implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor")
35
   implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
24
   implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
36
 
25
 
37
-  override def getAllPersons(): Unit = {
26
+  override def getAllPersons() = {
38
     val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
27
     val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
39
-    val personSink: Sink[Person, Future[Done]] = buildAllPersonsSink(logger = logger)
40
-
41
     //graph
28
     //graph
42
     val startTime: Long = System.currentTimeMillis()
29
     val startTime: Long = System.currentTimeMillis()
43
     val result: Future[Done] = personSource
30
     val result: Future[Done] = personSource
44
       .via(flow = buildPersonFlow())
31
       .via(flow = buildPersonFlow())
45
-      .runWith(sink = personSink)
46
-      .andThen {
47
-        case Success(value) =>
48
-          val elapsedTime: Long = (System.currentTimeMillis() - startTime) / 1000
49
-          logger.info(s"$value: Successfully processed, elapsed time: $elapsedTime")
50
-        case Failure(exception) => logger.error(s"$exception: Fail")
51
-      }
32
+      .runWith(sink = buildAllPersonsSink(logger = logger))
33
+
34
+    result.onComplete {
35
+      case Failure(exception) => logger.error(s"$exception")
36
+      case Success(value) =>
37
+        logger.info(s"$value")
38
+        val time: Long =(System.currentTimeMillis()-startTime)/100
39
+        logger.info(s"elapsed time: $time")
40
+    }
52
   }
41
   }
53
 
42
 
54
-  override def getPersonById(nconst: String): Person = {
43
+  override def getPersonById(nconst: String): Future[Person] = {
55
     val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
44
     val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
56
 
45
 
57
     val startTime: Long = System.currentTimeMillis()
46
     val startTime: Long = System.currentTimeMillis()
58
     val res = source
47
     val res = source
59
       .via(flow = filterByPersonIdFlow(nconst = nconst))
48
       .via(flow = filterByPersonIdFlow(nconst = nconst))
60
       .runWith(sink = buildPersonSink())
49
       .runWith(sink = buildPersonSink())
61
-     .andThen {
62
-        case Success(value) =>
63
-          val elapsedTime: Long = (System.currentTimeMillis()-startTime)/1000
64
-          logger.info(s"$value")
65
-          logger.info(s"Successfully processed, elapsed time: $elapsedTime")
66
-        case Failure(exception) => logger.error(s"$exception: Fail")
67
-      }
68
 
50
 
69
-    Await.result(res,1 minutes)
70
-    val person: Person = res.value.get.get
71
-    person
51
+    res.andThen {
52
+      case Failure(exception) => logger.info(s"error!!!!!!!!!!!!!$exception")
53
+      case Success(value) => logger.info(s"$value")
54
+    }
55
+
56
+    res
72
   }
57
   }
73
 
58
 
74
   override def getPersonByName(primaryName: String): Unit = {
59
   override def getPersonByName(primaryName: String): Unit = {
87
   override def getAllTvSeries(): Unit = {
72
   override def getAllTvSeries(): Unit = {
88
 
73
 
89
     val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
74
     val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
90
-    val sink: Sink[TvSeries, Future[Done]] = buildTvSeriesSink(logger = logger)
75
+    val sink: Sink[TvSeries, Future[Done]] = buildAllTvSeriesSink(logger = logger)
91
 
76
 
92
     val startingTime: Long = System.currentTimeMillis()
77
     val startingTime: Long = System.currentTimeMillis()
93
 
78
 
104
 
89
 
105
   }
90
   }
106
 
91
 
107
-  override def getByTvSeriePrimaryTitle(tvSeriePrimaryTitle: String): Unit = {
92
+  override def getByTvSeriePrimaryTitle(tvSeriePrimaryTitle: String):Future[IndexedSeq[TvSeries]] = {
108
 
93
 
109
     val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
94
     val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
110
-    val tvSeriesSink: Sink[TvSeries, Future[Done]] = buildTvSeriesSink(logger = logger)
111
 
95
 
112
-    val filterByMovieTitleFlow: Flow[Map[String, String], TvSeries, NotUsed] =
96
+    val filterByTvSerieTitle: Flow[Map[String, String], TvSeries, NotUsed] =
113
       filterByTvSeriePrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle)
97
       filterByTvSeriePrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle)
114
 
98
 
115
-    val startTime: Long = System.currentTimeMillis()
116
-    val listTvSeries: Future[Done] = tvSeriesSource
117
-      .via(flow = filterByMovieTitleFlow)
118
-      .runWith(sink = tvSeriesSink)
119
-      .andThen {
120
-        case Success(value) =>
121
-          val elapsedTime: Long = (System.currentTimeMillis() - startTime) / 1000
122
-          logger.info(s"$value: successfully processing file, elapsed time: $elapsedTime sec")
123
-        case Failure(error: Error) => logger.error(s"$error")
124
-      }
99
+    val tvSrie :Future[IndexedSeq[TvSeries]] = tvSeriesSource
100
+      .via(flow = filterByTvSerieTitle)
101
+      .runWith(Sink.collection)
125
 
102
 
126
-    getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
103
+      tvSrie.andThen {
104
+      case Failure(exception) => logger.info(s"$exception")
105
+      case Success(value) => logger.info(s"$value")
106
+    }
127
 
107
 
128
   }
108
   }
129
 
109
 
193
         case Failure(exception) => logger.error(s"$exception")
173
         case Failure(exception) => logger.error(s"$exception")
194
         case Success(value) =>
174
         case Success(value) =>
195
           val elapsedTime: Long = (System.currentTimeMillis()-startTime)/1000
175
           val elapsedTime: Long = (System.currentTimeMillis()-startTime)/1000
196
-          logger.info(s"$value")
197
           logger.info(s"END: Successfully, elapsed time: $elapsedTime")
176
           logger.info(s"END: Successfully, elapsed time: $elapsedTime")
198
       }
177
       }
199
 
178
 
203
     listPersons
182
     listPersons
204
 
183
 
205
   }
184
   }
206
-
207
-  override def generalize(tvSeriePrimaryTitle: String): Unit={
185
+  override def getPersonsTeamForTvSerie(tvSeriePrimaryTitle: String):Unit={
208
 
186
 
209
     val tvSerieID: String = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
187
     val tvSerieID: String = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
210
     val listPersonIDs: List[Option[String]] = getListOfPersonsIDByTvSerieID(tconst = tvSerieID)
188
     val listPersonIDs: List[Option[String]] = getListOfPersonsIDByTvSerieID(tconst = tvSerieID)
211
-    getListOfPersonsForTvSerie(nconstList = listPersonIDs)
189
+    val personsTeam: List[Person] = getListOfPersonsForTvSerie(nconstList = listPersonIDs)
190
+
191
+    logger.info(s"Team size:${personsTeam.size}")
192
+    personsTeam.foreach((person: Person)=>{
193
+      logger.info(s"${person.toString}")
194
+    })
195
+
212
   }
196
   }
213
 
197
 
214
 }
198
 }

Powered by TurnKey Linux.