Browse Source

first commit

placidenduwayo 1 year ago
parent
commit
482d96734d

+ 1
- 1
pom.xml View File

15
     <name>akka-stream-file-processing-api</name>
15
     <name>akka-stream-file-processing-api</name>
16
     <description>akka-stream-file-processing-api</description>
16
     <description>akka-stream-file-processing-api</description>
17
     <properties>
17
     <properties>
18
-        <java.version>8</java.version>
18
+        <java.version>11</java.version>
19
         <scala.version>2.12.10</scala.version>
19
         <scala.version>2.12.10</scala.version>
20
     </properties>
20
     </properties>
21
     <dependencies>
21
     <dependencies>

+ 52
- 14
src/main/java/fr/natan/akkastreamfileprocessingapi/controller/MovieController.java View File

11
 import org.springframework.web.bind.annotation.RequestMapping;
11
 import org.springframework.web.bind.annotation.RequestMapping;
12
 import org.springframework.web.bind.annotation.RequestMethod;
12
 import org.springframework.web.bind.annotation.RequestMethod;
13
 import org.springframework.web.bind.annotation.RestController;
13
 import org.springframework.web.bind.annotation.RestController;
14
+import play.api.libs.json.JsValue;
14
 import play.api.libs.json.Json;
15
 import play.api.libs.json.Json;
15
 import scala.collection.IndexedSeq;
16
 import scala.collection.IndexedSeq;
16
 import scala.concurrent.Future;
17
 import scala.concurrent.Future;
17
 
18
 
19
+import java.util.ArrayList;
20
+import java.util.List;
21
+import java.util.Random;
18
 import java.util.concurrent.CompletableFuture;
22
 import java.util.concurrent.CompletableFuture;
19
 import java.util.concurrent.ExecutionException;
23
 import java.util.concurrent.ExecutionException;
20
 
24
 
26
     private final AkkaStreamFileProcessing akkaStreamFilesProcessing;
30
     private final AkkaStreamFileProcessing akkaStreamFilesProcessing;
27
     private final CompletableFutureResult completableFutureResult = new CompletableFutureResult();
31
     private final CompletableFutureResult completableFutureResult = new CompletableFutureResult();
28
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
32
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
33
+    private static final int BORN_INF = 1000, BORN_SUP = 5000;
34
+
29
     public MovieController(AkkaStreamFileProcessing akkaStreamFilesProcessing) {
35
     public MovieController(AkkaStreamFileProcessing akkaStreamFilesProcessing) {
30
         this.akkaStreamFilesProcessing = akkaStreamFilesProcessing;
36
         this.akkaStreamFilesProcessing = akkaStreamFilesProcessing;
31
     }
37
     }
34
     private ResponseEntity<String> getPersonByID(@PathVariable(name = "personID") String personID) throws ExecutionException, InterruptedException {
40
     private ResponseEntity<String> getPersonByID(@PathVariable(name = "personID") String personID) throws ExecutionException, InterruptedException {
35
         Future<Models.Person> futurePerson = akkaStreamFilesProcessing.getPersonById(personID);
41
         Future<Models.Person> futurePerson = akkaStreamFilesProcessing.getPersonById(personID);
36
         CompletableFuture<Models.Person> completableFuture = completableFutureResult.buildcompletableFuture(futurePerson);
42
         CompletableFuture<Models.Person> completableFuture = completableFutureResult.buildcompletableFuture(futurePerson);
37
-        while (!completableFuture.isDone()){
38
-           logger.info("IS PROCESSING...");
39
-            Thread.sleep(5000);
43
+        while (!completableFuture.isDone()) {
44
+            logger.info("IS PROCESSING...");
45
+            Thread.sleep(new Random().nextInt(BORN_SUP-BORN_INF)+BORN_INF);
40
         }
46
         }
41
         Models.Person person = completableFuture.get();
47
         Models.Person person = completableFuture.get();
42
         return new ResponseEntity<>(Json.toJson(person, person.personFormat()).toString(), HttpStatus.OK);
48
         return new ResponseEntity<>(Json.toJson(person, person.personFormat()).toString(), HttpStatus.OK);
48
 
54
 
49
         Future<Models.Person> personFuture = akkaStreamFilesProcessing.getPersonByName(primaryName);
55
         Future<Models.Person> personFuture = akkaStreamFilesProcessing.getPersonByName(primaryName);
50
         CompletableFuture<Models.Person> personCompletableFuture = completableFutureResult.buildcompletableFuture(personFuture);
56
         CompletableFuture<Models.Person> personCompletableFuture = completableFutureResult.buildcompletableFuture(personFuture);
51
-        while (!personCompletableFuture.isDone()){
57
+        while (!personCompletableFuture.isDone()) {
52
             logger.info("IS PROCESSING...");
58
             logger.info("IS PROCESSING...");
53
-            Thread.sleep(5000);
59
+            Thread.sleep(new Random().nextInt(BORN_SUP-BORN_INF)+BORN_INF);
54
         }
60
         }
55
         Models.Person person = personCompletableFuture.get();
61
         Models.Person person = personCompletableFuture.get();
56
         return new ResponseEntity<>(Json.toJson(person, person.personFormat()).toString(), HttpStatus.OK);
62
         return new ResponseEntity<>(Json.toJson(person, person.personFormat()).toString(), HttpStatus.OK);
57
     }
63
     }
