Переглянути джерело

last async working but not parsing all file

Antoine Dorian 1 рік тому
джерело
коміт
894e908f1f
1 змінених файлів з 49 додано та 35 видалено
  1. 49
    35
      src/main/scala/com/mediahub/FileReader.scala

+ 49
- 35
src/main/scala/com/mediahub/FileReader.scala Переглянути файл

@@ -2,6 +2,8 @@ package com.mediahub
2 2
 
3 3
 import akka.NotUsed
4 4
 import akka.actor.ActorSystem
5
+import akka.stream.ActorAttributes.supervisionStrategy
6
+import akka.stream.Supervision.resumingDecider
5 7
 import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
6 8
 import akka.stream.scaladsl.{Compression, FileIO, Flow, Sink, Source}
7 9
 import akka.util.ByteString
@@ -24,9 +26,9 @@ class FileReader {
24 26
   val titlePrincipalsResource: File = new File("src/main/resources/title.principals.tsv.gz")
25 27
   val nameBasicsResource: File = new File("src/main/resources/name.basics.tsv.gz")
26 28
 
27
-  def fileSource(file: File) = {
28
-    FileIO
29
-      .fromPath(Paths.get(file.getPath), 1 * 1024 * 1024)
29
+  def fileSource(file: File): Source[ByteString, NotUsed] = {
30
+    Source.single(file)
31
+      .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024 * 2))
30 32
       .via(Compression.gunzip())
31 33
   }
32 34
 
@@ -37,70 +39,82 @@ class FileReader {
37 39
   }
38 40
 
39 41
   //working search not optimized
40
-  val titleId: String = getIdOfTitle(titleBasicsResource, "Carmencita")
42
+  val titleId: String = getIdOfTitleAsync(titleBasicsResource, "Carmencita")
41 43
   println("value ", titleId)
42 44
 
43
-  val personsIdList: List[String] = getIdOfPersons(titlePrincipalsResource, titleId)
45
+  val personsIdList: List[String] = getIdOfPersonsAsync(titlePrincipalsResource, titleId).toList.flatten
44 46
   println("value ", personsIdList)
45 47
 
46
-  val personsList: List[String] = getPersons(nameBasicsResource, personsIdList)
48
+  val personsList = getPersons(nameBasicsResource, personsIdList)
47 49
   println("value ", personsList)
48 50
 
49 51
 
50
-  //working do not change for now
51
-  def getIdOfTitle(file: File, titleName: String): String = {
52
-    val result: Future[Option[String]] = Source.single(file)
53
-      .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024).reduce((a, b) => a ++ b))
54
-      .via(Compression.gunzip())
55
-      .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
56
-      .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
57
-      .filter(row => row.getOrElse("primaryTitle", "") == titleName)
58
-      .map(a => a.get("tconst"))
52
+  def getIdOfTitleAsync(file: File, titleName: String): String = {
53
+    val result = fileSource(file)
54
+      .mapAsync(50) {
55
+        res =>
56
+          Source.single(res)
57
+            .via(lineParser)
58
+            .filter(row => row.getOrElse("primaryTitle", "") == titleName)
59
+            .map(a => a.get("tconst"))
60
+            .withAttributes(supervisionStrategy(resumingDecider))
61
+            .runWith(Sink.head)
62
+      }
63
+      .withAttributes(supervisionStrategy(resumingDecider))
59 64
       .runWith(Sink.head)
60 65
 
61 66
     Await.result(result, 5 minutes)
62 67
     result.value.get.get.get
63 68
   }
64 69
 
65
-  def getIdOfTitleAsync(file: File, titleName: String): immutable.Iterable[Option[String]] = {
66
-    val result = fileSource(file)
67
-      .mapAsync(10) {
68
-        result =>
69
-          Source.single(result)
70
+  def getIdOfPersonsAsync(file: File, titleId: String): immutable.Iterable[Option[String]] = {
71
+    val te = fileSource(file)
72
+      .mapAsync(50) {
73
+        res =>
74
+          Source.single(res)
70 75
             .via(lineParser)
71
-            .filter(row => row.getOrElse("primaryTitle", "") == titleName)
72
-            .map(a => a.get("tconst"))
76
+            .filter(row => row.getOrElse("tconst", "") == titleId)
77
+            .map(a => a.get("nconst"))
78
+            .withAttributes(supervisionStrategy(resumingDecider))
73 79
             .runWith(Sink.collection)
74 80
       }
81
+      .withAttributes(supervisionStrategy(resumingDecider))
75 82
       .runWith(Sink.head)
76
-    Await.result(result, 5 minutes)
83
+
84
+    Await.result(te, 5 minutes)
77 85
   }
78 86
 
79
-  def getIdOfPersons(file: File, titleId: String): List[String] = {
87
+  def getPersons(file: File, personsIdList: List[String]): List[String] = {
80 88
     val result: Future[immutable.Iterable[Option[String]]] = Source.single(file)
81 89
       .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
82 90
       .via(Compression.gunzip())
83 91
       .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
84 92
       .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
85
-      .filter(row => row.getOrElse("tconst", "") == titleId)
86
-      .map(a => a.get("nconst"))
93
+      .filter(row => personsIdList.contains(row.getOrElse("nconst", "")))
94
+      .map(a => a.get("primaryName"))
87 95
       .runWith(Sink.collection)
88 96
 
89 97
     Await.result(result, 5 minutes)
90 98
     result.value.get.get.toList.flatten
91 99
   }
92 100
 
93
-  def getPersons(file: File, personsIdList: List[String]): List[String] = {
94
-    val result: Future[immutable.Iterable[Option[String]]] = Source.single(file)
101
+ /* def getPersonsAsync(file: File, personsIdList: List[String]) = {
102
+    val test = Source.single(file)
95 103
       .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
96 104
       .via(Compression.gunzip())
97
-      .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
98
-      .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
99
-      .filter(row => personsIdList.contains(row.getOrElse("nconst", "")))
100
-      .map(a => a.get("primaryName"))
105
+      .mapAsync(2) {
106
+        res =>
107
+          Source.single(res)
108
+            .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
109
+            .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
110
+            .filter(row => row.getOrElse("nconst", "") == personsIdList.head)
111
+            .map(a => a.get("primaryName"))
112
+            .runWith(Sink.collection)
113
+      }
101 114
       .runWith(Sink.collection)
102 115
 
103
-    Await.result(result, 5 minutes)
104
-    result.value.get.get.toList.flatten
116
+    Await.result(test, 5 minutes)
117
+    test.value.get.get.toList.flatten
105 118
   }
106
-}
119
+  */
120
+}

Powered by TurnKey Linux.