|  | @@ -8,8 +8,9 @@ import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, t
 | 
	
		
			
			| 8 | 8 |  import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSeries}
 | 
	
		
			
			| 9 | 9 |  import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.buildPersonModel
 | 
	
		
			
			| 10 | 10 |  import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{
 | 
	
		
			
			| 11 |  | -  buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow,
 | 
	
		
			
			| 12 |  | -  buildSource, buildTvSerieFlow, filterByPersonIdFlow, filterByPersonNameFlow, filterByTvSeriePrimaryTitleFlow}
 | 
	
		
			
			|  | 11 | +  buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow, buildSource, buildTvSerieFlow,
 | 
	
		
			
			|  | 12 | +  filterByPersonIdFlow, filterByPersonNameFlow, filterByTvSeriePrimaryTitleFlow
 | 
	
		
			
			|  | 13 | +}
 | 
	
		
			
			| 13 | 14 |  import org.slf4j.LoggerFactory
 | 
	
		
			
			| 14 | 15 |  import org.springframework.stereotype.Component
 | 
	
		
			
			| 15 | 16 |  
 | 
	
	
		
			
			|  | @@ -25,23 +26,6 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
 | 
	
		
			
			| 25 | 26 |    implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor")
 | 
	
		
			
			| 26 | 27 |    implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
 | 
	
		
			
			| 27 | 28 |  
 | 
	
		
			
			| 28 |  | -  override def getAllPersons(): Unit = {
 | 
	
		
			
			| 29 |  | -    val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
 | 
	
		
			
			| 30 |  | -    //graph
 | 
	
		
			
			| 31 |  | -    val startTime: Long = System.currentTimeMillis()
 | 
	
		
			
			| 32 |  | -    val result: Future[Done] = personSource
 | 
	
		
			
			| 33 |  | -      .via(flow = buildPersonFlow())
 | 
	
		
			
			| 34 |  | -      .runWith(sink = buildAllPersonsSink(logger = logger))
 | 
	
		
			
			| 35 |  | -
 | 
	
		
			
			| 36 |  | -    result.onComplete {
 | 
	
		
			
			| 37 |  | -      case Failure(exception) => logger.error(s"$exception")
 | 
	
		
			
			| 38 |  | -      case Success(value) =>
 | 
	
		
			
			| 39 |  | -        logger.info(s"$value")
 | 
	
		
			
			| 40 |  | -        val time: Long = (System.currentTimeMillis() - startTime) / 100
 | 
	
		
			
			| 41 |  | -        logger.info(s"elapsed time: $time")
 | 
	
		
			
			| 42 |  | -    }
 | 
	
		
			
			| 43 |  | -  }
 | 
	
		
			
			| 44 |  | -
 | 
	
		
			
			| 45 | 29 |    override def getPersonById(nconst: String): Future[Person] = {
 | 
	
		
			
			| 46 | 30 |      val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
 | 
	
		
			
			| 47 | 31 |      val res = source
 | 
	
	
		
			
			|  | @@ -62,44 +46,35 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
 | 
	
		
			
			| 62 | 46 |        .via(flow = filterByPersonNameFlow(primaryName = primaryName))
 | 
	
		
			
			| 63 | 47 |        .runWith(Sink.collection)
 | 
	
		
			
			| 64 | 48 |  
 | 
	
		
			
			| 65 |  | -    persons
 | 
	
		
			
			| 66 |  | -  }
 | 
	
		
			
			| 67 |  | -
 | 
	
		
			
			| 68 |  | -  override def getAllTvSeries(): Unit = {
 | 
	
		
			
			| 69 |  | -
 | 
	
		
			
			| 70 |  | -    val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
 | 
	
		
			
			| 71 |  | -    val sink: Sink[TvSeries, Future[Done]] = buildAllTvSeriesSink(logger = logger)
 | 
	
		
			
			| 72 |  | -
 | 
	
		
			
			| 73 |  | -    val startingTime: Long = System.currentTimeMillis()
 | 
	
		
			
			| 74 |  | -
 | 
	
		
			
			| 75 |  | -    //graph sink->flow->sink
 | 
	
		
			
			| 76 |  | -    source
 | 
	
		
			
			| 77 |  | -      .via(flow = buildTvSerieFlow())
 | 
	
		
			
			| 78 |  | -      .runWith(sink = sink)
 | 
	
		
			
			| 79 |  | -      .andThen {
 | 
	
		
			
			| 80 |  | -        case Success(value) =>
 | 
	
		
			
			| 81 |  | -          val elapsedTime: Long = (System.currentTimeMillis() - startingTime) / 1000
 | 
	
		
			
			| 82 |  | -          logger.info(s"$value: successfully processing file, elapsed time $titleBasics: $elapsedTime sec")
 | 
	
		
			
			| 83 |  | -        case Failure(error: Error) => logger.error(s"$error")
 | 
	
		
			
			| 84 |  | -      }
 | 
	
		
			
			|  | 49 | +    persons.onComplete({
 | 
	
		
			
			|  | 50 | +      case Failure(exception) => logger.error(s"$exception")
 | 
	
		
			
			|  | 51 | +      case Success(value: IndexedSeq[Person]) =>
 | 
	
		
			
			|  | 52 | +        value.foreach((person: Person)=>logger.info(s"$person"))
 | 
	
		
			
			|  | 53 | +        logger.info("SUCCESS")
 | 
	
		
			
			|  | 54 | +    })
 | 
	
		
			
			| 85 | 55 |  
 | 
	
		
			
			|  | 56 | +    persons
 | 
	
		
			
			| 86 | 57 |    }
 | 
	
		
			
			| 87 | 58 |  
 | 
	
		
			
			| 88 |  | -  override def getByTvSeriePrimaryTitle(tvSeriePrimaryTitle: String):Future[IndexedSeq[TvSeries]] = {
 | 
	
		
			
			|  | 59 | +  override def getTvSerieByPrimaryTitle(tvSeriePrimaryTitle: String):Future[IndexedSeq[TvSeries]] = {
 | 
	
		
			
			| 89 | 60 |  
 | 
	
		
			
			| 90 | 61 |      val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
 | 
	
		
			
			| 91 | 62 |  
 | 
	
		
			
			| 92 | 63 |      val filterByTvSerieTitle: Flow[Map[String, String], TvSeries, NotUsed] =
 | 
	
		
			
			| 93 | 64 |        filterByTvSeriePrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle)
 | 
	
		
			
			| 94 | 65 |  
 | 
	
		
			
			| 95 |  | -    val tvSrie :Future[IndexedSeq[TvSeries]] = tvSeriesSource
 | 
	
		
			
			|  | 66 | +    val tvSries :Future[IndexedSeq[TvSeries]] = tvSeriesSource
 | 
	
		
			
			| 96 | 67 |        .via(flow = filterByTvSerieTitle)
 | 
	
		
			
			| 97 | 68 |        .runWith(Sink.collection)
 | 
	
		
			
			| 98 | 69 |  
 | 
	
		
			
			| 99 |  | -      tvSrie.andThen {
 | 
	
		
			
			|  | 70 | +      tvSries.onComplete({
 | 
	
		
			
			| 100 | 71 |        case Failure(exception) => logger.info(s"$exception")
 | 
	
		
			
			| 101 |  | -      case Success(value) => logger.info(s"$value")
 | 
	
		
			
			| 102 |  | -    }
 | 
	
		
			
			|  | 72 | +      case Success(value: IndexedSeq[TvSeries]) =>
 | 
	
		
			
			|  | 73 | +        value.foreach((tvSrie: TvSeries)=>logger.info(s"$tvSrie"))
 | 
	
		
			
			|  | 74 | +        logger.info("SUCCESS")
 | 
	
		
			
			|  | 75 | +      })
 | 
	
		
			
			|  | 76 | +
 | 
	
		
			
			|  | 77 | +    tvSries
 | 
	
		
			
			| 103 | 78 |    }
 | 
	
		
			
			| 104 | 79 |  
 | 
	
		
			
			| 105 | 80 |    private def getTvSerieIdByPrimaryTitle(primaryTitle: String): Future[Option[String]] = {
 | 
	
	
		
			
			|  | @@ -115,10 +90,9 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
 | 
	
		
			
			| 115 | 90 |      res
 | 
	
		
			
			| 116 | 91 |    }
 | 
	
		
			
			| 117 | 92 |  
 | 
	
		
			
			| 118 |  | -  private def getListOfPersonsIDByTvSerieID(tvSerieID: Future[Option[String]]): Future[List[Option[String]]]={
 | 
	
		
			
			|  | 93 | +  private def getListOfPersonsIDByTvSerieID(tvSerieID: Future[Option[String]]): Future[IndexedSeq[Option[String]]]={
 | 
	
		
			
			| 119 | 94 |      val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics)
 | 
	
		
			
			| 120 |  | -
 | 
	
		
			
			| 121 |  | -    val res: Future[List[Option[String]]] = source
 | 
	
		
			
			|  | 95 | +    val res: Future[IndexedSeq[Option[String]]] = source
 | 
	
		
			
			| 122 | 96 |        .filter((rowMaps: Map[String, String])=>{
 | 
	
		
			
			| 123 | 97 |          rowMaps.getOrElse(key = "tconst", default = "")==tvSerieID.value.get.get.get
 | 
	
		
			
			| 124 | 98 |        })
 | 
	
	
		
			
			|  | @@ -126,13 +100,14 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
 | 
	
		
			
			| 126 | 100 |          rowMap.get(key = "nconst")
 | 
	
		
			
			| 127 | 101 |        })
 | 
	
		
			
			| 128 | 102 |        .runWith(Sink.collection)
 | 
	
		
			
			|  | 103 | +
 | 
	
		
			
			| 129 | 104 |      res
 | 
	
		
			
			| 130 | 105 |    }
 | 
	
		
			
			| 131 | 106 |  
 | 
	
		
			
			| 132 |  | -  private def getListOfPersonsForTvSerie(listPersonsIDs: Future[List[Option[String]]]): Future[List[Person]] = {
 | 
	
		
			
			|  | 107 | +  private def getListOfPersonsForTvSerie(listPersonsIDs: Future[IndexedSeq[Option[String]]]): Future[IndexedSeq[Person]] = {
 | 
	
		
			
			| 133 | 108 |      val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
 | 
	
		
			
			| 134 | 109 |  
 | 
	
		
			
			| 135 |  | -    val res : Future[List[Person]]=
 | 
	
		
			
			|  | 110 | +    val res : Future[IndexedSeq[Person]]=
 | 
	
		
			
			| 136 | 111 |        source
 | 
	
		
			
			| 137 | 112 |          .filter((rowMaps: Map[String, String])=>{
 | 
	
		
			
			| 138 | 113 |            listPersonsIDs.value.get.get.contains(rowMaps.get(key = "nconst"))
 | 
	
	
		
			
			|  | @@ -144,10 +119,10 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
 | 
	
		
			
			| 144 | 119 |  
 | 
	
		
			
			| 145 | 120 |      res
 | 
	
		
			
			| 146 | 121 |    }
 | 
	
		
			
			| 147 |  | -  override def getPersonsTeamForTvSerie(tvSeriePrimaryTitle: String):Future[List[Person]]={
 | 
	
		
			
			|  | 122 | +  override def getPersonsTeamForTvSerie(tvSeriePrimaryTitle: String):Future[IndexedSeq[Person]]={
 | 
	
		
			
			| 148 | 123 |  
 | 
	
		
			
			| 149 | 124 |      logger.info("STEP 1/3")
 | 
	
		
			
			| 150 |  | -    val tvSerieID = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
 | 
	
		
			
			|  | 125 | +    val tvSerieID : Future[Option[String]]= getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
 | 
	
		
			
			| 151 | 126 |      tvSerieID.andThen({
 | 
	
		
			
			| 152 | 127 |        case Failure(exception) => logger.error(s"!!!!!!!!!!!!!!!!!!!!1$exception")
 | 
	
		
			
			| 153 | 128 |        case Success(value: Option[String]) =>
 | 
	
	
		
			
			|  | @@ -155,16 +130,16 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
 | 
	
		
			
			| 155 | 130 |          logger.info("END STEP 1/3")
 | 
	
		
			
			| 156 | 131 |      })
 | 
	
		
			
			| 157 | 132 |      logger.info("STEP 2/3")
 | 
	
		
			
			| 158 |  | -    val listPersonIDs: Future[List[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieID = tvSerieID)
 | 
	
		
			
			|  | 133 | +    val listPersonIDs: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieID = tvSerieID)
 | 
	
		
			
			| 159 | 134 |      listPersonIDs.andThen({
 | 
	
		
			
			| 160 | 135 |        case Failure(exception) => logger.error(s"!!!!!!!!!!!!!!!!!!!!!2$exception")
 | 
	
		
			
			| 161 |  | -      case Success(value: List[Option[String]]) =>
 | 
	
		
			
			|  | 136 | +      case Success(value) =>
 | 
	
		
			
			| 162 | 137 |          value.foreach((personID:Option[String])=>logger.info(s"Person ID:$personID"))
 | 
	
		
			
			| 163 | 138 |          logger.info("END STEP 2/3")
 | 
	
		
			
			| 164 | 139 |      })
 | 
	
		
			
			| 165 | 140 |  
 | 
	
		
			
			| 166 | 141 |      logger.info("STEP 3/3")
 | 
	
		
			
			| 167 |  | -    val personsTeam: Future[List[Person]] = getListOfPersonsForTvSerie(listPersonsIDs = listPersonIDs)
 | 
	
		
			
			|  | 142 | +    val personsTeam: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDs = listPersonIDs)
 | 
	
		
			
			| 168 | 143 |      personsTeam.andThen({
 | 
	
		
			
			| 169 | 144 |        case Failure(exception) => logger.error(s"!!!!!!!!!!!!!!!!!!!!!!!3$exception")
 | 
	
		
			
			| 170 | 145 |        case Success(value) =>
 | 
	
	
		
			
			|  | @@ -175,6 +150,41 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
 | 
	
		
			
			| 175 | 150 |  
 | 
	
		
			
			| 176 | 151 |     null
 | 
	
		
			
			| 177 | 152 |    }
 | 
	
		
			
			|  | 153 | +
 | 
	
		
			
			|  | 154 | +  override def getAllPersons(): Unit = {
 | 
	
		
			
			|  | 155 | +    val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
 | 
	
		
			
			|  | 156 | +    //graph
 | 
	
		
			
			|  | 157 | +    val startTime: Long = System.currentTimeMillis()
 | 
	
		
			
			|  | 158 | +    val result: Future[Done] = personSource
 | 
	
		
			
			|  | 159 | +      .via(flow = buildPersonFlow())
 | 
	
		
			
			|  | 160 | +      .runWith(sink = buildAllPersonsSink(logger = logger))
 | 
	
		
			
			|  | 161 | +
 | 
	
		
			
			|  | 162 | +    result.onComplete {
 | 
	
		
			
			|  | 163 | +      case Failure(exception) => logger.error(s"$exception")
 | 
	
		
			
			|  | 164 | +      case Success(value) =>
 | 
	
		
			
			|  | 165 | +        logger.info(s"$value")
 | 
	
		
			
			|  | 166 | +        val time: Long = (System.currentTimeMillis() - startTime) / 100
 | 
	
		
			
			|  | 167 | +        logger.info(s"elapsed time: $time")
 | 
	
		
			
			|  | 168 | +    }
 | 
	
		
			
			|  | 169 | +  }
 | 
	
		
			
			|  | 170 | +
 | 
	
		
			
			|  | 171 | +  override def getAllTvSeries(): Unit = {
 | 
	
		
			
			|  | 172 | +    val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
 | 
	
		
			
			|  | 173 | +    val sink: Sink[TvSeries, Future[Done]] = buildAllTvSeriesSink(logger = logger)
 | 
	
		
			
			|  | 174 | +
 | 
	
		
			
			|  | 175 | +    val startingTime: Long = System.currentTimeMillis()
 | 
	
		
			
			|  | 176 | +    //graph sink->flow->sink
 | 
	
		
			
			|  | 177 | +    source
 | 
	
		
			
			|  | 178 | +      .via(flow = buildTvSerieFlow())
 | 
	
		
			
			|  | 179 | +      .runWith(sink = sink)
 | 
	
		
			
			|  | 180 | +      .andThen {
 | 
	
		
			
			|  | 181 | +        case Success(value) =>
 | 
	
		
			
			|  | 182 | +          val elapsedTime: Long = (System.currentTimeMillis() - startingTime) / 1000
 | 
	
		
			
			|  | 183 | +          logger.info(s"$value: successfully processing file, elapsed time $titleBasics: $elapsedTime sec")
 | 
	
		
			
			|  | 184 | +        case Failure(error: Error) => logger.error(s"$error")
 | 
	
		
			
			|  | 185 | +      }
 | 
	
		
			
			|  | 186 | +  }
 | 
	
		
			
			|  | 187 | +
 | 
	
		
			
			| 178 | 188 |  }
 | 
	
		
			
			| 179 | 189 |  
 | 
	
		
			
			| 180 | 190 |  
 |