Skip to content

Commit de2ebd0

Browse files
authored
fix(ttstream): add server-side information in ttstream errBizHandlerReturnCancel exception (#1921)
1 parent f732553 commit de2ebd0

File tree

3 files changed

+87
-1
lines changed

3 files changed

+87
-1
lines changed

pkg/remote/trans/ttstream/exception.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ var (
4343
errConnectionClosedCancel = newException("canceled by connection closed", kerrors.ErrStreamingCanceled, 12013)
4444
)
4545

46+
var errServerSideBizHandlerReturnCancel = errBizHandlerReturnCancel.newBuilder().withSide(serverSide)
47+
4648
func newStreamRecvTimeoutException(cfg streaming.TimeoutConfig) *Exception {
4749
return newException(fmt.Sprintf("stream Recv timeout, timeout config=%+v", cfg), kerrors.ErrStreamingTimeout, 12014).withSide(clientSide)
4850
}

pkg/remote/trans/ttstream/stream_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (s *serverStream) SendMsg(ctx context.Context, res any) error {
121121
// CloseSend by serverStream will be called after server handler returned
122122
// after CloseSend stream cannot be access again
123123
func (s *serverStream) CloseSend(exception error) error {
124-
s.close(errBizHandlerReturnCancel)
124+
s.close(errServerSideBizHandlerReturnCancel)
125125
if s.wheader != nil {
126126
if err := s.sendHeader(); err != nil {
127127
return err

pkg/remote/trans/ttstream/transport_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1622,3 +1622,87 @@ func testSendFailedBidiStreaming(t *testing.T) {
16221622

16231623
wg.Wait()
16241624
}
1625+
1626+
func Test_handlerReturnCauseCascadedCancel(t *testing.T) {
1627+
cliStA, srvStA := initTestStreams(t, context.Background(), "BidiStreaming", "ClientA", "ServerA")
1628+
cliStB, srvStB := initTestStreams(t, srvStA.ctx, "BidiStreaming", "ServerA-AsClient", "ServerB")
1629+
1630+
var wg sync.WaitGroup
1631+
wg.Add(3)
1632+
1633+
go func() {
1634+
defer wg.Done()
1635+
req := new(testRequest)
1636+
req.A = 1
1637+
req.B = "hello"
1638+
sErr := cliStA.SendMsg(cliStA.ctx, req)
1639+
test.Assert(t, sErr == nil, sErr)
1640+
}()
1641+
1642+
go func() {
1643+
defer wg.Done()
1644+
req := new(testRequest)
1645+
rErr := srvStA.RecvMsg(srvStA.ctx, req)
1646+
test.Assert(t, rErr == nil, rErr)
1647+
test.Assert(t, req.A == 1)
1648+
test.Assert(t, req.B == "hello")
1649+
1650+
downReq := new(testRequest)
1651+
downReq.A = 2
1652+
downReq.B = "world"
1653+
sErr := cliStB.SendMsg(srvStA.ctx, downReq)
1654+
test.Assert(t, sErr == nil, sErr)
1655+
1656+
// mock handler returning
1657+
time.Sleep(50 * time.Millisecond)
1658+
err := srvStA.CloseSend(nil)
1659+
test.Assert(t, err == nil, err)
1660+
1661+
resp := new(testResponse)
1662+
resp.A = 3
1663+
sErr = srvStA.SendMsg(srvStA.ctx, resp)
1664+
test.Assert(t, sErr != nil, sErr)
1665+
test.Assert(t, errors.Is(sErr, errBizHandlerReturnCancel), sErr)
1666+
sEx := sErr.(*Exception)
1667+
test.Assert(t, sEx.side == serverSide, sEx)
1668+
t.Logf("server-side stream A Send err: %v", sErr)
1669+
1670+
newReq := new(testRequest)
1671+
rErr = srvStA.RecvMsg(srvStA.ctx, newReq)
1672+
test.Assert(t, rErr != nil, rErr)
1673+
test.Assert(t, errors.Is(rErr, errBizHandlerReturnCancel), rErr)
1674+
rEx := rErr.(*Exception)
1675+
test.Assert(t, rEx.side == serverSide, rEx)
1676+
t.Logf("server-side stream A Recv err: %v", rErr)
1677+
}()
1678+
1679+
go func() {
1680+
defer wg.Done()
1681+
req := new(testRequest)
1682+
rErr := srvStB.RecvMsg(srvStB.ctx, req)
1683+
test.Assert(t, rErr == nil, rErr)
1684+
test.Assert(t, req.A == 2)
1685+
test.Assert(t, req.B == "world")
1686+
1687+
time.Sleep(100 * time.Millisecond)
1688+
1689+
downReq := new(testRequest)
1690+
downReq.A = 4
1691+
sErr := cliStB.SendMsg(srvStA.ctx, downReq)
1692+
test.Assert(t, sErr != nil, sErr)
1693+
test.Assert(t, errors.Is(sErr, errBizHandlerReturnCancel), sErr)
1694+
sEx := sErr.(*Exception)
1695+
test.Assert(t, sEx.side == clientSide, sEx)
1696+
t.Logf("client-side stream B Send err: %v", sErr)
1697+
1698+
resp := new(testResponse)
1699+
rErr = cliStB.RecvMsg(srvStA.ctx, resp)
1700+
test.Assert(t, rErr != nil, rErr)
1701+
test.Assert(t, errors.Is(rErr, errBizHandlerReturnCancel), rErr)
1702+
rEx := rErr.(*Exception)
1703+
test.Assert(t, rEx.side == clientSide, rEx)
1704+
t.Logf("client-side stream B Recv err: %v", rErr)
1705+
}()
1706+
1707+
wg.Wait()
1708+
}

0 commit comments

Comments
 (0)