|
@@ -4,10 +4,23 @@ import akka.Done
|
4
|
4
|
import akka.actor.ActorSystem
|
5
|
5
|
import akka.stream.scaladsl.{Sink, Source}
|
6
|
6
|
import com.typesafe.scalalogging.slf4j.Logger
|
7
|
|
-import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics}
|
|
7
|
+import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{
|
|
8
|
+ nameBasics, titleBasics, titlePrincipalsBasics
|
|
9
|
+}
|
8
|
10
|
import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSerie}
|
9
|
11
|
import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.buildPersonModel
|
10
|
|
-import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow, buildSource, buildTvSerieFlow, filterPersonByIdFlow, filterPersonByNameFlow, filterTvSerieByIdFlow, filterTvSerieByPrimaryTitleFlow}
|
|
12
|
+import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{
|
|
13
|
+ buildAllPersonsSink,
|
|
14
|
+ buildAllTvSeriesSink,
|
|
15
|
+ buildAndValidateSource,
|
|
16
|
+ buildPersonFlow,
|
|
17
|
+ buildSource,
|
|
18
|
+ buildTvSerieFlow,
|
|
19
|
+ filterPersonByIdFlow,
|
|
20
|
+ filterPersonByNameFlow,
|
|
21
|
+ filterTvSerieByIdFlow,
|
|
22
|
+ filterTvSerieByPrimaryTitleFlow
|
|
23
|
+}
|
11
|
24
|
import org.slf4j.LoggerFactory
|
12
|
25
|
import org.springframework.stereotype.Component
|
13
|
26
|
|
|
@@ -18,87 +31,87 @@ import scala.util.{Failure, Success}
|
18
|
31
|
|
19
|
32
|
//noinspection SpellCheckingInspection
|
20
|
33
|
@Component
|
21
|
|
-class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
|
34
|
+class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
22
|
35
|
|
23
|
36
|
implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor")
|
24
|
37
|
implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
|
25
|
38
|
|
26
|
|
- override def getPersonById(personID: String): Future[Person] = {
|
|
39
|
+ override def getPersonByIdFuture(personID: String): Future[Person] = {
|
27
|
40
|
val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
28
|
|
- val res: Future[Person] = source
|
|
41
|
+ val personFuture: Future[Person] = source
|
29
|
42
|
.via(flow = filterPersonByIdFlow(personID = personID))
|
30
|
43
|
.runWith(Sink.head)
|
31
|
44
|
|
32
|
|
- res.andThen({
|
|
45
|
+ personFuture.andThen({
|
33
|
46
|
case Failure(exception) => logger.info(s"$exception")
|
34
|
47
|
case Success(value:Person) => logger.info(s"$value")
|
35
|
48
|
})
|
36
|
49
|
|
37
|
|
- res
|
|
50
|
+ personFuture
|
38
|
51
|
}
|
39
|
52
|
|
40
|
|
- override def getPersonByName(primaryName: String): Future[Person] = {
|
|
53
|
+ override def getPersonByNameFuture(primaryName: String): Future[Person] = {
|
41
|
54
|
val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
42
|
|
- val persons: Future[Person] = source
|
|
55
|
+ val personFuture: Future[Person] = source
|
43
|
56
|
.via(flow = filterPersonByNameFlow(primaryName = primaryName))
|
44
|
57
|
.runWith(Sink.head)
|
45
|
58
|
|
46
|
|
- persons.onComplete({
|
|
59
|
+ personFuture.onComplete({
|
47
|
60
|
case Failure(exception) => logger.error(s"$exception")
|
48
|
61
|
case Success(value: Person) => logger.info(s"$value")
|
49
|
62
|
logger.info("SUCCESS")
|
50
|
63
|
})
|
51
|
64
|
|
52
|
|
- persons
|
|
65
|
+ personFuture
|
53
|
66
|
}
|
54
|
67
|
|
55
|
|
- override def getTvSerieById(tvSerieID: String): Future[TvSerie] = {
|
|
68
|
+ override def getTvSerieByIdFuture(tvSerieID: String): Future[TvSerie] = {
|
56
|
69
|
val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics)
|
57
|
|
- val res: Future[TvSerie] = source
|
|
70
|
+ val tvSerieFuture: Future[TvSerie] = source
|
58
|
71
|
.via(flow = filterTvSerieByIdFlow(tvSerieID = tvSerieID))
|
59
|
72
|
.runWith(Sink.head)
|
60
|
73
|
|
61
|
|
- res.onComplete({
|
|
74
|
+ tvSerieFuture.onComplete({
|
62
|
75
|
case Failure(exception) => logger.info(s"$exception")
|
63
|
76
|
case Success(value: TvSerie) => logger.info(s"$value")
|
64
|
77
|
})
|
65
|
|
- res
|
|
78
|
+ tvSerieFuture
|
66
|
79
|
}
|
67
|
|
- override def getTvSerieByPrimaryTitle(tvSeriePrimaryTitle: String): Future[IndexedSeq[TvSerie]] = {
|
|
80
|
+ override def getTvSeriesByPrimaryTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[TvSerie]] = {
|
68
|
81
|
|
69
|
82
|
val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
|
70
|
83
|
|
71
|
|
- val tvSries: Future[IndexedSeq[TvSerie]] = tvSeriesSource
|
|
84
|
+ val tvSriesFuture: Future[IndexedSeq[TvSerie]] = tvSeriesSource
|
72
|
85
|
.via(flow = filterTvSerieByPrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle))
|
73
|
86
|
.runWith(Sink.collection)
|
74
|
87
|
|
75
|
|
- tvSries.onComplete({
|
|
88
|
+ tvSriesFuture.onComplete({
|
76
|
89
|
case Failure(exception) => logger.info(s"$exception")
|
77
|
90
|
case Success(value: IndexedSeq[TvSerie]) =>
|
78
|
91
|
value.foreach((tvSrie: TvSerie) => logger.info(s"$tvSrie"))
|
79
|
92
|
logger.info("SUCCESS")
|
80
|
93
|
})
|
81
|
94
|
|
82
|
|
- tvSries
|
|
95
|
+ tvSriesFuture
|
83
|
96
|
}
|
84
|
97
|
|
85
|
98
|
|
86
|
99
|
private def getTvSerieIdByPrimaryTitle(primaryTitle: String): Future[Option[String]] = {
|
87
|
100
|
val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics)
|
88
|
101
|
|
89
|
|
- val res: Future[Option[String]] = source
|
|
102
|
+ val tvSerieIdFuture: Future[Option[String]] = source
|
90
|
103
|
.filter((rowMap: Map[String, String]) => {
|
91
|
|
- rowMap.getOrElse(key = "primaryTitle", "") == primaryTitle
|
|
104
|
+ rowMap.getOrElse(key = "primaryTitle", default = "") == primaryTitle
|
92
|
105
|
})
|
93
|
106
|
.map((rowMap: Map[String, String]) => rowMap.get("tconst"))
|
94
|
107
|
.runWith(Sink.head)
|
95
|
108
|
|
96
|
|
- res
|
|
109
|
+ tvSerieIdFuture
|
97
|
110
|
}
|
98
|
111
|
|
99
|
112
|
private def getListOfPersonsIDByTvSerieID(tvSerieIdFuture: Future[Option[String]]): Future[IndexedSeq[Option[String]]] = {
|
100
|
113
|
val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics)
|
101
|
|
- val res: Future[IndexedSeq[Option[String]]] = source
|
|
114
|
+ val personsIDsFuture: Future[IndexedSeq[Option[String]]] = source
|
102
|
115
|
.filter((rowMaps: Map[String, String]) => {
|
103
|
116
|
rowMaps.getOrElse(key = "tconst", default = "") == tvSerieIdFuture.value.get.get.get
|
104
|
117
|
})
|
|
@@ -107,14 +120,13 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
107
|
120
|
})
|
108
|
121
|
.runWith(Sink.collection)
|
109
|
122
|
|
110
|
|
- res
|
|
123
|
+ personsIDsFuture
|
111
|
124
|
}
|
112
|
125
|
|
113
|
|
- private def getListOfPersonsForTvSerie(listPersonsIDsFuture: Future[IndexedSeq[Option[String]]]):
|
114
|
|
- Future[IndexedSeq[Person]] = {
|
|
126
|
+ private def getListOfPersonsForTvSerie(listPersonsIDsFuture: Future[IndexedSeq[Option[String]]]): Future[IndexedSeq[Person]] = {
|
115
|
127
|
|
116
|
128
|
val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
117
|
|
- val res: Future[IndexedSeq[Person]] = source
|
|
129
|
+ val personsFuture: Future[IndexedSeq[Person]] = source
|
118
|
130
|
.filter((rowMaps: Map[String, String]) => {
|
119
|
131
|
listPersonsIDsFuture.value.get.get.contains(rowMaps.get(key = "nconst"))
|
120
|
132
|
})
|
|
@@ -123,10 +135,10 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
123
|
135
|
})
|
124
|
136
|
.runWith(Sink.collection)
|
125
|
137
|
|
126
|
|
- res
|
|
138
|
+ personsFuture
|
127
|
139
|
}
|
128
|
140
|
|
129
|
|
- override def getTeamOfPersonsForTvSerie(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]] = {
|
|
141
|
+ override def getPersonsForTvSerieByTvSerieTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]] = {
|
130
|
142
|
|
131
|
143
|
//futures chaining
|
132
|
144
|
logger.info("STEP 1/3 START")
|
|
@@ -150,7 +162,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
150
|
162
|
logger.info("STEP 2/3 END")
|
151
|
163
|
})
|
152
|
164
|
.flatMap({
|
153
|
|
- future =>
|
|
165
|
+ _ =>
|
154
|
166
|
logger.info("STEP 3/3 START")
|
155
|
167
|
val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDsFuture = listPersonIDsFuture)
|
156
|
168
|
personsTeamFuture.andThen({
|
|
@@ -165,7 +177,51 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
165
|
177
|
finalFuture
|
166
|
178
|
}
|
167
|
179
|
|
168
|
|
- override def getAllPersons(): Unit = {
|
|
180
|
+ private def getTvSerieIDFuture(tvSerieId: String): Future[Option[String]] ={
|
|
181
|
+ val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics)
|
|
182
|
+
|
|
183
|
+ val tvSerieIdFuture : Future[Option[String]]= source.
|
|
184
|
+ filter((rowMap: Map[String, String]) => rowMap.getOrElse(key = "tconst", default = "") == tvSerieId)
|
|
185
|
+ .map((rowMap: Map[String, String])=>rowMap.get(key = "tconst"))
|
|
186
|
+ .runWith(Sink.head)
|
|
187
|
+
|
|
188
|
+ tvSerieIdFuture
|
|
189
|
+ }
|
|
190
|
+
|
|
191
|
+ override def getPersonsForTvSerieByTvSerieIdFuture(tvSerieId: String): Future[IndexedSeq[Person]] = {
|
|
192
|
+
|
|
193
|
+ val tvSerieIdFuture: Future[Option[String]] = getTvSerieIDFuture(tvSerieId = tvSerieId)
|
|
194
|
+ logger.info("STEP 1/3")
|
|
195
|
+ tvSerieIdFuture.andThen({
|
|
196
|
+ case Failure(exception) => logger.error(s"${exception.printStackTrace()}")
|
|
197
|
+ case Success(value:Option[String]) =>
|
|
198
|
+ logger.info(s"$value")
|
|
199
|
+ logger.info("STEP 1/3")
|
|
200
|
+
|
|
201
|
+ })
|
|
202
|
+ .flatMap({
|
|
203
|
+ _ => logger.info("STEP 2/3")
|
|
204
|
+ val listOfPersonsIDsFuture : Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieIdFuture = tvSerieIdFuture)
|
|
205
|
+ listOfPersonsIDsFuture.andThen({
|
|
206
|
+ case Failure(exception) => logger.error(s"${exception.printStackTrace()}")
|
|
207
|
+ case Success(value) =>
|
|
208
|
+ value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
|
|
209
|
+ logger.info("STEP 2/3 END")
|
|
210
|
+ })
|
|
211
|
+ .flatMap({
|
|
212
|
+ _ => logger.info("STEP 3/3")
|
|
213
|
+ val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDsFuture = listOfPersonsIDsFuture)
|
|
214
|
+ personsTeamFuture.andThen({
|
|
215
|
+ case Failure(exception) => logger.error(s"$exception")
|
|
216
|
+ case Success(value: IndexedSeq[Person]) =>
|
|
217
|
+ value.foreach((person: Person) => logger.info(s"${person.toString}"))
|
|
218
|
+ logger.info("STEP 3/3 END")
|
|
219
|
+ })
|
|
220
|
+ })
|
|
221
|
+ })
|
|
222
|
+
|
|
223
|
+ }
|
|
224
|
+ override def getAllPersonsFuture(): Future[Done] = {
|
169
|
225
|
val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
170
|
226
|
//graph
|
171
|
227
|
val startTime: Long = System.currentTimeMillis()
|
|
@@ -180,15 +236,17 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
180
|
236
|
val time: Long = (System.currentTimeMillis() - startTime) / 100
|
181
|
237
|
logger.info(s"elapsed time: $time")
|
182
|
238
|
}
|
|
239
|
+
|
|
240
|
+ result
|
183
|
241
|
}
|
184
|
242
|
|
185
|
|
- override def getAllTvSeries(): Unit = {
|
|
243
|
+ override def getAllTvSeriesFuture(): Future[Done] = {
|
186
|
244
|
val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
|
187
|
245
|
val sink: Sink[TvSerie, Future[Done]] = buildAllTvSeriesSink(logger = logger)
|
188
|
246
|
|
189
|
247
|
val startingTime: Long = System.currentTimeMillis()
|
190
|
248
|
//graph sink->flow->sink
|
191
|
|
- source
|
|
249
|
+ val results: Future[Done] = source
|
192
|
250
|
.via(flow = buildTvSerieFlow())
|
193
|
251
|
.runWith(sink = sink)
|
194
|
252
|
.andThen {
|
|
@@ -197,7 +255,10 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
197
|
255
|
logger.info(s"$value: successfully processing file, elapsed time $titleBasics: $elapsedTime sec")
|
198
|
256
|
case Failure(error: Error) => logger.error(s"$error")
|
199
|
257
|
}
|
|
258
|
+
|
|
259
|
+ results
|
200
|
260
|
}
|
|
261
|
+
|
201
|
262
|
}
|
202
|
263
|
|
203
|
264
|
|