|
@@ -1,13 +1,12 @@
|
1
|
1
|
package fr.natan.akkastreamfileprocessingapi.service
|
2
|
2
|
|
|
3
|
+import akka.NotUsed
|
3
|
4
|
import akka.actor.ActorSystem
|
4
|
5
|
import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
|
5
|
6
|
import akka.stream.scaladsl.{Compression, FileIO, Flow, Sink, Source}
|
6
|
|
-import akka.{Done, NotUsed}
|
7
|
|
-import com.typesafe.scalalogging.slf4j.Logger
|
8
|
7
|
import fr.natan.akkastreamfileprocessingapi.businessexceptions.FileNotFoundException
|
9
|
|
-import fr.natan.akkastreamfileprocessingapi.models.ModelsAndJsonMap.{Person, TvSerie}
|
10
|
|
-import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.{buildPersonModel, buildTvSerieModel}
|
|
8
|
+import fr.natan.akkastreamfileprocessingapi.models.ModelsAndJsonMap.{Episode, Person, TvSerie}
|
|
9
|
+import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.{buildEpisodeModel, buildPersonModel, buildTvSerieModel}
|
11
|
10
|
import fr.natan.akkastreamfileprocessingapi.valitator.Validators.fileExists
|
12
|
11
|
|
13
|
12
|
import java.io.File
|
|
@@ -20,7 +19,7 @@ object AkkaStreamComponents {
|
20
|
19
|
implicit val actorSystem: ActorSystem = ActorSystem("AkkaStreamActor")
|
21
|
20
|
|
22
|
21
|
//flows building
|
23
|
|
- def buildPersonFlow(): Flow[Map[String, String], Person, NotUsed] = {
|
|
22
|
+ def buildPersonFlow (): Flow[Map[String, String], Person, NotUsed] = {
|
24
|
23
|
val personFlow: Flow[Map[String, String], Person, NotUsed] =
|
25
|
24
|
Flow[Map[String, String]]
|
26
|
25
|
.map((rowMap: Map[String, String]) => {
|
|
@@ -30,26 +29,26 @@ object AkkaStreamComponents {
|
30
|
29
|
personFlow
|
31
|
30
|
}
|
32
|
31
|
|
33
|
|
- def filterPersonByIdFlow(personID: String): Flow[Map[String, String], Person, NotUsed]={
|
34
|
|
- val personFlowFilter: Flow[Map[String, String], Person, NotUsed]=
|
|
32
|
+ def filterPersonByIdFlow(personID: String): Flow[Map[String, String], Person, NotUsed] = {
|
|
33
|
+ val personFlowFilter: Flow[Map[String, String], Person, NotUsed] =
|
35
|
34
|
Flow[Map[String, String]]
|
36
|
|
- .filter((rowMap:Map[String, String])=>{
|
37
|
|
- rowMap.getOrElse(key="nconst",default="")==personID
|
|
35
|
+ .filter((rowMap: Map[String, String]) => {
|
|
36
|
+ rowMap.getOrElse(key = "nconst", default = "") == personID
|
38
|
37
|
})
|
39
|
|
- .map(rowMap=>{
|
|
38
|
+ .map(rowMap => {
|
40
|
39
|
buildPersonModel(personMap = rowMap)
|
41
|
40
|
})
|
42
|
41
|
|
43
|
42
|
personFlowFilter
|
44
|
43
|
}
|
45
|
44
|
|
46
|
|
- def filterPersonByNameFlow(primaryName: String): Flow[Map[String, String], Person, NotUsed] ={
|
|
45
|
+ def filterPersonByNameFlow(primaryName: String): Flow[Map[String, String], Person, NotUsed] = {
|
47
|
46
|
val personFlowFilter: Flow[Map[String, String], Person, NotUsed] =
|
48
|
47
|
Flow[Map[String, String]]
|
49
|
|
- .filter((rowMap: Map[String, String]) =>{
|
50
|
|
- rowMap.getOrElse(key = "primaryName", default = "")==primaryName
|
|
48
|
+ .filter((rowMap: Map[String, String]) => {
|
|
49
|
+ rowMap.getOrElse(key = "primaryName", default = "") == primaryName
|
51
|
50
|
})
|
52
|
|
- .map((rowMap: Map[String, String])=>{
|
|
51
|
+ .map((rowMap: Map[String, String]) => {
|
53
|
52
|
buildPersonModel(personMap = rowMap)
|
54
|
53
|
})
|
55
|
54
|
|
|
@@ -66,16 +65,28 @@ object AkkaStreamComponents {
|
66
|
65
|
tvSerieFlow
|
67
|
66
|
}
|
68
|
67
|
|
69
|
|
- def filterTvSerieByIdFlow(tvSerieID: String): Flow[Map[String, String], TvSerie, NotUsed] = {
|
70
|
|
- val tvSerieFlowFilter : Flow[Map[String, String], TvSerie, NotUsed] =
|
|
68
|
+ def buildEpisodeFlow (): Flow[Map[String, String], Episode, NotUsed] ={
|
|
69
|
+ val episodeFlow: Flow[Map[String, String], Episode, NotUsed] =
|
71
|
70
|
Flow[Map[String, String]]
|
72
|
|
- .filter((rowMap: Map[String, String])=>rowMap.getOrElse(key = "tconst", default = "")==tvSerieID)
|
73
|
71
|
.map((rowMap: Map[String, String])=>{
|
|
72
|
+ buildEpisodeModel(episodeMap = rowMap)
|
|
73
|
+ })
|
|
74
|
+
|
|
75
|
+ episodeFlow
|
|
76
|
+ }
|
|
77
|
+
|
|
78
|
+ //filters flow
|
|
79
|
+ def filterTvSerieByIdFlow(tvSerieID: String): Flow[Map[String, String], TvSerie, NotUsed] = {
|
|
80
|
+ val tvSerieFlowFilter: Flow[Map[String, String], TvSerie, NotUsed] =
|
|
81
|
+ Flow[Map[String, String]]
|
|
82
|
+ .filter((rowMap: Map[String, String]) => rowMap.getOrElse(key = "tconst", default = "") == tvSerieID)
|
|
83
|
+ .map((rowMap: Map[String, String]) => {
|
74
|
84
|
buildTvSerieModel(tvSerieMap = rowMap)
|
75
|
85
|
})
|
76
|
86
|
|
77
|
87
|
tvSerieFlowFilter
|
78
|
88
|
}
|
|
89
|
+
|
79
|
90
|
def filterTvSerieByPrimaryTitleFlow(tvSeriePrimaryTitle: String): Flow[Map[String, String], TvSerie, NotUsed] = {
|
80
|
91
|
val filterFlowFilter: Flow[Map[String, String], TvSerie, NotUsed] = Flow[Map[String, String]]
|
81
|
92
|
.filter((rows: Map[String, String]) => {
|
|
@@ -122,15 +133,21 @@ object AkkaStreamComponents {
|
122
|
133
|
}
|
123
|
134
|
|
124
|
135
|
//sinks building
|
|
136
|
+ def buildMaxPersonsSink(): Sink[Person, Future[IndexedSeq[Person]]] = {
|
|
137
|
+ val listPersonsSink: Sink[Person, Future[IndexedSeq[Person]]] =
|
|
138
|
+ Sink.collection[Person, IndexedSeq[Person]]
|
|
139
|
+ listPersonsSink
|
|
140
|
+ }
|
125
|
141
|
|
126
|
|
- def buildAllTvSeriesSink(logger: Logger): Sink[TvSerie, Future[Done]] = {
|
127
|
|
- val tvSeriesSink: Sink[TvSerie, Future[Done]] = Sink
|
128
|
|
- .foreach((tvSerie: TvSerie)=>logger.info(s"${tvSerie.toString}"))
|
|
142
|
+ def buildMaxTvSeriesSink(): Sink[TvSerie, Future[IndexedSeq[TvSerie]]] = {
|
|
143
|
+ val tvSeriesSink: Sink[TvSerie, Future[IndexedSeq[TvSerie]]] = Sink
|
|
144
|
+ .collection[TvSerie, IndexedSeq[TvSerie]]
|
129
|
145
|
tvSeriesSink
|
130
|
146
|
}
|
131
|
|
- def buildAllPersonsSink(logger: Logger): Sink[Person,Future[Done]] = {
|
132
|
|
- val listPersonsSink: Sink[Person, Future[Done]]=
|
133
|
|
- Sink.foreach((person: Person)=>logger.info(s"${person.toString}"))
|
134
|
|
- listPersonsSink
|
|
147
|
+
|
|
148
|
+ def buildMaxEpisodesSink(): Sink[Episode, Future[IndexedSeq[Episode]]] = {
|
|
149
|
+ val episodeSink: Sink[Episode, Future[IndexedSeq[Episode]]] = Sink
|
|
150
|
+ .collection[Episode, IndexedSeq[Episode]]
|
|
151
|
+ episodeSink
|
135
|
152
|
}
|
136
|
153
|
}
|