package fr.natan.akkastreamfileprocessingapi.service import akka.Done import akka.stream.scaladsl.{Sink, Source} import com.typesafe.scalalogging.slf4j.Logger import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics} import fr.natan.akkastreamfileprocessingapi.models.ModelsAndJsonMap.{Person, TvSerie} import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{ actorSystem, buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow, buildTvSerieFlow, filterPersonByIdFlow, filterPersonByNameFlow, filterTvSerieByIdFlow, filterTvSerieByPrimaryTitleFlow} import fr.natan.akkastreamfileprocessingapi.service.UtilitiesClass.{getListOfPersonsForTvSerie, getListOfPersonsIDByTvSerieID, getTvSerieIDFuture, getTvSerieIdByPrimaryTitle} import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.language.postfixOps import scala.util.{Failure, Success} //noinspection SpellCheckingInspection @Component class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture { implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass)) override def getPersonByIdFuture(personID: String): Future[Person] = { val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = nameBasics) val start: Long = System.currentTimeMillis() val personFuture: Future[Person] = source .via(flow = filterPersonByIdFlow(personID = personID)) .runWith(Sink.head[Person]) personFuture.andThen({ case Failure(exception) => logger.error(s"!${exception.printStackTrace()}") }) personFuture } override def getPersonByNameFuture(primaryName: String): Future[Person] = { val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = nameBasics) val start: Long = System.currentTimeMillis() val personFuture: Future[Person] = source .via(flow = filterPersonByNameFlow(primaryName = primaryName)) .runWith(Sink.head[Person]) personFuture } override def getTvSerieByIdFuture(tvSerieID: String): Future[TvSerie] = { val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics) val start: Long = System.currentTimeMillis() val tvSerieFuture: Future[TvSerie] = source .via(flow = filterTvSerieByIdFlow(tvSerieID = tvSerieID)) .runWith(Sink.head[TvSerie]) tvSerieFuture.onComplete({ case Failure(exception) => logger.info(s"$exception") case Success(value: TvSerie) => logger.info(s"$value") logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec") }) tvSerieFuture } override def getTvSeriesByPrimaryTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[TvSerie]] = { val start: Long = System.currentTimeMillis() val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics) val tvSriesFuture: Future[IndexedSeq[TvSerie]] = tvSeriesSource .via(flow = filterTvSerieByPrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle)) .runWith(Sink.collection) tvSriesFuture.onComplete({ case Failure(exception) => logger.info(s"$exception") case Success(value: IndexedSeq[TvSerie]) => value.foreach((tvSrie: TvSerie) => logger.info(s"$tvSrie")) logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec") }) tvSriesFuture } override def getPersonsForTvSerieByTvSerieTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]] = { //futures chaining logger.info("STEP 1/3 ...") val start1: Long = System.currentTimeMillis() val tvSerieIDFuture: Future[Option[String]] = getTvSerieIdByPrimaryTitle( primaryTitle = tvSeriePrimaryTitle, inputFile = titleBasics ) val finalFuture: Future[IndexedSeq[Person]] = tvSerieIDFuture.andThen({ case Success(value: Option[String]) => logger.info(s"TvSerie ID: $value") logger.info(s"STEP 1/3 END, elapsed time:${(System.currentTimeMillis() - start1) / 1000} sec") }) .flatMap({ _ => logger.info("STEP 2/3 ...") val start2: Long = System.currentTimeMillis() val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID( tvSerieIdFuture = tvSerieIDFuture, inputFile = titlePrincipalsBasics ) listPersonIDsFuture.andThen({ case Success(value) => value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID")) logger.info(s"STEP 2/3 END, elapsed time:${(System.currentTimeMillis() - start2) / 1000} sec") }) .flatMap({ _ => logger.info("STEP 3/3 ...") val start3: Long = System.currentTimeMillis() val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie( listPersonsIDsFuture = listPersonIDsFuture, inputFile = nameBasics) personsTeamFuture.andThen({ case Success(value: IndexedSeq[Person]) => value.foreach((person: Person) => logger.info(s"${person.toString}")) logger.info(s"STEP 3/3 END, elapsed time:${(System.currentTimeMillis() - start3) / 1000} sec") }) }) }) finalFuture } override def getPersonsForTvSerieByTvSerieIDFuture(tvSerieId: String): Future[IndexedSeq[Person]] = { val start1: Long = System.currentTimeMillis() logger.info("STEP 1/3 ...") val tvSerieIdFuture: Future[Option[String]] = getTvSerieIDFuture( tvSerieID = tvSerieId, logger = logger, inputFile = titleBasics) tvSerieIdFuture.andThen({ case Failure(exception) => logger.error(s"${exception.printStackTrace()}") case Success(value: Option[String]) => logger.info(s"$value") logger.info(s"STEP 1/3 END, elapsed time:${(System.currentTimeMillis() - start1) / 1000} sec") }) .flatMap({ _ => logger.info("STEP 2/3 ...") val start2: Long = System.currentTimeMillis() val listOfPersonsIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID( tvSerieIdFuture = tvSerieIdFuture, inputFile = titlePrincipalsBasics) listOfPersonsIDsFuture.andThen({ case Failure(exception) => logger.error(s"${exception.printStackTrace()}") case Success(value: IndexedSeq[Option[String]]) => value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID")) logger.info(s"STEP 2/3 END, elapsed time:${(System.currentTimeMillis() - start2) / 1000} sec") }) .flatMap({ _ => logger.info("STEP 3/3 ...") val start3: Long = System.currentTimeMillis() val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie( listPersonsIDsFuture = listOfPersonsIDsFuture, inputFile = nameBasics) personsTeamFuture.andThen({ case Failure(exception) => logger.error(s"$exception") case Success(value: IndexedSeq[Person]) => value.foreach((person: Person) => logger.info(s"${person.toString}")) logger.info(s"STEP 3/3 END, elapsed time:${(System.currentTimeMillis() - start3) / 1000} sec") }) }) }) } override def getAllPersonsFuture: Future[Done] = { val personSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = nameBasics) //graph val startTime: Long = System.currentTimeMillis() val result: Future[Done] = personSource .via(flow = buildPersonFlow()) .runWith(sink = buildAllPersonsSink(logger = logger)) result.onComplete { case Failure(exception) => logger.error(s"$exception") case Success(value) => logger.info(s"$value") val time: Long = (System.currentTimeMillis() - startTime) / 100 logger.info(s"elapsed time: $time") } result } override def getAllTvSeriesFuture: Future[Done] = { val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics) val sink: Sink[TvSerie, Future[Done]] = buildAllTvSeriesSink(logger = logger) val startingTime: Long = System.currentTimeMillis() //graph sink->flow->sink val results: Future[Done] = source .via(flow = buildTvSerieFlow()) .runWith(sink = sink) .andThen { case Success(value) => val elapsedTime: Long = (System.currentTimeMillis() - startingTime) / 1000 logger.info(s"$value: successfully processing file, elapsed time $titleBasics: $elapsedTime sec") case Failure(error: Error) => logger.error(s"$error") } results } }