| 
				
			 | 
			
			
				@@ -4,9 +4,9 @@ import akka.actor.ActorSystem 
			 | 
		
	
		
			
			| 
				4
			 | 
			
				4
			 | 
			
			
				 import akka.stream.scaladsl.{Flow, Sink, Source} 
			 | 
		
	
		
			
			| 
				5
			 | 
			
				5
			 | 
			
			
				 import akka.{Done, NotUsed} 
			 | 
		
	
		
			
			| 
				6
			 | 
			
				6
			 | 
			
			
				 import com.typesafe.scalalogging.slf4j.Logger 
			 | 
		
	
		
			
			| 
				7
			 | 
			
				
			 | 
			
			
				-import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.titleBasics 
			 | 
		
	
		
			
			| 
				8
			 | 
			
				
			 | 
			
			
				-import fr.natan.akkastreamfileprocessingapi.models.TvSeries 
			 | 
		
	
		
			
			| 
				9
			 | 
			
				
			 | 
			
			
				-import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAndValidateSource, buildTvSerieFlow, buildTvSeriesSink, buildfilterByMoviePrimaryTitleFlow} 
			 | 
		
	
		
			
			| 
				
			 | 
			
				7
			 | 
			
			
				+import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics} 
			 | 
		
	
		
			
			| 
				
			 | 
			
				8
			 | 
			
			
				+import fr.natan.akkastreamfileprocessingapi.models.{Person, TvSeries} 
			 | 
		
	
		
			
			| 
				
			 | 
			
				9
			 | 
			
			
				+import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAndValidateSource, buildPersonFlow, buildPersonSink, buildSource, buildTvSerieFlow, buildTvSeriesSink, buildfilterByMoviePrimaryTitleFlow} 
			 | 
		
	
		
			
			| 
				10
			 | 
			
				10
			 | 
			
			
				 import org.slf4j.LoggerFactory 
			 | 
		
	
		
			
			| 
				11
			 | 
			
				11
			 | 
			
			
				 import org.springframework.stereotype.Component 
			 | 
		
	
		
			
			| 
				12
			 | 
			
				12
			 | 
			
			
				  
			 | 
		
	
	
		
			
			| 
				
			 | 
			
			
				@@ -43,16 +43,17 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing { 
			 | 
		
	
		
			
			| 
				43
			 | 
			
				43
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				44
			 | 
			
				44
			 | 
			
			
				   override def getMoviesByTitle(moviePrimaryTitle: String): Unit = { 
			 | 
		
	
		
			
			| 
				45
			 | 
			
				45
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				46
			 | 
			
				
			 | 
			
			
				-    val source: Source[String, NotUsed] = buildAndValidateSource(inputFile = titleBasics) 
			 | 
		
	
		
			
			| 
				47
			 | 
			
				
			 | 
			
			
				-    val sink: Sink[TvSeries, Future[Done]] = buildTvSeriesSink(logger = logger) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				46
			 | 
			
			
				+    val tvSeriesSource: Source[String, NotUsed] = buildAndValidateSource(inputFile = titleBasics) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				47
			 | 
			
			
				+    val tvSeriesSink: Sink[TvSeries, Future[Done]] = buildTvSeriesSink(logger = logger) 
			 | 
		
	
		
			
			| 
				48
			 | 
			
				48
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				49
			 | 
			
				49
			 | 
			
			
				     val filterByMovieTitleFlow: Flow[String, TvSeries, NotUsed] = 
			 | 
		
	
		
			
			| 
				50
			 | 
			
				50
			 | 
			
			
				       buildfilterByMoviePrimaryTitleFlow(moviePrimaryTitle = moviePrimaryTitle) 
			 | 
		
	
		
			
			| 
				51
			 | 
			
				51
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				52
			 | 
			
				52
			 | 
			
			
				     val startTime: Long = System.currentTimeMillis() 
			 | 
		
	
		
			
			| 
				53
			 | 
			
				
			 | 
			
			
				-    val listTvSeries: Future[Done]= source.via(flow = filterByMovieTitleFlow) 
			 | 
		
	
		
			
			| 
				54
			 | 
			
				
			 | 
			
			
				-      .runWith(sink = sink) 
			 | 
		
	
		
			
			| 
				55
			 | 
			
				
			 | 
			
			
				-    .andThen { 
			 | 
		
	
		
			
			| 
				
			 | 
			
				53
			 | 
			
			
				+    val listTvSeries: Future[Done]= tvSeriesSource 
			 | 
		
	
		
			
			| 
				
			 | 
			
				54
			 | 
			
			
				+      .via(flow = filterByMovieTitleFlow) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				55
			 | 
			
			
				+      .runWith(sink = tvSeriesSink) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				56
			 | 
			
			
				+      .andThen { 
			 | 
		
	
		
			
			| 
				56
			 | 
			
				57
			 | 
			
			
				       case Success(value) => 
			 | 
		
	
		
			
			| 
				57
			 | 
			
				58
			 | 
			
			
				         val elapsedTime: Long = (System.currentTimeMillis() - startTime) / 1000 
			 | 
		
	
		
			
			| 
				58
			 | 
			
				59
			 | 
			
			
				         logger.info(s"$value: successfully processing file, elapsed time $titleBasics: $elapsedTime sec") 
			 | 
		
	
	
		
			
			| 
				
			 | 
			
			
				@@ -61,7 +62,18 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing { 
			 | 
		
	
		
			
			| 
				61
			 | 
			
				62
			 | 
			
			
				   } 
			 | 
		
	
		
			
			| 
				62
			 | 
			
				63
			 | 
			
			
				  
			 | 
		
	
		
			
			| 
				63
			 | 
			
				64
			 | 
			
			
				   override def getAllPersons(): Unit = { 
			 | 
		
	
		
			
			| 
				64
			 | 
			
				
			 | 
			
			
				-    ??? 
			 | 
		
	
		
			
			| 
				
			 | 
			
				65
			 | 
			
			
				+    val personSource: Source[String, NotUsed]= buildSource(inputFile = nameBasics) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				66
			 | 
			
			
				+    val personSink: Sink[Person, Future[Done]] = buildPersonSink(logger = logger) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				67
			 | 
			
			
				+ 
			 | 
		
	
		
			
			| 
				
			 | 
			
				68
			 | 
			
			
				+    //graph 
			 | 
		
	
		
			
			| 
				
			 | 
			
				69
			 | 
			
			
				+    personSource 
			 | 
		
	
		
			
			| 
				
			 | 
			
				70
			 | 
			
			
				+      .via(flow = buildPersonFlow(personID = "nconst")) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				71
			 | 
			
			
				+      .runWith(sink = personSink) 
			 | 
		
	
		
			
			| 
				
			 | 
			
				72
			 | 
			
			
				+      .andThen { 
			 | 
		
	
		
			
			| 
				
			 | 
			
				73
			 | 
			
			
				+        case Success(value) => 
			 | 
		
	
		
			
			| 
				
			 | 
			
				74
			 | 
			
			
				+          logger.info(s"$value: successfully processing file") 
			 | 
		
	
		
			
			| 
				
			 | 
			
				75
			 | 
			
			
				+        case Failure(error: Error) => logger.error(s"$error") 
			 | 
		
	
		
			
			| 
				
			 | 
			
				76
			 | 
			
			
				+      } 
			 | 
		
	
		
			
			| 
				65
			 | 
			
				77
			 | 
			
			
				   } 
			 | 
		
	
		
			
			| 
				66
			 | 
			
				78
			 | 
			
			
				 } 
			 | 
		
	
		
			
			| 
				67
			 | 
			
				79
			 | 
			
			
				  
			 |