placidenduwayo 2 роки тому
джерело
коміт
1c8c64627a

+ 13
- 14
src/main/java/fr/natan/akkastreamfileprocessingapi/controller/MovieController.java Переглянути файл

@@ -17,18 +17,6 @@ public class MovieController {
17 17
         this.akkaStreamFilesProcessing = akkaStreamFilesProcessing;
18 18
     }
19 19
 
20
-    @RequestMapping(value = "/movies", method = RequestMethod.GET)
21
-    private ResponseEntity<String> getAllMovies() {
22
-        akkaStreamFilesProcessing.getAllMovies();
23
-        return new ResponseEntity<>("is running", HttpStatus.OK);
24
-    }
25
-
26
-    @RequestMapping(value = "/movies/{movieTitle}", method = RequestMethod.GET)
27
-    private ResponseEntity<String> getMovieByTitle(@PathVariable(name ="movieTitle" ) String movieTitle){
28
-        akkaStreamFilesProcessing.getMoviesByTitle(movieTitle);
29
-        return new ResponseEntity<>("is running", HttpStatus.OK);
30
-    }
31
-
32 20
     @RequestMapping(value = "/persons", method = RequestMethod.GET)
33 21
     private ResponseEntity<String> getallPersons(){
34 22
         akkaStreamFilesProcessing.getAllPersons();
@@ -37,9 +25,8 @@ public class MovieController {
37 25
 
38 26
     @RequestMapping(value = "/persons/id/{personID}", method = RequestMethod.GET)
39 27
     private ResponseEntity<String> getPersonByID(@PathVariable(name = "personID") String nconst){
40
-       akkaStreamFilesProcessing.getPersonById(nconst);
41 28
 
42
-       return new ResponseEntity<>("is running", HttpStatus.OK);
29
+       return new ResponseEntity<>(akkaStreamFilesProcessing.getPersonById(nconst).toString(), HttpStatus.OK);
43 30
 
44 31
     }
45 32
 
@@ -48,4 +35,16 @@ public class MovieController {
48 35
         akkaStreamFilesProcessing.getPersonByName(primaryName);
49 36
         return new ResponseEntity<>("is running", HttpStatus.OK);
50 37
     }
38
+
39
+    @RequestMapping(value = "/movies", method = RequestMethod.GET)
40
+    private ResponseEntity<String> getAllMovies() {
41
+        akkaStreamFilesProcessing.getAllTvSeries();
42
+        return new ResponseEntity<>("is running", HttpStatus.OK);
43
+    }
44
+
45
+    @RequestMapping(value = "/movies/{movieTitle}", method = RequestMethod.GET)
46
+    private ResponseEntity<String> getMovieByTitle(@PathVariable(name ="movieTitle" ) String movieTitle){
47
+        akkaStreamFilesProcessing.getByTvSeriePrimaryTitle(movieTitle);
48
+        return new ResponseEntity<>("is running", HttpStatus.OK);
49
+    }
51 50
 }

+ 15
- 0
src/main/scala/fr/natan/akkastreamfileprocessingapi/models/Models.scala Переглянути файл

@@ -19,6 +19,13 @@ object Models {
19 19
         ", known for titles:" + knownForTitles +
20 20
         "]"
21 21
     }
22
+
23
+    def getNconst: String={
24
+      nconst
25
+    }
26
+    def getPrimaryName: String = {
27
+      primaryName
28
+    }
22 29
   }
23 30
 
24 31
   final case class TvSeries(
@@ -44,5 +51,13 @@ object Models {
44 51
         ", runtime minutes:" + runtimeMinutes +
45 52
         ", genre:" + genres + "]"
46 53
     }
54
+
55
+   def getTconst: String= {
56
+     tconst
57
+   }
58
+    def getPrimaryTitle: String={
59
+      primaryTitle
60
+    }
47 61
   }
62
+
48 63
 }

+ 30
- 30
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamComponents.scala Переглянути файл

