|
@@ -49,30 +49,30 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
49
|
49
|
persons.onComplete({
|
50
|
50
|
case Failure(exception) => logger.error(s"$exception")
|
51
|
51
|
case Success(value: IndexedSeq[Person]) =>
|
52
|
|
- value.foreach((person: Person)=>logger.info(s"$person"))
|
|
52
|
+ value.foreach((person: Person) => logger.info(s"$person"))
|
53
|
53
|
logger.info("SUCCESS")
|
54
|
54
|
})
|
55
|
55
|
|
56
|
56
|
persons
|
57
|
57
|
}
|
58
|
58
|
|
59
|
|
- override def getTvSerieByPrimaryTitle(tvSeriePrimaryTitle: String):Future[IndexedSeq[TvSeries]] = {
|
|
59
|
+ override def getTvSerieByPrimaryTitle(tvSeriePrimaryTitle: String): Future[IndexedSeq[TvSeries]] = {
|
60
|
60
|
|
61
|
61
|
val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
|
62
|
62
|
|
63
|
63
|
val filterByTvSerieTitle: Flow[Map[String, String], TvSeries, NotUsed] =
|
64
|
64
|
filterByTvSeriePrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle)
|
65
|
65
|
|
66
|
|
- val tvSries :Future[IndexedSeq[TvSeries]] = tvSeriesSource
|
|
66
|
+ val tvSries: Future[IndexedSeq[TvSeries]] = tvSeriesSource
|
67
|
67
|
.via(flow = filterByTvSerieTitle)
|
68
|
68
|
.runWith(Sink.collection)
|
69
|
69
|
|
70
|
|
- tvSries.onComplete({
|
|
70
|
+ tvSries.onComplete({
|
71
|
71
|
case Failure(exception) => logger.info(s"$exception")
|
72
|
72
|
case Success(value: IndexedSeq[TvSeries]) =>
|
73
|
|
- value.foreach((tvSrie: TvSeries)=>logger.info(s"$tvSrie"))
|
|
73
|
+ value.foreach((tvSrie: TvSeries) => logger.info(s"$tvSrie"))
|
74
|
74
|
logger.info("SUCCESS")
|
75
|
|
- })
|
|
75
|
+ })
|
76
|
76
|
|
77
|
77
|
tvSries
|
78
|
78
|
}
|
|
@@ -90,13 +90,13 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
90
|
90
|
res
|
91
|
91
|
}
|
92
|
92
|
|
93
|
|
- private def getListOfPersonsIDByTvSerieID(tvSerieID: Future[Option[String]]): Future[IndexedSeq[Option[String]]]={
|
|
93
|
+ private def getListOfPersonsIDByTvSerieID(tvSerieID: Future[Option[String]]): Future[IndexedSeq[Option[String]]] = {
|
94
|
94
|
val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics)
|
95
|
95
|
val res: Future[IndexedSeq[Option[String]]] = source
|
96
|
|
- .filter((rowMaps: Map[String, String])=>{
|
97
|
|
- rowMaps.getOrElse(key = "tconst", default = "")==tvSerieID.value.get.get.get
|
|
96
|
+ .filter((rowMaps: Map[String, String]) => {
|
|
97
|
+ rowMaps.getOrElse(key = "tconst", default = "") == tvSerieID.value.get.get.get
|
98
|
98
|
})
|
99
|
|
- .map((rowMap: Map[String, String])=>{
|
|
99
|
+ .map((rowMap: Map[String, String]) => {
|
100
|
100
|
rowMap.get(key = "nconst")
|
101
|
101
|
})
|
102
|
102
|
.runWith(Sink.collection)
|
|
@@ -107,53 +107,58 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
|
107
|
107
|
private def getListOfPersonsForTvSerie(listPersonsIDs: Future[IndexedSeq[Option[String]]]): Future[IndexedSeq[Person]] = {
|
108
|
108
|
val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
109
|
109
|
|
110
|
|
- val res : Future[IndexedSeq[Person]]=
|
|
110
|
+ val res: Future[IndexedSeq[Person]] =
|
111
|
111
|
source
|
112
|
|
- .filter((rowMaps: Map[String, String])=>{
|
|
112
|
+ .filter((rowMaps: Map[String, String]) => {
|
113
|
113
|
listPersonsIDs.value.get.get.contains(rowMaps.get(key = "nconst"))
|
114
|
114
|
})
|
115
|
|
- .map((rowMap: Map[String, String])=>{
|
116
|
|
- buildPersonModel(rowMap)
|
117
|
|
- })
|
118
|
|
- .runWith(Sink.collection)
|
|
115
|
+ .map((rowMap: Map[String, String]) => {
|
|
116
|
+ buildPersonModel(rowMap)
|
|
117
|
+ })
|
|
118
|
+ .runWith(Sink.collection)
|
119
|
119
|
|
120
|
120
|
res
|
121
|
121
|
}
|
122
|
|
- override def getPersonsTeamForTvSerie(tvSeriePrimaryTitle: String):Future[IndexedSeq[Person]]={
|
123
|
122
|
|
124
|
|
- logger.info("STEP 1/3")
|
125
|
|
- val tvSerieID : Future[Option[String]]= getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
|
126
|
|
- tvSerieID.andThen({
|
127
|
|
- case Failure(exception) => logger.error(s"!!!!!!!!!!!!!!!!!!!!1$exception")
|
|
123
|
+ override def getTeamOfPersonsForTvSerie(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]] = {
|
|
124
|
+
|
|
125
|
+ //futures chaining
|
|
126
|
+ logger.info("STEP 1/3 START")
|
|
127
|
+ val tvSerieIDFuture: Future[Option[String]] = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
|
|
128
|
+
|
|
129
|
+ val finalResult: Future[IndexedSeq[Person]] = tvSerieIDFuture.andThen({
|
|
130
|
+ case Failure(exception) => logger.error(s"$exception")
|
128
|
131
|
case Success(value: Option[String]) =>
|
129
|
132
|
logger.info(s"TvSerie ID: $value")
|
130
|
|
- logger.info("END STEP 1/3")
|
131
|
|
- }).flatMap({
|
132
|
|
- future =>
|
133
|
|
- logger.info("STEP 2/3")
|
134
|
|
- val listPersonIDs: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieID = tvSerieID)
|
135
|
|
- listPersonIDs.andThen({
|
136
|
|
- case Failure(exception) => logger.error(s"!!!!!!!!!!!!!!!!!!!!!2$exception")
|
137
|
|
- case Success(value) =>
|
138
|
|
- value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
|
139
|
|
- logger.info("END STEP 2/3")
|
140
|
|
- }).flatMap({
|
141
|
|
- future =>
|
142
|
|
- logger.info("STEP 3/3")
|
143
|
|
- val personsTeam: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDs = listPersonIDs)
|
144
|
|
- personsTeam.andThen({
|
145
|
|
- case Failure(exception) => logger.error(s"!!!!!!!!!!!!!!!!!!!!!!!3$exception")
|
146
|
|
- case Success(value) =>
|
147
|
|
- logger.info(s"$value")
|
148
|
|
- value.foreach((person: Person) => logger.info(s"${person.toString}"))
|
149
|
|
- logger.info("END STEP 3/3")
|
150
|
|
- })
|
151
|
|
- })
|
|
133
|
+ logger.info("STEP 1/3 END")
|
152
|
134
|
})
|
|
135
|
+ .flatMap({
|
|
136
|
+ _ =>
|
|
137
|
+ logger.info("STEP 2/3 START")
|
|
138
|
+ val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieID = tvSerieIDFuture)
|
|
139
|
+ listPersonIDsFuture.andThen({
|
|
140
|
+ case Failure(exception) => logger.error(s"$exception")
|
|
141
|
+ case Success(value) =>
|
|
142
|
+ value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
|
|
143
|
+ logger.info("STEP 2/3 END")
|
|
144
|
+ })
|
|
145
|
+ .flatMap({
|
|
146
|
+ future =>
|
|
147
|
+ logger.info("STEP 3/3 START")
|
|
148
|
+ val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDs = listPersonIDsFuture)
|
|
149
|
+ personsTeamFuture.andThen({
|
|
150
|
+ case Failure(exception) => logger.error(s"$exception")
|
|
151
|
+ case Success(value: IndexedSeq[Person]) =>
|
|
152
|
+ value.foreach((person: Person) => logger.info(s"${person.toString}"))
|
|
153
|
+ logger.info("STEP 3/3 END")
|
|
154
|
+ })
|
|
155
|
+ })
|
|
156
|
+ })
|
153
|
157
|
|
154
|
|
- null
|
|
158
|
+ finalResult
|
155
|
159
|
}
|
156
|
160
|
|
|
161
|
+
|
157
|
162
|
override def getAllPersons(): Unit = {
|
158
|
163
|
val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
|
159
|
164
|
//graph
|