|  | @@ -1,6 +1,7 @@
 | 
	
		
			
			| 1 | 1 |  package fr.natan.akkastreamfileprocessingapi.service
 | 
	
		
			
			| 2 | 2 |  
 | 
	
		
			
			| 3 | 3 |  import akka.actor.ActorSystem
 | 
	
		
			
			|  | 4 | +import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
 | 
	
		
			
			| 4 | 5 |  import akka.stream.javadsl.Framing
 | 
	
		
			
			| 5 | 6 |  import akka.stream.scaladsl.{Compression, FileIO, Flow, Sink, Source}
 | 
	
		
			
			| 6 | 7 |  import akka.util.ByteString
 | 
	
	
		
			
			|  | @@ -19,97 +20,106 @@ object AkkaStreamComponents {
 | 
	
		
			
			| 19 | 20 |  
 | 
	
		
			
			| 20 | 21 |    implicit val actor: ActorSystem = ActorSystem("AkkaStreamActor")
 | 
	
		
			
			| 21 | 22 |  
 | 
	
		
			
			| 22 |  | -  private def convertToTvSerie(array: Array[String]): TvSeries = {
 | 
	
		
			
			|  | 23 | +  private def convertToTvSerie(map: Map[String, String]): TvSeries = {
 | 
	
		
			
			| 23 | 24 |      val tvSerie: TvSeries = TvSeries(
 | 
	
		
			
			| 24 |  | -      array(0),
 | 
	
		
			
			| 25 |  | -      array(1),
 | 
	
		
			
			| 26 |  | -      array(2),
 | 
	
		
			
			| 27 |  | -      array(3),
 | 
	
		
			
			| 28 |  | -      array(4),
 | 
	
		
			
			| 29 |  | -      array(5),
 | 
	
		
			
			| 30 |  | -      array(6),
 | 
	
		
			
			| 31 |  | -      array(7),
 | 
	
		
			
			| 32 |  | -      array(8))
 | 
	
		
			
			|  | 25 | +      map("tconst"),
 | 
	
		
			
			|  | 26 | +      map("titleType"),
 | 
	
		
			
			|  | 27 | +      map("primaryTitle"),
 | 
	
		
			
			|  | 28 | +      map("originalTitle"),
 | 
	
		
			
			|  | 29 | +      map("isAdult"),
 | 
	
		
			
			|  | 30 | +      map("startYear"),
 | 
	
		
			
			|  | 31 | +      map("endYear"),
 | 
	
		
			
			|  | 32 | +      map("runtimeMinutes"),
 | 
	
		
			
			|  | 33 | +      map("genres")
 | 
	
		
			
			|  | 34 | +    )
 | 
	
		
			
			| 33 | 35 |  
 | 
	
		
			
			| 34 | 36 |      tvSerie
 | 
	
		
			
			| 35 | 37 |    }
 | 
	
		
			
			| 36 | 38 |  
 | 
	
		
			
			| 37 |  | -  private def convertToPerson(array: Array[String]): Person ={
 | 
	
		
			
			|  | 39 | +  private def convertToPerson(map: Map[String, String]): Person ={
 | 
	
		
			
			| 38 | 40 |      Person(
 | 
	
		
			
			| 39 |  | -      array(0),
 | 
	
		
			
			| 40 |  | -      array(1),
 | 
	
		
			
			| 41 |  | -      array(2),
 | 
	
		
			
			| 42 |  | -      array(3),
 | 
	
		
			
			| 43 |  | -      array(4).split(",").toList,
 | 
	
		
			
			| 44 |  | -      array(5).split(",").toList,
 | 
	
		
			
			|  | 41 | +      map("nconst"),
 | 
	
		
			
			|  | 42 | +      map("primaryName"),
 | 
	
		
			
			|  | 43 | +      map("birthYear"),
 | 
	
		
			
			|  | 44 | +      map("deathYear"),
 | 
	
		
			
			|  | 45 | +      map("primaryProfession").split(",").toList,
 | 
	
		
			
			|  | 46 | +      map("knownForTitles").split(",").toList,
 | 
	
		
			
			| 45 | 47 |      )
 | 
	
		
			
			| 46 | 48 |    }
 | 
	
		
			
			| 47 | 49 |  
 | 
	
		
			
			| 48 | 50 |    //flows
 | 
	
		
			
			| 49 | 51 |  
 | 
	
		
			
			| 50 |  | -  def buildTvSerieFlow(movieID: String): Flow[String, TvSeries, NotUsed] = {
 | 
	
		
			
			|  | 52 | +  def buildTvSerieFlow(): Flow[Map[String, String], TvSeries, NotUsed] = {
 | 
	
		
			
			| 51 | 53 |  
 | 
	
		
			
			| 52 |  | -    val tvFlow: Flow[String, TvSeries, NotUsed] =
 | 
	
		
			
			| 53 |  | -      Flow[String].filter(rows => !rows.contains(movieID))
 | 
	
		
			
			|  | 54 | +    val tvFlow: Flow[Map[String, String], TvSeries, NotUsed] =
 | 
	
		
			
			|  | 55 | +      Flow[Map[String, String]]
 | 
	
		
			
			| 54 | 56 |          .map(row => {
 | 
	
		
			
			| 55 |  | -          val movie: Array[String] = row.split(separator)
 | 
	
		
			
			| 56 |  | -          convertToTvSerie(movie)
 | 
	
		
			
			|  | 57 | +          convertToTvSerie(row)
 | 
	
		
			
			| 57 | 58 |          })
 | 
	
		
			
			| 58 | 59 |      tvFlow
 | 
	
		
			
			| 59 | 60 |    }
 | 
	
		
			
			| 60 | 61 |  
 | 
	
		
			
			| 61 |  | -  def buildPersonFlow(personID: String): Flow[String, Person, NotUsed]={
 | 
	
		
			
			| 62 |  | -    val personFlow: Flow[String, Person, NotUsed] =
 | 
	
		
			
			| 63 |  | -      Flow[String]
 | 
	
		
			
			| 64 |  | -        .filterNot((rows: String)=>{
 | 
	
		
			
			| 65 |  | -          rows.contains(personID)
 | 
	
		
			
			| 66 |  | -        })
 | 
	
		
			
			| 67 |  | -        .map((row: String)=>{
 | 
	
		
			
			| 68 |  | -          convertToPerson(row.split(separator))
 | 
	
		
			
			|  | 62 | +  def buildPersonFlow(): Flow[Map[String, String], Person, NotUsed] = {
 | 
	
		
			
			|  | 63 | +    val personFlow: Flow[Map[String, String], Person, NotUsed] =
 | 
	
		
			
			|  | 64 | +      Flow[Map[String, String]]
 | 
	
		
			
			|  | 65 | +        .map((rowMap: Map[String, String]) => {
 | 
	
		
			
			|  | 66 | +          convertToPerson(rowMap)
 | 
	
		
			
			| 69 | 67 |          })
 | 
	
		
			
			| 70 | 68 |  
 | 
	
		
			
			| 71 | 69 |      personFlow
 | 
	
		
			
			| 72 | 70 |    }
 | 
	
		
			
			| 73 |  | -  def buildfilterByMoviePrimaryTitleFlow(moviePrimaryTitle: String): Flow[String, TvSeries, NotUsed] = {
 | 
	
		
			
			| 74 |  | -    val filterFlow: Flow[String, TvSeries, NotUsed] =
 | 
	
		
			
			| 75 |  | -      Flow[String]
 | 
	
		
			
			| 76 |  | -        .filter((rows: String) => {
 | 
	
		
			
			| 77 |  | -          rows.contains(moviePrimaryTitle)
 | 
	
		
			
			|  | 71 | +
 | 
	
		
			
			|  | 72 | +  def filterByMoviePrimaryTitleFlow(moviePrimaryTitle: String): Flow[Map[String, String], TvSeries, NotUsed] = {
 | 
	
		
			
			|  | 73 | +    val filterFlow: Flow[Map[String, String], TvSeries, NotUsed] = Flow[Map[String, String]]
 | 
	
		
			
			|  | 74 | +        .filter((rows: Map[String, String]) => {
 | 
	
		
			
			|  | 75 | +          rows.getOrElse("primaryTitle","")==moviePrimaryTitle
 | 
	
		
			
			| 78 | 76 |          })
 | 
	
		
			
			| 79 |  | -        .map(row => {
 | 
	
		
			
			| 80 |  | -          val movie: Array[String] = row.split("\t")
 | 
	
		
			
			| 81 |  | -          convertToTvSerie(movie)
 | 
	
		
			
			|  | 77 | +        .map(rowMap => {
 | 
	
		
			
			|  | 78 | +          convertToTvSerie(map = rowMap)
 | 
	
		
			
			| 82 | 79 |          })
 | 
	
		
			
			| 83 | 80 |  
 | 
	
		
			
			| 84 | 81 |      filterFlow
 | 
	
		
			
			| 85 | 82 |    }
 | 
	
		
			
			|  | 83 | +
 | 
	
		
			
			|  | 84 | +  def filterByPersonID(nconst: String): Flow[Map[String, String], Person, NotUsed]={
 | 
	
		
			
			|  | 85 | +    val personFilter: Flow[Map[String, String], Person, NotUsed]=
 | 
	
		
			
			|  | 86 | +      Flow[Map[String, String]]
 | 
	
		
			
			|  | 87 | +        .filter((rowMap:Map[String, String])=>{
 | 
	
		
			
			|  | 88 | +          rowMap.getOrElse("nconst","")==nconst
 | 
	
		
			
			|  | 89 | +        })
 | 
	
		
			
			|  | 90 | +        .map(rowMap=>{
 | 
	
		
			
			|  | 91 | +          convertToPerson(map = rowMap)
 | 
	
		
			
			|  | 92 | +        })
 | 
	
		
			
			|  | 93 | +
 | 
	
		
			
			|  | 94 | +    personFilter
 | 
	
		
			
			|  | 95 | +  }
 | 
	
		
			
			|  | 96 | +
 | 
	
		
			
			| 86 | 97 |    //source
 | 
	
		
			
			| 87 |  | -  def buildSource(inputFile: File): Source[String, NotUsed] = {
 | 
	
		
			
			|  | 98 | +  def buildSource(inputFile: File): Source[Map[String, String], NotUsed] = {
 | 
	
		
			
			| 88 | 99 |  
 | 
	
		
			
			| 89 |  | -    var source: Source[String, NotUsed] = null
 | 
	
		
			
			|  | 100 | +    var datasource: Source[Map[String, String], NotUsed] = null
 | 
	
		
			
			| 90 | 101 |  
 | 
	
		
			
			| 91 | 102 |      if (!fileExists(inputFile.getPath)) {
 | 
	
		
			
			| 92 | 103 |        return null
 | 
	
		
			
			| 93 | 104 |      }
 | 
	
		
			
			| 94 |  | -    source = Source
 | 
	
		
			
			|  | 105 | +    datasource = Source
 | 
	
		
			
			| 95 | 106 |        .single(inputFile)
 | 
	
		
			
			| 96 |  | -      .flatMapConcat(
 | 
	
		
			
			| 97 |  | -        (file: File) =>
 | 
	
		
			
			| 98 |  | -          FileIO.fromPath(Paths.get(inputFile.getPath)
 | 
	
		
			
			| 99 |  | -          )
 | 
	
		
			
			|  | 107 | +      .flatMapConcat((filename: File) => {
 | 
	
		
			
			|  | 108 | +        FileIO.fromPath(
 | 
	
		
			
			|  | 109 | +          Paths.get(filename.getPath)
 | 
	
		
			
			|  | 110 | +        )
 | 
	
		
			
			|  | 111 | +      }
 | 
	
		
			
			| 100 | 112 |        )
 | 
	
		
			
			| 101 | 113 |        .via(Compression.gunzip())
 | 
	
		
			
			| 102 |  | -      .via(
 | 
	
		
			
			| 103 |  | -        Framing.delimiter(ByteString("\n"), 4096)
 | 
	
		
			
			| 104 |  | -          .map(byteString => byteString.utf8String)
 | 
	
		
			
			| 105 |  | -      )
 | 
	
		
			
			|  | 114 | +      .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote))
 | 
	
		
			
			|  | 115 | +      .via(CsvToMap.toMapAsStrings())
 | 
	
		
			
			| 106 | 116 |  
 | 
	
		
			
			| 107 |  | -    source
 | 
	
		
			
			|  | 117 | +    datasource
 | 
	
		
			
			| 108 | 118 |    }
 | 
	
		
			
			| 109 | 119 |  
 | 
	
		
			
			| 110 |  | -  def buildAndValidateSource(inputFile: File): Source[String, NotUsed] = {
 | 
	
		
			
			|  | 120 | +  def buildAndValidateSource(inputFile: File): Source[Map[String, String], NotUsed] = {
 | 
	
		
			
			| 111 | 121 |  
 | 
	
		
			
			| 112 |  | -    val source: Source[String, NotUsed] = buildSource(inputFile = inputFile)
 | 
	
		
			
			|  | 122 | +    val source: Source[Map[String, String], NotUsed] = buildSource(inputFile = inputFile)
 | 
	
		
			
			| 113 | 123 |      if (source == null) {
 | 
	
		
			
			| 114 | 124 |        throw new FileNotFoundException(filename = inputFile.getPath)
 | 
	
		
			
			| 115 | 125 |      }
 | 
	
	
		
			
			|  | @@ -126,12 +136,18 @@ object AkkaStreamComponents {
 | 
	
		
			
			| 126 | 136 |      tvSeriesSink
 | 
	
		
			
			| 127 | 137 |    }
 | 
	
		
			
			| 128 | 138 |  
 | 
	
		
			
			| 129 |  | -  def buildPersonSink(logger: Logger): Sink[Person, Future[Done]] ={
 | 
	
		
			
			| 130 |  | -    val personSink :Sink[Person, Future[Done]] = Sink
 | 
	
		
			
			| 131 |  | -      .foreach[Person]((person: Person)=>{
 | 
	
		
			
			|  | 139 | +  def buildPersonsSink(logger: Logger): Sink[Person,Future[Done]] = {
 | 
	
		
			
			|  | 140 | +    val listPersonsSink: Sink[Person, Future[Done]]=
 | 
	
		
			
			|  | 141 | +      Sink.foreach[Person]((person: Person)=>{
 | 
	
		
			
			| 132 | 142 |          logger.info(s"${person.toString}")
 | 
	
		
			
			| 133 | 143 |        })
 | 
	
		
			
			| 134 | 144 |  
 | 
	
		
			
			| 135 |  | -    personSink
 | 
	
		
			
			|  | 145 | +    listPersonsSink
 | 
	
		
			
			|  | 146 | +  }
 | 
	
		
			
			|  | 147 | +
 | 
	
		
			
			|  | 148 | +  def buildPersonSink(logger: Logger): Sink[Person, Future[Done]] = {
 | 
	
		
			
			|  | 149 | +    Sink.foreach[Person](
 | 
	
		
			
			|  | 150 | +      (person: Person) => logger.info(s"${person.toString}")
 | 
	
		
			
			|  | 151 | +    )
 | 
	
		
			
			| 136 | 152 |    }
 | 
	
		
			
			| 137 | 153 |  }
 |