Browse Source

first commit

placidenduwayo 1 year ago
parent
commit
c6e7731752

+ 1
- 1
pom.xml View File

15
     <name>akka-stream-file-processing-api</name>
15
     <name>akka-stream-file-processing-api</name>
16
     <description>akka-stream-file-processing-api</description>
16
     <description>akka-stream-file-processing-api</description>
17
     <properties>
17
     <properties>
18
-        <java.version>11</java.version>
18
+        <java.version>8</java.version>
19
         <scala.version>2.12.10</scala.version>
19
         <scala.version>2.12.10</scala.version>
20
     </properties>
20
     </properties>
21
     <dependencies>
21
     <dependencies>

+ 31
- 20
src/main/java/fr/natan/akkastreamfileprocessingapi/controller/MovieController.java View File

3
 import fr.natan.akkastreamfileprocessingapi.futurecompleteness.CompletableFutureResult;
3
 import fr.natan.akkastreamfileprocessingapi.futurecompleteness.CompletableFutureResult;
4
 import fr.natan.akkastreamfileprocessingapi.models.Models;
4
 import fr.natan.akkastreamfileprocessingapi.models.Models;
5
 import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamFileProcessing;
5
 import fr.natan.akkastreamfileprocessingapi.service.AkkaStreamFileProcessing;
6
+import org.slf4j.Logger;
7
+import org.slf4j.LoggerFactory;
6
 import org.springframework.http.HttpStatus;
8
 import org.springframework.http.HttpStatus;
7
 import org.springframework.http.ResponseEntity;
9
 import org.springframework.http.ResponseEntity;
8
 import org.springframework.web.bind.annotation.PathVariable;
10
 import org.springframework.web.bind.annotation.PathVariable;
9
 import org.springframework.web.bind.annotation.RequestMapping;
11
 import org.springframework.web.bind.annotation.RequestMapping;
10
 import org.springframework.web.bind.annotation.RequestMethod;
12
 import org.springframework.web.bind.annotation.RequestMethod;
11
 import org.springframework.web.bind.annotation.RestController;
13
 import org.springframework.web.bind.annotation.RestController;
12
-import play.api.libs.json.JsValue;
13
 import play.api.libs.json.Json;
14
 import play.api.libs.json.Json;
14
 import scala.collection.IndexedSeq;
15
 import scala.collection.IndexedSeq;
15
-import scala.collection.immutable.List;
16
 import scala.concurrent.Future;
16
 import scala.concurrent.Future;
17
 
17
 
18
 import java.util.concurrent.CompletableFuture;
18
 import java.util.concurrent.CompletableFuture;
