Skip to content

Commit 1e260ee

Browse files
authored
refactor: using length-prefixed chunking for Backwards invocations (#342)
- Adjusted the header byte manipulation in chunking functions to correctly use the first four bytes for data length. - Modified the HTTP request streaming function to include the length-prefixed option for improved data handling.
1 parent ac64417 commit 1e260ee

File tree

4 files changed

+10
-5
lines changed

4 files changed

+10
-5
lines changed

internal/core/dify_invocation/real/http_request.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,15 @@ func StreamResponse[T any](i *RealBackwardsInvocation, method string, path strin
5757
}),
5858
http_requests.HttpWriteTimeout(i.writeTimeout),
5959
http_requests.HttpReadTimeout(i.readTimeout),
60+
http_requests.HttpUsingLengthPrefixed(true),
6061
)
6162

62-
response, err := http_requests.RequestAndParseStream[BaseBackwardsInvocationResponse[T]](i.client, i.difyPath(path), method, options...)
63+
response, err := http_requests.RequestAndParseStream[BaseBackwardsInvocationResponse[T]](
64+
i.client,
65+
i.difyPath(path),
66+
method,
67+
options...,
68+
)
6369
if err != nil {
6470
return nil, err
6571
}

internal/utils/http_requests/http_warpper.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
9595
for _, option := range options {
9696
if option.Type == HttpOptionTypeReadTimeout {
9797
readTimeout = option.Value.(int64)
98-
break
9998
} else if option.Type == HttpOptionTypeRaiseErrorWhenStreamDataNotMatch {
10099
raiseErrorWhenStreamDataNotMatch = option.Value.(bool)
101100
} else if option.Type == HttpOptionTypeUsingLengthPrefixed {

internal/utils/parser/chunking.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func LengthPrefixedChunking(
8383
}
8484

8585
// decoding data length
86-
dataLength := binary.LittleEndian.Uint32(header[4:8])
86+
dataLength := binary.LittleEndian.Uint32(header[:4])
8787
if dataLength > maxChunkSize {
8888
return fmt.Errorf("data length is too long: %d", dataLength)
8989
}

internal/utils/parser/chunking_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func TestLengthPrefixedChunking(t *testing.T) {
137137
// First 4 bytes are already used for data length placeholder
138138
// Write data length in bytes 4-7 (little endian)
139139
dataLen := uint32(len(chunk))
140-
binary.LittleEndian.PutUint32(header[4:8], dataLen)
140+
binary.LittleEndian.PutUint32(header[:4], dataLen)
141141
// Remaining 6 bytes are reserved (already zero)
142142

143143
buf.Write(header)
@@ -247,7 +247,7 @@ func TestLengthPrefixedChunking_DataTooLarge(t *testing.T) {
247247

248248
// Create header with large data size
249249
header := make([]byte, 10)
250-
binary.LittleEndian.PutUint32(header[4:8], largeDataSize)
250+
binary.LittleEndian.PutUint32(header[:4], largeDataSize)
251251

252252
buf.Write(header)
253253

0 commit comments

Comments
 (0)