package fr.natan.akkastreamfileprocessingapi.service import akka.Done import akka.actor.ActorSystem import akka.stream.scaladsl.{Sink, Source} import com.typesafe.scalalogging.slf4j.Logger import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics} import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSerie} import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.buildPersonModel import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow, buildSource, buildTvSerieFlow, filterPersonByIdFlow, filterPersonByNameFlow, filterTvSerieByIdFlow, filterTvSerieByPrimaryTitleFlow} import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.DurationInt import scala.concurrent.{Await, Future} import scala.language.postfixOps import scala.util.{Failure, Success} //noinspection SpellCheckingInspection @Component class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing { implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor") implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass)) override def getPersonById(personID: String): Future[Person] = { val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics) val res: Future[Person] = source .via(flow = filterPersonByIdFlow(personID = personID)) .runWith(Sink.head) res.onComplete({ case Failure(exception) => logger.info(s"$exception") case Success(value:Person) => logger.info(s"$value") }) res } override def getPersonByName(primaryName: String): Future[Person] = { val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics) val persons: Future[Person] = source .via(flow = filterPersonByNameFlow(primaryName = primaryName)) .runWith(Sink.head) persons.onComplete({ case Failure(exception) => logger.error(s"$exception") case Success(value: Person) => logger.info(s"$value") logger.info("SUCCESS") }) persons } override def getTvSerieById(tvSerieID: String): Future[TvSerie] = { val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics) val res: Future[TvSerie] = source .via(flow = filterTvSerieByIdFlow(tvSerieID = tvSerieID)) .runWith(Sink.head) res.onComplete({ case Failure(exception) => logger.info(s"$exception") case Success(value: TvSerie) => logger.info(s"$value") }) res } override def getTvSerieByPrimaryTitle(tvSeriePrimaryTitle: String): List[TvSerie] = { val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics) val tvSries: Future[List[TvSerie]] = tvSeriesSource .via(flow = filterTvSerieByPrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle)) .runWith(Sink.collection) tvSries.onComplete({ case Failure(exception) => logger.info(s"$exception") case Success(value: List[TvSerie]) => value.foreach((tvSrie: TvSerie) => logger.info(s"$tvSrie")) logger.info("SUCCESS") }) Await.result(tvSries,2 minutes) val tvSriesList : List[TvSerie] = tvSries.value.get.get tvSriesList } private def getTvSerieIdByPrimaryTitle(primaryTitle: String): Future[Option[String]] = { val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics) val res: Future[Option[String]] = source .filter((rowMap: Map[String, String]) => { rowMap.getOrElse(key = "primaryTitle", "") == primaryTitle }) .map((rowMap: Map[String, String]) => rowMap.get("tconst")) .runWith(Sink.head) res } private def getListOfPersonsIDByTvSerieID(tvSerieIdFuture: Future[Option[String]]): Future[IndexedSeq[Option[String]]] = { val source: Source[Map[String, String], _] = buildSource(inputFile = titlePrincipalsBasics) val res: Future[IndexedSeq[Option[String]]] = source .filter((rowMaps: Map[String, String]) => { rowMaps.getOrElse(key = "tconst", default = "") == tvSerieIdFuture.value.get.get.get }) .map((rowMap: Map[String, String]) => { rowMap.get(key = "nconst") }) .runWith(Sink.collection) res } private def getListOfPersonsForTvSerie(listPersonsIDsFuture: Future[IndexedSeq[Option[String]]]): Future[IndexedSeq[Person]] = { val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics) val res: Future[IndexedSeq[Person]] = source .filter((rowMaps: Map[String, String]) => { listPersonsIDsFuture.value.get.get.contains(rowMaps.get(key = "nconst")) }) .map((rowMap: Map[String, String]) => { buildPersonModel(rowMap) }) .runWith(Sink.collection) res } override def getTeamOfPersonsForTvSerie(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]] = { //futures chaining logger.info("STEP 1/3 START") val tvSerieIDFuture: Future[Option[String]] = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle) val finalFuture: Future[IndexedSeq[Person]] = tvSerieIDFuture.andThen({ case Failure(exception) => logger.error(s"$exception") case Success(value: Option[String]) => logger.info(s"TvSerie ID: $value") logger.info("STEP 1/3 END") }) .flatMap({ _ => logger.info("STEP 2/3 START") val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(tvSerieIdFuture = tvSerieIDFuture) listPersonIDsFuture.andThen({ case Failure(exception) => logger.error(s"$exception") case Success(value) => value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID")) logger.info("STEP 2/3 END") }) .flatMap({ future => logger.info("STEP 3/3 START") val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(listPersonsIDsFuture = listPersonIDsFuture) personsTeamFuture.andThen({ case Failure(exception) => logger.error(s"$exception") case Success(value: IndexedSeq[Person]) => value.foreach((person: Person) => logger.info(s"${person.toString}")) logger.info("STEP 3/3 END") }) }) }) finalFuture } override def getAllPersons(): Unit = { val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics) //graph val startTime: Long = System.currentTimeMillis() val result: Future[Done] = personSource .via(flow = buildPersonFlow()) .runWith(sink = buildAllPersonsSink(logger = logger)) result.onComplete { case Failure(exception) => logger.error(s"$exception") case Success(value) => logger.info(s"$value") val time: Long = (System.currentTimeMillis() - startTime) / 100 logger.info(s"elapsed time: $time") } } override def getAllTvSeries(): Unit = { val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics) val sink: Sink[TvSerie, Future[Done]] = buildAllTvSeriesSink(logger = logger) val startingTime: Long = System.currentTimeMillis() //graph sink->flow->sink source .via(flow = buildTvSerieFlow()) .runWith(sink = sink) .andThen { case Success(value) => val elapsedTime: Long = (System.currentTimeMillis() - startingTime) / 1000 logger.info(s"$value: successfully processing file, elapsed time $titleBasics: $elapsedTime sec") case Failure(error: Error) => logger.error(s"$error") } } }