Hi, thanks for your great work. I may need some help.
I tried to use the deadline control in one operator. I followed the full_pipeline example: implement setup func for SqureOperator, and in the function, invoke ctx.add_deadline to set a deadline for the operator. However, it is useless to do so. The deadline handler doesn't work.
I read the source code and find the reason is in arm_deadlines function. Take src/node/operator_executors/one_in_one_out_executor.rs as an example. The arm_deadlines func only creates a DeadlineEvent when the deadline.get_constrained_read_stream_ids is not empty. However, the deadline's constrained_read_stream(TimestampDeadline.read_stream_ids) is always empty. The only way to insert a read_strem_id is by invoking the on_read_stream func of TimestampDeadline, but on_read_stream is not invoked during the running of full_pipeline.
I solved the problem(deadline control does not work) by making the following changes to ctx.add_deadline in full_pipeline.rs:
ctx.add_deadline(TimestampDeadline::new(
move |_s: &(), _t: &Timestamp| -> Duration { Duration::new(2, 0) },
|_s: &(), _t: &Timestamp| {
tracing::info!("SquareOperator @ {:?}: Missed deadline.", _t);
},
).on_read_stream(ctx.get_read_stream_ids()[0]));
To achieve this, I have to make get_read_stream_ids pub.
I dont know if I am right. Please give me an example of how to add_deadline correctly. I appreciate your help. Thanks!
Hi, thanks for your great work. I may need some help.
I tried to use the deadline control in one operator. I followed the full_pipeline example: implement setup func for SqureOperator, and in the function, invoke ctx.add_deadline to set a deadline for the operator. However, it is useless to do so. The deadline handler doesn't work.
I read the source code and find the reason is in arm_deadlines function. Take src/node/operator_executors/one_in_one_out_executor.rs as an example. The arm_deadlines func only creates a DeadlineEvent when the deadline.get_constrained_read_stream_ids is not empty. However, the deadline's constrained_read_stream(TimestampDeadline.read_stream_ids) is always empty. The only way to insert a read_strem_id is by invoking the on_read_stream func of TimestampDeadline, but on_read_stream is not invoked during the running of full_pipeline.
I solved the problem(deadline control does not work) by making the following changes to ctx.add_deadline in full_pipeline.rs:
ctx.add_deadline(TimestampDeadline::new(
move |_s: &(), _t: &Timestamp| -> Duration { Duration::new(2, 0) },
|_s: &(), _t: &Timestamp| {
tracing::info!("SquareOperator @ {:?}: Missed deadline.", _t);
},
).on_read_stream(ctx.get_read_stream_ids()[0]));
To achieve this, I have to make get_read_stream_ids pub.
I dont know if I am right. Please give me an example of how to add_deadline correctly. I appreciate your help. Thanks!