24
 public class MovieController {
24
 public class MovieController {
25
 
25
 
26
     private final AkkaStreamFileProcessing akkaStreamFilesProcessing;
26
     private final AkkaStreamFileProcessing akkaStreamFilesProcessing;
27
-    private final CompletableFutureResult completableFutureResult;
27
+    private final CompletableFutureResult completableFutureResult = new CompletableFutureResult();
28
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
28
     public MovieController(AkkaStreamFileProcessing akkaStreamFilesProcessing) {
29
     public MovieController(AkkaStreamFileProcessing akkaStreamFilesProcessing) {
29
         this.akkaStreamFilesProcessing = akkaStreamFilesProcessing;
30
         this.akkaStreamFilesProcessing = akkaStreamFilesProcessing;
30
-        completableFutureResult = new CompletableFutureResult();
31
     }
31
     }
32
 
32
 
33
     @RequestMapping(value = "/persons/id/{personID}", method = RequestMethod.GET)
33
     @RequestMapping(value = "/persons/id/{personID}", method = RequestMethod.GET)
34
     private ResponseEntity<String> getPersonByID(@PathVariable(name = "personID") String personID) throws ExecutionException, InterruptedException {
34
     private ResponseEntity<String> getPersonByID(@PathVariable(name = "personID") String personID) throws ExecutionException, InterruptedException {
35
-        Future<Models.Person> personFuture = akkaStreamFilesProcessing.getPersonById(personID);
36
-        CompletableFuture<Models.Person> personCompletableFuture = this.completableFutureResult.buildcompletableFuture(personFuture);
37
-        Models.Person person = personCompletableFuture.get();
38
-        JsValue personJs = Json.toJson(person, person.personFormat());
39
-        return new ResponseEntity<>(personJs.toString(), HttpStatus.OK);
35
+        Future<Models.Person> futurePerson = akkaStreamFilesProcessing.getPersonById(personID);
36
+        CompletableFuture<Models.Person> completableFuture = completableFutureResult.buildcompletableFuture(futurePerson);
37
+        while (!completableFuture.isDone()){
38
+           logger.info("IS PROCESSING...");
39
+            Thread.sleep(5000);
40
+        }
41
+        Models.Person person = completableFuture.get();
42
+        return new ResponseEntity<>(Json.toJson(person, person.personFormat()).toString(), HttpStatus.OK);
40
     }
43
     }
41
 
44
 
42
     @RequestMapping(value = "/persons/name/{primaryName}", method = RequestMethod.GET)
45
     @RequestMapping(value = "/persons/name/{primaryName}", method = RequestMethod.GET)
44
             throws ExecutionException, InterruptedException {
47
             throws ExecutionException, InterruptedException {
45
 
48
 
46
         Future<Models.Person> personFuture = akkaStreamFilesProcessing.getPersonByName(primaryName);
49
         Future<Models.Person> personFuture = akkaStreamFilesProcessing.getPersonByName(primaryName);
47
-        CompletableFuture completableFutureResult = this.completableFutureResult.buildcompletableFuture(personFuture);
48
-       Models.Person person = (Models.Person) completableFutureResult.get();
49
-       JsValue personJs = Json.toJson(person, person.personFormat());
50
-        return new ResponseEntity<>(personJs.toString(), HttpStatus.OK);
50
+        CompletableFuture<Models.Person> personCompletableFuture = completableFutureResult.buildcompletableFuture(personFuture);
51
+        while (!personCompletableFuture.isDone()){
52
+            logger.info("IS PROCESSING...");
53
+            Thread.sleep(5000);
54
+        }
55
+        Models.Person person = personCompletableFuture.get();
56
+        return new ResponseEntity<>(Json.toJson(person, person.personFormat()).toString(), HttpStatus.OK);
51
     }
57
     }
52
 
58
 
53
     @RequestMapping(value = "/tvseries/id/{tvSerieId}", method = RequestMethod.GET)
59
     @RequestMapping(value = "/tvseries/id/{tvSerieId}", method = RequestMethod.GET)
54
     private ResponseEntity<String> getTvSerieByID(@PathVariable(name = "tvSerieId") String tvSerieID) throws ExecutionException, InterruptedException {
60
     private ResponseEntity<String> getTvSerieByID(@PathVariable(name = "tvSerieId") String tvSerieID) throws ExecutionException, InterruptedException {
55
         Future<Models.TvSerie> tvSerieFuture = akkaStreamFilesProcessing.getTvSerieById(tvSerieID);
61
         Future<Models.TvSerie> tvSerieFuture = akkaStreamFilesProcessing.getTvSerieById(tvSerieID);
56
-        CompletableFuture<Models.TvSerie> tvSerieCompletableFuture = this.completableFutureResult.buildcompletableFuture(tvSerieFuture);
62
+        CompletableFuture<Models.TvSerie> tvSerieCompletableFuture = completableFutureResult.buildcompletableFuture(tvSerieFuture);
63
+        while (!tvSerieCompletableFuture.isDone()){
64
+            logger.info("IS PROCESSING...");
65
+            Thread.sleep(5000);
66
+        }
57
         Models.TvSerie tvSerie = tvSerieCompletableFuture.get();
67
         Models.TvSerie tvSerie = tvSerieCompletableFuture.get();
58
-        JsValue tvSerieJs = Json.toJson(tvSerie, tvSerie.tvSerieFormat());
59
-
60
-        return new ResponseEntity<>(tvSerieJs.toString(), HttpStatus.OK);
68
+        return new ResponseEntity<>(Json.toJson(tvSerie, tvSerie.tvSerieFormat()).toString(), HttpStatus.OK);
61
     }
69
     }
62
 
70
 
63
     @RequestMapping(value = "/tvseries/title/{tvseriePrimaryTitle}", method = RequestMethod.GET)
71
     @RequestMapping(value = "/tvseries/title/{tvseriePrimaryTitle}", method = RequestMethod.GET)
64
-    private ResponseEntity<Future<List<Models.TvSerie>>> getTvserieByPrimaryTitle(@PathVariable(name = "tvseriePrimaryTitle") String tvseriePrimaryTitle) {
65
-       Future<List<Models.TvSerie>> listFuture = akkaStreamFilesProcessing.getTvSerieByPrimaryTitle(tvseriePrimaryTitle);
72
+    private ResponseEntity<IndexedSeq<Models.TvSerie>> getTvserieByPrimaryTitle(
73
+            @PathVariable(name = "tvseriePrimaryTitle") String tvseriePrimaryTitle) {
74
+
75
+        Future<IndexedSeq<Models.TvSerie>> listFuture = akkaStreamFilesProcessing.getTvSerieByPrimaryTitle(tvseriePrimaryTitle);
76
+        IndexedSeq<Models.TvSerie> indexedSeq = listFuture.value().get().get();
66
 
77
 
67
-       return new ResponseEntity<>(listFuture, HttpStatus.OK);
78
+        return new ResponseEntity<>(indexedSeq, HttpStatus.OK);
68
     }
79
     }
69
 
80
 
70
     @RequestMapping(value = "/persons/tvseries/title/{tvSerieTitle}", method = RequestMethod.GET)
81
     @RequestMapping(value = "/persons/tvseries/title/{tvSerieTitle}", method = RequestMethod.GET)

+ 10
- 5
src/main/java/fr/natan/akkastreamfileprocessingapi/futurecompleteness/CompletableFutureResult.java View File

1
 package fr.natan.akkastreamfileprocessingapi.futurecompleteness;
1
 package fr.natan.akkastreamfileprocessingapi.futurecompleteness;
2
 
2
 
3
+
3
 import scala.concurrent.Future;
4
 import scala.concurrent.Future;
5
+import scala.concurrent.duration.Duration;
4
 
6
 
5
-import java.util.Random;
6
 import java.util.concurrent.CompletableFuture;
7
 import java.util.concurrent.CompletableFuture;
7
 import java.util.concurrent.Executors;
8
 import java.util.concurrent.Executors;
8
 
9
 
9
 
10
 
11
+
10
 public class CompletableFutureResult<T> {
12
 public class CompletableFutureResult<T> {
11
 
13
 
12
-    public CompletableFuture<T> buildcompletableFuture(Future<T> future){
14
+    public CompletableFuture<T> buildcompletableFuture(Future<T> futureT) {
13
         CompletableFuture<T> completableFuture = new CompletableFuture<>();
15
         CompletableFuture<T> completableFuture = new CompletableFuture<>();
14
-        Executors.newSingleThreadExecutor().submit(()->{
15
-            Thread.sleep(new Random().nextInt(500-200)+200);
16
-            return completableFuture.complete(future.value().get().get());
16
+        T tt = (T) Executors.newSingleThreadExecutor().submit(() -> {
17
+           while(!futureT.isCompleted()){
18
+               Thread.sleep(500);
19
+           }
20
+            completableFuture.complete(futureT.value().get().get());
21
+            return null;
17
         });
22
         });
18
 
23
 
19
         return completableFuture;
24
         return completableFuture;

+ 1
- 1
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamFileProcessing.scala View File

10
   def getPersonByName(primaryName: String): Future[Person]
10
   def getPersonByName(primaryName: String): Future[Person]
11
 
11
 
12
   def getTvSerieById(tvSerieID: String): Future[TvSerie]
12
   def getTvSerieById(tvSerieID: String): Future[TvSerie]
13
-  def getTvSerieByPrimaryTitle(tvSerieTitle: String):Future[List[TvSerie]]
13
+  def getTvSerieByPrimaryTitle(tvSerieTitle: String):Future[IndexedSeq[TvSerie]]
14
 
14
 
15
   def getTeamOfPersonsForTvSerie(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]]
15
   def getTeamOfPersonsForTvSerie(tvSeriePrimaryTitle: String): Future[IndexedSeq[Person]]
16
   def getAllTvSeries(): Unit
16
   def getAllTvSeries(): Unit

+ 7
- 8
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamFileProcessingImpl.scala View File

1
 package fr.natan.akkastreamfileprocessingapi.service
1
 package fr.natan.akkastreamfileprocessingapi.service
2
 
2
 
3
+import akka.Done
3
 import akka.actor.ActorSystem
4
 import akka.actor.ActorSystem
4
-import akka.stream.scaladsl.{Flow, Sink, Source}
5
-import akka.{Done, NotUsed}
5
+import akka.stream.scaladsl.{Sink, Source}
6
 import com.typesafe.scalalogging.slf4j.Logger
6
 import com.typesafe.scalalogging.slf4j.Logger
7
 import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics}
7
 import fr.natan.akkastreamfileprocessingapi.datasource.Datasource.{nameBasics, titleBasics, titlePrincipalsBasics}
8
 import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSerie}
8
 import fr.natan.akkastreamfileprocessingapi.models.Models.{Person, TvSerie}
12
 import org.springframework.stereotype.Component
12
 import org.springframework.stereotype.Component
13
 
13
 
14
 import scala.concurrent.ExecutionContext.Implicits.global
14
 import scala.concurrent.ExecutionContext.Implicits.global
15
-import scala.concurrent.duration.DurationInt
16
-import scala.concurrent.{Await, Future}
15
+import scala.concurrent.Future
17
 import scala.language.postfixOps
16
 import scala.language.postfixOps
18
 import scala.util.{Failure, Success}
17
 import scala.util.{Failure, Success}
19
 
18
 
30
       .via(flow = filterPersonByIdFlow(personID = personID))
29
       .via(flow = filterPersonByIdFlow(personID = personID))
31
       .runWith(Sink.head)
30
       .runWith(Sink.head)
32
 
31
 
33
-    res.onComplete({
32
+    res.andThen({
34
       case Failure(exception) => logger.info(s"$exception")
33
       case Failure(exception) => logger.info(s"$exception")
35
       case Success(value:Person) => logger.info(s"$value")
34
       case Success(value:Person) => logger.info(s"$value")
36
     })
35
     })
65
     })
64
     })