@@ -21,17 +21,6 @@ object AkkaStreamComponents {
21 21
   //flows building
22 22
 
23 23
   //flow1
24
-  def buildTvSerieFlow(): Flow[Map[String, String], TvSeries, NotUsed] = {
25
-
26
-    val tvFlow: Flow[Map[String, String], TvSeries, NotUsed] =
27
-      Flow[Map[String, String]]
28
-        .map(row => {
29
-          buildTvSerieModel(row)
30
-        })
31
-    tvFlow
32
-  }
33
-
34
-  //flow2
35 24
   def buildPersonFlow(): Flow[Map[String, String], Person, NotUsed] = {
36 25
     val personFlow: Flow[Map[String, String], Person, NotUsed] =
37 26
       Flow[Map[String, String]]
@@ -42,14 +31,27 @@ object AkkaStreamComponents {
42 31
     personFlow
43 32
   }
44 33
 
34
+  //flow2
35
+
36
+  def buildTvSerieFlow(): Flow[Map[String, String], TvSeries, NotUsed] = {
37
+
38
+    val tvFlow: Flow[Map[String, String], TvSeries, NotUsed] =
39
+      Flow[Map[String, String]]
40
+        .map(row => {
41
+          buildTvSerieModel(row)
42
+        })
43
+    tvFlow
44
+  }
45
+
45 46
   //flow3
46
-  def filterByMoviePrimaryTitleFlow(moviePrimaryTitle: String): Flow[Map[String, String], TvSeries, NotUsed] = {
47
+  def filterByTvSeriePrimaryTitleFlow(tvSeriePrimaryTitle: String): Flow[Map[String, String], TvSeries, NotUsed] = {
47 48
     val filterFlow: Flow[Map[String, String], TvSeries, NotUsed] = Flow[Map[String, String]]
48 49
         .filter((rows: Map[String, String]) => {
49
-          rows.getOrElse("primaryTitle","")==moviePrimaryTitle
50
+          rows.getOrElse(key = "primaryName", default = "")==tvSeriePrimaryTitle
50 51
         })
51 52
         .map(rowMap => {
52
-          buildTvSerieModel(map = rowMap)
53
+          val tvSerie: TvSeries = buildTvSerieModel(map = rowMap)
54
+          tvSerie
53 55
         })
54 56
 
55 57
     filterFlow
@@ -60,7 +62,7 @@ object AkkaStreamComponents {
60 62
     val personFilter: Flow[Map[String, String], Person, NotUsed]=
61 63
       Flow[Map[String, String]]
62 64
         .filter((rowMap:Map[String, String])=>{
63
-          rowMap.getOrElse("nconst","")==nconst
65
+          rowMap.getOrElse(key="nconst",default="")==nconst
64 66
         })
65 67
         .map(rowMap=>{
66 68
           buildPersonModel(map = rowMap)
@@ -69,11 +71,12 @@ object AkkaStreamComponents {
69 71
     personFilter
70 72
   }
71 73
 
74
+  //flow5
72 75
   def filterByPersonNameFlow(personName: String): Flow[Map[String, String], Person, NotUsed] ={
73 76
     val personFilter: Flow[Map[String, String], Person, NotUsed] =
74 77
       Flow[Map[String, String]]
75 78
         .filter((rowMap: Map[String, String]) =>{
76
-          rowMap.getOrElse("primaryName","")==personName
79
+          rowMap.getOrElse(key = "primaryName", default = "")==personName
77 80
         })
78 81
         .map((rowMap: Map[String, String])=>{
79 82
           buildPersonModel(map = rowMap)
@@ -83,10 +86,10 @@ object AkkaStreamComponents {
83 86
   }
84 87
 
85 88
   //source building
86
-  def buildSource(inputFile: File): Source[Map[String, String], NotUsed] = {
87 89
 
88
-    var datasource: Source[Map[String, String], NotUsed] = null
90
+  def buildSource(inputFile: File): Source[Map[String, String], _] = {
89 91
 
92
+    var datasource: Source[Map[String, String], NotUsed] = null
90 93
     if (!fileExists(inputFile.getPath)) {
91 94
       return null
92 95
     }
@@ -105,9 +108,9 @@ object AkkaStreamComponents {
105 108
     datasource
106 109
   }
107 110
 
108
-  def buildAndValidateSource(inputFile: File): Source[Map[String, String], NotUsed] = {
111
+  def buildAndValidateSource(inputFile: File): Source[Map[String, String], _] = {
109 112
 
110
-    val source: Source[Map[String, String], NotUsed] = buildSource(inputFile = inputFile)
113
+    val source: Source[Map[String, String], _] = buildSource(inputFile = inputFile)
111 114
     if (source == null) {
112 115
       throw new FileNotFoundException(filename = inputFile.getPath)
113 116
     }
@@ -115,28 +118,25 @@ object AkkaStreamComponents {
115 118
   }
116 119
 
117 120
   //sinks building
118
-
119 121
   //sink1
120 122
   def buildTvSeriesSink(logger: Logger): Sink[TvSeries, Future[Done]] = {
121 123
     val tvSeriesSink : Sink[TvSeries, Future[Done]]= Sink.foreach[TvSeries](
122
-      (movie: TvSeries) => {
123
-        logger.info(s"${movie.toString}")
124
+      (tvSerie: TvSeries) => {
125
+        logger.info(s"${tvSerie.toString}")
124 126
       }
125 127
     )
126 128
     tvSeriesSink
127 129
   }
128 130
 
129 131
   //sink2
130
-  def buildAllPersonsSink(logger: Logger): Sink[Person,Future[Seq[Person]]] = {
131
-    val listPersonsSink: Sink[Person, Future[Seq[Person]]]=
132
-      Sink.seq
132
+  def buildAllPersonsSink(logger: Logger): Sink[Person,Future[Done]] = {
133
+    val listPersonsSink: Sink[Person, Future[Done]]=
134
+      Sink.foreach[Person]((person: Person)=>logger.info(s"${person.toString}"))
133 135
     listPersonsSink
134 136
   }
135 137
 
136 138
   //sink3
137
-  def buildPersonSink(logger: Logger): Sink[Person, Future[Done]] = {
138
-    Sink.foreach[Person](
139
-      (person: Person) => logger.info(s"${person.toString}")
140
-    )
139
+  def buildPersonSink(): Sink[Person, Future[Person]] = {
140
+    Sink.head
141 141
   }
142 142
 }

+ 7
- 5
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamFileProcessing.scala Переглянути файл

@@ -1,12 +1,14 @@
1 1
 package fr.natan.akkastreamfileprocessingapi.service
2 2
 
3
+import fr.natan.akkastreamfileprocessingapi.models.Models.Person
4
+
3 5
 trait AkkaStreamFileProcessing {
6
+  def getAllPersons(): Unit
4 7
 
5
-  def getAllMovies()
6
-  def getMoviesByTitle (movieTitle: String)
8
+  def getPersonById(nconst: String): Person
9
+  def getPersonByName(primaryName: String): Unit
7 10
 
8
-  def getAllPersons()
11
+  def getAllTvSeries(): Unit
9 12
 
10
-  def getPersonById(nconst: String)
11
-  def getPersonByName(primaryName: String)
13
+  def getByTvSeriePrimaryTitle(tvSerieTitle: String): Unit
12 14
 }

+ 77
- 48
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamFileProcessingImpl.scala Переглянути файл

@@ -6,12 +6,14 @@ import akka.{Done, NotUsed}
6 6
 import com.typesafe.scalalogging.slf4j.Logger
7 7
 import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics}
8 8
 import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSeries}
9
-import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAndValidateSource, buildPersonFlow, buildPersonSink, buildSource, buildTvSerieFlow, buildTvSeriesSink, filterByMoviePrimaryTitleFlow, filterByPersonIdFlow, filterByPersonNameFlow}
9
+import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.buildTvSerieModel
10
+import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamComponents.{buildAllPersonsSink, buildAndValidateSource, buildPersonFlow, buildPersonSink, buildSource, buildTvSerieFlow, buildTvSeriesSink, filterByPersonIdFlow, filterByPersonNameFlow, filterByTvSeriePrimaryTitleFlow}
10 11
 import org.slf4j.LoggerFactory
11 12
 import org.springframework.stereotype.Component
12 13
 
13 14
 import scala.concurrent.ExecutionContext.Implicits.global
14
-import scala.concurrent.Future
15
+import scala.concurrent.duration.DurationInt
16
+import scala.concurrent.{Await, Future}
15 17
 import scala.util.{Failure, Success}
16 18
 
17 19
 @Component
@@ -20,9 +22,58 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
20 22
   implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor")
21 23
   implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
22 24
 
23
-  override def getAllMovies(): Unit = {
25
+  override def getAllPersons(): Unit = {
26
+    val personSource: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
27
+    val personSink: Sink[Person, Future[Done]] = buildAllPersonsSink(logger = logger)
28
+
29
+    //graph
30
+    val startTime: Long = System.currentTimeMillis()
31
+    val result: Future[Done] = personSource
32
+      .via(flow = buildPersonFlow())
33
+      .runWith(sink = personSink)
34
+      .andThen {
35
+        case Success(value) =>
36
+          val elapsedTime: Long = (System.currentTimeMillis() - startTime) / 1000
37
+          logger.info(s"$value: Successfully processed, elapsed time: $elapsedTime")
38
+        case Failure(exception) => logger.error(s"$exception: Fail")
39
+      }
40
+  }
41
+
42
+  override def getPersonById(nconst: String): Person = {
43
+    val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
44
+
45
+    val startTime: Long = System.currentTimeMillis()
46
+    val res = source
47
+      .via(flow = filterByPersonIdFlow(nconst = nconst))
48
+      .runWith(sink = buildPersonSink())
49
+     .andThen {
50
+        case Success(value) =>
51
+          val elapsedTime: Long = (System.currentTimeMillis()-startTime)/1000
52
+          logger.info(s"$value: Successfully processed, elapsed time: $elapsedTime")
53
+        case Failure(exception) => logger.error(s"$exception: Fail")
54
+      }
55
+
56
+    Await.result(res,1 minutes)
57
+    val person: Person = res.value.get.get
58
+    person
59
+  }
60
+
61
+  override def getPersonByName(primaryName: String): Unit = {
62
+    val source: Source[Map[String, String], _] = buildSource(inputFile = nameBasics)
63
+    source
64
+      .via(
65
+        flow = filterByPersonNameFlow(personName = primaryName)
66
+      )
67
+      .runWith(sink = buildPersonSink())
68
+      .andThen {
69
+        case Failure(exception) => logger.info(s"$exception")
70
+        case Success(value) => logger.info(s"$value: Successufully processing")
71
+      }
72
+  }
24 73
 
25
-    val source: Source[Map[String, String], NotUsed] = buildAndValidateSource(inputFile = titleBasics)
74
+  override def getAllTvSeries(): Unit = {
75
+
76
+    val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
26 77
     val sink: Sink[TvSeries, Future[Done]] = buildTvSeriesSink(logger = logger)
27 78
 
28 79
     val startingTime: Long = System.currentTimeMillis()
@@ -40,13 +91,13 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
40 91
 
41 92
   }
42 93
 
43
-  override def getMoviesByTitle(moviePrimaryTitle: String): Unit = {
94
+  override def getByTvSeriePrimaryTitle(tvSeriePrimaryTitle: String): Unit = {
44 95
 
45
-    val tvSeriesSource: Source[Map[String, String], NotUsed] = buildAndValidateSource(inputFile = titleBasics)
96
+    val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
46 97
     val tvSeriesSink: Sink[TvSeries, Future[Done]] = buildTvSeriesSink(logger = logger)
47 98
 
48 99
     val filterByMovieTitleFlow: Flow[Map[String, String], TvSeries, NotUsed] =
49
-      filterByMoviePrimaryTitleFlow(moviePrimaryTitle = moviePrimaryTitle)
100
+      filterByTvSeriePrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle)
50 101
 
51 102
     val startTime: Long = System.currentTimeMillis()
52 103
     val listTvSeries: Future[Done] = tvSeriesSource
@@ -58,52 +109,30 @@ class AkkaStreamFileProcessingImpl extends AkkaStreamFileProcessing {
58 109
           logger.info(s"$value: successfully processing file, elapsed time: $elapsedTime sec")
59 110
         case Failure(error: Error) => logger.error(s"$error")
60 111
       }
61
-  }
62 112
 
63
-  override def getAllPersons(): Unit = {
64
-    val personSource: Source[Map[String, String], NotUsed] = buildSource(inputFile = nameBasics)
65
-    val personSink: Sink[Person, Future[Done]] = buildPersonSink(logger = logger)
66
-
67
-    //graph
68
-    val startTime: Long = System.currentTimeMillis()
69
-    personSource
70
-      .via(flow = buildPersonFlow())
71
-      .runWith(sink = personSink)
72
-      .andThen {
73
-        case Success(value) =>
74
-          val elapsedTime: Long = (System.currentTimeMillis() - startTime) / 1000
75
-          logger.info(s"$value: successfully processing, elapsed time: $elapsedTime sec")
76
-        case Failure(error: Error) => logger.error(s"$error")
77
-      }
113
+    getTvSerieIdByPrimaryTitle(primaryTitle = tvSeriePrimaryTitle)
78 114
   }
79 115
 
80
-  override def getPersonById(nconst: String): Unit = {
81
-    val source: Source[Map[String, String], NotUsed] = buildSource(inputFile = nameBasics)
82
-
83
-    val startTime: Long = System.currentTimeMillis()
84
-    source
85
-      .via(flow = filterByPersonIdFlow(nconst = nconst))
86
-      .runWith(sink = buildPersonSink(logger = logger))
87
-      .andThen {
88
-        case Success(value) =>
89
-          val elapsedTime: Long = (System.currentTimeMillis()-startTime)/1000
90
-          logger.info(s"$value: Successfully processed, elapsed time: $elapsedTime")
91
-        case Failure(exception) => logger.error(s"$exception: Fail")
92
-      }
116
+  private def getTvSerieIdByPrimaryTitle(primaryTitle: String): String ={
117
+    val source: Source[Map[String, String],_]= buildSource(inputFile = titleBasics)
118
+
119
+    var tvSerie: TvSeries =null
120
+    val res : Future[Option[String]]= source
121
+      .filter((rowMap: Map[String, String]) =>{
122
+        rowMap.getOrElse(key = "primaryTitle","")==primaryTitle
123
+      })
124
+      .map((rowMap: Map[String, String])=>{
125
+        tvSerie = buildTvSerieModel(rowMap)
126
+        rowMap.get("tconst")
127
+      })
128
+      .runWith(Sink.head)
129
+
130
+   Await.result(res,1 minutes)
131
+    val tconst:String = res.value.get.get.get
132
+    tconst
93 133
   }
94 134
 
95
-  override def getPersonByName(primaryName: String): Unit = {
96
-    val source: Source[Map[String, String], NotUsed] = buildSource(inputFile = nameBasics)
97
-    source
98
-      .via(
99
-        flow = filterByPersonNameFlow(personName = primaryName)
100
-      )
101
-      .runWith(sink = buildPersonSink(logger = logger))
102
-      .andThen {
103
-        case Failure(exception) => logger.info(s"$exception")
104
-        case Success(value) => logger.info(s"$value: Successufully processing")
105
-      }
106
-  }
135
+
107 136
 }
108 137
 
109 138
 

Powered by TurnKey Linux.