|
@@ -44,8 +44,6 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
44
|
44
|
|
45
|
45
|
override def getPersonById(nconst: String): Future[Person] = {
|
46
|
46
|
val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
47
|
|
-
|
48
|
|
-
|
49
|
47
|
val res = source
|
50
|
48
|
.via(flow = filterByPersonIdFlow(nconst = nconst))
|
51
|
49
|
.runWith(Sink.head)
|
|
@@ -102,7 +100,6 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
102
|
100
|
case Failure(exception) => logger.info(s"$exception")
|
103
|
101
|
case Success(value) => logger.info(s"$value")
|
104
|
102
|
}
|
105
|
|
-
|
106
|
103
|
}
|
107
|
104
|
|
108
|
105
|
private def getTvSerieIdByPrimaryTitle(primaryTitle: String): Future[Option[String]] = {
|
|
@@ -112,19 +109,16 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
112
|
109
|
.filter((rowMap: Map[String, String]) => {
|
113
|
110
|
rowMap.getOrElse(key = "primaryTitle", "") == primaryTitle
|
114
|
111
|
})
|
115
|
|
- .map((rowMap: Map[String, String]) => {
|
116
|
|
- rowMap.get("tconst")
|
117
|
|
- })
|
|
112
|
+ .map((rowMap: Map[String, String]) => rowMap.get("tconst"))
|
118
|
113
|
.runWith(Sink.head)
|
119
|
114
|
|
120
|
115
|
res
|
121
|
|
-
|
122
|
116
|
}
|
123
|
117
|
|
124
|
|
- private def getListOfPersonsIDByTvSerieID(tvSerieID: Future[Option[String]]): Future[IndexedSeq[Option[String]]]={
|
|
118
|
+ private def getListOfPersonsIDByTvSerieID(tvSerieID: Future[Option[String]]): Future[List[Option[String]]]={
|
125
|
119
|
val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics)
|
126
|
120
|
|
127
|
|
- val res: Future[IndexedSeq[Option[String]]] = source
|
|
121
|
+ val res: Future[List[Option[String]]] = source
|
128
|
122
|
.filter((rowMaps: Map[String, String])=>{
|
129
|
123
|
rowMaps.getOrElse(key = "tconst", default = "")==tvSerieID.value.get.get.get
|
130
|
124
|
})
|
|
@@ -132,15 +126,13 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
132
|
126
|
rowMap.get(key = "nconst")
|
133
|
127
|
})
|
134
|
128
|
.runWith(Sink.collection)
|
135
|
|
-
|
136
|
129
|
res
|
137
|
|
-
|
138
|
130
|
}
|
139
|
131
|
|
140
|
|
- private def getListOfPersonsForTvSerie(listPersonsIDs: Future[IndexedSeq[Option[String]]]): Future[IndexedSeq[Person]] = {
|
|
132
|
+ private def getListOfPersonsForTvSerie(listPersonsIDs: Future[List[Option[String]]]): Future[List[Person]] = {
|
141
|
133
|
val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
142
|
134
|
|
143
|
|
- val res : Future[IndexedSeq[Person]]=
|
|
135
|
+ val res : Future[List[Person]]=
|
144
|
136
|
source
|
145
|
137
|
.filter((rowMaps: Map[String, String])=>{
|
146
|
138
|
listPersonsIDs.value.get.get.contains(rowMaps.get(key = "nconst"))
|
|
@@ -152,37 +144,37 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
152
|
144
|
|
153
|
145
|
res
|
154
|
146
|
}
|
155
|
|
- override def getPersonsTeamForTvSerie(tvSeriePrimaryTitle: String):Unit={
|
|
147
|
+ override def getPersonsTeamForTvSerie(tvSeriePrimaryTitle: String):Future[List[Person]]={
|
156
|
148
|
|
157
|
149
|
logger.info("STEP 1/3")
|
158
|
150
|
val tvSerieID = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
|
159
|
|
- tvSerieID.onComplete({
|
160
|
|
- case Failure(exception) => logger.error(s"$exception")
|
|
151
|
+ tvSerieID.andThen({
|
|
152
|
+ case Failure(exception) => logger.error(s"!!!!!!!!!!!!!!!!!!!!1$exception")
|
161
|
153
|
case Success(value: Option[String]) =>
|
162
|
154
|
logger.info(s"TvSerie ID: $value")
|
163
|
155
|
logger.info("END STEP 1/3")
|
164
|
156
|
})
|
165
|
157
|
logger.info("STEP 2/3")
|
166
|
|
- val listPersonIDs = getListOfPersonsIDByTvSerieID(tvSerieID = tvSerieID)
|
167
|
|
- listPersonIDs.onComplete({
|
168
|
|
- case Failure(exception) => logger.error(s"$exception")
|
169
|
|
- case Success(value) =>
|
170
|
|
- value.toList.foreach((personID: Option[String])=>logger.info(s"Person ID:$personID"))
|
|
158
|
+ val listPersonIDs: Future[List[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieID = tvSerieID)
|
|
159
|
+ listPersonIDs.andThen({
|
|
160
|
+ case Failure(exception) => logger.error(s"!!!!!!!!!!!!!!!!!!!!!2$exception")
|
|
161
|
+ case Success(value: List[Option[String]]) =>
|
|
162
|
+ value.foreach((personID:Option[String])=>logger.info(s"Person ID:$personID"))
|
171
|
163
|
logger.info("END STEP 2/3")
|
172
|
164
|
})
|
173
|
165
|
|
174
|
166
|
logger.info("STEP 3/3")
|
175
|
|
- val personsTeam= getListOfPersonsForTvSerie(listPersonsIDs = listPersonIDs)
|
176
|
|
- personsTeam.onComplete({
|
177
|
|
- case Failure(exception) => logger.error(s"$exception")
|
|
167
|
+ val personsTeam: Future[List[Person]] = getListOfPersonsForTvSerie(listPersonsIDs = listPersonIDs)
|
|
168
|
+ personsTeam.andThen({
|
|
169
|
+ case Failure(exception) => logger.error(s"!!!!!!!!!!!!!!!!!!!!!!!3$exception")
|
178
|
170
|
case Success(value) =>
|
179
|
|
- logger.info(s"${value.toList}")
|
180
|
|
- value.toList.foreach((person:Person)=>logger.info(s"${person.toString}"))
|
|
171
|
+ logger.info(s"$value")
|
|
172
|
+ value.foreach((person:Person)=>logger.info(s"${person.toString}"))
|
181
|
173
|
logger.info("END STEP 3/3")
|
182
|
174
|
})
|
183
|
175
|
|
|
176
|
+ null
|
184
|
177
|
}
|
185
|
|
-
|
186
|
178
|
}
|
187
|
179
|
|
188
|
180
|
|