Skip to content

Implementation of the first layer of the SSE Implementation for the Generator to consume #7535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 63 additions & 11 deletions generators/java/generator-utils/src/main/resources/Stream.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import org.jetbrains.annotations.NotNull;

import java.io.Reader;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.io.Reader;
import java.util.Objects;
import java.util.Scanner;

/**
Expand All @@ -22,6 +25,21 @@ public final class Stream<T> implements Iterable<T> {
*/
private final Scanner scanner;

private Options options;

private StreamType streamType;

/**
* The type of stream to be interpreted.
*/
public enum StreamType {
SSE, NDJSON
}

private String DEFAULT_VALUE_PREFIX = "data: ";
private String DEFAULT_VALUE_TERMINATOR = "[DONE]";
private String DEFAULT_VALUE_DELIMITER = "\n";

/**
* Constructs a new {@code Stream} with the specified value type, reader, and delimiter.
*
Expand All @@ -34,6 +52,36 @@ public Stream(Class<T> valueType, Reader reader, String delimiter) {
this.valueType = valueType;
}

/**
* Constructs a new {@code Stream} with the specified value type, reader, and delimiter.
*
* @param valueType The class of the objects in the stream.
* @param reader The reader that provides the streamed data.
* @param options The options that determine how the stream is to be interpreted and contains prefix, delimiter, and terminator values.
* @param streamType The type of stream to be interpreted. The value here should come from {@link StreamType}
* @see StreamType
* @see Options
*/
public Stream(Class<T> valueType, Reader reader, Options options, StreamType streamType) {
this.streamType = Objects.requireNonNullElse(streamType, StreamType.NDJSON);
this.options = Objects.requireNonNullElse(options, new Options(DEFAULT_VALUE_PREFIX, DEFAULT_VALUE_DELIMITER, DEFAULT_VALUE_TERMINATOR));
this.scanner = new Scanner(reader).useDelimiter(this.options.delimiter);
this.valueType = valueType;

}

public static class Options {
public @NotNull String prefix;
public @NotNull String delimiter;
public @NotNull String terminator;

public Options(@NotNull String prefix, @NotNull String delimiter, @NotNull String terminator) {
this.prefix = prefix;
this.delimiter = delimiter;
this.terminator = terminator;
}
}

/**
* Returns an iterator over the elements in this stream that blocks during iteration when the next object is
* not yet available.
Expand Down Expand Up @@ -65,16 +113,20 @@ public boolean hasNext() {
*/
@Override
public T next() {
if (!scanner.hasNext()) {
throw new NoSuchElementException();
} else {
try {
T parsedResponse =
ObjectMappers.JSON_MAPPER.readValue(scanner.next().trim(), valueType);
return parsedResponse;
} catch (Exception e) {
throw new RuntimeException(e);
if (!scanner.hasNext()) throw new NoSuchElementException();

try {
String line = scanner.next();
if (streamType == StreamType.SSE) {
while (!line.startsWith(options.prefix)) line = scanner.next();
if (line.equals(options.terminator)) throw new NoSuchElementException();
line = line.substring(options.prefix.length());
} else {
line = line.trim();
}
return ObjectMappers.JSON_MAPPER.readValue(line, valueType);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand All @@ -89,4 +141,4 @@ public void remove() {
}
};
}
}
}
Loading