No Description

AkkaStreamFileProcessingImpl.scala 9.1KB


  1. package fr.natan.akkastreamfileprocessingapi.service
  2. import akka.Done
  3. import akka.stream.scaladsl.{Sink, Source}
  4. import com.typesafe.scalalogging.slf4j.Logger
  5. import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics}
  6. import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSerie}
  7. import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{
  8. actorSystem, buildAllPersonsSink, buildAllTvSeriesSink, buildAndValidateSource, buildPersonFlow, buildSource,
  9. buildTvSerieFlow, filterPersonByIdFlow, filterPersonByNameFlow, filterTvSerieByIdFlow, filterTvSerieByPrimaryTitleFlow
  10. }
  11. import fr.natan.akkastreamfileprocessingapi.service.UtilitiesClass.{
  12. getListOfPersonsForTvSerie, getListOfPersonsIDByTvSerieID, getTvSerieIDFuture, getTvSerieIdByPrimaryTitle
  13. }
  14. import org.slf4j.LoggerFactory
  15. import org.springframework.stereotype.Component
  16. import scala.concurrent.ExecutionContext.Implicits.global
  17. import scala.concurrent.Future
  18. import scala.language.postfixOps
  19. import scala.util.{Failure, Success}
  20. //noinspection SpellCheckingInspection
  21. @Component
  22. class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessingFuture {
  23. implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
  24. override def getPersonByIdFuture(personID: String): Future[Option[Person]] = {
  25. val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
  26. val start: Long = System.currentTimeMillis()
  27. val personFuture: Future[Option[Person]] = source
  28. .via(flow = filterPersonByIdFlow(personID = personID))
  29. .runWith(Sink.headOption)
  30. personFuture.andThen({
  31. case Failure(exception) => logger.info(s"$exception")
  32. case Success(value:Option[Person]) =>
  33. logger.info(s"$value")
  34. logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
  35. })
  36. personFuture
  37. }
  38. override def getPersonByNameFuture(primaryName: String): Future[Option[Person]]= {
  39. val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
  40. val start: Long = System.currentTimeMillis()
  41. val personFuture: Future[Option[Person]] = source
  42. .via(flow = filterPersonByNameFlow(primaryName = primaryName))
  43. .runWith(Sink.headOption)
  44. personFuture
  45. }
  46. override def getTvSerieByIdFuture(tvSerieID: String): Future[Option[TvSerie]] = {
  47. val source: Source[Map[String, String], _] = buildSource(inputFile = titleBasics)
  48. val start: Long = System.currentTimeMillis()
  49. val tvSerieFuture: Future[Option[TvSerie]]= source
  50. .via(flow = filterTvSerieByIdFlow(tvSerieID = tvSerieID))
  51. .runWith(Sink.headOption)
  52. tvSerieFuture.onComplete({
  53. case Failure(exception) => logger.info(s"$exception")
  54. case Success(value: Option[TvSerie]) =>
  55. logger.info(s"$value")
  56. logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
  57. })
  58. tvSerieFuture
  59. }
  60. override def getTvSeriesByPrimaryTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[TvSerie]] = {
  61. val start: Long = System.currentTimeMillis()
  62. val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
  63. val tvSriesFuture: Future[IndexedSeq[TvSerie]] = tvSeriesSource
  64. .via(flow = filterTvSerieByPrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle))
  65. .runWith(Sink.collection)
  66. tvSriesFuture.onComplete({
  67. case Failure(exception) => logger.info(s"$exception")
  68. case Success(value: IndexedSeq[TvSerie]) =>
  69. value.foreach((tvSrie: TvSerie) => logger.info(s"$tvSrie"))
  70. logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
  71. })
  72. tvSriesFuture
  73. }
  74. override def getPersonsForTvSerieByTvSerieTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]] = {
  75. //futures chaining
  76. logger.info("STEP 1/3 START")
  77. val start1: Long = System.currentTimeMillis()
  78. val tvSerieIDFuture: Future[Option[String]] = getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
  79. val finalFuture: Future[IndexedSeq[Person]] =
  80. tvSerieIDFuture.andThen({
  81. case Failure(exception) => logger.error(s"$exception")
  82. case Success(value: Option[String]) =>
  83. logger.info(s"TvSerie ID: $value")
  84. logger.info(s"STEP 1/3 END, elapsed time:${(System.currentTimeMillis() - start1) / 1000} sec")
  85. })
  86. .flatMap({
  87. _ =>
  88. logger.info("STEP 2/3 START")
  89. val start2: Long = System.currentTimeMillis()
  90. val listPersonIDsFuture: Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(
  91. tvSerieIdFuture = tvSerieIDFuture)
  92. listPersonIDsFuture.andThen({
  93. case Failure(exception) => logger.error(s"$exception")
  94. case Success(value) =>
  95. value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
  96. logger.info(s"STEP 2/3 END, elapsed time:${(System.currentTimeMillis() - start2) / 1000} sec")
  97. })
  98. .flatMap({
  99. _ =>
  100. logger.info("STEP 3/3 START")
  101. val start3: Long = System.currentTimeMillis()
  102. val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(
  103. listPersonsIDsFuture = listPersonIDsFuture)
  104. personsTeamFuture.andThen({
  105. case Failure(exception) => logger.error(s"$exception")
  106. case Success(value: IndexedSeq[Person]) =>
  107. value.foreach((person: Person) => logger.info(s"${person.toString}"))
  108. logger.info(s"STEP 3/3 END, elapsed time:${(System.currentTimeMillis() - start3) / 1000} sec")
  109. })
  110. })
  111. })
  112. finalFuture
  113. }
  114. override def getPersonsForTvSerieByTvSerieIDFuture(tvSerieId: String): Future[IndexedSeq[Person]] = {
  115. val start1: Long = System.currentTimeMillis()
  116. logger.info("STEP 1/3")
  117. val tvSerieIdFuture: Future[Option[String]] = getTvSerieIDFuture(tvSerieId = tvSerieId)
  118. tvSerieIdFuture.andThen({
  119. case Failure(exception) => logger.error(s"${exception.printStackTrace()}")
  120. case Success(value:Option[String]) =>
  121. logger.info(s"$value")
  122. logger.info(s"STEP 1/3 END, elapsed time:${(System.currentTimeMillis() - start1) / 1000} sec")
  123. })
  124. .flatMap({
  125. _ => logger.info("STEP 2/3")
  126. val start2: Long = System.currentTimeMillis()
  127. val listOfPersonsIDsFuture : Future[IndexedSeq[Option[String]]] = getListOfPersonsIDByTvSerieID(
  128. tvSerieIdFuture = tvSerieIdFuture)
  129. listOfPersonsIDsFuture.andThen({
  130. case Failure(exception) => logger.error(s"${exception.printStackTrace()}")
  131. case Success(value: IndexedSeq[Option[String]]) =>
  132. value.foreach((personID: Option[String]) => logger.info(s"Person ID:$personID"))
  133. logger.info(s"STEP 2/3 END, elapsed time:${(System.currentTimeMillis() - start2) / 1000} sec")
  134. })
  135. .flatMap({
  136. _ => logger.info("STEP 3/3")
  137. val start3: Long = System.currentTimeMillis()
  138. val personsTeamFuture: Future[IndexedSeq[Person]] = getListOfPersonsForTvSerie(
  139. listPersonsIDsFuture = listOfPersonsIDsFuture)
  140. personsTeamFuture.andThen({
  141. case Failure(exception) => logger.error(s"$exception")
  142. case Success(value: IndexedSeq[Person]) =>
  143. value.foreach((person: Person) => logger.info(s"${person.toString}"))
  144. logger.info(s"STEP 3/3 END, elapsed time:${(System.currentTimeMillis() - start3) / 1000} sec")
  145. })
  146. })
  147. })
  148. }
  149. override def getAllPersonsFuture(): Future[Done] = {
  150. val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
  151. //graph
  152. val startTime: Long = System.currentTimeMillis()
  153. val result: Future[Done] = personSource
  154. .via(flow = buildPersonFlow())
  155. .runWith(sink = buildAllPersonsSink(logger = logger))
  156. result.onComplete {
  157. case Failure(exception) => logger.error(s"$exception")
  158. case Success(value) =>
  159. logger.info(s"$value")
  160. val time: Long = (System.currentTimeMillis() - startTime) / 100
  161. logger.info(s"elapsed time: $time")
  162. }
  163. result
  164. }
  165. override def getAllTvSeriesFuture(): Future[Done] = {
  166. val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
  167. val sink: Sink[TvSerie, Future[Done]] = buildAllTvSeriesSink(logger = logger)
  168. val startingTime: Long = System.currentTimeMillis()
  169. //graph sink->flow->sink
  170. val results: Future[Done] = source
  171. .via(flow = buildTvSerieFlow())
  172. .runWith(sink = sink)
  173. .andThen {
  174. case Success(value) =>
  175. val elapsedTime: Long = (System.currentTimeMillis() - startingTime) / 1000
  176. logger.info(s"$value: successfully processing file, elapsed time $titleBasics: $elapsedTime sec")
  177. case Failure(error: Error) => logger.error(s"$error")
  178. }
  179. results
  180. }
  181. }

Powered by TurnKey Linux.