No Description

AkkaStreamComponents.scala 4.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package fr.natan.akkastreamfileprocessingapi.service
  2. import akka.actor.ActorSystem
  3. import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
  4. import akka.stream.scaladsl.{Compression, FileIO, Flow, Sink, Source}
  5. import akka.{Done, NotUsed}
  6. import com.typesafe.scalalogging.slf4j.Logger
  7. import fr.natan.akkastreamfileprocessingapi.businessexceptions.FileNotFoundException
  8. import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSeries}
  9. import fr.natan.akkastreamfileprocessingapi.valitator.Validators.fileExists
  10. import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.{buildPersonModel, buildTvSerieModel}
  11. import java.io.File
  12. import java.nio.file.Paths
  13. import scala.concurrent.Future
  14. //noinspection SpellCheckingInspection
  15. object AkkaStreamComponents {
  16. implicit val actor: ActorSystem = ActorSystem("AkkaStreamActor")
  17. //flows building
  18. def buildPersonFlow(): Flow[Map[String, String], Person, NotUsed] = {
  19. val personFlow: Flow[Map[String, String], Person, NotUsed] =
  20. Flow[Map[String, String]]
  21. .map((rowMap: Map[String, String]) => {
  22. buildPersonModel(rowMap)
  23. })
  24. personFlow
  25. }
  26. def filterByTvSeriePrimaryTitleFlow(tvSeriePrimaryTitle: String): Flow[Map[String, String], TvSeries, NotUsed] = {
  27. val filterFlow: Flow[Map[String, String], TvSeries, NotUsed] = Flow[Map[String, String]]
  28. .filter((rows: Map[String, String]) => {
  29. rows.getOrElse(key = "primaryTitle", default = "")==tvSeriePrimaryTitle
  30. })
  31. .map(rowMap => {
  32. val tvSerie: TvSeries = buildTvSerieModel(tvSerieMap = rowMap)
  33. tvSerie
  34. })
  35. filterFlow
  36. }
  37. def filterByPersonIdFlow(nconst: String): Flow[Map[String, String], Person, NotUsed]={
  38. val personFilter: Flow[Map[String, String], Person, NotUsed]=
  39. Flow[Map[String, String]]
  40. .filter((rowMap:Map[String, String])=>{
  41. rowMap.getOrElse(key="nconst",default="")==nconst
  42. })
  43. .map(rowMap=>{
  44. buildPersonModel(personMap = rowMap)
  45. })
  46. personFilter
  47. }
  48. def filterByPersonNameFlow(primaryName: String): Flow[Map[String, String], Person, NotUsed] ={
  49. val personFilter: Flow[Map[String, String], Person, NotUsed] =
  50. Flow[Map[String, String]]
  51. .filter((rowMap: Map[String, String]) =>{
  52. rowMap.getOrElse(key = "primaryName", default = "")==primaryName
  53. })
  54. .map((rowMap: Map[String, String])=>{
  55. buildPersonModel(personMap = rowMap)
  56. })
  57. personFilter
  58. }
  59. def buildTvSerieFlow(): Flow[Map[String, String], TvSeries, NotUsed] = {
  60. val tvFlow: Flow[Map[String, String], TvSeries, NotUsed] =
  61. Flow[Map[String, String]]
  62. .map((rowMap: Map[String, String]) => {
  63. buildTvSerieModel(tvSerieMap = rowMap)
  64. })
  65. tvFlow
  66. }
  67. //source building
  68. def buildSource(inputFile: File): Source[Map[String, String], NotUsed] = {
  69. var datasource: Source[Map[String, String], NotUsed] = null
  70. if (!fileExists(inputFile.getPath)) {
  71. return null
  72. }
  73. datasource = Source
  74. .single(inputFile)
  75. .flatMapConcat((filename: File) => {
  76. FileIO.fromPath(
  77. Paths.get(filename.getPath)
  78. )
  79. }
  80. )
  81. .via(Compression.gunzip())
  82. .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
  83. .via(CsvToMap.toMapAsStrings())
  84. datasource
  85. }
  86. def buildAndValidateSource(inputFile: File): Source[Map[String, String], _] = {
  87. val source: Source[Map[String, String], _] = buildSource(inputFile = inputFile)
  88. if (source == null) {
  89. throw new FileNotFoundException(filename = inputFile.getPath)
  90. }
  91. source
  92. }
  93. //sinks building
  94. def buildAllTvSeriesSink(logger: Logger): Sink[TvSeries, Future[Done]] = {
  95. val tvSeriesSink: Sink[TvSeries, Future[Done]] = Sink
  96. .foreach((tvSerie: TvSeries)=>logger.info(s"${tvSerie.toString}"))
  97. tvSeriesSink
  98. }
  99. def buildAllPersonsSink(logger: Logger): Sink[Person,Future[Done]] = {
  100. val listPersonsSink: Sink[Person, Future[Done]]=
  101. Sink.foreach((person: Person)=>logger.info(s"${person.toString}"))
  102. listPersonsSink
  103. }
  104. }

Powered by TurnKey Linux.