Browse Source

first commit

placidenduwayo 1 year ago
parent
commit
38196a3a9b

+ 4
- 3
src/main/java/fr/natan/akkastreamfileprocessingapi/controller/TvSeriesController.java View File

13
 import org.springframework.web.bind.annotation.RestController;
13
 import org.springframework.web.bind.annotation.RestController;
14
 import play.api.libs.json.JsValue;
14
 import play.api.libs.json.JsValue;
15
 import play.api.libs.json.Json;
15
 import play.api.libs.json.Json;
16
+import scala.Option;
16
 import scala.collection.IndexedSeq;
17
 import scala.collection.IndexedSeq;
17
 import scala.concurrent.Future;
18
 import scala.concurrent.Future;
18
 
19
 
35
 
36
 
36
     @RequestMapping(value = "/persons/id/{personID}", method = RequestMethod.GET)
37
     @RequestMapping(value = "/persons/id/{personID}", method = RequestMethod.GET)
37
     private String getPersonByID(@PathVariable(name = "personID") String personID) throws ExecutionException, InterruptedException {
38
     private String getPersonByID(@PathVariable(name = "personID") String personID) throws ExecutionException, InterruptedException {
38
-        Future<ModelsAndJsonMap.Person> futurePerson = akkaStreamFilesProcessing.getPersonByIdFuture(personID);
39
+        Future<Option<ModelsAndJsonMap.Person>> futurePerson = akkaStreamFilesProcessing.getPersonByIdFuture(personID);
39
         CompletableFuture<ModelsAndJsonMap.Person> completableFuture = CompletableFutureBuilder
40
         CompletableFuture<ModelsAndJsonMap.Person> completableFuture = CompletableFutureBuilder
40
                 .buildcompletableFuture1(futurePerson);
41
                 .buildcompletableFuture1(futurePerson);
41
         ModelsAndJsonMap.Person person = completableFuture.get();
42
         ModelsAndJsonMap.Person person = completableFuture.get();
48
     private String getPersonByName(@PathVariable(name = "primaryName") String primaryName)
49
     private String getPersonByName(@PathVariable(name = "primaryName") String primaryName)
49
             throws ExecutionException, InterruptedException {
50
             throws ExecutionException, InterruptedException {
50
 
51
 
51
-        Future<ModelsAndJsonMap.Person> personFuture = akkaStreamFilesProcessing.getPersonByNameFuture(primaryName);
52
+        Future<Option<ModelsAndJsonMap.Person>> personFuture = akkaStreamFilesProcessing.getPersonByNameFuture(primaryName);
52
         CompletableFuture<ModelsAndJsonMap.Person> personCompletableFuture = CompletableFutureBuilder
53
         CompletableFuture<ModelsAndJsonMap.Person> personCompletableFuture = CompletableFutureBuilder
53
                 .buildcompletableFuture1(personFuture);
54
                 .buildcompletableFuture1(personFuture);
54
         ModelsAndJsonMap.Person person = personCompletableFuture.get();
55
         ModelsAndJsonMap.Person person = personCompletableFuture.get();
59
     @RequestMapping(value = "/tvseries/id/{tvSerieId}", method = RequestMethod.GET)
60
     @RequestMapping(value = "/tvseries/id/{tvSerieId}", method = RequestMethod.GET)
60
     private String getTvSerieByID(@PathVariable(name = "tvSerieId") String tvSerieID)
61
     private String getTvSerieByID(@PathVariable(name = "tvSerieId") String tvSerieID)
61
             throws ExecutionException, InterruptedException {
62
             throws ExecutionException, InterruptedException {
62
-        Future<ModelsAndJsonMap.TvSerie> tvSerieFuture = akkaStreamFilesProcessing.getTvSerieByIdFuture(tvSerieID);
63
+        Future<Option<ModelsAndJsonMap.TvSerie>> tvSerieFuture = akkaStreamFilesProcessing.getTvSerieByIdFuture(tvSerieID);
63
         CompletableFuture<ModelsAndJsonMap.TvSerie> tvSerieCompletableFuture = CompletableFutureBuilder
64
         CompletableFuture<ModelsAndJsonMap.TvSerie> tvSerieCompletableFuture = CompletableFutureBuilder
64
                 .buildcompletableFuture1(tvSerieFuture);
65
                 .buildcompletableFuture1(tvSerieFuture);
65
 
66
 

+ 3
- 2
src/main/java/fr/natan/akkastreamfileprocessingapi/futurecompleteness/CompletableFutureBuilder.java View File

1
 package fr.natan.akkastreamfileprocessingapi.futurecompleteness;
1
 package fr.natan.akkastreamfileprocessingapi.futurecompleteness;
2
 
2
 
3
+import scala.Option;
3
 import scala.collection.IndexedSeq;
4
 import scala.collection.IndexedSeq;
4
 import scala.concurrent.Future;
5
 import scala.concurrent.Future;
5
 
6
 
12
 public class CompletableFutureBuilder<T> {
13
 public class CompletableFutureBuilder<T> {
13
 
14
 
14
    private static final Logger logger = Logger.getLogger("log");
15
    private static final Logger logger = Logger.getLogger("log");
15
-    public static<T> CompletableFuture<T> buildcompletableFuture1(Future<T> futureT) {
16
+    public static<T> CompletableFuture<T> buildcompletableFuture1(Future<Option<T>> futureT) {
16
         CompletableFuture<T> completableFuture = new CompletableFuture<>();
17
         CompletableFuture<T> completableFuture = new CompletableFuture<>();
17
 
18
 
18
 
19
 
24
                     throw new RuntimeException(e);
25
                     throw new RuntimeException(e);
25
                 }
26
                 }
26
             }
27
             }
27
-            T task = futureT.value().get().get();
28
+            T task = futureT.value().get().get().get();
28
             completableFuture.complete(task);
29
             completableFuture.complete(task);
29
         });
30
         });
30
 
31
 

+ 3
- 3
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamFileProcessingFuture.scala View File

6
 import scala.concurrent.Future
6
 import scala.concurrent.Future
7
 
7
 
8
 trait AkkaStreamFileProcessingFuture {
8
 trait AkkaStreamFileProcessingFuture {
9
-  def getPersonByIdFuture(personID: String): Future[Person]
10
-  def getPersonByNameFuture(primaryName: String):  Future[Person]
9
+  def getPersonByIdFuture(personID: String): Future[Option[Person]]
10
+  def getPersonByNameFuture(primaryName: String):  Future[Option[Person]]
11
 
11
 
12
-  def getTvSerieByIdFuture(tvSerieID: String): Future[TvSerie]
12
+  def getTvSerieByIdFuture(tvSerieID: String): Future[Option[TvSerie]]
13
   def getTvSeriesByPrimaryTitleFuture(tvSerieTitle: String):Future[IndexedSeq[TvSerie]]
13
   def getTvSeriesByPrimaryTitleFuture(tvSerieTitle: String):Future[IndexedSeq[TvSerie]]
14
 
14
 
15
   def getPersonsForTvSerieByTvSerieTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]]
15
   def getPersonsForTvSerieByTvSerieTitleFuture(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]]

+ 10
- 10
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamFileProcessingImpl.scala View File

24
 
24
 
25
   implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
25
   implicit val logger: Logger = Logger(LoggerFactory.getLogger(this.getClass))
26
 
26
 
27
-  override def getPersonByIdFuture(personID: String): Future[Person] = {
27
+  override def getPersonByIdFuture(personID: String): Future[Option[Person]] = {
28
     val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = nameBasics)
28
     val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = nameBasics)
29
 
29
 
30
     val start: Long = System.currentTimeMillis()
30
     val start: Long = System.currentTimeMillis()
31
-    val personFuture: Future[Person] = source
31
+    val personFuture: Future[Option[Person]] = source
32
       .via(flow = filterPersonByIdFlow(personID = personID))
32
       .via(flow = filterPersonByIdFlow(personID = personID))
33
-      .runWith(Sink.head[Person])
33
+      .runWith(Sink.headOption[Person])
34
 
34
 
35
     personFuture.andThen({
35
     personFuture.andThen({
36
       case Failure(exception) => logger.error(s"!${exception.printStackTrace()}")
36
       case Failure(exception) => logger.error(s"!${exception.printStackTrace()}")
39
     personFuture
39
     personFuture
40
   }
40
   }
41
 
41
 
42
-  override def getPersonByNameFuture(primaryName: String): Future[Person] = {
42
+  override def getPersonByNameFuture(primaryName: String): Future[Option[Person]] = {
43
     val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = nameBasics)
43
     val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = nameBasics)
44
 
44
 
45
     val start: Long = System.currentTimeMillis()
45
     val start: Long = System.currentTimeMillis()
46
-    val personFuture: Future[Person] = source
46
+    val personFuture: Future[Option[Person]] = source
47
       .via(flow = filterPersonByNameFlow(primaryName = primaryName))
47
       .via(flow = filterPersonByNameFlow(primaryName = primaryName))
48
-      .runWith(Sink.head[Person])
48
+      .runWith(Sink.headOption[Person])
49
 
49
 
50
     personFuture
50
     personFuture
51
   }
51
   }
52
 
52
 
53
-  override def getTvSerieByIdFuture(tvSerieID: String): Future[TvSerie] = {
53
+  override def getTvSerieByIdFuture(tvSerieID: String): Future[Option[TvSerie]] = {
54
     val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
54
     val source: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
55
 
55
 
56
     val start: Long = System.currentTimeMillis()
56
     val start: Long = System.currentTimeMillis()
57
-    val tvSerieFuture: Future[TvSerie] = source
57
+    val tvSerieFuture: Future[Option[TvSerie]] = source
58
       .via(flow = filterTvSerieByIdFlow(tvSerieID = tvSerieID))
58
       .via(flow = filterTvSerieByIdFlow(tvSerieID = tvSerieID))
59
-      .runWith(Sink.head[TvSerie])
59
+      .runWith(Sink.headOption[TvSerie])
60
 
60
 
61
     tvSerieFuture.onComplete({
61
     tvSerieFuture.onComplete({
62
       case Failure(exception) => logger.info(s"$exception")
62
       case Failure(exception) => logger.info(s"$exception")
63
-      case Success(value: TvSerie) =>
63
+      case Success(value: Option[TvSerie]) =>
64
         logger.info(s"$value")
64
         logger.info(s"$value")
65
         logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
65
         logger.info(s"SUCCESS, elapsed time:${(System.currentTimeMillis() - start) / 1000} sec")
66
     })
66
     })

Powered by TurnKey Linux.