|
@@ -6,7 +6,7 @@ import akka.{Done, NotUsed}
|
6
|
6
|
import com.typesafe.scalalogging.slf4j.Logger
|
7
|
7
|
import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.titleBasics
|
8
|
8
|
import fr.natan.akkastreamfileprocessingapi.models.TvSeries
|
9
|
|
-import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAndValidateSource, buildMovieFlow, buildSink, filterByMoviePrimaryTitleFlow}
|
|
9
|
+import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAndValidateSource, buildMovieFlow, buildSink, buildfilterByMoviePrimaryTitleFlow}
|
10
|
10
|
import org.slf4j.LoggerFactory
|
11
|
11
|
import org.springframework.stereotype.Component
|
12
|
12
|
|
|
@@ -20,7 +20,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
20
|
20
|
implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor")
|
21
|
21
|
implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
|
22
|
22
|
|
23
|
|
- override def getAllMovies() = {
|
|
23
|
+ override def getAllMovies(): Unit = {
|
24
|
24
|
|
25
|
25
|
val source: Source[String, NotUsed] = buildAndValidateSource(inputFile = titleBasics)
|
26
|
26
|
val sink: Sink[TvSeries, Future[Done]] = buildSink(logger = logger)
|
|
@@ -28,6 +28,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
28
|
28
|
|
29
|
29
|
val startingTime: Long = System.currentTimeMillis()
|
30
|
30
|
|
|
31
|
+ //graph sink->flow->sink
|
31
|
32
|
source
|
32
|
33
|
.via(flow = buildMovieFlow(movieID = "tconst"))
|
33
|
34
|
.runWith(sink = sink)
|
|
@@ -46,17 +47,18 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
46
|
47
|
val sink: Sink[TvSeries, Future[Done]] = buildSink(logger = logger)
|
47
|
48
|
|
48
|
49
|
val filterByMovieTitleFlow: Flow[String, TvSeries, NotUsed] =
|
49
|
|
- filterByMoviePrimaryTitleFlow(moviePrimaryTitle = moviePrimaryTitle)
|
|
50
|
+ buildfilterByMoviePrimaryTitleFlow(moviePrimaryTitle = moviePrimaryTitle)
|
50
|
51
|
|
51
|
52
|
val startTime: Long = System.currentTimeMillis()
|
52
|
|
- source.via(flow = filterByMovieTitleFlow).runWith(sink = sink).andThen {
|
|
53
|
+ val listTvSeries: Future[Done]= source.via(flow = filterByMovieTitleFlow)
|
|
54
|
+ .runWith(sink = sink)
|
|
55
|
+ .andThen {
|
53
|
56
|
case Success(value) =>
|
54
|
57
|
val elapsedTime: Long = (System.currentTimeMillis() - startTime) / 1000
|
55
|
|
- logger.info(s"$value : successfully processing file, elapsed time $elapsedTime sec")
|
56
|
|
-
|
57
|
|
- case Failure(err: Error) => logger.error(s"$err")
|
|
58
|
+ logger.info(s"$value: successfully processing file, elapsed time $titleBasics: $elapsedTime sec")
|
|
59
|
+ case Failure(error: Error) => logger.error(s"$error")
|
58
|
60
|
}
|
59
|
|
-
|
60
|
61
|
}
|
61
|
|
-
|
62
|
62
|
}
|
|
63
|
+
|
|
64
|
+
|