package fr.natan.akkastreamfileprocessingapi.service import akka.actor.ActorSystem import akka.stream.javadsl.Framing import akka.stream.scaladsl.{Compression, FileIO, Flow, Sink, Source} import akka.util.ByteString import akka.{Done, NotUsed} import com.typesafe.scalalogging.slf4j.Logger import fr.natan.akkastreamfileprocessingapi.businessexceptions.FileNotFoundException import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.separator import fr.natan.akkastreamfileprocessingapi.models.{Person, TvSeries} import fr.natan.akkastreamfileprocessingapi.valitator.Validators.fileExists import java.io.File import java.nio.file.Paths import scala.concurrent.Future object AkkaStreamComponents { implicit val actor: ActorSystem = ActorSystem("AkkaStreamActor") private def convertToTvSerie(array: Array[String]): TvSeries = { val tvSerie: TvSeries = TvSeries( array(0), array(1), array(2), array(3), array(4), array(5), array(6), array(7), array(8)) tvSerie } private def convertToPerson(array: Array[String]): Person ={ Person( array(0), array(1), array(2), array(3), array(4).split(",").toList, array(5).split(",").toList, ) } //flows def buildTvSerieFlow(movieID: String): Flow[String, TvSeries, NotUsed] = { val tvFlow: Flow[String, TvSeries, NotUsed] = Flow[String].filter(rows => !rows.contains(movieID)) .map(row => { val movie: Array[String] = row.split(separator) convertToTvSerie(movie) }) tvFlow } def buildPersonFlow(personID: String): Flow[String, Person, NotUsed]={ val personFlow: Flow[String, Person, NotUsed] = Flow[String] .filterNot((rows: String)=>{ rows.contains(personID) }) .map((row: String)=>{ convertToPerson(row.split(separator)) }) personFlow } def buildfilterByMoviePrimaryTitleFlow(moviePrimaryTitle: String): Flow[String, TvSeries, NotUsed] = { val filterFlow: Flow[String, TvSeries, NotUsed] = Flow[String] .filter((rows: String) => { rows.contains(moviePrimaryTitle) }) .map(row => { val movie: Array[String] = row.split("\t") convertToTvSerie(movie) }) filterFlow } //source def buildSource(inputFile: File): Source[String, NotUsed] = { var source: Source[String, NotUsed] = null if (!fileExists(inputFile.getPath)) { return null } source = Source .single(inputFile) .flatMapConcat( (file: File) => FileIO.fromPath(Paths.get(inputFile.getPath) ) ) .via(Compression.gunzip()) .via( Framing.delimiter(ByteString("\n"), 4096) .map(byteString => byteString.utf8String) ) source } def buildAndValidateSource(inputFile: File): Source[String, NotUsed] = { val source: Source[String, NotUsed] = buildSource(inputFile = inputFile) if (source == null) { throw new FileNotFoundException(filename = inputFile.getPath) } source } //sink def buildTvSeriesSink(logger: Logger): Sink[TvSeries, Future[Done]] = { val tvSeriesSink : Sink[TvSeries, Future[Done]]= Sink.foreach[TvSeries]( (movie: TvSeries) => { logger.info(s"${movie.toString}") } ) tvSeriesSink } def buildPersonSink(logger: Logger): Sink[Person, Future[Done]] ={ val personSink :Sink[Person, Future[Done]] = Sink .foreach[Person]((person: Person)=>{ logger.info(s"${person.toString}") }) personSink } }