|  | @@ -7,11 +7,9 @@ import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, t
 | 
	
		
			
			| 7 | 7 |  import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSerie}
 | 
	
		
			
			| 8 | 8 |  import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{
 | 
	
		
			
			| 9 | 9 |    actorSystem, buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow, buildSource,
 | 
	
		
			
			| 10 |  | -  buildTvSerieFlow, filterPersonByIdFlow, filterPersonByNameFlow, filterTvSerieByIdFlow, filterTvSerieByPrimaryTitleFlow
 | 
	
		
			
			| 11 |  | -}
 | 
	
		
			
			|  | 10 | +  buildTvSerieFlow, filterPersonByIdFlow, filterPersonByNameFlow, filterTvSerieByIdFlow, filterTvSerieByPrimaryTitleFlow}
 | 
	
		
			
			| 12 | 11 |  import fr.natan.akkastreamfileprocessingapi.service.UtilitiesClass.{
 | 
	
		
			
			| 13 |  | -  getListOfPersonsForTvSerie, getListOfPersonsIDByTvSerieID, getTvSerieIDFuture, getTvSerieIdByPrimaryTitle
 | 
	
		
			
			| 14 |  | -}
 | 
	
		
			
			|  | 12 | +  getListOfPersonsForTvSerie, getListOfPersonsIDByTvSerieID, getTvSerieIDFuture, getTvSerieIdByPrimaryTitle}
 | 
	
		
			
			| 15 | 13 |  import org.slf4j.LoggerFactory
 | 
	
		
			
			| 16 | 14 |  import org.springframework.stereotype.Component
 | 
	
		
			
			| 17 | 15 |  
 | 
	
	
		
			
			|  | @@ -32,11 +30,11 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
 | 
	
		
			
			| 32 | 30 |      val start: Long = System.currentTimeMillis()
 | 
	
		
			
			| 33 | 31 |      val personFuture: Future[Option[Person]] = source
 | 
	
		
			
			| 34 | 32 |        .via(flow = filterPersonByIdFlow(personID = personID))
 | 
	
		
			
			| 35 |  | -      .runWith(Sink.headOption)
 | 
	
		
			
			|  | 33 | +      .runWith(Sink.headOption[Person])
 | 
	
		
			
			| 36 | 34 |  
 | 
	
		
			
			| 37 | 35 |      personFuture.andThen({
 | 
	
		
			
			| 38 | 36 |        case Failure(exception) => logger.info(s"$exception")
 | 
	
		
			
			| 39 |  | -      case Success(value:Option[Person]) =>
 | 
	
		
			
			|  | 37 | +      case Success(value: Option[Person]) =>
 | 
	
		
			
			| 40 | 38 |          logger.info(s"$value")
 | 
	
		
			
			| 41 | 39 |          logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
 | 
	
		
			
			| 42 | 40 |      })
 | 
	
	
		
			
			|  | @@ -44,13 +42,13 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
 | 
	
		
			
			| 44 | 42 |      personFuture
 | 
	
		
			
			| 45 | 43 |    }
 | 
	
		
			
			| 46 | 44 |  
 | 
	
		
			
			| 47 |  | -  override def getPersonByNameFuture(primaryName: String):  Future[Option[Person]]= {
 | 
	
		
			
			|  | 45 | +  override def getPersonByNameFuture(primaryName: String): Future[Option[Person]] = {
 | 
	
		
			
			| 48 | 46 |      val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
 | 
	
		
			
			| 49 | 47 |  
 | 
	
		
			
			| 50 | 48 |      val start: Long = System.currentTimeMillis()
 | 
	
		
			
			| 51 |  | -    val personFuture:  Future[Option[Person]] = source
 | 
	
		
			
			|  | 49 | +    val personFuture: Future[Option[Person]] = source
 | 
	
		
			
			| 52 | 50 |        .via(flow = filterPersonByNameFlow(primaryName = primaryName))
 | 
	
		
			
			| 53 |  | -      .runWith(Sink.headOption)
 | 
	
		
			
			|  | 51 | +      .runWith(Sink.headOption[Person])
 | 
	
		
			
			| 54 | 52 |  
 | 
	
		
			
			| 55 | 53 |      personFuture
 | 
	
		
			
			| 56 | 54 |    }
 | 
	
	
		
			
			|  | @@ -59,9 +57,9 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
 | 
	
		
			
			| 59 | 57 |      val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics)
 | 
	
		
			
			| 60 | 58 |  
 | 
	
		
			
			| 61 | 59 |      val start: Long = System.currentTimeMillis()
 | 
	
		
			
			| 62 |  | -    val tvSerieFuture: Future[Option[TvSerie]]= source
 | 
	
		
			
			|  | 60 | +    val tvSerieFuture: Future[Option[TvSerie]] = source
 | 
	
		
			
			| 63 | 61 |        .via(flow = filterTvSerieByIdFlow(tvSerieID = tvSerieID))
 | 
	
		
			
			| 64 |  | -      .runWith(Sink.headOption)
 | 
	
		
			
			|  | 62 | +      .runWith(Sink.headOption[TvSerie])
 | 
	
		
			
			| 65 | 63 |  
 | 
	
		
			
			| 66 | 64 |      tvSerieFuture.onComplete({
 | 
	
		
			
			| 67 | 65 |        case Failure(exception) => logger.info(s"$exception")
 | 
	
	
		
			
			|  | @@ -71,6 +69,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
 | 
	
		
			
			| 71 | 69 |      })
 | 
	
		
			
			| 72 | 70 |      tvSerieFuture
 | 
	
		
			
			| 73 | 71 |    }
 | 
	
		
			
			|  | 72 | +
 | 
	
		
			
			| 74 | 73 |    override def getTvSeriesByPrimaryTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[TvSerie]] = {
 | 
	
		
			
			| 75 | 74 |  
 | 
	
		
			
			| 76 | 75 |      val start: Long = System.currentTimeMillis()
 | 
	
	
		
			
			|  | @@ -102,11 +101,12 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
 | 
	
		
			
			| 102 | 101 |          case Success(value: Option[String]) =>
 | 
	
		
			
			| 103 | 102 |            logger.info(s"TvSerie ID: $value")
 | 
	
		
			
			| 104 | 103 |            logger.info(s"STEP 1/3 END, elapsed time:${(System.currentTimeMillis() - start1) / 1000} sec")
 | 
	
		
			
			|  | 104 | +
 | 
	
		
			
			| 105 | 105 |        })
 | 
	
		
			
			| 106 | 106 |          .flatMap({
 | 
	
		
			
			| 107 | 107 |            _ =>
 | 
	
		
			
			| 108 | 108 |              logger.info("STEP 2/3 START")
 | 
	
		
			
			| 109 |  | -            val  start2: Long = System.currentTimeMillis()
 | 
	
		
			
			|  | 109 | +            val start2: Long = System.currentTimeMillis()
 | 
	
		
			
			| 110 | 110 |              val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(
 | 
	
		
			
			| 111 | 111 |                tvSerieIdFuture = tvSerieIDFuture)
 | 
	
		
			
			| 112 | 112 |              listPersonIDsFuture.andThen({
 | 
	
	
		
			
			|  | @@ -140,15 +140,16 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
 | 
	
		
			
			| 140 | 140 |      val tvSerieIdFuture: Future[Option[String]] = getTvSerieIDFuture(tvSerieId = tvSerieId)
 | 
	
		
			
			| 141 | 141 |      tvSerieIdFuture.andThen({
 | 
	
		
			
			| 142 | 142 |        case Failure(exception) => logger.error(s"${exception.printStackTrace()}")
 | 
	
		
			
			| 143 |  | -      case Success(value:Option[String]) =>
 | 
	
		
			
			|  | 143 | +      case Success(value: Option[String]) =>
 | 
	
		
			
			| 144 | 144 |          logger.info(s"$value")
 | 
	
		
			
			| 145 | 145 |          logger.info(s"STEP 1/3 END, elapsed time:${(System.currentTimeMillis() - start1) / 1000} sec")
 | 
	
		
			
			| 146 | 146 |  
 | 
	
		
			
			| 147 | 147 |      })
 | 
	
		
			
			| 148 | 148 |        .flatMap({
 | 
	
		
			
			| 149 |  | -        _ => logger.info("STEP 2/3")
 | 
	
		
			
			|  | 149 | +        _ =>
 | 
	
		
			
			|  | 150 | +          logger.info("STEP 2/3")
 | 
	
		
			
			| 150 | 151 |            val start2: Long = System.currentTimeMillis()
 | 
	
		
			
			| 151 |  | -          val  listOfPersonsIDsFuture : Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(
 | 
	
		
			
			|  | 152 | +          val listOfPersonsIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(
 | 
	
		
			
			| 152 | 153 |              tvSerieIdFuture = tvSerieIdFuture)
 | 
	
		
			
			| 153 | 154 |            listOfPersonsIDsFuture.andThen({
 | 
	
		
			
			| 154 | 155 |              case Failure(exception) => logger.error(s"${exception.printStackTrace()}")
 | 
	
	
		
			
			|  | @@ -157,7 +158,8 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
 | 
	
		
			
			| 157 | 158 |                logger.info(s"STEP 2/3 END, elapsed time:${(System.currentTimeMillis() - start2) / 1000} sec")
 | 
	
		
			
			| 158 | 159 |            })
 | 
	
		
			
			| 159 | 160 |              .flatMap({
 | 
	
		
			
			| 160 |  | -              _ => logger.info("STEP 3/3")
 | 
	
		
			
			|  | 161 | +              _ =>
 | 
	
		
			
			|  | 162 | +                logger.info("STEP 3/3")
 | 
	
		
			
			| 161 | 163 |                  val start3: Long = System.currentTimeMillis()
 | 
	
		
			
			| 162 | 164 |                  val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(
 | 
	
		
			
			| 163 | 165 |                    listPersonsIDsFuture = listOfPersonsIDsFuture)
 | 
	
	
		
			
			|  | @@ -171,7 +173,8 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
 | 
	
		
			
			| 171 | 173 |        })
 | 
	
		
			
			| 172 | 174 |  
 | 
	
		
			
			| 173 | 175 |    }
 | 
	
		
			
			| 174 |  | -  override def getAllPersonsFuture(): Future[Done] = {
 | 
	
		
			
			|  | 176 | +
 | 
	
		
			
			|  | 177 | +  override def getAllPersonsFuture: Future[Done] = {
 | 
	
		
			
			| 175 | 178 |      val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
 | 
	
		
			
			| 176 | 179 |      //graph
 | 
	
		
			
			| 177 | 180 |      val startTime: Long = System.currentTimeMillis()
 | 
	
	
		
			
			|  | @@ -190,7 +193,7 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
 | 
	
		
			
			| 190 | 193 |      result
 | 
	
		
			
			| 191 | 194 |    }
 | 
	
		
			
			| 192 | 195 |  
 | 
	
		
			
			| 193 |  | -  override def getAllTvSeriesFuture(): Future[Done] = {
 | 
	
		
			
			|  | 196 | +  override def getAllTvSeriesFuture: Future[Done] = {
 | 
	
		
			
			| 194 | 197 |      val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
 | 
	
		
			
			| 195 | 198 |      val sink: Sink[TvSerie, Future[Done]] = buildAllTvSeriesSink(logger = logger)
 | 
	
		
			
			| 196 | 199 |  
 |