Skip to content

Commit c1f0b8b

Browse files
committed
review() : Fix some typo and refactor for publication
- Change method name - Update README - Change demo Main to use all callbacks
1 parent e04c68e commit c1f0b8b

File tree

7 files changed

+118
-40
lines changed

7 files changed

+118
-40
lines changed

README.adoc

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,37 @@
22
== Streamdata.io Java SDK
33

44

5-
=== What does the API do ?
5+
=== What does the API do?
66

77
It remove the plumbing between you and streamdata.io once you have an API ready to use, you can use the code above.
88

99
It connects to the server with some your API URL and your credentials (API key) and you get notified using either
1010

1111
* Some callback you provide
12-
* Trough an RxJava2 `Flowable`
12+
* Through an RxJava2 `Flowable`
1313

1414
You can choose to receive snapshots only or one snapshot and then patches you don't even know SSE is used, you just use your data.
1515

16-
=== Prerequisite
16+
=== Step by step setup to run demo
17+
18+
* Create an free account on streamdata.io https://portal.streamdata.io/#/register to get an App token.
19+
* Clone project, edit Main.java and replace [YOUR TOKEN HERE] with your App token.
20+
* Make sure you have Java 8+ installed
21+
* Make sure you have maven 3.0+ installed
22+
* Build project with maven:
23+
24+
`mvn clean install`
25+
26+
* Run sample from a terminal:
27+
28+
`java -jar target/streamdataio-sdk-java-1.0.jar`
29+
30+
* You should see data and patches pushed in your application and displayed on your terminal.
31+
* You can build the jar on you own and use provided classes to use this API
32+
you can use the provided demo example API which simulates updating stocks prices from a financial market: 'http://stockmarket.streamdata.io/v2/prices'
33+
34+
Feel free to test it with any REST/Json API of your choice.
1735

18-
* You have to create an account on streamdata.io and create an app using your API or one of demo sample API provided by streamdata.io
19-
* You must use Java8+ to use this API as to takes advantage
20-
of lambda expression and some specific features of java8.
2136

2237
=== How to integrate the API into your project
2338

