package fr.natan.akkastreamfileprocessingapi.service import akka.actor.ActorSystem import akka.stream.scaladsl.{Flow, Sink, Source} import akka.{Done, NotUsed} import com.typesafe.scalalogging.slf4j.Logger import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.titleBasics import fr.natan.akkastreamfileprocessingapi.models.Movie import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAndValidateSource, buildMovieFlow, buildSink, filterByMoviePrimaryTitleFlow} import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} @Component class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing { implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor") implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass)) override def getAllMovies(): List[Movie] = { val source: Source[String, NotUsed] = buildAndValidateSource(inputFile = titleBasics) val sink: Sink[Movie, Future[Done]] = buildSink(logger = logger) val movieFlow: Flow[String, Movie, NotUsed] = buildMovieFlow(movieID = "tconst") val startingTime: Long = System.currentTimeMillis() source.via(flow = movieFlow).runWith(sink = sink) .andThen { case Success(value) => val elapsedTime: Long = (System.currentTimeMillis() - startingTime) / 1000 logger.info(s"$value: successfully processing file, elapsed time $titleBasics: $elapsedTime sec") case Failure(error: Error) => logger.error(s"$error") } } override def getMoviesByTitle(moviePrimaryTitle: String): Unit = { val source: Source[String, NotUsed] = buildAndValidateSource(inputFile = titleBasics) val sink: Sink[Movie, Future[Done]] = buildSink(logger = logger) val filterByMovieTitleFlow: Flow[String, Movie, NotUsed] = filterByMoviePrimaryTitleFlow(moviePrimaryTitle = moviePrimaryTitle) val startTime: Long = System.currentTimeMillis() source.via(flow = filterByMovieTitleFlow).runWith(sink = sink).andThen { case Success(value) => val elapsedTime: Long = (System.currentTimeMillis() - startTime) / 1000 logger.info(s"$value : successfully processing file, elapsed time $elapsedTime sec") case Failure(err: Error) => logger.error(s"$err") } } }