|
@@ -4,9 +4,7 @@ import akka.Done
|
4
|
4
|
import akka.actor.ActorSystem
|
5
|
5
|
import akka.stream.scaladsl.{Sink, Source}
|
6
|
6
|
import com.typesafe.scalalogging.slf4j.Logger
|
7
|
|
-import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{
|
8
|
|
- nameBasics, titleBasics, titlePrincipalsBasics
|
9
|
|
-}
|
|
7
|
+import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics}
|
10
|
8
|
import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSerie}
|
11
|
9
|
import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.buildPersonModel
|
12
|
10
|
import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{
|
|
@@ -38,13 +36,17 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
38
|
36
|
|
39
|
37
|
override def getPersonByIdFuture(personID: String): Future[Person] = {
|
40
|
38
|
val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
|
39
|
+
|
|
40
|
+ val start: Long = System.currentTimeMillis()
|
41
|
41
|
val personFuture: Future[Person] = source
|
42
|
42
|
.via(flow = filterPersonByIdFlow(personID = personID))
|
43
|
43
|
.runWith(Sink.head)
|
44
|
44
|
|
45
|
45
|
personFuture.andThen({
|
46
|
46
|
case Failure(exception) => logger.info(s"$exception")
|
47
|
|
- case Success(value:Person) => logger.info(s"$value")
|
|
47
|
+ case Success(value:Person) =>
|
|
48
|
+ logger.info(s"$value")
|
|
49
|
+ logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
|
48
|
50
|
})
|
49
|
51
|
|
50
|
52
|
personFuture
|
|
@@ -52,6 +54,8 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
52
|
54
|
|
53
|
55
|
override def getPersonByNameFuture(primaryName: String): Future[Person] = {
|
54
|
56
|
val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
|
57
|
+
|
|
58
|
+ val start: Long = System.currentTimeMillis()
|
55
|
59
|
val personFuture: Future[Person] = source
|
56
|
60
|
.via(flow = filterPersonByNameFlow(primaryName = primaryName))
|
57
|
61
|
.runWith(Sink.head)
|
|
@@ -59,7 +63,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
59
|
63
|
personFuture.onComplete({
|
60
|
64
|
case Failure(exception) => logger.error(s"$exception")
|
61
|
65
|
case Success(value: Person) => logger.info(s"$value")
|
62
|
|
- logger.info("SUCCESS")
|
|
66
|
+ logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
|
63
|
67
|
})
|
64
|
68
|
|
65
|
69
|
personFuture
|
|
@@ -67,20 +71,24 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
67
|
71
|
|
68
|
72
|
override def getTvSerieByIdFuture(tvSerieID: String): Future[TvSerie] = {
|
69
|
73
|
val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics)
|
|
74
|
+
|
|
75
|
+ val start: Long = System.currentTimeMillis()
|
70
|
76
|
val tvSerieFuture: Future[TvSerie] = source
|
71
|
77
|
.via(flow = filterTvSerieByIdFlow(tvSerieID = tvSerieID))
|
72
|
78
|
.runWith(Sink.head)
|
73
|
79
|
|
74
|
80
|
tvSerieFuture.onComplete({
|
75
|
81
|
case Failure(exception) => logger.info(s"$exception")
|
76
|
|
- case Success(value: TvSerie) => logger.info(s"$value")
|
|
82
|
+ case Success(value: TvSerie) =>
|
|
83
|
+ logger.info(s"$value")
|
|
84
|
+ logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
|
77
|
85
|
})
|
78
|
86
|
tvSerieFuture
|
79
|
87
|
}
|
80
|
88
|
override def getTvSeriesByPrimaryTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[TvSerie]] = {
|
81
|
89
|
|
|
90
|
+ val start: Long = System.currentTimeMillis()
|
82
|
91
|
val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
|
83
|
|
-
|
84
|
92
|
val tvSriesFuture: Future[IndexedSeq[TvSerie]] = tvSeriesSource
|
85
|
93
|
.via(flow = filterTvSerieByPrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle))
|
86
|
94
|
.runWith(Sink.collection)
|
|
@@ -89,7 +97,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
89
|
97
|
case Failure(exception) => logger.info(s"$exception")
|
90
|
98
|
case Success(value: IndexedSeq[TvSerie]) =>
|
91
|
99
|
value.foreach((tvSrie: TvSerie) => logger.info(s"$tvSrie"))
|
92
|
|
- logger.info("SUCCESS")
|
|
100
|
+ logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
|
93
|
101
|
})
|
94
|
102
|
|
95
|
103
|
tvSriesFuture
|
|
@@ -98,7 +106,6 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
98
|
106
|
|
99
|
107
|
private def getTvSerieIdByPrimaryTitle(primaryTitle: String): Future[Option[String]] = {
|
100
|
108
|
val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics)
|
101
|
|
-
|
102
|
109
|
val tvSerieIdFuture: Future[Option[String]] = source
|
103
|
110
|
.filter((rowMap: Map[String, String]) => {
|
104
|
111
|
rowMap.getOrElse(key = "primaryTitle", default = "") == primaryTitle
|
|
@@ -142,6 +149,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
142
|
149
|
|
143
|
150
|
//futures chaining
|
144
|
151
|
logger.info("STEP 1/3 START")
|
|
152
|
+ val start1: Long = System.currentTimeMillis()
|
145
|
153
|
val tvSerieIDFuture: Future[Option[String]] = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
|
146
|
154
|
|
147
|
155
|
val finalFuture: Future[IndexedSeq[Person]] =
|
|
@@ -149,27 +157,29 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
149
|
157
|
case Failure(exception) => logger.error(s"$exception")
|
150
|
158
|
case Success(value: Option[String]) =>
|
151
|
159
|
logger.info(s"TvSerie ID: $value")
|
152
|
|
- logger.info("STEP 1/3 END")
|
|
160
|
+ logger.info(s"STEP 1/3 END, elapsed time:${(System.currentTimeMillis() - start1) / 1000} sec")
|
153
|
161
|
})
|
154
|
162
|
.flatMap({
|
155
|
163
|
_ =>
|
156
|
164
|
logger.info("STEP 2/3 START")
|
|
165
|
+ val start2: Long = System.currentTimeMillis()
|
157
|
166
|
val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieIdFuture = tvSerieIDFuture)
|
158
|
167
|
listPersonIDsFuture.andThen({
|
159
|
168
|
case Failure(exception) => logger.error(s"$exception")
|
160
|
169
|
case Success(value) =>
|
161
|
170
|
value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
|
162
|
|
- logger.info("STEP 2/3 END")
|
|
171
|
+ logger.info(s"STEP 2/3 END, elapsed time:${(System.currentTimeMillis() - start2) / 1000} sec")
|
163
|
172
|
})
|
164
|
173
|
.flatMap({
|
165
|
174
|
_ =>
|
166
|
175
|
logger.info("STEP 3/3 START")
|
|
176
|
+ val start3: Long = System.currentTimeMillis()
|
167
|
177
|
val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDsFuture = listPersonIDsFuture)
|
168
|
178
|
personsTeamFuture.andThen({
|
169
|
179
|
case Failure(exception) => logger.error(s"$exception")
|
170
|
180
|
case Success(value: IndexedSeq[Person]) =>
|
171
|
181
|
value.foreach((person: Person) => logger.info(s"${person.toString}"))
|
172
|
|
- logger.info("STEP 3/3 END")
|
|
182
|
+ logger.info(s"STEP 3/3 END, elapsed time:${(System.currentTimeMillis() - start3) / 1000} sec")
|
173
|
183
|
})
|
174
|
184
|
})
|
175
|
185
|
})
|
|
@@ -190,32 +200,35 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
|
190
|
200
|
|
191
|
201
|
override def getPersonsForTvSerieByTvSerieIdFuture(tvSerieId: String): Future[IndexedSeq[Person]] = {
|
192
|
202
|
|
193
|
|
- val tvSerieIdFuture: Future[Option[String]] = getTvSerieIDFuture(tvSerieId = tvSerieId)
|
|
203
|
+ val start1: Long = System.currentTimeMillis()
|
194
|
204
|
logger.info("STEP 1/3")
|
|
205
|
+ val tvSerieIdFuture: Future[Option[String]] = getTvSerieIDFuture(tvSerieId = tvSerieId)
|
195
|
206
|
tvSerieIdFuture.andThen({
|
196
|
207
|
case Failure(exception) => logger.error(s"${exception.printStackTrace()}")
|
197
|
208
|
case Success(value:Option[String]) =>
|
198
|
209
|
logger.info(s"$value")
|
199
|
|
- logger.info("STEP 1/3")
|
|
210
|
+ logger.info(s"STEP 1/3 END, elapsed time:${(System.currentTimeMillis() - start1) / 1000} sec")
|
200
|
211
|
|
201
|
212
|
})
|
202
|
213
|
.flatMap({
|
203
|
214
|
_ => logger.info("STEP 2/3")
|
|
215
|
+ val start2: Long = System.currentTimeMillis()
|
204
|
216
|
val listOfPersonsIDsFuture : Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieIdFuture = tvSerieIdFuture)
|
205
|
217
|
listOfPersonsIDsFuture.andThen({
|
206
|
218
|
case Failure(exception) => logger.error(s"${exception.printStackTrace()}")
|
207
|
219
|
case Success(value) =>
|
208
|
220
|
value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
|
209
|
|
- logger.info("STEP 2/3 END")
|
|
221
|
+ logger.info(s"STEP 2/3 END, elapsed time:${(System.currentTimeMillis() - start2) / 1000} sec")
|
210
|
222
|
})
|
211
|
223
|
.flatMap({
|
212
|
224
|
_ => logger.info("STEP 3/3")
|
|
225
|
+ val start3: Long = System.currentTimeMillis()
|
213
|
226
|
val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDsFuture = listOfPersonsIDsFuture)
|
214
|
227
|
personsTeamFuture.andThen({
|
215
|
228
|
case Failure(exception) => logger.error(s"$exception")
|
216
|
229
|
case Success(value: IndexedSeq[Person]) =>
|
217
|
230
|
value.foreach((person: Person) => logger.info(s"${person.toString}"))
|
218
|
|
- logger.info("STEP 3/3 END")
|
|
231
|
+ logger.info(s"STEP 3/3 END, elapsed time:${(System.currentTimeMillis() - start3) / 1000} sec")
|
219
|
232
|
})
|
220
|
233
|
})
|
221
|
234
|
})
|