Skip to content

[SPARK-52333][SS][PYTHON] Squeeze the protocol of retrieving timers for transformWithState in PySpark #51036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to squeeze the protocol of retrieving timers for transformWithState in PySpark, which will help a lot on dealing with not-to-be-huge number of timers.

Here are the changes:

  • StatefulProcessorHandleImpl.listTimers(), StatefulProcessorHandleImpl.getExpiredTimers() no longer requires additional request to notice there is no further data to read.
    • We inline the data into proto message, to ease of determine whether the iterator has fully consumed or not.

This change is the same mechanism we applied for ListState & MapState. We got performance improvement in the prior case, and we also see this change to be helpful on our internal benchmark.

Why are the changes needed?

To optimize further on some timer operations.

We benchmarked the change with listing 100 timers (PR for benchmarking: #50952), and we saw overall performance improvements.

Before the fix


 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
78.250		141.583		184.375		635.792		962743.500
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
65.375		126.125		162.792		565.833		60809.333
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
68.500		130.000		170.292		610.083		156733.125
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
486.833		714.961		998.625		2695.417		167039.959

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
77.916		139.000		182.375		671.792		521809.958
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
65.000		124.333		160.875		596.667		30860.208
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
67.125		127.916		170.250		740.051		64404.416
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
482.041		710.333		1050.333		2685.500		76762.583

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
78.208		139.959		181.459		722.459		713788.250
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
65.209		125.125		159.625		636.666		27963.167
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
67.417		129.000		168.875		764.602		12991.667
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
479.000		709.584		1045.543		2776.541		92247.542

After the fix

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
31.250		47.250		75.875		150.000		551557.750
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
26.958		39.208		65.208		122.667		78609.292
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
23.500		41.125		64.542		125.958		52641.042
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
93.125		118.542		156.500		284.625		19910.000

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
30.875		44.083		70.417		128.875		628912.209
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
26.917		36.416		61.292		109.917		164584.666
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
23.333		38.375		59.542		113.839		114350.250
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
94.125		115.208		148.917		246.292		36924.292

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
31.375		58.375		93.041		243.750		719545.583
 ==================== REGISTER latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
26.959		50.167		81.833		194.375		67609.583
 ==================== DELETE latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
24.208		50.834		83.000		211.018		20611.959
 ==================== LIST latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
95.291		132.375		183.875		427.584		36971.792

Worth noting that it is not only impacting the LIST operation - it also impacts other operations as well. It's not clear why it happens, but the direction of reducing round-trips is proven to be the right direction.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UT.

Was this patch authored or co-authored using generative AI tooling?

No.

@HeartSaVioR
Copy link
Contributor Author

@bogao007 @jingz-db Please take a look, thanks!

Copy link
Contributor

@huanliwang-db huanliwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a high-level question:
You mentioned this optimization only helps when there are "not-to-be-huge number of timers". Is that because the change reduces the number of calls from N to N - 1, so for a small N (e.g., from 2 to 1), the reduction is significant (50%)—but when N is large, the relative benefit becomes minimal?

@jingz-db
Copy link
Contributor

a high-level question: You mentioned this optimization only helps when there are "not-to-be-huge number of timers". Is that because the change reduces the number of calls from N to N - 1, so for a small N (e.g., from 2 to 1), the reduction is significant (50%)—but when N is large, the relative benefit becomes minimal?

IIUC, another benefits of this PR is to get rid of the mid layer of PandasDataFrame and directly use Row. This also saves se/deserialization time.

Copy link
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

timestamp = batch_df.at[i, "timestamp"].item()
result_list.append((tuple(deserialized_key), timestamp))
yield result_list
if status == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong: we are now not sending expiry timer in arrow batch, but in list of Rows - Is this because it improves performance by avoiding using arrow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. We see the benefit of inlining the data into proto message to save one round-trip.
  2. Arrow is the columnar format, which is known to be efficient when there are multiple data. It's not a good usage (though sometimes needed) to use Arrow RecordBatch with small number of records. It "might be" a bit different when there are enough number of records, especially the fact that pickled Python Row looks to contain the "schema" as "json", which is not needed at all with Arrow RecordBatch. Haven't tested with large number.

@jingz-db
Copy link
Contributor

I might be missing something, but I think your benchmark results listed in the description is to benchmark against timer related operations and also your code change is built around timer:

We benchmarked the change with listing 100 timers (PR for benchmarking: https://github.com/apache/spark/pull/50952), and we saw overall performance improvements.
 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
78.250		141.583		184.375		635.792		962743.500

 ==================== SET IMPLICIT KEY latency (micros) ======================
perc:50		perc:95		perc:99		perc:99.9		perc:100
31.250		47.250		75.875		150.000		551557.750

Why do we also see this improvements in set implicit key as well?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented May 29, 2025

Why do we also see this improvements in set implicit key as well?

Unfortunately I don't know clearly - one of my suspicion is that network layer might be overloaded due to lots of small messages, and reducing the number of round-trip may help in overall.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented May 29, 2025

IIUC, another benefits of this PR is to get rid of the mid layer of PandasDataFrame and directly use Row.

It is actually not proved - Scala side does not have the way to pass the Row which Python does not need to deserialize, and vice versa. They also need serde. Arrow RecordBatch is no doubt.

So it's really a matter of which approach has more overhead to perform serde on how many records and how many columns. Unfortunately I don't have a definite answer for this.

@HeartSaVioR
Copy link
Contributor Author

CI failed with linter - I'll merge once I fix it and CI is back to green.

@HeartSaVioR
Copy link
Contributor Author

CI is green. Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants