|
@@ -3,13 +3,11 @@ package fr.natan.akkastreamfileprocessingapi.service
|
3
|
3
|
import akka.Done
|
4
|
4
|
import akka.stream.scaladsl.{Sink, Source}
|
5
|
5
|
import com.typesafe.scalalogging.slf4j.Logger
|
|
6
|
+import fr.natan.akkastreamfileprocessingapi.businessexceptions.PersonNotFoundException
|
6
|
7
|
import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics}
|
7
|
8
|
import fr.natan.akkastreamfileprocessingapi.models.ModelsAndJsonMap.{Person, TvSerie}
|
8
|
|
-import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{
|
9
|
|
- actorSystem, buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow, buildTvSerieFlow,
|
10
|
|
- filterPersonByIdFlow, filterPersonByNameFlow, filterTvSerieByIdFlow, filterTvSerieByPrimaryTitleFlow}
|
11
|
|
-import fr.natan.akkastreamfileprocessingapi.service.UtilitiesClass.{getListOfPersonsForTvSerie, getListOfPersonsIDByTvSerieID,
|
12
|
|
- getTvSerieIDFuture, getTvSerieIdByPrimaryTitle}
|
|
9
|
+import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{actorSystem, buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow, buildTvSerieFlow, filterPersonByIdFlow, filterPersonByNameFlow, filterTvSerieByIdFlow, filterTvSerieByPrimaryTitleFlow}
|
|
10
|
+import fr.natan.akkastreamfileprocessingapi.service.UtilitiesClass.{getListOfPersonsForTvSerie, getListOfPersonsIDByTvSerieID, getTvSerieIDFuture, getTvSerieIdByPrimaryTitle}
|
13
|
11
|
import org.slf4j.LoggerFactory
|
14
|
12
|
import org.springframework.stereotype.Component
|
15
|
13
|
|
|
@@ -32,8 +30,9 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
32
|
30
|
.via(flow = filterPersonByIdFlow(personID = personID))
|
33
|
31
|
.runWith(Sink.headOption[Person])
|
34
|
32
|
|
35
|
|
- personFuture.andThen({
|
36
|
|
- case Failure(exception) => logger.error(s"!${exception.printStackTrace()}")
|
|
33
|
+ personFuture.onComplete({
|
|
34
|
+ case Failure(exception) => throw new PersonNotFoundException()
|
|
35
|
+ case Success(value: Option[Person]) => logger.info(s"${value.get}")
|
37
|
36
|
})
|
38
|
37
|
|
39
|
38
|
personFuture
|
|
@@ -47,6 +46,11 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
47
|
46
|
.via(flow = filterPersonByNameFlow(primaryName = primaryName))
|
48
|
47
|
.runWith(Sink.headOption[Person])
|
49
|
48
|
|
|
49
|
+ personFuture.onComplete({
|
|
50
|
+ case Failure(exception) => throw new PersonNotFoundException()
|
|
51
|
+ case Success(value:Option[Person]) =>logger.info(s"${value.get}")
|
|
52
|
+ })
|
|
53
|
+
|
50
|
54
|
personFuture
|
51
|
55
|
}
|
52
|
56
|
|
|
@@ -60,9 +64,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
60
|
64
|
|
61
|
65
|
tvSerieFuture.onComplete({
|
62
|
66
|
case Failure(exception) => logger.info(s"$exception")
|
63
|
|
- case Success(value: Option[TvSerie]) =>
|
64
|
|
- logger.info(s"$value")
|
65
|
|
- logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
|
|
67
|
+ case Success(value: Option[TvSerie]) => logger.info(s"${value.get}")
|
66
|
68
|
})
|
67
|
69
|
tvSerieFuture
|
68
|
70
|
}
|