66
     res
65
     res
67
   }
66
   }
68
-  override def getTvSerieByPrimaryTitle(tvSeriePrimaryTitle: String): Future[List[TvSerie]] = {
67
+  override def getTvSerieByPrimaryTitle(tvSeriePrimaryTitle: String): Future[IndexedSeq[TvSerie]] = {
69
 
68
 
70
     val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
69
     val tvSeriesSource: Source[Map[String, String], _] = buildAndValidateSource(inputFile = titleBasics)
71
 
70
 
72
-    val tvSries: Future[List[TvSerie]] = tvSeriesSource
71
+    val tvSries: Future[IndexedSeq[TvSerie]] = tvSeriesSource
73
       .via(flow = filterTvSerieByPrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle))
72
       .via(flow = filterTvSerieByPrimaryTitleFlow(tvSeriePrimaryTitle = tvSeriePrimaryTitle))
74
       .runWith(Sink.collection)
73
       .runWith(Sink.collection)
75
 
74
 
76
     tvSries.onComplete({
75
     tvSries.onComplete({
77
       case Failure(exception) => logger.info(s"$exception")
76
       case Failure(exception) => logger.info(s"$exception")
78
-      case Success(value: List[TvSerie]) =>
77
+      case Success(value: IndexedSeq[TvSerie]) =>
79
         value.foreach((tvSrie: TvSerie) => logger.info(s"$tvSrie"))
78
         value.foreach((tvSrie: TvSerie) => logger.info(s"$tvSrie"))
80
         logger.info("SUCCESS")
79
         logger.info("SUCCESS")
81
     })
80
     })

Powered by TurnKey Linux.