|
@@ -1,10 +1,12 @@
|
1
|
1
|
package fr.natan.akkastreamfileprocessingapi.service
|
2
|
2
|
|
3
|
3
|
import akka.actor.ActorSystem
|
|
4
|
+import akka.stream.ActorAttributes.supervisionStrategy
|
|
5
|
+import akka.stream.Supervision.resumingDecider
|
4
|
6
|
import akka.stream.scaladsl.{Flow, Sink, Source}
|
5
|
7
|
import akka.{Done, NotUsed}
|
6
|
8
|
import com.typesafe.scalalogging.slf4j.Logger
|
7
|
|
-import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics}
|
|
9
|
+import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics}
|
8
|
10
|
import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSeries}
|
9
|
11
|
import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.buildTvSerieModel
|
10
|
12
|
import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAllPersonsSink, buildAndValidateSource, buildPersonFlow, buildPersonSink, buildSource, buildTvSerieFlow, buildTvSeriesSink, filterByPersonIdFlow, filterByPersonNameFlow, filterByTvSeriePrimaryTitleFlow}
|
|
@@ -14,7 +16,9 @@ import org.springframework.stereotype.Component
|
14
|
16
|
import scala.concurrent.ExecutionContext.Implicits.global
|
15
|
17
|
import scala.concurrent.duration.DurationInt
|
16
|
18
|
import scala.concurrent.{Await, Future}
|
|
19
|
+import scala.language.postfixOps
|
17
|
20
|
import scala.util.{Failure, Success}
|
|
21
|
+import scala.collection.immutable.Iterable
|
18
|
22
|
|
19
|
23
|
@Component
|
20
|
24
|
class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
|
@@ -49,7 +53,8 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
49
|
53
|
.andThen {
|
50
|
54
|
case Success(value) =>
|
51
|
55
|
val elapsedTime: Long = (System.currentTimeMillis()-startTime)/1000
|
52
|
|
- logger.info(s"$value: Successfully processed, elapsed time: $elapsedTime")
|
|
56
|
+ logger.info(s"$value")
|
|
57
|
+ logger.info(s"Successfully processed, elapsed time: $elapsedTime")
|
53
|
58
|
case Failure(exception) => logger.error(s"$exception: Fail")
|
54
|
59
|
}
|
55
|
60
|
|
|
@@ -111,27 +116,59 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
111
|
116
|
}
|
112
|
117
|
|
113
|
118
|
getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
|
|
119
|
+
|
114
|
120
|
}
|
115
|
121
|
|
116
|
|
- private def getTvSerieIdByPrimaryTitle(primaryTitle: String): String ={
|
117
|
|
- val source: Source[Map[String, String],_]= buildSource(inputFile = titleBasics)
|
|
122
|
+ private def getTvSerieIdByPrimaryTitle(primaryTitle: String): String = {
|
|
123
|
+ val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics)
|
118
|
124
|
|
119
|
|
- var tvSerie: TvSeries =null
|
120
|
|
- val res : Future[Option[String]]= source
|
121
|
|
- .filter((rowMap: Map[String, String]) =>{
|
122
|
|
- rowMap.getOrElse(key = "primaryTitle","")==primaryTitle
|
|
125
|
+ var tvSerie: TvSeries = null
|
|
126
|
+ val res: Future[Option[String]] = source
|
|
127
|
+ .filter((rowMap: Map[String, String]) => {
|
|
128
|
+ rowMap.getOrElse(key = "primaryTitle", "") == primaryTitle
|
123
|
129
|
})
|
124
|
|
- .map((rowMap: Map[String, String])=>{
|
|
130
|
+ .map((rowMap: Map[String, String]) => {
|
125
|
131
|
tvSerie = buildTvSerieModel(rowMap)
|
126
|
132
|
rowMap.get("tconst")
|
127
|
133
|
})
|
128
|
134
|
.runWith(Sink.head)
|
129
|
135
|
|
130
|
|
- Await.result(res,1 minutes)
|
131
|
|
- val tconst:String = res.value.get.get.get
|
|
136
|
+ Await.result(res, 1 minutes)
|
|
137
|
+ val tconst: String = res.value.get.get.get
|
|
138
|
+ logger.info(s"TvSerie ID: $tconst, $tvSerie")
|
132
|
139
|
tconst
|
133
|
140
|
}
|
134
|
141
|
|
|
142
|
+ def getListOfPersonsIDByTvSerieID(tconst: String): List[Option[String]]={
|
|
143
|
+ val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics)
|
|
144
|
+ var tvSeries: Iterable[TvSeries]=null
|
|
145
|
+
|
|
146
|
+ val startTime : Long = System.currentTimeMillis()
|
|
147
|
+ val res: Future[IndexedSeq[Option[String]]] = source
|
|
148
|
+ .filter((rowMaps: Map[String, String])=>{
|
|
149
|
+ rowMaps.getOrElse(key = "tconst", default = "")==tconst
|
|
150
|
+ })
|
|
151
|
+ .map((rowMap: Map[String, String])=>{
|
|
152
|
+ rowMap.get(key = "nconst")
|
|
153
|
+ })
|
|
154
|
+ .withAttributes(supervisionStrategy(resumingDecider))
|
|
155
|
+ .runWith(Sink.collection)
|
|
156
|
+ .andThen {
|
|
157
|
+ case Success(value) =>
|
|
158
|
+ logger.info(s"$value")
|
|
159
|
+ val elapsedTime : Long = (System.currentTimeMillis() - startTime)/1000
|
|
160
|
+ logger.info(s"success, elapsed time: $elapsedTime")
|
|
161
|
+
|
|
162
|
+ case Failure(error: Error) => logger.error(s"$error")
|
|
163
|
+ }
|
|
164
|
+
|
|
165
|
+ Await.result(res,5 minutes)
|
|
166
|
+ val listPersonsID: List[Option[String]] = res.value.get.get.toList
|
|
167
|
+
|
|
168
|
+ listPersonsID
|
|
169
|
+
|
|
170
|
+ }
|
|
171
|
+
|
135
|
172
|
|
136
|
173
|
}
|
137
|
174
|
|