Skip to content

Commit 819d320

Browse files
authored
fix: thrift_streaming deprecated (#191)
Signed-off-by: rogerogers <[email protected]>
1 parent 0073e84 commit 819d320

File tree

9 files changed

+135
-570
lines changed

9 files changed

+135
-570
lines changed

thrift_streaming/client/demo_client.go

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323
"time"
2424

2525
"github.com/cloudwego/kitex/client"
26-
"github.com/cloudwego/kitex/client/streamclient"
2726
"github.com/cloudwego/kitex/pkg/endpoint"
27+
"github.com/cloudwego/kitex/pkg/endpoint/cep"
2828
"github.com/cloudwego/kitex/pkg/klog"
2929
"github.com/cloudwego/kitex/pkg/streaming"
3030
"github.com/cloudwego/kitex/pkg/transmeta"
@@ -36,12 +36,12 @@ import (
3636
)
3737

3838
var (
39-
streamClient = testservice.MustNewStreamClient(
39+
streamClient = testservice.MustNewClient(
4040
"server_name_for_discovery",
41-
streamclient.WithHostPorts("127.0.0.1:8888"),
41+
client.WithHostPorts("127.0.0.1:8888"),
4242

4343
// client middleware
44-
streamclient.WithMiddleware(func(e endpoint.Endpoint) endpoint.Endpoint {
44+
client.WithMiddleware(func(e endpoint.Endpoint) endpoint.Endpoint {
4545
return func(ctx context.Context, req, resp interface{}) (err error) {
4646
method, _ := kitexutil.GetMethod(ctx)
4747
klog.Infof("[%s] streamclient middleware, req = %v", method, req)
@@ -52,25 +52,27 @@ var (
5252
}),
5353

5454
// send middleware
55-
streamclient.WithSendMiddleware(func(next endpoint.SendEndpoint) endpoint.SendEndpoint {
56-
return func(stream streaming.Stream, req interface{}) (err error) {
57-
method, _ := kitexutil.GetMethod(stream.Context())
58-
err = next(stream, req)
59-
klog.Infof("[%s] streamclient send middleware, err = %v, req = %v", method, err, req)
60-
return err
61-
}
62-
}),
63-
64-
// recv middleware
65-
// NOTE: message (response from server) will NOT be available until `next` returns
66-
streamclient.WithRecvMiddleware(func(next endpoint.RecvEndpoint) endpoint.RecvEndpoint {
67-
return func(stream streaming.Stream, resp interface{}) (err error) {
68-
method, _ := kitexutil.GetMethod(stream.Context())
69-
err = next(stream, resp)
70-
klog.Infof("[%s] streamclient recv middleware, err = %v, resp = %v", method, err, resp)
71-
return err
72-
}
73-
}),
55+
client.WithStreamOptions(
56+
client.WithStreamSendMiddleware(func(next cep.StreamSendEndpoint) cep.StreamSendEndpoint {
57+
return func(ctx context.Context, stream streaming.ClientStream, req interface{}) (err error) {
58+
method, _ := kitexutil.GetMethod(stream.Context())
59+
err = next(ctx, stream, req)
60+
klog.Infof("[%s] streamclient send middleware, err = %v, req = %v", method, err, req)
61+
return err
62+
}
63+
}),
64+
65+
// recv middleware
66+
// NOTE: message (response from server) will NOT be available until `next` returns
67+
client.WithStreamRecvMiddleware(func(next cep.StreamRecvEndpoint) cep.StreamRecvEndpoint {
68+
return func(ctx context.Context, stream streaming.ClientStream, resp interface{}) (err error) {
69+
method, _ := kitexutil.GetMethod(stream.Context())
70+
err = next(ctx, stream, resp)
71+
klog.Infof("[%s] streamclient recv middleware, err = %v, resp = %v", method, err, resp)
72+
return err
73+
}
74+
}),
75+
),
7476
)
7577

7678
pingPongClient = testservice.MustNewClient(
@@ -105,7 +107,7 @@ func echoPingPong(cli testservice.Client) {
105107
klog.Infof("echoPingPong: rsp = %v", rsp)
106108
}
107109

108-
func echoUnary(cli testservice.StreamClient) {
110+
func echoUnary(cli testservice.Client) {
109111
ctx := context.Background()
110112
req := &test.Request{Message: "hello"}
111113
rsp, err := cli.EchoUnary(ctx, req)
@@ -116,7 +118,7 @@ func echoUnary(cli testservice.StreamClient) {
116118
klog.Infof("echoPingPong: rsp = %v", rsp)
117119
}
118120

119-
func echo(cli testservice.StreamClient) {
121+
func echo(cli testservice.Client) {
120122
ctx := context.Background()
121123
stream, err := cli.Echo(ctx)
122124
if err != nil {
@@ -128,10 +130,10 @@ func echo(cli testservice.StreamClient) {
128130
// Send
129131
go func() {
130132
defer wg.Done()
131-
defer stream.Close() // Tell the server there'll be no more message from client
133+
defer stream.CloseSend(stream.Context()) // Tell the server there'll be no more message from client
132134
for i := 0; i < 3; i++ {
133135
req := &test.Request{Message: "client, " + strconv.Itoa(i)}
134-
if err = stream.Send(req); err != nil {
136+
if err = stream.Send(ctx, req); err != nil {
135137
klog.Warnf("echo.send: failed, err = " + err.Error())
136138
break
137139
}
@@ -143,7 +145,7 @@ func echo(cli testservice.StreamClient) {
143145
go func() {
144146
defer wg.Done()
145147
for {
146-
resp, err := stream.Recv()
148+
resp, err := stream.Recv(ctx)
147149
// make sure you receive and io.EOF or other non-nil error
148150
// otherwise RPCFinish event will not be recorded
149151
if err == io.EOF {
@@ -159,37 +161,38 @@ func echo(cli testservice.StreamClient) {
159161
wg.Wait()
160162
}
161163

162-
func echoClient(cli testservice.StreamClient) {
163-
stream, err := cli.EchoClient(context.Background())
164+
func echoClient(cli testservice.Client) {
165+
ctx := context.Background()
166+
stream, err := cli.EchoClient(ctx)
164167
if err != nil {
165168
panic("failed to call Echo: " + err.Error())
166169
}
167170
for i := 0; i < 3; i++ {
168171
req := &test.Request{Message: "hello, " + strconv.Itoa(i)}
169-
err := stream.Send(req)
172+
err := stream.Send(stream.Context(), req)
170173
if err != nil {
171174
panic("failed to send Echo: " + err.Error())
172175
}
173176
klog.Infof("sent: %+v", req)
174177
}
175178

176179
// Recv
177-
resp, err := stream.CloseAndRecv()
180+
resp, err := stream.CloseAndRecv(ctx)
178181
if err != nil {
179182
klog.Warnf("failed to recv Echo: " + err.Error())
180183
} else {
181184
klog.Infof("recv: %+v", resp)
182185
}
183186
}
184187

185-
func echoServer(cli testservice.StreamClient) {
188+
func echoServer(cli testservice.Client) {
186189
req := &test.Request{Message: "hello"}
187190
stream, err := cli.EchoServer(context.Background(), req)
188191
if err != nil {
189192
panic("failed to call Echo: " + err.Error())
190193
}
191194
for {
192-
resp, err := stream.Recv()
195+
resp, err := stream.Recv(stream.Context())
193196
// make sure you receive and io.EOF or other non-nil error
194197
// otherwise RPCFinish event will not be recorded
195198
if err == io.EOF {

thrift_streaming/generate.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@ echo "kitex: using `which $kitex` $kitex_version"
3131
echo -e "\nMake sure you're using kitex >= v0.9.0 && thriftgo >= v0.3.6\n"
3232

3333
set -x
34-
$kitex $verbose $module $service $idl
34+
$kitex -streamx $verbose $module $service $idl

thrift_streaming/handler.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
type TestServiceImpl struct{}
3232

3333
// Echo is bidirectional streaming
34-
func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) {
34+
func (s *TestServiceImpl) Echo(ctx context.Context, stream echo.TestService_EchoServer) (err error) {
3535
// no need to call `stream.Close()` manually
3636
wg := sync.WaitGroup{}
3737
wg.Add(2)
@@ -44,7 +44,7 @@ func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) {
4444
wg.Done()
4545
}()
4646
for {
47-
msg, recvErr := stream.Recv()
47+
msg, recvErr := stream.Recv(ctx)
4848
// make sure you receive and io.EOF or other non-nil error
4949
// otherwise RPCFinish event will not be recorded
5050
if recvErr == io.EOF {
@@ -67,7 +67,7 @@ func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) {
6767
}()
6868
for i := 0; i < 3; i++ {
6969
msg := &echo.Response{Message: "server, " + strconv.Itoa(i)}
70-
if sendErr := stream.Send(msg); sendErr != nil {
70+
if sendErr := stream.Send(ctx, msg); sendErr != nil {
7171
err = sendErr
7272
return
7373
}
@@ -79,23 +79,23 @@ func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) {
7979
}
8080

8181
// EchoClient is client streaming
82-
func (s *TestServiceImpl) EchoClient(stream echo.TestService_EchoClientServer) (err error) {
82+
func (s *TestServiceImpl) EchoClient(ctx context.Context, stream echo.TestService_EchoClientServer) (err error) {
8383
for i := 0; i < 3; i++ {
84-
msg, err := stream.Recv()
84+
msg, err := stream.Recv(ctx)
8585
if err != nil {
8686
return err
8787
}
8888
klog.Infof("EchoClient: recv message = %s", msg)
8989
}
90-
return stream.SendAndClose(&echo.Response{Message: "echoClient"})
90+
return stream.SendAndClose(ctx, &echo.Response{Message: "echoClient"})
9191
}
9292

9393
// EchoServer is server streaming
94-
func (s *TestServiceImpl) EchoServer(req *echo.Request, stream echo.TestService_EchoServerServer) (err error) {
94+
func (s *TestServiceImpl) EchoServer(ctx context.Context, req *echo.Request, stream echo.TestService_EchoServerServer) (err error) {
9595
klog.Infof("EchoServer called, req = %+v", req)
9696
for i := 0; i < 3; i++ {
9797
msg := &echo.Response{Message: "server, " + strconv.Itoa(i)}
98-
if err = stream.Send(msg); err != nil {
98+
if err = stream.Send(ctx, msg); err != nil {
9999
return
100100
}
101101
klog.Infof("EchoServer: sent message = %s", msg)

0 commit comments

Comments
 (0)