Browse Source

add a mapasync variation

Antoine Dorian 1 year ago
parent
commit
25a6daa5cb
1 changed files with 31 additions and 5 deletions
  1. 31
    5
      src/main/scala/com/mediahub/FileReader.scala

+ 31
- 5
src/main/scala/com/mediahub/FileReader.scala View File

3
 import akka.NotUsed
3
 import akka.NotUsed
4
 import akka.actor.ActorSystem
4
 import akka.actor.ActorSystem
5
 import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
5
 import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
6
-import akka.stream.javadsl.FileIO
7
-import akka.stream.scaladsl.{Compression, Framing, Sink, Source}
6
+import akka.stream.scaladsl.{Compression, FileIO, Flow, Sink, Source}
8
 import akka.util.ByteString
7
 import akka.util.ByteString
9
 import org.springframework.stereotype.Component
8
 import org.springframework.stereotype.Component
10
 
9
 
12
 import java.nio.file.Paths
11
 import java.nio.file.Paths
13
 import scala.collection.immutable
12
 import scala.collection.immutable
14
 import scala.concurrent.duration.DurationInt
13
 import scala.concurrent.duration.DurationInt
15
-import scala.concurrent.{Await, ExecutionContext, Future}
14
+import scala.concurrent.{Await, Future}
16
 import scala.language.postfixOps
15
 import scala.language.postfixOps
17
 
16
 
18
 
17
 
25
   val titlePrincipalsResource: File = new File("src/main/resources/title.principals.tsv.gz")
24
   val titlePrincipalsResource: File = new File("src/main/resources/title.principals.tsv.gz")
26
   val nameBasicsResource: File = new File("src/main/resources/name.basics.tsv.gz")
25
   val nameBasicsResource: File = new File("src/main/resources/name.basics.tsv.gz")
27
 
26
 
27
+  def fileSource(file: File) = {
28
+    FileIO
29
+      .fromPath(Paths.get(file.getPath), 1 * 1024 * 1024)
30
+      .via(Compression.gunzip())
31
+  }
32
+
33
+  val lineParser: Flow[ByteString, Map[String, String], NotUsed] = {
34
+    CsvParsing
35
+      .lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote)
36
+      .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
37
+  }
38
+
39
+  //working search not optimized
28
   val titleId: String = getIdOfTitle(titleBasicsResource, "Carmencita")
40
   val titleId: String = getIdOfTitle(titleBasicsResource, "Carmencita")
29
   println("value ", titleId)
41
   println("value ", titleId)
30
 
42
 
31
-  val personsIdList: List[String]  = getIdOfPersons(titlePrincipalsResource, titleId)
43
+  val personsIdList: List[String] = getIdOfPersons(titlePrincipalsResource, titleId)
32
   println("value ", personsIdList)
44
   println("value ", personsIdList)
33
 
45
 
34
   val personsList: List[String] = getPersons(nameBasicsResource, personsIdList)
46
   val personsList: List[String] = getPersons(nameBasicsResource, personsIdList)
35
   println("value ", personsList)
47
   println("value ", personsList)
36
 
48
 
37
 
49
 
50
+  //working do not change for now
38
   def getIdOfTitle(file: File, titleName: String): String = {
51
   def getIdOfTitle(file: File, titleName: String): String = {
39
     val result: Future[Option[String]] = Source.single(file)
52
     val result: Future[Option[String]] = Source.single(file)
40
-      .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
53
+      .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024).reduce((a, b) => a ++ b))
41
       .via(Compression.gunzip())
54
       .via(Compression.gunzip())
42
       .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
55
       .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
43
       .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
56
       .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
49
     result.value.get.get.get
62
     result.value.get.get.get
50
   }
63
   }
51
 
64
 
65
+  def getIdOfTitleAsync(file: File, titleName: String): immutable.Iterable[Option[String]] = {
66
+    val result = fileSource(file)
67
+      .mapAsync(10) {
68
+        result =>
69
+          Source.single(result)
70
+            .via(lineParser)
71
+            .filter(row => row.getOrElse("primaryTitle", "") == titleName)
72
+            .map(a => a.get("tconst"))
73
+            .runWith(Sink.collection)
74
+      }
75
+      .runWith(Sink.head)
76
+    Await.result(result, 5 minutes)
77
+  }
52
 
78
 
53
   def getIdOfPersons(file: File, titleId: String): List[String] = {
79
   def getIdOfPersons(file: File, titleId: String): List[String] = {
54
     val result: Future[immutable.Iterable[Option[String]]] = Source.single(file)
80
     val result: Future[immutable.Iterable[Option[String]]] = Source.single(file)

Powered by TurnKey Linux.