@@ -42,9 +57,11 @@ dependencies {
4257

4358
Notable transitive dependencies :
4459

45-
* Glassfish Jersey as EventSource client
60+
* Glassfish Jersey as EventSource client (https://jersey.java.net/documentation/latest/sse.html)
61+
* RxJava 2 (https://github.com/ReactiveX/RxJava)
62+
* Jackson for Json processing (http://wiki.fasterxml.com/JacksonHome).
63+
* zjsonpatch (https://github.com/flipkart-incubator/zjsonpatch) as a Java Json-Patch implementation
4664
* Slf4j with a default configuration
47-
* RxJava 2
4865

4966
=== Callback based API
5067

@@ -58,7 +75,7 @@ String appKey = "YOUR OWN APP KEY";
5875
5976
EventSourceClient client = StreamdataClient.createClient(apiURL, appKey);
6077
client.addHeader("X-MYAPI-POLLER", "Polled By SD.io") # <2>
61-
.incrementalCache(true) # <3>
78+
.useJsonPatch(true) # <3>
6279
.onSnapshot(data -> this::processData) # <4>
6380
.onPatch(patch -> this.processPatch(patch, client.getCurrentSnaphot())) # <5>
6481
.onOpen(() -> logger.info("And we are... live!")) # <6>
@@ -71,12 +88,12 @@ client.addHeader("X-MYAPI-POLLER", "Polled By SD.io")
7188
<1> This the key you when you created your app in the web portal.
7289
<2> Headers to be added when polling your API (Typically some auth header) [_Optional_ ]
7390
<3> This allows you let streamdata send snapshots only (set `false` to do so) or snapshot then patch (default behaviour if you don't call this method) [_Optional_ ]
74-
<4> A callback that will consume the snaphost
91+
<4> A callback that will consume the snapshot
7592
<5> A callback that will consume the patch (you can access the snaphost anyways) [_Optional_ ]
7693
<6> Call when successfully open the connection [_Optional_ ]
7794
<7> Call the connection is actually closed [_Optional_ ]
78-
<6> Call when the stream return an error, by default the error is logged usually it is recommended to close the connection : ```client.close()``` [_Optional_ ]
79-
<7> Call when an error occurs, mainly when opening the event source, by default the error is recommended to close the connection : ```client.close()``` [_Optional_ ]
95+
<6> Call when the stream return an error, by default the error is logged usually it is recommended to close the connection: ```client.close()``` [_Optional_ ]
96+
<7> Call when an error occurs, mainly when opening the event source, by default the error is recommended to close the connection: ```client.close()``` [_Optional_ ]
8097
<10> Open the connection with the server
8198

8299

@@ -95,7 +112,7 @@ String appKey = "YOUR OWN APP KEY";
95112
96113
StreamdataClient.createRxJavaClient(apiURL, appKey)
97114
.addHeader("X-MYAPI-POLLER", "Polled By SD.io") # <2>
98-
.incrementalCache(true) # <3>
115+
.useJsonPatch(true) # <3>
99116
.toFlowable() # <4>
100117
// here you can add operator to manipulate the flow
101118
.subscribe(event -> { # <5>
@@ -118,9 +135,9 @@ StreamdataClient.createRxJavaClient(apiURL, appKey)
118135

119136
== Errors
120137

121-
Errors not a simple string. It is JSON !
138+
Errors not a simple string. It is JSON!
122139

123-
Above, an example of an error so you can get more detailed informations
140+
Above, an example of an error so you can get more detailed informations.
124141

125142
[JSON]
126143
```

pom.xml

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>io.streamdata</groupId>
66
<artifactId>streamdataio-java-sdk</artifactId>
7-
<version>0.0.1-SNAPSHOT</version>
7+
<version>1.0</version>
88
<packaging>jar</packaging>
99

1010
<name>Streamdata.io Java SDK</name>
@@ -73,6 +73,64 @@
7373
<showWarnings>true</showWarnings>
7474
</configuration>
7575
</plugin>
76+
<plugin>
77+
<groupId>org.apache.maven.plugins</groupId>
78+
<artifactId>maven-assembly-plugin</artifactId>
79+
<version>2.6</version>
80+
<configuration>
81+
<archive>
82+
<manifest>
83+
<mainClass>io.streamdata.demo.Main</mainClass>
84+
</manifest>
85+
</archive>
86+
<descriptorRefs>
87+
<descriptorRef>jar-with-dependencies</descriptorRef>
88+
</descriptorRefs>
89+
</configuration>
90+
<executions>
91+
<execution>
92+
<id>make-assembly</id>
93+
<phase>package</phase>
94+
<goals>
95+
<goal>single</goal>
96+
</goals>
97+
</execution>
98+
</executions>
99+
</plugin>
100+
<plugin>
101+
<groupId>org.apache.maven.plugins</groupId>
102+
<artifactId>maven-dependency-plugin</artifactId>
103+
<version>2.10</version>
104+
<executions>
105+
<execution>
106+
<id>copy-dependencies</id>
107+
<phase>prepare-package</phase>
108+
<goals>
109+
<goal>copy-dependencies</goal>
110+
</goals>
111+
<configuration>
112+
<outputDirectory>${project.build.directory}/lib</outputDirectory>
113+
<overWriteReleases>false</overWriteReleases>
114+
<overWriteSnapshots>false</overWriteSnapshots>
115+
<overWriteIfNewer>true</overWriteIfNewer>
116+
</configuration>
117+
</execution>
118+
</executions>
119+
</plugin>
120+
<plugin>
121+
<groupId>org.apache.maven.plugins</groupId>
122+
<artifactId>maven-jar-plugin</artifactId>
123+
<version>2.6</version>
124+
<configuration>
125+
<archive>
126+
<manifest>
127+
<addClasspath>true</addClasspath>
128+
<classpathPrefix>lib/</classpathPrefix>
129+
<mainClass>io.streamdata.demo.Main</mainClass>
130+
</manifest>
131+
</archive>
132+
</configuration>
133+
</plugin>
76134
</plugins>
77135
</build>
78136

src/main/java/io/streamdata/demo/Main.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class Main {
1313

1414
public static void main(String... args) throws URISyntaxException, InterruptedException {
1515

16-
final String appKey = "ODZjZDQ5MDYtYzZkYS00NTQwLWI0ZDctMGZlYzU2N2JlYmY3";
16+
final String appKey = "[YOUR TOKEN HERE]";
1717
final String apiURL = "http://stockmarket.streamdata.io/prices";
1818

1919
Logger logger = LoggerFactory.getLogger(Main.class);
@@ -27,8 +27,10 @@ public static void main(String... args) throws URISyntaxException, InterruptedEx
2727
eventSource
2828
.addHeader("X-MYAPI-HEADER", "Polled-By-SD.io")
2929
.addHeader("X-MYAPI-HEADER2", "SomeStuffs")
30-
.onSnapshot(data -> logger.info("INITIAL SNAPSHOT {}", data))
30+
.onSnapshot(data -> logger.info("RECEIVING SNAPSHOT {}", data))
3131
.onPatch(patch -> logger.info("PATCH {} SNAPSHOT UPDATED {}", patch, eventSource.getCurrentSnapshot()))
32+
.onError(error -> logger.error("Error occured : {}", error.toString()))
33+
.onException(ex -> logger.error("Error occured on connection", ex))
3234
.onOpen(() -> logger.info("And we are... live!"))
3335
.onClose(() -> logger.info("Bye now!"))
3436
.open();
@@ -44,8 +46,8 @@ public static void main(String... args) throws URISyntaxException, InterruptedEx
4446
{
4547

4648
EventSourceClient eventSource = StreamdataClient.createClient(apiURL, appKey)
47-
.incrementalCache(false)
48-
.onSnapshot(data -> logger.info("INITIAL SNAPSHOT {}", data))
49+
.useJsonPatch(false)
50+
.onSnapshot(data -> logger.info("RECEIVING SNAPSHOT {}", data))
4951
.onOpen(() -> logger.info("And we are... live!"))
5052
.open();
5153

@@ -61,7 +63,7 @@ public static void main(String... args) throws URISyntaxException, InterruptedEx
6163
RxJavaEventSourceClient rxJavaEventSourceClient = StreamdataClient.createRxJavaClient(apiURL, appKey);
6264
Disposable disposable =
6365
rxJavaEventSourceClient.addHeader("X-MYAPI-HEADER", "Polled By SD.io")
64-
.incrementalCache(true) // same behavior as default
66+
.useJsonPatch(true) // same behavior as default
6567
.toFlowable()
6668
.subscribe(event -> {
6769
if (event.isSnapshot()) {

src/main/java/io/streamdata/sdk/EventSourceClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ public interface EventSourceClient {
2727
* <p>If set to false a snapshot will be sent every time, no patch is sent. This means that {@link RxJavaEventSourceClient.Event#getPatch()} will return null <b>Use this only for low frequency polling</b></p>
2828
* <p>Behind the scene it adds the header <code>text/event-stream</code> for patches or <code>application/json</code> for non-incremental cache</p>
2929
*
30-
* @param enableIncrementalCache a boolean to allow incremental cache (default : true)
30+
* @param useJsonPatch a boolean to allow incremental cache (default : true)
3131
*/
32-
EventSourceClient incrementalCache(boolean enableIncrementalCache);
32+
EventSourceClient useJsonPatch(boolean useJsonPatch);
3333

3434
/**
3535
* Sets a optionnal callback to be called after the event source has been successfully started.
@@ -96,7 +96,7 @@ public interface EventSourceClient {
9696

9797
/**
9898
* Opens the connections with streamdata proxy that will poll data for you.
99-
* {@link #onSnapshot(Consumer)} must have called before and {@link #onPatch(Consumer)} is incremental cache is on (default is yes unless you call {@link #incrementalCache(boolean)} with false.
99+
* {@link #onSnapshot(Consumer)} must have called before and {@link #onPatch(Consumer)} is incremental cache is on (default is yes unless you call {@link #useJsonPatch(boolean)} with false.
100100
*
101101
* @return a future to get hints on the thread status
102102
*/

src/main/java/io/streamdata/sdk/RxJavaEventSourceClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public interface RxJavaEventSourceClient {
4040
* <p>If set to false a snapshot will be sent every time, no patch is sent. This means that {@link Event#getPatch()} will return null <b>Use this only for low frequency polling</b></p>
4141
* <p>Behind the scene it adds the header <code>text/event-stream</code> for patches or <code>application/json</code> for non-incremental cache</p>
4242
*
43-
* @param enableIncrementalCache a boolean to allow incremental cache (default : true)
43+
* @param useJsonPatch a boolean to allow incremental cache (default : true)
4444
*/
45-
RxJavaEventSourceClient incrementalCache(boolean enableIncrementalCache);
45+
RxJavaEventSourceClient useJsonPatch(boolean useJsonPatch);
4646

4747

4848
/**
@@ -121,7 +121,7 @@ public JsonNode getSnapshot() {
121121
* Gets the patch if any. There are two cases where <b>the patch can be null</b>
122122
* <ul>
123123
* <li>{@link #isSnapshot()} return true</li>
124-
* <li>{@link EventSourceClient#incrementalCache(boolean)} or {@link RxJavaEventSourceClient#incrementalCache(boolean)} has been called with null</li>
124+
* <li>{@link EventSourceClient#useJsonPatch(boolean)} or {@link RxJavaEventSourceClient#useJsonPatch(boolean)} has been called with null</li>
125125
* </ul>
126126
*
127127
* @return the as a JsonNode

src/main/java/io/streamdata/sdk/impl/EventSourceClientImpl.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class EventSourceClientImpl implements EventSourceClient {
4040
private Consumer<JsonNode> onDataCallback;
4141
private Consumer<JsonNode> onPatchCallback;
4242
private Consumer<JsonNode> onErrorCallback = err -> LOGGER.error("A streamdata error has been sent from SSE : {}", err);
43-
private Consumer<Throwable> onFailureCallback = t -> LOGGER.error("An error occured while processing event", t);
43+
private Consumer<Throwable> onExceptionCallback = t -> LOGGER.error("An error occured while processing event", t);
4444

4545
// jackson objectMapper to parse Json content
4646
private final ObjectMapper jsonObjectMapper = new ObjectMapper();
@@ -93,7 +93,7 @@ public EventSourceClient addHeader(String name, String value) {
9393
}
9494

9595
@Override
96-
public EventSourceClient incrementalCache(boolean enableIncrementalCache) {
96+
public EventSourceClient useJsonPatch(boolean enableIncrementalCache) {
9797
this.incrementalCache = enableIncrementalCache;
9898
return this;
9999
}
@@ -130,7 +130,7 @@ public EventSourceClient onError(Consumer<JsonNode> callback) {
130130

131131
@Override
132132
public EventSourceClient onException(Consumer<Throwable> callback) {
133-
this.onFailureCallback = callback;
133+
this.onExceptionCallback = callback;
134134
return this;
135135
}
136136

@@ -166,8 +166,9 @@ public EventSourceClient open() {
166166
.register(SseFeature.class)
167167
.register((Feature) context -> {
168168
context.register((ClientRequestFilter) requestContext -> {
169-
requestContext.getHeaders().get("Accept").clear();
170-
requestContext.getHeaders().get("Accept").add(this.incrementalCache ? SseFeature.SERVER_SENT_EVENTS : MediaType.APPLICATION_JSON);
169+
if (!this.incrementalCache) {
170+
requestContext.getHeaders().get("Accept").add(MediaType.APPLICATION_JSON);
171+
}
171172
});
172173
return true;
173174
})
@@ -199,7 +200,7 @@ public void onEvent(InboundEvent inboundEvent) {
199200
onDataCallback.accept(data);
200201
} catch (IOException e) {
201202
// notify consumer
202-
onFailureCallback.accept(e);
203+
onExceptionCallback.accept(e);
203204
}
204205
break;
205206

@@ -219,7 +220,7 @@ public void onEvent(InboundEvent inboundEvent) {
219220
onPatchCallback.accept(lastPatch);
220221

221222
} catch (IOException e) {
222-
onFailureCallback.accept(e);
223+
onExceptionCallback.accept(e);
223224
}
224225
break;
225226

@@ -228,11 +229,11 @@ public void onEvent(InboundEvent inboundEvent) {
228229
LOGGER.debug("Receiving error {} ", eventData);
229230

230231
// read the error
231-
JsonNode lastPatch = jsonObjectMapper.readTree(eventData);
232+
JsonNode error = jsonObjectMapper.readTree(eventData);
232233

233-
onErrorCallback.accept(lastPatch);
234+
onErrorCallback.accept(error);
234235
} catch (IOException e) {
235-
onFailureCallback.accept(e);
236+
onExceptionCallback.accept(e);
236237
}
237238
break;
238239

@@ -248,7 +249,7 @@ public void onEvent(InboundEvent inboundEvent) {
248249
if (this.onOpenCallback != null)
249250
this.onOpenCallback.run();
250251
} catch (Exception e) {
251-
onFailureCallback.accept(e);
252+
onExceptionCallback.accept(e);
252253
this.close();
253254
System.exit(1);
254255
}

src/main/java/io/streamdata/sdk/impl/RxJavaEventSourceClientImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ public RxJavaEventSourceClient addHeader(String name, String value) {
2626
}
2727

2828
@Override
29-
public RxJavaEventSourceClient incrementalCache(boolean enableIncrementalCache) {
30-
this.eventSourceClient.incrementalCache(enableIncrementalCache);
29+
public RxJavaEventSourceClient useJsonPatch(boolean enableIncrementalCache) {
30+
this.eventSourceClient.useJsonPatch(enableIncrementalCache);
3131
return this;
3232
}
3333

0 commit comments

Comments
 (0)