58
 
64
 
59
     @RequestMapping(value = "/tvseries/id/{tvSerieId}", method = RequestMethod.GET)
65
     @RequestMapping(value = "/tvseries/id/{tvSerieId}", method = RequestMethod.GET)
60
-    private ResponseEntity<String> getTvSerieByID(@PathVariable(name = "tvSerieId") String tvSerieID) throws ExecutionException, InterruptedException {
66
+    private ResponseEntity<String> getTvSerieByID(@PathVariable(name = "tvSerieId") String tvSerieID)
67
+            throws ExecutionException, InterruptedException {
61
         Future<Models.TvSerie> tvSerieFuture = akkaStreamFilesProcessing.getTvSerieById(tvSerieID);
68
         Future<Models.TvSerie> tvSerieFuture = akkaStreamFilesProcessing.getTvSerieById(tvSerieID);
62
         CompletableFuture<Models.TvSerie> tvSerieCompletableFuture = completableFutureResult.buildcompletableFuture(tvSerieFuture);
69
         CompletableFuture<Models.TvSerie> tvSerieCompletableFuture = completableFutureResult.buildcompletableFuture(tvSerieFuture);
63
-        while (!tvSerieCompletableFuture.isDone()){
70
+        while (!tvSerieCompletableFuture.isDone()) {
64
             logger.info("IS PROCESSING...");
71
             logger.info("IS PROCESSING...");
65
-            Thread.sleep(5000);
72
+            Thread.sleep(new Random().nextInt(BORN_SUP-BORN_INF)+BORN_INF);
66
         }
73
         }
67
         Models.TvSerie tvSerie = tvSerieCompletableFuture.get();
74
         Models.TvSerie tvSerie = tvSerieCompletableFuture.get();
68
         return new ResponseEntity<>(Json.toJson(tvSerie, tvSerie.tvSerieFormat()).toString(), HttpStatus.OK);
75
         return new ResponseEntity<>(Json.toJson(tvSerie, tvSerie.tvSerieFormat()).toString(), HttpStatus.OK);
69
     }
76
     }
70
 
77
 
71
     @RequestMapping(value = "/tvseries/title/{tvseriePrimaryTitle}", method = RequestMethod.GET)
78
     @RequestMapping(value = "/tvseries/title/{tvseriePrimaryTitle}", method = RequestMethod.GET)
72
-    private ResponseEntity<IndexedSeq<Models.TvSerie>> getTvserieByPrimaryTitle(
73
-            @PathVariable(name = "tvseriePrimaryTitle") String tvseriePrimaryTitle) {
79
+    private List<String> getTvserieByPrimaryTitle(
80
+            @PathVariable(name = "tvseriePrimaryTitle") String tvseriePrimaryTitle) throws InterruptedException, ExecutionException {
74
 
81
 
75
         Future<IndexedSeq<Models.TvSerie>> listFuture = akkaStreamFilesProcessing.getTvSerieByPrimaryTitle(tvseriePrimaryTitle);
82
         Future<IndexedSeq<Models.TvSerie>> listFuture = akkaStreamFilesProcessing.getTvSerieByPrimaryTitle(tvseriePrimaryTitle);
76
-        IndexedSeq<Models.TvSerie> indexedSeq = listFuture.value().get().get();
83
+        CompletableFuture<IndexedSeq<Models.TvSerie>> completableFuture =
84
+                completableFutureResult.buildcompletableFuture_(listFuture);
85
+        while (!completableFuture.isDone()) {
86
+            logger.info("IS PROCESSING...");
87
+            Thread.sleep(new Random().nextInt(BORN_SUP-BORN_INF)+BORN_INF);
88
+        }
89
+        IndexedSeq<Models.TvSerie> tvSerieList = completableFuture.get();
90
+        List<String> tvSeries = new ArrayList<>();
91
+        tvSerieList.foreach(tvSerie -> {
92
+            JsValue tvSerieJs = Json.toJson(tvSerie, tvSerie.tvSerieFormat());
93
+            tvSeries.add(tvSerieJs.toString());
94
+            return null;
95
+        });
77
 
96
 
78
-        return new ResponseEntity<>(indexedSeq, HttpStatus.OK);
97
+        return tvSeries;
79
     }
98
     }
80
 
99
 
100
+    ;
101
+
81
     @RequestMapping(value = "/persons/tvseries/title/{tvSerieTitle}", method = RequestMethod.GET)
102
     @RequestMapping(value = "/persons/tvseries/title/{tvSerieTitle}", method = RequestMethod.GET)
