Преглед изворни кода

working finder service for people depending on title name

Antoine DORIAN пре 1 година
родитељ
комит
7a0d1bf83f
3 измењених фајлова са 53 додато и 90 уклоњено
  1. 0
    17
      src/main/resources/test.tsv
  2. 0
    2
      src/main/resources/test2.tsv
  3. 53
    71
      src/main/scala/com/mediahub/FileReader.scala

+ 0
- 17
src/main/resources/test.tsv Прегледај датотеку

@@ -1,17 +0,0 @@
1
-tconst	titleType	primaryTitle	originalTitle	isAdult	startYear	endYear	runtimeMinutes	genres
2
-tt0000001	short	Carmencita	Carmencita	0	1894	\N	1	Documentary,Short
3
-tt0000002	short	Le clown et ses chiens	Le clown et ses chiens	0	1892	\N	5	Animation,Short
4
-tt0000003	short	Pauvre Pierrot	Pauvre Pierrot	0	1892	\N	4	Animation,Comedy,Romance
5
-tt0000004	short	Un bon bock	Un bon bock	0	1892	\N	12	Animation,Short
6
-tt0000005	short	Blacksmith Scene	Blacksmith Scene	0	1893	\N	1	Comedy,Short
7
-tt0000006	short	Chinese Opium Den	Chinese Opium Den	0	1894	\N	1	Short
8
-tt0000007	short	Corbett and Courtney Before the Kinetograph	Corbett and Courtney Before the Kinetograph	0	1894	\N	1	Short,Sport
9
-tt0000008	short	Edison Kinetoscopic Record of a Sneeze	Edison Kinetoscopic Record of a Sneeze	0	1894	\N	1	Documentary,Short
10
-tt0000009	movie	Miss Jerry	Miss Jerry	0	1894	\N	45	Romance
11
-tt0000010	short	Leaving the Factory	La sortie de l'usine Lumi?¿re ?á Lyon	0	1895	\N	1	Documentary,Short
12
-tt0000011	short	Akrobatisches Potpourri	Akrobatisches Potpourri	0	1895	\N	1	Documentary,Short
13
-tt0000012	short	The Arrival of a Train	L'arriv?®e d'un train ?á La Ciotat	0	1896	\N	1	Documentary,Short
14
-tt0000013	short	The Photographical Congress Arrives in Lyon	Le d?®barquement du congr?¿s de photographie ?á Lyon	0	1895	\N	1	Documentary,Short
15
-tt0000014	short	The Waterer Watered	L'arroseur arros?®	0	1895	\N	1	Comedy,Short
16
-tt0000015	short	Autour d'une cabine	Autour d'une cabine	0	1894	\N	2	Animation,Short
17
-tt0000016	short	Boat Leaving the Port	Barque sortant du port	0	1895	\N	1	Documentary,Short

+ 0
- 2
src/main/resources/test2.tsv Прегледај датотеку

@@ -1,2 +0,0 @@
1
-tconst	titleType	name	originalTitle	isAdult	startYear	endYear	runtimeMinutes	ge
2
-tt0000015	short	Antoine	Autour d'une cabine	0	1900	\N	2	Animation,Short

+ 53
- 71
src/main/scala/com/mediahub/FileReader.scala Прегледај датотеку

@@ -1,24 +1,19 @@
1 1
 package com.mediahub
2 2
 
3
-import akka.{Done, NotUsed}
4
-import org.springframework.stereotype.Component
5
-
6
-import java.io.File
3
+import akka.NotUsed
7 4
 import akka.actor.ActorSystem
8 5
 import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
9 6
 import akka.stream.javadsl.FileIO
10
-import akka.stream.scaladsl.{Compression, Flow, Framing, Keep, Sink, Source}
7
+import akka.stream.scaladsl.{Compression, Framing, Sink, Source}
11 8
 import akka.util.ByteString
9
+import org.springframework.stereotype.Component
12 10
 
11
+import java.io.File
13 12
 import java.nio.file.Paths
14
-import java.time.Duration
15
-import java.util.stream.Collectors
16 13
 import scala.collection.immutable
17
-import scala.concurrent.ExecutionContext.Implicits.global
18 14
 import scala.concurrent.duration.DurationInt
19 15
 import scala.concurrent.{Await, ExecutionContext, Future}
20 16
 import scala.language.postfixOps
21
-import scala.util.{Failure, Success}
22 17
 
23 18
 
24 19
 @Component
