| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 | package fr.natan.akkastreamfileprocessingapi.service
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Sink, Source}
import com.typesafe.scalalogging.slf4j.Logger
import fr.natan.akkastreamfileprocessingapi.models.ModelsAndJsonMap.Person
import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.buildPersonModel
import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{actorSystem, buildAndValidateSource}
import java.io.File
import scala.concurrent.Future
object UtilitiesClass {
  def getTvSerieIdByPrimaryTitle(primaryTitle: String, inputFile: File): Future[Option[String]] = {
    val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = inputFile)
    val tvSerieIdFuture: Future[Option[String]] = source
      .filter((rowMap: Map[String, String]) => {
        rowMap.getOrElse(key = "primaryTitle", default = "") == primaryTitle
      })
      .map((rowMap: Map[String, String]) => rowMap.get("tconst"))
      .runWith(Sink.head)
    tvSerieIdFuture
  }
  def getListOfPersonsIDByTvSerieID(tvSerieIdFuture: Future[Option[String]], inputFile: File): Future[IndexedSeq[Option[String]]] = {
    val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = inputFile)
    val personsIDsFuture: Future[IndexedSeq[Option[String]]] = source
      .filter((rowMaps: Map[String, String]) => {
        rowMaps.getOrElse(key = "tconst", default = "") == tvSerieIdFuture.value.get.get.get
      })
      .map((rowMap: Map[String, String]) => {
        rowMap.get(key = "nconst")
      })
      .runWith(Sink.collection)
    personsIDsFuture
  }
  def getListOfPersonsForTvSerie(listPersonsIDsFuture: Future[IndexedSeq[Option[String]]], inputFile: File): Future[IndexedSeq[Person]] = {
    val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = inputFile)
    val personsFuture: Future[IndexedSeq[Person]] = source
      .filter((rowMaps: Map[String, String]) => {
        listPersonsIDsFuture.value.get.get.contains(rowMaps.get(key = "nconst"))
      })
      .map((rowMap: Map[String, String]) => {
        buildPersonModel(rowMap)
      })
      .runWith(Sink.collection)
    personsFuture
  }
  private def getTvSerieIdFlow(tvSerieID: String): Flow[Map[String, String], Option[String], NotUsed] = {
    val tvSerieIDFlow = Flow[Map[String, String]]
      .filter(rowMap => rowMap.getOrElse(key = "tconst", default = "") == tvSerieID)
      .map(row => row.get("tconst"))
    tvSerieIDFlow
  }
  def getTvSerieIDFuture(tvSerieID: String, logger: Logger, inputFile: File): Future[Option[String]] = {
    val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = inputFile)
    val tvSerieIdFuture: Future[Option[String]] = source
      .filter(rowMap => rowMap.getOrElse(key = "tconst", default = "") == tvSerieID)
      .map(row => row.get("tconst"))
      .runWith(Sink.head)
    tvSerieIdFuture
  }
}
 |