Browse Source

read async all files and filter testing

Antoine DORIAN 1 year ago
parent
commit
986d88a9e8
1 changed files with 28 additions and 9 deletions
  1. 28
    9
      src/main/scala/com/mediahub/FileReader.scala

+ 28
- 9
src/main/scala/com/mediahub/FileReader.scala View File

11
 import akka.util.ByteString
11
 import akka.util.ByteString
12
 
12
 
13
 import java.nio.file.Paths
13
 import java.nio.file.Paths
14
+import java.time.Duration
14
 import java.util.stream.Collectors
15
 import java.util.stream.Collectors
16
+import scala.collection.immutable
15
 import scala.concurrent.ExecutionContext.Implicits.global
17
 import scala.concurrent.ExecutionContext.Implicits.global
16
-import scala.concurrent.{ExecutionContext, Future}
18
+import scala.concurrent.duration.DurationInt
19
+import scala.concurrent.{Await, ExecutionContext, Future}
20
+import scala.language.postfixOps
21
+import scala.util.{Failure, Success}
17
 
22
 
18
 
23
 
19
 @Component
24
 @Component
24
   val list: List[File] = getListOfFiles
29
   val list: List[File] = getListOfFiles
25
   val source: Source[File, NotUsed] = Source(list)
30
   val source: Source[File, NotUsed] = Source(list)
26
 
31
 
27
-  val result = source
32
+  val result: Future[immutable.Iterable[Option[String]]] = source
28
     .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
33
     .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
29
     .via(CsvParsing.lineScanner(CsvParsing.Tab))
34
     .via(CsvParsing.lineScanner(CsvParsing.Tab))
30
     .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
35
     .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
31
     .filter(row => row.getOrElse("primaryTitle", "") == "Autour d'une cabine")
36
     .filter(row => row.getOrElse("primaryTitle", "") == "Autour d'une cabine")
32
     .map(a => a.get("tconst"))
37
     .map(a => a.get("tconst"))
33
-    .toMat(Sink.foreach(println))(Keep.both)
34
-   // .runForeach(h => println("source 1 ", h))
38
+    .runWith(Sink.collection)
35
 
39
 
36
-  //.runWith(Sink.seq)
40
+  result.onComplete {
41
+    case Failure(exception) => println(s"failure: $exception")
42
+
43
+    case Success(value) => {
44
+      println(s"sucess: $value")
45
+    }
46
+
47
+  }
48
+  Await.result(result,  5 hours)
37
 
49
 
50
+  val re: List[Option[String]] = result.value.get.get.toList
51
+
52
+  println("value ", re.iterator.next().get)
53
+
54
+  //.runWith(Sink.seq)
55
+/*
38
   source
56
   source
39
     .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
57
     .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
40
     .via(CsvParsing.lineScanner(CsvParsing.Tab))
58
     .via(CsvParsing.lineScanner(CsvParsing.Tab))
41
     .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
59
     .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
42
-    .runForeach(h => println("source 2 ", h))
43
-   /* .filter(row => row.getOrElse("tconst", "name") == (result.value, "Antoine"))
44
-    .runForeach(println)*/
45
-
60
+    .runWith(Sink.collection)
61
+    .foreach(a => println("result", a))
62
+*/
46
 
63
 
64
+  /* .filter(row => row.getOrElse("tconst", "name") == (result.value, "Antoine"))
65
+   .runForeach(println)*/
47
 
66
 
48
 
67
 
49
   // readAsync()
68
   // readAsync()

Powered by TurnKey Linux.