|
@@ -26,10 +26,10 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
26
|
26
|
implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor")
|
27
|
27
|
implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
|
28
|
28
|
|
29
|
|
- override def getPersonById(nconst: String): Future[Person] = {
|
|
29
|
+ override def getPersonById(personID: String): Future[Person] = {
|
30
|
30
|
val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
31
|
|
- val res = source
|
32
|
|
- .via(flow = filterByPersonIdFlow(nconst = nconst))
|
|
31
|
+ val res: Future[Person] = source
|
|
32
|
+ .via(flow = filterByPersonIdFlow(nconst = personID))
|
33
|
33
|
.runWith(Sink.head)
|
34
|
34
|
|
35
|
35
|
res.andThen {
|
|
@@ -90,11 +90,11 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
90
|
90
|
res
|
91
|
91
|
}
|
92
|
92
|
|
93
|
|
- private def getListOfPersonsIDByTvSerieID(tvSerieID: Future[Option[String]]): Future[IndexedSeq[Option[String]]] = {
|
|
93
|
+ private def getListOfPersonsIDByTvSerieID(tvSerieIdFuture: Future[Option[String]]): Future[IndexedSeq[Option[String]]] = {
|
94
|
94
|
val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics)
|
95
|
95
|
val res: Future[IndexedSeq[Option[String]]] = source
|
96
|
96
|
.filter((rowMaps: Map[String, String]) => {
|
97
|
|
- rowMaps.getOrElse(key = "tconst", default = "") == tvSerieID.value.get.get.get
|
|
97
|
+ rowMaps.getOrElse(key = "tconst", default = "") == tvSerieIdFuture.value.get.get.get
|
98
|
98
|
})
|
99
|
99
|
.map((rowMap: Map[String, String]) => {
|
100
|
100
|
rowMap.get(key = "nconst")
|
|
@@ -104,18 +104,18 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
104
|
104
|
res
|
105
|
105
|
}
|
106
|
106
|
|
107
|
|
- private def getListOfPersonsForTvSerie(listPersonsIDs: Future[IndexedSeq[Option[String]]]): Future[IndexedSeq[Person]] = {
|
108
|
|
- val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
|
107
|
+ private def getListOfPersonsForTvSerie(listPersonsIDsFuture: Future[IndexedSeq[Option[String]]]):
|
|
108
|
+ Future[IndexedSeq[Person]] = {
|
109
|
109
|
|
110
|
|
- val res: Future[IndexedSeq[Person]] =
|
111
|
|
- source
|
112
|
|
- .filter((rowMaps: Map[String, String]) => {
|
113
|
|
- listPersonsIDs.value.get.get.contains(rowMaps.get(key = "nconst"))
|
114
|
|
- })
|
115
|
|
- .map((rowMap: Map[String, String]) => {
|
116
|
|
- buildPersonModel(rowMap)
|
117
|
|
- })
|
118
|
|
- .runWith(Sink.collection)
|
|
110
|
+ val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
|
111
|
+ val res: Future[IndexedSeq[Person]] = source
|
|
112
|
+ .filter((rowMaps: Map[String, String]) => {
|
|
113
|
+ listPersonsIDsFuture.value.get.get.contains(rowMaps.get(key = "nconst"))
|
|
114
|
+ })
|
|
115
|
+ .map((rowMap: Map[String, String]) => {
|
|
116
|
+ buildPersonModel(rowMap)
|
|
117
|
+ })
|
|
118
|
+ .runWith(Sink.collection)
|
119
|
119
|
|
120
|
120
|
res
|
121
|
121
|
}
|
|
@@ -126,39 +126,39 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
126
|
126
|
logger.info("STEP 1/3 START")
|
127
|
127
|
val tvSerieIDFuture: Future[Option[String]] = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
|
128
|
128
|
|
129
|
|
- val finalResult: Future[IndexedSeq[Person]] = tvSerieIDFuture.andThen({
|
130
|
|
- case Failure(exception) => logger.error(s"$exception")
|
131
|
|
- case Success(value: Option[String]) =>
|
132
|
|
- logger.info(s"TvSerie ID: $value")
|
133
|
|
- logger.info("STEP 1/3 END")
|
134
|
|
- })
|
135
|
|
- .flatMap({
|
136
|
|
- _ =>
|
137
|
|
- logger.info("STEP 2/3 START")
|
138
|
|
- val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieID = tvSerieIDFuture)
|
139
|
|
- listPersonIDsFuture.andThen({
|
140
|
|
- case Failure(exception) => logger.error(s"$exception")
|
141
|
|
- case Success(value) =>
|
142
|
|
- value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
|
143
|
|
- logger.info("STEP 2/3 END")
|
144
|
|
- })
|
145
|
|
- .flatMap({
|
146
|
|
- future =>
|
147
|
|
- logger.info("STEP 3/3 START")
|
148
|
|
- val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDs = listPersonIDsFuture)
|
149
|
|
- personsTeamFuture.andThen({
|
150
|
|
- case Failure(exception) => logger.error(s"$exception")
|
151
|
|
- case Success(value: IndexedSeq[Person]) =>
|
152
|
|
- value.foreach((person: Person) => logger.info(s"${person.toString}"))
|
153
|
|
- logger.info("STEP 3/3 END")
|
154
|
|
- })
|
155
|
|
- })
|
|
129
|
+ val finalFuture: Future[IndexedSeq[Person]] =
|
|
130
|
+ tvSerieIDFuture.andThen({
|
|
131
|
+ case Failure(exception) => logger.error(s"$exception")
|
|
132
|
+ case Success(value: Option[String]) =>
|
|
133
|
+ logger.info(s"TvSerie ID: $value")
|
|
134
|
+ logger.info("STEP 1/3 END")
|
156
|
135
|
})
|
|
136
|
+ .flatMap({
|
|
137
|
+ _ =>
|
|
138
|
+ logger.info("STEP 2/3 START")
|
|
139
|
+ val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieIdFuture = tvSerieIDFuture)
|
|
140
|
+ listPersonIDsFuture.andThen({
|
|
141
|
+ case Failure(exception) => logger.error(s"$exception")
|
|
142
|
+ case Success(value) =>
|
|
143
|
+ value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
|
|
144
|
+ logger.info("STEP 2/3 END")
|
|
145
|
+ })
|
|
146
|
+ .flatMap({
|
|
147
|
+ future =>
|
|
148
|
+ logger.info("STEP 3/3 START")
|
|
149
|
+ val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDsFuture = listPersonIDsFuture)
|
|
150
|
+ personsTeamFuture.andThen({
|
|
151
|
+ case Failure(exception) => logger.error(s"$exception")
|
|
152
|
+ case Success(value: IndexedSeq[Person]) =>
|
|
153
|
+ value.foreach((person: Person) => logger.info(s"${person.toString}"))
|
|
154
|
+ logger.info("STEP 3/3 END")
|
|
155
|
+ })
|
|
156
|
+ })
|
|
157
|
+ })
|
157
|
158
|
|
158
|
|
- finalResult
|
|
159
|
+ finalFuture
|
159
|
160
|
}
|
160
|
161
|
|
161
|
|
-
|
162
|
162
|
override def getAllPersons(): Unit = {
|
163
|
163
|
val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
164
|
164
|
//graph
|