@@ -26,73 +21,60 @@ class FileReader {
26 21
 
27 22
   implicit val system: ActorSystem = ActorSystem()
28 23
 
29
-  val list: List[File] = getListOfFiles
30
-  val source: Source[File, NotUsed] = Source(list)
24
+  val titleBasicsResource: File = new File("src/main/resources/title.basics.tsv.gz")
25
+  val titlePrincipalsResource: File = new File("src/main/resources/title.principals.tsv.gz")
26
+  val nameBasicsResource: File = new File("src/main/resources/name.basics.tsv.gz")
27
+
28
+  val titleId: String = getIdOfTitle(titleBasicsResource, "Carmencita")
29
+  println("value ", titleId)
31 30
 
32
-  val result: Future[immutable.Iterable[Option[String]]] = source
33
-    .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
34
-    .via(CsvParsing.lineScanner(CsvParsing.Tab))
35
-    .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
36
-    .filter(row => row.getOrElse("primaryTitle", "") == "Autour d'une cabine")
37
-    .map(a => a.get("tconst"))
38
-    .runWith(Sink.collection)
31
+  val personsIdList: List[String]  = getIdOfPersons(titlePrincipalsResource, titleId)
32
+  println("value ", personsIdList)
39 33
 
40
-  result.onComplete {
41
-    case Failure(exception) => println(s"failure: $exception")
34
+  val personsList: List[String] = getPersons(nameBasicsResource, personsIdList)
35
+  println("value ", personsList)
42 36
 
43
-    case Success(value) => {
44
-      println(s"sucess: $value")
45
-    }
46 37
 
38
+  def getIdOfTitle(file: File, titleName: String): String = {
39
+    val result: Future[Option[String]] = Source.single(file)
40
+      .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
41
+      .via(Compression.gunzip())
42
+      .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
43
+      .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
44
+      .filter(row => row.getOrElse("primaryTitle", "") == titleName)
45
+      .map(a => a.get("tconst"))
46
+      .runWith(Sink.head)
47
+
48
+    Await.result(result, 5 minutes)
49
+    result.value.get.get.get
47 50
   }
48
-  Await.result(result,  5 hours)
49
-
50
-  val re: List[Option[String]] = result.value.get.get.toList
51
-
52
-  println("value ", re.iterator.next().get)
53
-
54
-  //.runWith(Sink.seq)
55
-/*
56
-  source
57
-    .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
58
-    .via(CsvParsing.lineScanner(CsvParsing.Tab))
59
-    .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
60
-    .runWith(Sink.collection)
61
-    .foreach(a => println("result", a))
62
-*/
63
-
64
-  /* .filter(row => row.getOrElse("tconst", "name") == (result.value, "Antoine"))
65
-   .runForeach(println)*/
66
-
67
-
68
-  // readAsync()
69
-
70
-
71
-  /* def readAsync(): Source = {
72
-     Source(list)
73
-       .mapAsync(4) { value =>
74
-         implicit val ec: ExecutionContext = ActorSystem().dispatcher
75
-         Future {
76
-           source
77
-             .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
78
-             .via(CsvParsing.lineScanner(CsvParsing.Tab))
79
-             .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
80
-             .filter(row => row.getOrElse("primaryTitle", "") == "Autour d'une cabine")
81
-             .runForeach(println)
82
-           println(s"Process in Thread:${Thread.currentThread().getName}")
83
-           value
84
-         }
85
-       }
86
-       .runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}")))
87
-   }*/
88
-
89
-  def getListOfFiles: List[File] = {
90
-    val d = new File("src/main/resources")
91
-    if (d.exists && d.isDirectory) {
92
-      d.listFiles.filter(_.isFile).toList
93
-    } else {
94
-      List[File]()
95
-    }
51
+
52
+
53
+  def getIdOfPersons(file: File, titleId: String): List[String] = {
54
+    val result: Future[immutable.Iterable[Option[String]]] = Source.single(file)
55
+      .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
56
+      .via(Compression.gunzip())
57
+      .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
58
+      .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
59
+      .filter(row => row.getOrElse("tconst", "") == titleId)
60
+      .map(a => a.get("nconst"))
61
+      .runWith(Sink.collection)
62
+
63
+    Await.result(result, 5 minutes)
64
+    result.value.get.get.toList.flatten
96 65
   }
97 66
 
67
+  def getPersons(file: File, personsIdList: List[String]): List[String] = {
68
+    val result: Future[immutable.Iterable[Option[String]]] = Source.single(file)
69
+      .flatMapConcat(f => FileIO.fromPath(Paths.get(f.getPath), 1 * 1024 * 1024))
70
+      .via(Compression.gunzip())
71
+      .via(CsvParsing.lineScanner(CsvParsing.Tab, CsvParsing.DoubleQuote, CsvParsing.DoubleQuote))
72
+      .via(CsvToMap.toMapAsStringsCombineAll(headerPlaceholder = Option.empty))
73
+      .filter(row => personsIdList.contains(row.getOrElse("nconst", "")))
74
+      .map(a => a.get("primaryName"))
75
+      .runWith(Sink.collection)
76
+
77
+    Await.result(result, 5 minutes)
78
+    result.value.get.get.toList.flatten
79
+  }
98 80
 }

Powered by TurnKey Linux.