82
-    private Future<IndexedSeq<Models.Person>> getPersonsForTvSerie(@PathVariable(name = "tvSerieTitle") String tvSerieTitle) {
83
-        return akkaStreamFilesProcessing.getTeamOfPersonsForTvSerie(tvSerieTitle);
103
+    private List<String> getPersonsForTvSerie(@PathVariable(name = "tvSerieTitle") String tvSerieTitle)
104
+            throws InterruptedException, ExecutionException {
105
+
106
+        Future<IndexedSeq<Models.Person>> futurePersonSeq = akkaStreamFilesProcessing.getTeamOfPersonsForTvSerie(tvSerieTitle);
107
+        CompletableFuture<IndexedSeq<Models.Person>> completableFuture = completableFutureResult.buildcompletableFuture_(futurePersonSeq);
108
+        while (!completableFuture.isDone()){
109
+            logger.info("IS PROCESSING...");
110
+            Thread.sleep(new Random().nextInt(BORN_SUP-BORN_INF)+BORN_INF);
111
+        }
112
+
113
+        IndexedSeq<Models.Person> personIndexedSeq = completableFuture.get();
114
+        List<String> persons = new ArrayList<>();
115
+        personIndexedSeq.foreach(person -> {
116
+            JsValue personJs = Json.toJson(person, person.personFormat());
117
+            persons.add(personJs.toString());
118
+            return null;
119
+        });
120
+
121
+        return persons;
84
     }
122
     }
85
 
123
 
86
     @RequestMapping(value = "/persons", method = RequestMethod.GET)
124
     @RequestMapping(value = "/persons", method = RequestMethod.GET)

+ 18
- 3
src/main/java/fr/natan/akkastreamfileprocessingapi/futurecompleteness/CompletableFutureResult.java View File

1
 package fr.natan.akkastreamfileprocessingapi.futurecompleteness;
1
 package fr.natan.akkastreamfileprocessingapi.futurecompleteness;
2
 
2
 
3
 
3
 
4
+import scala.collection.IndexedSeq;
5
+import scala.collection.immutable.List;
4
 import scala.concurrent.Future;
6
 import scala.concurrent.Future;
5
-import scala.concurrent.duration.Duration;
6
 
7
 
7
 import java.util.concurrent.CompletableFuture;
8
 import java.util.concurrent.CompletableFuture;
8
 import java.util.concurrent.Executors;
9
 import java.util.concurrent.Executors;
13
 
14
 
14
     public CompletableFuture<T> buildcompletableFuture(Future<T> futureT) {
15
     public CompletableFuture<T> buildcompletableFuture(Future<T> futureT) {
15
         CompletableFuture<T> completableFuture = new CompletableFuture<>();
16
         CompletableFuture<T> completableFuture = new CompletableFuture<>();
16
-        T tt = (T) Executors.newSingleThreadExecutor().submit(() -> {
17
+       Executors.newSingleThreadExecutor().submit(() -> {
17
            while(!futureT.isCompleted()){
18
            while(!futureT.isCompleted()){
18
-               Thread.sleep(500);
19
+               Thread.sleep(100);
19
            }
20
            }
20
             completableFuture.complete(futureT.value().get().get());
21
             completableFuture.complete(futureT.value().get().get());
21
             return null;
22
             return null;
24
         return completableFuture;
25
         return completableFuture;
25
     }
26
     }
26
 
27
 
28
+
29
+    public CompletableFuture<IndexedSeq<T>> buildcompletableFuture_(Future<IndexedSeq<T>> futureListT) {
30
+        CompletableFuture<IndexedSeq<T>> completableFuture = new CompletableFuture<>();
31
+        Executors.newSingleThreadExecutor().submit(() -> {
32
+            while(!futureListT.isCompleted()){
33
+                Thread.sleep(100);
34
+            }
35
+            completableFuture.complete(futureListT.value().get().get());
36
+            return null;
37
+        });
38
+
39
+        return completableFuture;
40
+    }
41
+
27
 }
42
 }

+ 2
- 0
src/main/scala/fr/natan/akkastreamfileprocessingapi/models/Models.scala View File

4
 
4
 
5
 object Models {
5
 object Models {
6
 
6
 
7
+  //noinspection TypeAnnotation
7
   final case class Person(
8
   final case class Person(
8
                            personID: String,
9
                            personID: String,
9
                            primaryName: String,
10
                            primaryName: String,
25
     }
26
     }
26
   }
27
   }
27
 
28
 
29
+  //noinspection TypeAnnotation
28
   final case class TvSerie(
30
   final case class TvSerie(
29
                             tvSerieID: String,
31
                             tvSerieID: String,
30
                             titleType: String,
32
                             titleType: String,

+ 1
- 0
src/main/scala/fr/natan/akkastreamfileprocessingapi/service/AkkaStreamFileProcessingImpl.scala View File

11
 import org.slf4j.LoggerFactory
11
 import org.slf4j.LoggerFactory
12
 import org.springframework.stereotype.Component
12
 import org.springframework.stereotype.Component
13
 
13
 
14
+import scala.collection.mutable.ListBuffer
14
 import scala.concurrent.ExecutionContext.Implicits.global
15
 import scala.concurrent.ExecutionContext.Implicits.global
15
 import scala.concurrent.Future
16
 import scala.concurrent.Future
16
 import scala.language.postfixOps
17
 import scala.language.postfixOps

Powered by TurnKey Linux.