|
@@ -7,7 +7,7 @@ import akka.util.ByteString
|
7
|
7
|
import akka.{Done, NotUsed}
|
8
|
8
|
import com.typesafe.scalalogging.slf4j.Logger
|
9
|
9
|
import fr.natan.akkastreamfileprocessingapi.businessexceptions.FileNotFoundException
|
10
|
|
-import fr.natan.akkastreamfileprocessingapi.models.Movie
|
|
10
|
+import fr.natan.akkastreamfileprocessingapi.models.TvSeries
|
11
|
11
|
import fr.natan.akkastreamfileprocessingapi.valitator.Validators.fileExists
|
12
|
12
|
|
13
|
13
|
import java.io.File
|
|
@@ -18,16 +18,16 @@ object AkkaStreamComponents {
|
18
|
18
|
|
19
|
19
|
implicit val actor: ActorSystem = ActorSystem("AkkaStreamActor")
|
20
|
20
|
|
21
|
|
- private def convertToClassMovie(array: Array[String]): Movie = {
|
22
|
|
- val movie: Movie = Movie(array(0), array(1), array(2), array(3), array(4).toShort, array(5).toInt, array(6), array(7), array(8))
|
|
21
|
+ private def convertToClassMovie(array: Array[String]): TvSeries = {
|
|
22
|
+ val movie: TvSeries = TvSeries(array(0), array(1), array(2), array(3), array(4), array(5), array(6), array(7), array(8))
|
23
|
23
|
movie
|
24
|
24
|
}
|
25
|
25
|
|
26
|
26
|
//flows
|
27
|
27
|
|
28
|
|
- def buildMovieFlow(movieID: String): Flow[String, Movie, NotUsed] = {
|
|
28
|
+ def buildMovieFlow(movieID: String): Flow[String, TvSeries, NotUsed] = {
|
29
|
29
|
|
30
|
|
- val flow: Flow[String, Movie, NotUsed] =
|
|
30
|
+ val flow: Flow[String, TvSeries, NotUsed] =
|
31
|
31
|
Flow[String].filter(rows => !rows.contains(movieID))
|
32
|
32
|
.map(row => {
|
33
|
33
|
val movie: Array[String] = row.split("\t")
|
|
@@ -37,8 +37,8 @@ object AkkaStreamComponents {
|
37
|
37
|
|
38
|
38
|
}
|
39
|
39
|
|
40
|
|
- def filterByMoviePrimaryTitleFlow(moviePrimaryTitle: String): Flow[String, Movie, NotUsed] = {
|
41
|
|
- val filterFlow: Flow[String, Movie, NotUsed] =
|
|
40
|
+ def filterByMoviePrimaryTitleFlow(moviePrimaryTitle: String): Flow[String, TvSeries, NotUsed] = {
|
|
41
|
+ val filterFlow: Flow[String, TvSeries, NotUsed] =
|
42
|
42
|
Flow[String]
|
43
|
43
|
.filter((rows: String) => {
|
44
|
44
|
rows.contains(moviePrimaryTitle)
|
|
@@ -50,7 +50,6 @@ object AkkaStreamComponents {
|
50
|
50
|
|
51
|
51
|
filterFlow
|
52
|
52
|
}
|
53
|
|
-
|
54
|
53
|
//source
|
55
|
54
|
def buildSource(inputFile: File): Source[String, NotUsed] = {
|
56
|
55
|
|
|
@@ -69,7 +68,7 @@ object AkkaStreamComponents {
|
69
|
68
|
.via(Compression.gunzip())
|
70
|
69
|
.via(
|
71
|
70
|
Framing.delimiter(ByteString("\n"), 4096)
|
72
|
|
- .map(byteString => byteString.utf8String)
|
|
71
|
+ .map(byteString => byteString.utf8String)
|
73
|
72
|
)
|
74
|
73
|
|
75
|
74
|
source
|
|
@@ -86,9 +85,9 @@ object AkkaStreamComponents {
|
86
|
85
|
}
|
87
|
86
|
|
88
|
87
|
//sink
|
89
|
|
- def buildSink(logger: Logger): Sink[Movie, Future[Done]] = {
|
90
|
|
- val sink = Sink.foreach[Movie](
|
91
|
|
- (movie: Movie) => logger.info(s"${movie.toString}")
|
|
88
|
+ def buildSink(logger: Logger): Sink[TvSeries, Future[Done]] = {
|
|
89
|
+ val sink = Sink.foreach[TvSeries](
|
|
90
|
+ (movie: TvSeries) => logger.info(s"${movie.toString}")
|
92
|
91
|
)
|
93
|
92
|
sink
|
94
|
93
|
}
|