Skip to content

Commit ef15948

Browse files
committed
refactor(*) : Convert error into JsonNode
- Convert error message into JsonNode - Change signatures to handle error as a JsonNode - Update demo code
1 parent 5216158 commit ef15948

File tree

4 files changed

+18
-10
lines changed

4 files changed

+18
-10
lines changed

src/main/java/io/streamdata/demo/Main.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public static void main(String... args) throws URISyntaxException, InterruptedEx
6969
} else if (event.isPatch()) {
7070
logger.info("RX PATCH {} SNAPSHOT UPDATED {}", event.getPatch(), event.getSnapshot());
7171
} else if (event.isError()) {
72-
throw new RuntimeException(event.getError());
72+
throw new RuntimeException(event.getError().toString());
7373
}
7474
}, err -> logger.error(err.getMessage(), err));
7575

src/main/java/io/streamdata/sdk/EventSourceClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public interface EventSourceClient {
7171
* @param callback the callback
7272
* @return this client instance for nice fluent api call
7373
*/
74-
EventSourceClient onError(Consumer<String> callback);
74+
EventSourceClient onError(Consumer<JsonNode> callback);
7575

7676
/**
7777
* Sets a callback that is called when a exception is raised :

src/main/java/io/streamdata/sdk/RxJavaEventSourceClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ class Event {
5959
private EventType type;
6060
private JsonNode snapshot;
6161
private JsonNode patch;
62-
private String error;
62+
private JsonNode error;
6363

64-
private Event(EventType type, JsonNode snapshot, JsonNode patch, String error) {
64+
private Event(EventType type, JsonNode snapshot, JsonNode patch, JsonNode error) {
6565
this.type = type;
6666
this.snapshot = snapshot;
6767
this.patch = patch;
@@ -99,7 +99,7 @@ public static Event forPatch(JsonNode snapshot, JsonNode patch) {
9999
* @param error the json node
100100
* @return an Event object
101101
*/
102-
public static Event forError(String error) {
102+
public static Event forError(JsonNode error) {
103103
return new Event(EventType.ERROR, null, null, error);
104104
}
105105

@@ -142,7 +142,7 @@ public boolean isPatch() {
142142
return type == EventType.PATCH;
143143
}
144144

145-
public String getError() {
145+
public JsonNode getError() {
146146
return error;
147147
}
148148

src/main/java/io/streamdata/sdk/impl/EventSourceClientImpl.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class EventSourceClientImpl implements EventSourceClient {
3939
private Runnable onCloseCallback;
4040
private Consumer<JsonNode> onDataCallback;
4141
private Consumer<JsonNode> onPatchCallback;
42-
private Consumer<String> onErrorCallback = err -> LOGGER.error("A streamdata error has been sent from SSE : {}", err);
42+
private Consumer<JsonNode> onErrorCallback = err -> LOGGER.error("A streamdata error has been sent from SSE : {}", err);
4343
private Consumer<Throwable> onFailureCallback = t -> LOGGER.error("An error occured while processing event", t);
4444

4545
// jackson objectMapper to parse Json content
@@ -123,7 +123,7 @@ public EventSourceClient onPatch(Consumer<JsonNode> callback) {
123123
}
124124

125125
@Override
126-
public EventSourceClient onError(Consumer<String> callback) {
126+
public EventSourceClient onError(Consumer<JsonNode> callback) {
127127
this.onErrorCallback = callback;
128128
return this;
129129
}
@@ -224,8 +224,16 @@ public void onEvent(InboundEvent inboundEvent) {
224224
break;
225225

226226
case "error":
227-
LOGGER.debug("Receiving error {} ", eventData);
228-
onErrorCallback.accept(eventData);
227+
try {
228+
LOGGER.debug("Receiving error {} ", eventData);
229+
230+
// read the error
231+
JsonNode lastPatch = jsonObjectMapper.readTree(eventData);
232+
233+
onErrorCallback.accept(lastPatch);
234+
} catch (IOException e) {
235+
onFailureCallback.accept(e);
236+
}
229237
break;
230238

231239
default:

0 commit comments

Comments
 (0)