-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-52333][SS][PYTHON] Squeeze the protocol of retrieving timers for transformWithState in PySpark #51036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a high-level question:
You mentioned this optimization only helps when there are "not-to-be-huge number of timers". Is that because the change reduces the number of calls from N to N - 1, so for a small N (e.g., from 2 to 1), the reduction is significant (50%)—but when N is large, the relative benefit becomes minimal?
IIUC, another benefits of this PR is to get rid of the mid layer of PandasDataFrame and directly use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
timestamp = batch_df.at[i, "timestamp"].item() | ||
result_list.append((tuple(deserialized_key), timestamp)) | ||
yield result_list | ||
if status == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong: we are now not sending expiry timer in arrow batch, but in list of Rows - Is this because it improves performance by avoiding using arrow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
- We see the benefit of inlining the data into proto message to save one round-trip.
- Arrow is the columnar format, which is known to be efficient when there are multiple data. It's not a good usage (though sometimes needed) to use Arrow RecordBatch with small number of records. It "might be" a bit different when there are enough number of records, especially the fact that pickled Python Row looks to contain the "schema" as "json", which is not needed at all with Arrow RecordBatch. Haven't tested with large number.
I might be missing something, but I think your benchmark results listed in the description is to benchmark against timer related operations and also your code change is built around timer:
Why do we also see this improvements in set implicit key as well? |
Unfortunately I don't know clearly - one of my suspicion is that network layer might be overloaded due to lots of small messages, and reducing the number of round-trip may help in overall. |
It is actually not proved - Scala side does not have the way to pass the Row which Python does not need to deserialize, and vice versa. They also need serde. Arrow RecordBatch is no doubt. So it's really a matter of which approach has more overhead to perform serde on how many records and how many columns. Unfortunately I don't have a definite answer for this. |
CI failed with linter - I'll merge once I fix it and CI is back to green. |
CI is green. Thanks! Merging to master. |
What changes were proposed in this pull request?
This PR proposes to squeeze the protocol of retrieving timers for transformWithState in PySpark, which will help a lot on dealing with not-to-be-huge number of timers.
Here are the changes:
This change is the same mechanism we applied for ListState & MapState. We got performance improvement in the prior case, and we also see this change to be helpful on our internal benchmark.
Why are the changes needed?
To optimize further on some timer operations.
We benchmarked the change with listing 100 timers (PR for benchmarking: #50952), and we saw overall performance improvements.
Worth noting that it is not only impacting the LIST operation - it also impacts other operations as well. It's not clear why it happens, but the direction of reducing round-trips is proven to be the right direction.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UT.
Was this patch authored or co-authored using generative AI tooling?
No.