|
@@ -2,15 +2,13 @@ package fr.natan.akkastreamfileprocessingapi.service
|
2
|
2
|
|
3
|
3
|
import akka.actor.ActorSystem
|
4
|
4
|
import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
|
5
|
|
-import akka.stream.javadsl.Framing
|
6
|
5
|
import akka.stream.scaladsl.{Compression, FileIO, Flow, Sink, Source}
|
7
|
|
-import akka.util.ByteString
|
8
|
6
|
import akka.{Done, NotUsed}
|
9
|
7
|
import com.typesafe.scalalogging.slf4j.Logger
|
10
|
8
|
import fr.natan.akkastreamfileprocessingapi.businessexceptions.FileNotFoundException
|
11
|
|
-import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.separator
|
12
|
|
-import fr.natan.akkastreamfileprocessingapi.models.{Person, TvSeries}
|
|
9
|
+import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSeries}
|
13
|
10
|
import fr.natan.akkastreamfileprocessingapi.valitator.Validators.fileExists
|
|
11
|
+import fr.natan.akkastreamfileprocessingapi.models.ModelsBuilder.{buildPersonModel, buildTvSerieModel}
|
14
|
12
|
|
15
|
13
|
import java.io.File
|
16
|
14
|
import java.nio.file.Paths
|
|
@@ -20,81 +18,71 @@ object AkkaStreamComponents {
|
20
|
18
|
|
21
|
19
|
implicit val actor: ActorSystem = ActorSystem("AkkaStreamActor")
|
22
|
20
|
|
23
|
|
- private def convertToTvSerie(map: Map[String, String]): TvSeries = {
|
24
|
|
- val tvSerie: TvSeries = TvSeries(
|
25
|
|
- map("tconst"),
|
26
|
|
- map("titleType"),
|
27
|
|
- map("primaryTitle"),
|
28
|
|
- map("originalTitle"),
|
29
|
|
- map("isAdult"),
|
30
|
|
- map("startYear"),
|
31
|
|
- map("endYear"),
|
32
|
|
- map("runtimeMinutes"),
|
33
|
|
- map("genres")
|
34
|
|
- )
|
35
|
|
-
|
36
|
|
- tvSerie
|
37
|
|
- }
|
38
|
|
-
|
39
|
|
- private def convertToPerson(map: Map[String, String]): Person ={
|
40
|
|
- Person(
|
41
|
|
- map("nconst"),
|
42
|
|
- map("primaryName"),
|
43
|
|
- map("birthYear"),
|
44
|
|
- map("deathYear"),
|
45
|
|
- map("primaryProfession").split(",").toList,
|
46
|
|
- map("knownForTitles").split(",").toList,
|
47
|
|
- )
|
48
|
|
- }
|
49
|
|
-
|
50
|
|
- //flows
|
|
21
|
+ //flows building
|
51
|
22
|
|
|
23
|
+ //flow1
|
52
|
24
|
def buildTvSerieFlow(): Flow[Map[String, String], TvSeries, NotUsed] = {
|
53
|
25
|
|
54
|
26
|
val tvFlow: Flow[Map[String, String], TvSeries, NotUsed] =
|
55
|
27
|
Flow[Map[String, String]]
|
56
|
28
|
.map(row => {
|
57
|
|
- convertToTvSerie(row)
|
|
29
|
+ buildTvSerieModel(row)
|
58
|
30
|
})
|
59
|
31
|
tvFlow
|
60
|
32
|
}
|
61
|
33
|
|
|
34
|
+ //flow2
|
62
|
35
|
def buildPersonFlow(): Flow[Map[String, String], Person, NotUsed] = {
|
63
|
36
|
val personFlow: Flow[Map[String, String], Person, NotUsed] =
|
64
|
37
|
Flow[Map[String, String]]
|
65
|
38
|
.map((rowMap: Map[String, String]) => {
|
66
|
|
- convertToPerson(rowMap)
|
|
39
|
+ buildPersonModel(rowMap)
|
67
|
40
|
})
|
68
|
41
|
|
69
|
42
|
personFlow
|
70
|
43
|
}
|
71
|
44
|
|
|
45
|
+ //flow3
|
72
|
46
|
def filterByMoviePrimaryTitleFlow(moviePrimaryTitle: String): Flow[Map[String, String], TvSeries, NotUsed] = {
|
73
|
47
|
val filterFlow: Flow[Map[String, String], TvSeries, NotUsed] = Flow[Map[String, String]]
|
74
|
48
|
.filter((rows: Map[String, String]) => {
|
75
|
49
|
rows.getOrElse("primaryTitle","")==moviePrimaryTitle
|
76
|
50
|
})
|
77
|
51
|
.map(rowMap => {
|
78
|
|
- convertToTvSerie(map = rowMap)
|
|
52
|
+ buildTvSerieModel(map = rowMap)
|
79
|
53
|
})
|
80
|
54
|
|
81
|
55
|
filterFlow
|
82
|
56
|
}
|
83
|
57
|
|
84
|
|
- def filterByPersonID(nconst: String): Flow[Map[String, String], Person, NotUsed]={
|
|
58
|
+ //flow4
|
|
59
|
+ def filterByPersonIdFlow(nconst: String): Flow[Map[String, String], Person, NotUsed]={
|
85
|
60
|
val personFilter: Flow[Map[String, String], Person, NotUsed]=
|
86
|
61
|
Flow[Map[String, String]]
|
87
|
62
|
.filter((rowMap:Map[String, String])=>{
|
88
|
63
|
rowMap.getOrElse("nconst","")==nconst
|
89
|
64
|
})
|
90
|
65
|
.map(rowMap=>{
|
91
|
|
- convertToPerson(map = rowMap)
|
|
66
|
+ buildPersonModel(map = rowMap)
|
|
67
|
+ })
|
|
68
|
+
|
|
69
|
+ personFilter
|
|
70
|
+ }
|
|
71
|
+
|
|
72
|
+ def filterByPersonNameFlow(personName: String): Flow[Map[String, String], Person, NotUsed] ={
|
|
73
|
+ val personFilter: Flow[Map[String, String], Person, NotUsed] =
|
|
74
|
+ Flow[Map[String, String]]
|
|
75
|
+ .filter((rowMap: Map[String, String]) =>{
|
|
76
|
+ rowMap.getOrElse("primaryName","")==personName
|
|
77
|
+ })
|
|
78
|
+ .map((rowMap: Map[String, String])=>{
|
|
79
|
+ buildPersonModel(map = rowMap)
|
92
|
80
|
})
|
93
|
81
|
|
94
|
82
|
personFilter
|
95
|
83
|
}
|
96
|
84
|
|
97
|
|
- //source
|
|
85
|
+ //source building
|
98
|
86
|
def buildSource(inputFile: File): Source[Map[String, String], NotUsed] = {
|
99
|
87
|
|
100
|
88
|
var datasource: Source[Map[String, String], NotUsed] = null
|
|
@@ -126,7 +114,9 @@ object AkkaStreamComponents {
|
126
|
114
|
source
|
127
|
115
|
}
|
128
|
116
|
|
129
|
|
- //sink
|
|
117
|
+ //sinks building
|
|
118
|
+
|
|
119
|
+ //sink1
|
130
|
120
|
def buildTvSeriesSink(logger: Logger): Sink[TvSeries, Future[Done]] = {
|
131
|
121
|
val tvSeriesSink : Sink[TvSeries, Future[Done]]= Sink.foreach[TvSeries](
|
132
|
122
|
(movie: TvSeries) => {
|
|
@@ -136,15 +126,14 @@ object AkkaStreamComponents {
|
136
|
126
|
tvSeriesSink
|
137
|
127
|
}
|
138
|
128
|
|
139
|
|
- def buildPersonsSink(logger: Logger): Sink[Person,Future[Done]] = {
|
140
|
|
- val listPersonsSink: Sink[Person, Future[Done]]=
|
141
|
|
- Sink.foreach[Person]((person: Person)=>{
|
142
|
|
- logger.info(s"${person.toString}")
|
143
|
|
- })
|
144
|
|
-
|
|
129
|
+ //sink2
|
|
130
|
+ def buildAllPersonsSink(logger: Logger): Sink[Person,Future[Seq[Person]]] = {
|
|
131
|
+ val listPersonsSink: Sink[Person, Future[Seq[Person]]]=
|
|
132
|
+ Sink.seq
|
145
|
133
|
listPersonsSink
|
146
|
134
|
}
|
147
|
135
|
|
|
136
|
+ //sink3
|
148
|
137
|
def buildPersonSink(logger: Logger): Sink[Person, Future[Done]] = {
|
149
|
138
|
Sink.foreach[Person](
|
150
|
139
|
(person: Person) => logger.info(s"${person.toString}")
|