It connects to the server with some your API URL and your credentials (API key) and you get notified using either
-
Some callback you provide
-
Through an RxJava2
Flowable
You can choose to receive snapshots only or one snapshot and then patches you don’t even know SSE is used, you just use your data.
-
Create an free account on Axway AMPLIFY Streams https://portal.streamdata.io/#/register to get an App token.
-
Clone project, edit Main.java and replace [YOUR TOKEN HERE] with your App token.
-
Make sure you have Java 8+ installed
-
Make sure you have maven 3.0+ installed
-
Build project with maven:
`mvn clean install`
-
Run sample from a terminal:
`java -jar target/streamdataio-java-sdk-1.0.jar`
-
You should see data and patches pushed in your application and displayed on your terminal.
-
You can build the jar on your own and use provided classes to use this API. You can use the provided demo which stream an API of factice financial market: 'http://stockmarket.streamdata.io/v2/prices'
Feel free to test it with any REST/Json API of your choice. You can add authorization headers and query params.
Maven
after building this project
<dependency> <groupId>io.streamdata</groupId> <artifactId>java-sdk</artifactId> <version>1.0</version> </dependency>
Gradle
dependencies { compile("io.streamdata:java-sdk:1.0") }
Notable transitive dependencies :
-
Glassfish Jersey as EventSource client (https://jersey.java.net/documentation/latest/sse.html)
-
RxJava 2 (https://github.com/ReactiveX/RxJava)
-
Jackson for Json processing (http://wiki.fasterxml.com/JacksonHome).
-
zjsonpatch (https://github.com/flipkart-incubator/zjsonpatch) as a Java Json-Patch implementation
-
Slf4j with a default configuration
This API is based on callbacks and is very similar to our Node.js API.
String apiURL = "http://api.mycompany.com/prices"; String appKey = "YOUR OWN APP KEY"; # (1) EventSourceClient client = StreamdataClient.createClient(apiURL, appKey); client.addHeader("X-MYAPI-POLLER", "Polled By SD.io") # (2) .useJsonPatch(true) # (3) .onSnapshot(this::processData) # (4) .onPatch(patch -> this.processPatch(patch, client.getCurrentSnaphot())) # (5) .onOpen(() -> logger.info("And we are... live!")) # (6) .onClose(() -> logger.info("And we are... dead!")) # (7) .onError(err -> logger.error("An error occured {}", err)) # (8) .onException(ex -> logger.error("Unexpected error", ex)) # (9) .open(); # (10)
-
This the key you when you created your app in the web portal.
-
Headers to be added when polling your API (Typically some auth header) [Optional ]
-
This allows you let streamdata send snapshots only (set
false
to do so) or snapshot then patch (default behaviour if you don’t call this method) [Optional ] -
A callback that will consume the snapshot
-
A callback that will consume the patch (you can access the snaphost anyways) [Optional ]
-
Called when successfully open the connection [Optional ]
-
Called the connection is actually closed [Optional ]
-
Called when the stream return an error, by default the error is logged. Usually it is recommended to close the connection:
[Optional ]client.close()
-
Call when an error occurs, mainly when opening the event source, by default the error is logged but it is recommended to close the connection:
[Optional ]client.close()
-
Open the connection with the server and start streamdata.
This API relies on the previous one but expose a rx’s Flowable interface to handle data coming from the server. Data is wrapped in a simple Event class allowing you to know is the data is an error a patch or a snapshot.
String apiURL = "http://api.mycompany.com/prices"; String appKey = "YOUR OWN APP KEY"; # (1) StreamdataClient.createRxJavaClient(apiURL, appKey) .addHeader("X-MYAPI-POLLER", "Polled By SD.io") # (2) .useJsonPatch(true) # (3) .toFlowable() # (4) // here you can add operator to manipulate the flow .subscribe(event -> { # (5) if (event.isSnapshot()) { this.processData(event.getSnapshot()) } else if (event.isPatch()) { this.processData(event.getPatch(), event.getSnapshot()) } else if (event.isError()) { throw new RuntimeException(event.getError().toString()); # (6) } }, err -> logger.error(err.getMessage(), err));
-
Same as above
-
Same as above
-
Same as above
-
Once configured you can start manipulating you data or use specific schedulers
-
An example without using any rx operators before show you available methods on event
-
Shis will stop the flowable and disconnect the event source
Errors not a simple string. It is JSON!
Above, an example of an error so you can get more detailed informations.
{
"status":2005,
"cause":"An error occurred while streaming http://stockmarket.streamdata.io/priceshttp://stockmarket.streamdata.io/prices. : HTTP/1.1 404 ",
"message":"HTTP error. The Http response cannot be processed.",
"timestamp":1512566770744,
"sessionId":"62fd67bc-d090-4333-a783-d94b366f55f4"
}
Please read Contributing.md for details on our code of conduct, and the process for submitting pull requests to us.