|
@@ -1,13 +1,12 @@
|
1
|
1
|
package fr.natan.akkastreamfileprocessingapi.service
|
2
|
2
|
|
3
|
3
|
import akka.Done
|
4
|
|
-import akka.actor.ActorSystem
|
5
|
4
|
import akka.stream.scaladsl.{Sink, Source}
|
6
|
5
|
import com.typesafe.scalalogging.slf4j.Logger
|
7
|
|
-import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics}
|
|
6
|
+import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics}
|
8
|
7
|
import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSerie}
|
9
|
|
-import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.buildPersonModel
|
10
|
8
|
import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{
|
|
9
|
+ actorSystem,
|
11
|
10
|
buildAllPersonsSink,
|
12
|
11
|
buildAllTvSeriesSink,
|
13
|
12
|
buildAndValidateSource,
|
|
@@ -19,6 +18,12 @@ import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{
|
19
|
18
|
filterTvSerieByIdFlow,
|
20
|
19
|
filterTvSerieByPrimaryTitleFlow
|
21
|
20
|
}
|
|
21
|
+import fr.natan.akkastreamfileprocessingapi.service.UtilitiesClass.{
|
|
22
|
+ getListOfPersonsForTvSerie,
|
|
23
|
+ getListOfPersonsIDByTvSerieID,
|
|
24
|
+ getTvSerieIDFuture,
|
|
25
|
+ getTvSerieIdByPrimaryTitle
|
|
26
|
+}
|
22
|
27
|
import org.slf4j.LoggerFactory
|
23
|
28
|
import org.springframework.stereotype.Component
|
24
|
29
|
|
|
@@ -31,7 +36,6 @@ import scala.util.{Failure, Success}
|
31
|
36
|
@Component
|
32
|
37
|
class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
33
|
38
|
|
34
|
|
- implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor")
|
35
|
39
|
implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
|
36
|
40
|
|
37
|
41
|
override def getPersonByIdFuture(personID: String): Future[Person] = {
|
|
@@ -103,48 +107,6 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
103
|
107
|
tvSriesFuture
|
104
|
108
|
}
|
105
|
109
|
|
106
|
|
-
|
107
|
|
- private def getTvSerieIdByPrimaryTitle(primaryTitle: String): Future[Option[String]] = {
|
108
|
|
- val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics)
|
109
|
|
- val tvSerieIdFuture: Future[Option[String]] = source
|
110
|
|
- .filter((rowMap: Map[String, String]) => {
|
111
|
|
- rowMap.getOrElse(key = "primaryTitle", default = "") == primaryTitle
|
112
|
|
- })
|
113
|
|
- .map((rowMap: Map[String, String]) => rowMap.get("tconst"))
|
114
|
|
- .runWith(Sink.head)
|
115
|
|
-
|
116
|
|
- tvSerieIdFuture
|
117
|
|
- }
|
118
|
|
-
|
119
|
|
- private def getListOfPersonsIDByTvSerieID(tvSerieIdFuture: Future[Option[String]]): Future[IndexedSeq[Option[String]]] = {
|
120
|
|
- val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics)
|
121
|
|
- val personsIDsFuture: Future[IndexedSeq[Option[String]]] = source
|
122
|
|
- .filter((rowMaps: Map[String, String]) => {
|
123
|
|
- rowMaps.getOrElse(key = "tconst", default = "") == tvSerieIdFuture.value.get.get.get
|
124
|
|
- })
|
125
|
|
- .map((rowMap: Map[String, String]) => {
|
126
|
|
- rowMap.get(key = "nconst")
|
127
|
|
- })
|
128
|
|
- .runWith(Sink.collection)
|
129
|
|
-
|
130
|
|
- personsIDsFuture
|
131
|
|
- }
|
132
|
|
-
|
133
|
|
- private def getListOfPersonsForTvSerie(listPersonsIDsFuture: Future[IndexedSeq[Option[String]]]): Future[IndexedSeq[Person]] = {
|
134
|
|
-
|
135
|
|
- val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
136
|
|
- val personsFuture: Future[IndexedSeq[Person]] = source
|
137
|
|
- .filter((rowMaps: Map[String, String]) => {
|
138
|
|
- listPersonsIDsFuture.value.get.get.contains(rowMaps.get(key = "nconst"))
|
139
|
|
- })
|
140
|
|
- .map((rowMap: Map[String, String]) => {
|
141
|
|
- buildPersonModel(rowMap)
|
142
|
|
- })
|
143
|
|
- .runWith(Sink.collection)
|
144
|
|
-
|
145
|
|
- personsFuture
|
146
|
|
- }
|
147
|
|
-
|
148
|
110
|
override def getPersonsForTvSerieByTvSerieTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]] = {
|
149
|
111
|
|
150
|
112
|
//futures chaining
|
|
@@ -163,7 +125,8 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
163
|
125
|
_ =>
|
164
|
126
|
logger.info("STEP 2/3 START")
|
165
|
127
|
val start2: Long = System.currentTimeMillis()
|
166
|
|
- val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieIdFuture = tvSerieIDFuture)
|
|
128
|
+ val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(
|
|
129
|
+ tvSerieIdFuture = tvSerieIDFuture)
|
167
|
130
|
listPersonIDsFuture.andThen({
|
168
|
131
|
case Failure(exception) => logger.error(s"$exception")
|
169
|
132
|
case Success(value) =>
|
|
@@ -174,7 +137,8 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
174
|
137
|
_ =>
|
175
|
138
|
logger.info("STEP 3/3 START")
|
176
|
139
|
val start3: Long = System.currentTimeMillis()
|
177
|
|
- val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDsFuture = listPersonIDsFuture)
|
|
140
|
+ val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(
|
|
141
|
+ listPersonsIDsFuture = listPersonIDsFuture)
|
178
|
142
|
personsTeamFuture.andThen({
|
179
|
143
|
case Failure(exception) => logger.error(s"$exception")
|
180
|
144
|
case Success(value: IndexedSeq[Person]) =>
|
|
@@ -187,18 +151,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
187
|
151
|
finalFuture
|
188
|
152
|
}
|
189
|
153
|
|
190
|
|
- private def getTvSerieIDFuture(tvSerieId: String): Future[Option[String]] ={
|
191
|
|
- val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics)
|
192
|
|
-
|
193
|
|
- val tvSerieIdFuture : Future[Option[String]]= source.
|
194
|
|
- filter((rowMap: Map[String, String]) => rowMap.getOrElse(key = "tconst", default = "") == tvSerieId)
|
195
|
|
- .map((rowMap: Map[String, String])=>rowMap.get(key = "tconst"))
|
196
|
|
- .runWith(Sink.head)
|
197
|
|
-
|
198
|
|
- tvSerieIdFuture
|
199
|
|
- }
|
200
|
|
-
|
201
|
|
- override def getPersonsForTvSerieByTvSerieIdFuture(tvSerieId: String): Future[IndexedSeq[Person]] = {
|
|
154
|
+ override def getPersonsForTvSerieByTvSerieIDFuture(tvSerieId: String): Future[IndexedSeq[Person]] = {
|
202
|
155
|
|
203
|
156
|
val start1: Long = System.currentTimeMillis()
|
204
|
157
|
logger.info("STEP 1/3")
|
|
@@ -213,17 +166,19 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
213
|
166
|
.flatMap({
|
214
|
167
|
_ => logger.info("STEP 2/3")
|
215
|
168
|
val start2: Long = System.currentTimeMillis()
|
216
|
|
- val listOfPersonsIDsFuture : Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieIdFuture = tvSerieIdFuture)
|
|
169
|
+ val listOfPersonsIDsFuture : Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(
|
|
170
|
+ tvSerieIdFuture = tvSerieIdFuture)
|
217
|
171
|
listOfPersonsIDsFuture.andThen({
|
218
|
172
|
case Failure(exception) => logger.error(s"${exception.printStackTrace()}")
|
219
|
|
- case Success(value) =>
|
|
173
|
+ case Success(value: IndexedSeq[Option[String]]) =>
|
220
|
174
|
value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
|
221
|
175
|
logger.info(s"STEP 2/3 END, elapsed time:${(System.currentTimeMillis() - start2) / 1000} sec")
|
222
|
176
|
})
|
223
|
177
|
.flatMap({
|
224
|
178
|
_ => logger.info("STEP 3/3")
|
225
|
179
|
val start3: Long = System.currentTimeMillis()
|
226
|
|
- val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDsFuture = listOfPersonsIDsFuture)
|
|
180
|
+ val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(
|
|
181
|
+ listPersonsIDsFuture = listOfPersonsIDsFuture)
|
227
|
182
|
personsTeamFuture.andThen({
|
228
|
183
|
case Failure(exception) => logger.error(s"$exception")
|
229
|
184
|
case Success(value: IndexedSeq[Person]) =>
|