Skip to content

Commit 2ff16af

Browse files
authored
chore: handle panics in reduce (#196)
Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
1 parent b229f81 commit 2ff16af

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

pkg/reducer/service.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
5858
// read messages from the stream and write the messages to corresponding channels
5959
// if the channel is not created, create the channel and invoke the reduceFn
6060
for {
61+
select {
62+
case errFromTask := <-taskManager.ErrorChannel():
63+
fs.shutdownCh <- struct{}{}
64+
return errFromTask
65+
default:
66+
}
6167
d, recvErr := stream.Recv()
6268
// if EOF, close all the channels
6369
if recvErr == io.EOF {

pkg/reducer/task_manager.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@ import (
88
"strings"
99

1010
v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
11+
"github.com/numaproj/numaflow-go/pkg/shared"
12+
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
13+
"google.golang.org/grpc/codes"
14+
"google.golang.org/grpc/status"
1115
)
1216

17+
var errReduceHandlerPanic = fmt.Errorf("UDF_EXECUTION_ERROR(%s)", shared.ContainerType)
18+
1319
// reduceTask represents a task for a performing reduceStream operation.
1420
type reduceTask struct {
1521
keys []string
1622
window *v1.Window
17-
reducer Reducer
1823
inputCh chan Datum
1924
doneCh chan struct{}
2025
}
@@ -57,6 +62,7 @@ type reduceTaskManager struct {
5762
tasks map[string]*reduceTask
5863
responseCh chan *v1.ReduceResponse
5964
shutdownCh chan<- struct{}
65+
errorCh chan error
6066
}
6167

6268
func newReduceTaskManager(reducerCreatorHandle ReducerCreator, shutdownCh chan<- struct{}) *reduceTaskManager {
@@ -65,6 +71,7 @@ func newReduceTaskManager(reducerCreatorHandle ReducerCreator, shutdownCh chan<-
6571
tasks: make(map[string]*reduceTask),
6672
responseCh: make(chan *v1.ReduceResponse),
6773
shutdownCh: shutdownCh,
74+
errorCh: make(chan error, 1),
6875
}
6976
}
7077

@@ -92,7 +99,10 @@ func (rtm *reduceTaskManager) CreateTask(ctx context.Context, request *v1.Reduce
9299
defer func() {
93100
if r := recover(); r != nil {
94101
log.Printf("panic inside reduce handler: %v %v", r, string(debug.Stack()))
95-
rtm.shutdownCh <- struct{}{}
102+
st, _ := status.Newf(codes.Internal, "%s: %v", errReduceHandlerPanic, r).WithDetails(&epb.DebugInfo{
103+
Detail: string(debug.Stack()),
104+
})
105+
rtm.errorCh <- st.Err()
96106
}
97107
}()
98108
// invoke the reduce function
@@ -135,6 +145,11 @@ func (rtm *reduceTaskManager) OutputChannel() <-chan *v1.ReduceResponse {
135145
return rtm.responseCh
136146
}
137147

148+
// Method to get the error channel
149+
func (rtm *reduceTaskManager) ErrorChannel() <-chan error {
150+
return rtm.errorCh
151+
}
152+
138153
// WaitAll waits for all the reduce tasks to complete.
139154
func (rtm *reduceTaskManager) WaitAll() {
140155
var eofResponse *v1.ReduceResponse

0 commit comments

Comments
 (0)