| 
				
			 | 
			
			
				@@ -1,15 +1,24 @@ 
			 | 
		
	
		
			
			| 
				1
			 | 
			
				1
			 | 
			
			
				 package fr.natan.akkastreamfileprocessingapi.service 
			 | 
		
	
		
			
			| 
				2
			 | 
			
				2
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				3
			 | 
			
				3
			 | 
			
			
				 import akka.actor.ActorSystem 
			 | 
		
	
		
			
			| 
				4
			 | 
			
				
			 | 
			
			
				-import akka.stream.ActorAttributes.supervisionStrategy 
			 | 
		
	
		
			
			| 
				5
			 | 
			
				
			 | 
			
			
				-import akka.stream.Supervision.resumingDecider 
			 | 
		
	
		
			
			| 
				6
			 | 
			
				4
			 | 
			
			
				 import akka.stream.scaladsl.{Flow, Sink, Source} 
			 | 
		
	
		
			
			| 
				7
			 | 
			
				5
			 | 
			
			
				 import akka.{Done, NotUsed} 
			 | 
		
	
		
			
			| 
				8
			 | 
			
				6
			 | 
			
			
				 import com.typesafe.scalalogging.slf4j.Logger 
			 | 
		
	
		
			
			| 
				9
			 | 
			
				7
			 | 
			
			
				 import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics} 
			 | 
		
	
		
			
			| 
				10
			 | 
			
				8
			 | 
			
			
				 import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSeries} 
			 | 
		
	
		
			
			| 
				11
			 | 
			
				
			 | 
			
			
				-import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.buildTvSerieModel 
			 | 
		
	
		
			
			| 
				12
			 | 
			
				
			 | 
			
			
				-import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAllPersonsSink, buildAndValidateSource, buildPersonFlow, buildPersonSink, buildSource, buildTvSerieFlow, buildTvSeriesSink, filterByPersonIdFlow, filterByPersonNameFlow, filterByTvSeriePrimaryTitleFlow} 
			 | 
		
	
		
			
			| 
				
			 | 
			
				9
			 | 
			
			
				+import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.{buildPersonModel, buildTvSerieModel} 
			 | 
		
	
		
			
			| 
				
			 | 
			
				10
			 | 
			
			
				+import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				11
			 | 
			
			
				+  buildAllPersonsSink, 
			 | 
		
	
		
			
			| 
				
			 | 
			
				12
			 | 
			
			
				+  buildAndValidateSource, 
			 | 
		
	
		
			
			| 
				
			 | 
			
				13
			 | 
			
			
				+  buildPersonFlow, 
			 | 
		
	
		
			
			| 
				
			 | 
			
				14
			 | 
			
			
				+  buildPersonSink, 
			 | 
		
	
		
			
			| 
				
			 | 
			
				15
			 | 
			
			
				+  buildSource, 
			 | 
		
	
		
			
			| 
				
			 | 
			
				16
			 | 
			
			
				+  buildTvSerieFlow, 
			 | 
		
	
		
			
			| 
				
			 | 
			
				17
			 | 
			
			
				+  buildTvSeriesSink, 
			 | 
		
	
		
			
			| 
				
			 | 
			
				18
			 | 
			
			
				+  filterByPersonIdFlow, 
			 | 
		
	
		
			
			| 
				
			 | 
			
				19
			 | 
			
			
				+  filterByPersonNameFlow, 
			 | 
		
	
		
			
			| 
				
			 | 
			
				20
			 | 
			
			
				+  filterByTvSeriePrimaryTitleFlow 
			 | 
		
	
		
			
			| 
				
			 | 
			
				21
			 | 
			
			
				+} 
			 | 
		
	
		
			
			| 
				13
			 | 
			
				22
			 | 
			
			
				 import org.slf4j.LoggerFactory 
			 | 
		
	
		
			
			| 
				14
			 | 
			
				23
			 | 
			
			
				 import org.springframework.stereotype.Component 
			 | 
		
	
		
			
			| 
				15
			 | 
			
				24
			 | 
			
			
				  
			 | 
		
	
	
		
			
			| 
				
			 | 
			
			
				@@ -18,7 +27,6 @@ import scala.concurrent.duration.DurationInt 
			 | 
		
	
		
			
			| 
				18
			 | 
			
				27
			 | 
			
			
				 import scala.concurrent.{Await, Future} 
			 | 
		
	
		
			
			| 
				19
			 | 
			
				28
			 | 
			
			
				 import scala.language.postfixOps 
			 | 
		
	
		
			
			| 
				20
			 | 
			
				29
			 | 
			
			
				 import scala.util.{Failure, Success} 
			 | 
		
	
		
			
			| 
				21
			 | 
			
				
			 | 
			
			
				-import scala.collection.immutable.Iterable 
			 | 
		
	
		
			
			| 
				22
			 | 
			
				30
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				23
			 | 
			
				31
			 | 
			
			
				 @Component 
			 | 
		
	
		
			
			| 
				24
			 | 
			
				32
			 | 
			
			
				 class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing { 
			 | 
		
	
	
		
			
			| 
				
			 | 
			
			
				@@ -139,9 +147,8 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing { 
			 | 
		
	
		
			
			| 
				139
			 | 
			
				147
			 | 
			
			
				     tconst 
			 | 
		
	
		
			
			| 
				140
			 | 
			
				148
			 | 
			
			
				   } 
			 | 
		
	
		
			
			| 
				141
			 | 
			
				149
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				142
			 | 
			
				
			 | 
			
			
				-  def getListOfPersonsIDByTvSerieID(tconst: String): List[Option[String]]={ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				150
			 | 
			
			
				+  private def getListOfPersonsIDByTvSerieID(tconst: String): List[Option[String]]={ 
			 | 
		
	
		
			
			| 
				143
			 | 
			
				151
			 | 
			
			
				     val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics) 
			 | 
		
	
		
			
			| 
				144
			 | 
			
				
			 | 
			
			
				-    var tvSeries: Iterable[TvSeries]=null 
			 | 
		
	
		
			
			| 
				145
			 | 
			
				152
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				146
			 | 
			
				153
			 | 
			
			
				     val startTime : Long = System.currentTimeMillis() 
			 | 
		
	
		
			
			| 
				147
			 | 
			
				154
			 | 
			
			
				     val res: Future[IndexedSeq[Option[String]]] = source 
			 | 
		
	
	
		
			
			| 
				
			 | 
			
			
				@@ -151,7 +158,6 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing { 
			 | 
		
	
		
			
			| 
				151
			 | 
			
				158
			 | 
			
			
				       .map((rowMap: Map[String, String])=>{ 
			 | 
		
	
		
			
			| 
				152
			 | 
			
				159
			 | 
			
			
				         rowMap.get(key = "nconst") 
			 | 
		
	
		
			
			| 
				153
			 | 
			
				160
			 | 
			
			
				       }) 
			 | 
		
	
		
			
			| 
				154
			 | 
			
				
			 | 
			
			
				-      .withAttributes(supervisionStrategy(resumingDecider)) 
			 | 
		
	
		
			
			| 
				155
			 | 
			
				161
			 | 
			
			
				       .runWith(Sink.collection) 
			 | 
		
	
		
			
			| 
				156
			 | 
			
				162
			 | 
			
			
				       .andThen { 
			 | 
		
	
		
			
			| 
				157
			 | 
			
				163
			 | 
			
			
				         case Success(value) => 
			 | 
		
	
	
		
			
			| 
				
			 | 
			
			
				@@ -162,13 +168,48 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing { 
			 | 
		
	
		
			
			| 
				162
			 | 
			
				168
			 | 
			
			
				         case Failure(error: Error) => logger.error(s"$error") 
			 | 
		
	
		
			
			| 
				163
			 | 
			
				169
			 | 
			
			
				       } 
			 | 
		
	
		
			
			| 
				164
			 | 
			
				170
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				165
			 | 
			
				
			 | 
			
			
				-    Await.result(res,5 minutes) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				171
			 | 
			
			
				+    Await.result(res, 3 minutes) 
			 | 
		
	
		
			
			| 
				166
			 | 
			
				172
			 | 
			
			
				     val listPersonsID: List[Option[String]] = res.value.get.get.toList 
			 | 
		
	
		
			
			| 
				167
			 | 
			
				173
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				168
			 | 
			
				174
			 | 
			
			
				     listPersonsID 
			 | 
		
	
		
			
			| 
				169
			 | 
			
				175
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				170
			 | 
			
				176
			 | 
			
			
				   } 
			 | 
		
	
		
			
			| 
				171
			 | 
			
				177
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				
			 | 
			
				178
			 | 
			
			
				+  private def getListOfPersonsForTvSerie(nconstList: List[Option[String]]): List[Person] = { 
			 | 
		
	
		
			
			| 
				
			 | 
			
				179
			 | 
			
			
				+    val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				180
			 | 
			
			
				+ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				181
			 | 
			
			
				+    val startTime: Long = System.currentTimeMillis() 
			 | 
		
	
		
			
			| 
				
			 | 
			
				182
			 | 
			
			
				+ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				183
			 | 
			
			
				+    val res : Future[IndexedSeq[Person]]= source 
			 | 
		
	
		
			
			| 
				
			 | 
			
				184
			 | 
			
			
				+      .filter((rowMap: Map[String, String])=>{ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				185
			 | 
			
			
				+        nconstList.contains(rowMap.getOrElse(key = "nconst",default = "")) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				186
			 | 
			
			
				+      }) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				187
			 | 
			
			
				+      .map((rowMap: Map[String, String])=>{ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				188
			 | 
			
			
				+        val person: Person = buildPersonModel(rowMap) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				189
			 | 
			
			
				+        person 
			 | 
		
	
		
			
			| 
				
			 | 
			
				190
			 | 
			
			
				+      }) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				191
			 | 
			
			
				+      .runWith(Sink.collection) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				192
			 | 
			
			
				+      .andThen { 
			 | 
		
	
		
			
			| 
				
			 | 
			
				193
			 | 
			
			
				+        case Failure(exception) => logger.error(s"$exception") 
			 | 
		
	
		
			
			| 
				
			 | 
			
				194
			 | 
			
			
				+        case Success(value) => 
			 | 
		
	
		
			
			| 
				
			 | 
			
				195
			 | 
			
			
				+          val elapsedTime: Long = (System.currentTimeMillis()-startTime)/1000 
			 | 
		
	
		
			
			| 
				
			 | 
			
				196
			 | 
			
			
				+          logger.info(s"$value") 
			 | 
		
	
		
			
			| 
				
			 | 
			
				197
			 | 
			
			
				+          logger.info(s"END: Successfully, elapsed time: $elapsedTime") 
			 | 
		
	
		
			
			| 
				
			 | 
			
				198
			 | 
			
			
				+      } 
			 | 
		
	
		
			
			| 
				
			 | 
			
				199
			 | 
			
			
				+ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				200
			 | 
			
			
				+    Await.result(res, 4 minutes) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				201
			 | 
			
			
				+    val listPersons: List[Person] = res.value.get.get.toList 
			 | 
		
	
		
			
			| 
				
			 | 
			
				202
			 | 
			
			
				+ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				203
			 | 
			
			
				+    listPersons 
			 | 
		
	
		
			
			| 
				
			 | 
			
				204
			 | 
			
			
				+ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				205
			 | 
			
			
				+  } 
			 | 
		
	
		
			
			| 
				
			 | 
			
				206
			 | 
			
			
				+ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				207
			 | 
			
			
				+  override def generalize(tvSeriePrimaryTitle: String): Unit={ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				208
			 | 
			
			
				+ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				209
			 | 
			
			
				+    val tvSerieID: String = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				210
			 | 
			
			
				+    val listPersonIDs: List[Option[String]] = getListOfPersonsIDByTvSerieID(tconst = tvSerieID) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				211
			 | 
			
			
				+    getListOfPersonsForTvSerie(nconstList = listPersonIDs) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				212
			 | 
			
			
				+  } 
			 | 
		
	
		
			
			| 
				172
			 | 
			
				213
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				173
			 | 
			
				214
			 | 
			
			
				 } 
			 | 
		
	
		
			
			| 
				174
			 | 
			
				215
			 | 
			
			
				  
			 |