Skip to content

Commit e2c3ce4

Browse files
committed
feat: support ttstream with protobuf payload
1 parent 6e03ff3 commit e2c3ce4

20 files changed

+1297
-113
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
run: |
2020
cd ..
2121
rm -rf kitex-tests
22-
git clone --depth=1 https://github.com/cloudwego/kitex-tests.git
22+
git clone -b test/ttstream_pb --depth=1 https://github.com/DMwangnima/kitex-tests.git
2323
cd kitex-tests
2424
KITEX_TOOL_USE_PROTOC=0 ./run.sh ${{github.workspace}}
2525
cd ${{github.workspace}}

internal/codec/protobuf_struct.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2026 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package codec
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
24+
"github.com/bytedance/gopkg/lang/mcache"
25+
"github.com/cloudwego/fastpb"
26+
"google.golang.org/protobuf/proto"
27+
28+
"github.com/cloudwego/kitex/internal/utils/safemcache"
29+
"github.com/cloudwego/kitex/pkg/remote/codec/protobuf"
30+
"github.com/cloudwego/kitex/pkg/rpcinfo"
31+
)
32+
33+
var (
34+
errEncodePbGenericEmptyMethod = errors.New("empty methodName in generic pb Encode")
35+
errDecodePbGenericEmptyMethod = errors.New("empty methodName in generic pb Decode")
36+
)
37+
38+
// gogoproto generate
39+
type marshaler interface {
40+
MarshalTo(data []byte) (n int, err error)
41+
Size() int
42+
}
43+
44+
type protobufV2MsgCodec interface {
45+
XXX_Unmarshal(b []byte) error
46+
XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
47+
}
48+
49+
type EncodeResult struct {
50+
Payload []byte // encoded byte slice
51+
PreAllocate bool // the struct encoded can pre-allocate memory
52+
PreAllocateSize int // pre-allocate size for the struct encoded
53+
}
54+
55+
const GRPCDataFrameHeaderLen = 5
56+
57+
func GRPCEncodeProtobufStruct(ctx context.Context, ri rpcinfo.RPCInfo, msg any, isCompress bool) (EncodeResult, error) {
58+
prefixLen := GRPCDataFrameHeaderLen
59+
if isCompress {
60+
prefixLen = 0
61+
}
62+
return encodeProtobufStruct(ctx, ri, msg, safemcache.Malloc, safemcache.Free, prefixLen)
63+
}
64+
65+
func TTStreamEncodeProtobufStruct(ctx context.Context, ri rpcinfo.RPCInfo, msg any) (EncodeResult, error) {
66+
return encodeProtobufStruct(ctx, ri, msg, mcacheMalloc, mcache.Free, 0)
67+
}
68+
69+
func mcacheMalloc(size int) []byte {
70+
return mcache.Malloc(size)
71+
}
72+
73+
func encodeProtobufStruct(ctx context.Context, ri rpcinfo.RPCInfo, msg any,
74+
mallocFunc func(int) []byte, freeFunc func([]byte), prefixLen int,
75+
) (res EncodeResult, err error) {
76+
var payload []byte
77+
switch t := msg.(type) {
78+
// Deprecated: fastpb is no longer used
79+
case fastpb.Writer:
80+
res.PreAllocate = true
81+
res.PreAllocateSize = t.Size()
82+
payload = mallocFunc(res.PreAllocateSize + prefixLen)
83+
t.FastWrite(payload[prefixLen:])
84+
case marshaler:
85+
size := t.Size()
86+
payload = mallocFunc(size + prefixLen)
87+
if _, err = t.MarshalTo(payload[prefixLen:]); err != nil {
88+
freeFunc(payload)
89+
return res, err
90+
}
91+
res.PreAllocate = true
92+
res.PreAllocateSize = size
93+
case protobufV2MsgCodec:
94+
payload, err = t.XXX_Marshal(nil, true)
95+
case proto.Message:
96+
payload, err = proto.Marshal(t)
97+
case protobuf.ProtobufMsgCodec:
98+
payload, err = t.Marshal(nil)
99+
case protobuf.MessageWriterWithContext:
100+
payload, err = encodeProtobufGeneric(ctx, ri, t)
101+
default:
102+
err = fmt.Errorf("invalid payload %T in EncodeProtobufStruct", t)
103+
}
104+
105+
if err != nil {
106+
return res, err
107+
}
108+
res.Payload = payload
109+
return res, nil
110+
}
111+
112+
func encodeProtobufGeneric(ctx context.Context, ri rpcinfo.RPCInfo, w protobuf.MessageWriterWithContext) ([]byte, error) {
113+
methodName := ri.Invocation().MethodName()
114+
if methodName == "" {
115+
return nil, errEncodePbGenericEmptyMethod
116+
}
117+
actualMsg, err := w.WritePb(ctx, methodName)
118+
if err != nil {
119+
return nil, err
120+
}
121+
payload, ok := actualMsg.([]byte)
122+
if !ok {
123+
return nil, fmt.Errorf("encodePbGeneric failed, got %T", actualMsg)
124+
}
125+
return payload, nil
126+
}
127+
128+
func DecodeProtobufStruct(ctx context.Context, ri rpcinfo.RPCInfo, payload []byte, msg any) (err error) {
129+
// Deprecated: fastpb is no longer used
130+
if t, ok := msg.(fastpb.Reader); ok {
131+
if len(payload) == 0 {
132+
// if all fields of a struct is default value, data will be nil
133+
// In the implementation of fastpb, if data is nil, then fastpb will skip creating this struct, as a result user will get a nil pointer which is not expected.
134+
// So, when data is nil, use default protobuf unmarshal method to decode the struct.
135+
// todo: fix fastpb
136+
} else {
137+
_, err = fastpb.ReadMessage(payload, fastpb.SkipTypeCheck, t)
138+
return err
139+
}
140+
}
141+
switch t := msg.(type) {
142+
case protobufV2MsgCodec:
143+
return t.XXX_Unmarshal(payload)
144+
case proto.Message:
145+
return proto.Unmarshal(payload, t)
146+
case protobuf.ProtobufMsgCodec:
147+
return t.Unmarshal(payload)
148+
case protobuf.MessageReaderWithMethodWithContext:
149+
methodName := ri.Invocation().MethodName()
150+
if methodName == "" {
151+
return errDecodePbGenericEmptyMethod
152+
}
153+
return t.ReadPb(ctx, methodName, payload)
154+
default:
155+
return fmt.Errorf("invalid payload %T in DecodeProtobufStruct", t)
156+
}
157+
}

0 commit comments

Comments
 (0)