-
Notifications
You must be signed in to change notification settings - Fork 77
Pool based async persister #535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Pool based async persister #535
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Changes requested ❌
Reviewed everything up to c4d6293 in 2 minutes and 30 seconds. Click for details.
- Reviewed
612
lines of code in5
files - Skipped
0
files when reviewing. - Skipped posting
17
draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. burr/integrations/persisters/b_asyncpg.py:154
- Draft comment:
The copy() method’s docstring states that if using a direct connection, it should raise an error (since connections can’t be shared). However, the current implementation returns a new persister that reuses the same connection. Please align the behavior with the documentation—either update the docstring to reflect that direct connections are reused, or modify the implementation to raise an error when not using a pool. - Reason this comment was not posted:
Marked as duplicate.
2. burr/integrations/persisters/b_asyncpg.py:380
- Draft comment:
In cleanup(), only the direct connection is closed, and the pool is left untouched. If the pool is shared across instances, this may be intended, but consider adding a comment clarifying that the pool’s lifecycle is managed separately. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The comment is technically correct but not actionable. The pool's lifecycle being managed separately is intentional design, as evidenced by the class-level _pool variable and the warning in the docstring. Adding a comment about this would be redundant since it's already documented in the class docstring. The current behavior is correct and doesn't need modification. The comment does identify a potential source of confusion for developers who might expect cleanup() to handle all resources. Maybe there is some value in making the behavior more explicit? While the concern is valid, the class already has clear documentation about pool management in its docstring. Adding redundant comments would not improve code quality. Delete the comment since the pool lifecycle management is already well-documented in the class docstring and the current behavior is intentional.
3. docs/concepts/parallelism.rst:680
- Draft comment:
The documentation example refers to AsyncPGPersister, but the class implemented is AsyncPostgreSQLPersister. For consistency and to avoid confusion, please ensure the naming is consistent in both the documentation and the code. - Reason this comment was not posted:
Comment looked like it was already resolved.
4. docs/concepts/sync-vs-async.rst:43
- Draft comment:
The checklist for Async Applications is clear; just ensure that it mentions the importance of using persisters with connection pools (as seen in the async persister implementation) to avoid connection sharing issues. A note could be added here to emphasize that direct connections aren’t safe for parallel asynchronous tasks. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The checklist already includes the key point about using async persisters with connection pools. The suggested addition is more implementation detail that belongs in API docs, not a high-level concepts guide. The current bullet point is appropriately concise for a checklist while covering the essential requirement. The comment does point to important technical details about connection safety that could prevent bugs. Maybe this level of detail would help developers avoid mistakes. While the technical details are valid, they belong in API documentation. The concepts guide should stay high-level and the existing bullet point already captures the key requirement. Delete the comment. The existing bullet point about connection pools is sufficient for a high-level checklist, and implementation details belong in API docs.
5. burr/integrations/persisters/b_asyncpg.py:61
- Draft comment:
The global _pool class variable is shared among all persister instances. This might cause issues if different DB parameters are needed per instance. Consider clarifying this behavior in the documentation or providing instance-specific pool support. - Reason this comment was not posted:
Confidence changes required:50%
<= threshold50%
None
6. burr/integrations/persisters/b_asyncpg.py:380
- Draft comment:
The cleanup() method only closes the direct connection and does not handle closing the connection pool. Consider documenting this behavior or providing an optional method to close the pool when it's no longer needed. - Reason this comment was not posted:
Confidence changes required:50%
<= threshold50%
None
7. docs/concepts/actions.rst:10
- Draft comment:
Typo: 'asynchonous' should be corrected to 'asynchronous'. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
8. burr/core/parallelism.py:538
- Draft comment:
Typo: In the docstrings for the state_persister, state_initializer, and tracker methods, 'reserverd' is used instead of 'reserved'. Please update the spelling to ensure the documentation is correct. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
9. burr/core/parallelism.py:278
- Draft comment:
Typo: The docstring for the async 'state_generator' function includes the misspelling 'exhause'. Please change it to 'exhaust' for clarity. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
10. docs/concepts/actions.rst:10
- Draft comment:
Typographical error: 'asynchonous' should be corrected to 'asynchronous'. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
11. docs/concepts/actions.rst:80
- Draft comment:
Syntax typo in code snippet: 'def custom_action(state: State) -> State' is missing a colon at the end. Please add a ':' after the function signature. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
12. docs/concepts/parallelism.rst:497
- Draft comment:
Typographical error: 'hte' should be 'the'. Please fix this minor typo. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
13. docs/concepts/parallelism.rst:513
- Draft comment:
Typographical error: 'mst' should be corrected to 'most'. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
14. docs/concepts/parallelism.rst:588
- Draft comment:
Typographical errors: replace 'providea' with 'provide a' and change 'it's' to 'its' to correctly show possession. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
15. docs/concepts/sync-vs-async.rst:60
- Draft comment:
Typo: The phrase "on the same as thread" should be corrected to "on the same thread". - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
16. docs/concepts/sync-vs-async.rst:67
- Draft comment:
Typo: "suports" should be corrected to "supports". - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
17. docs/concepts/sync-vs-async.rst:77
- Draft comment:
Typo: "bellow" should be corrected to "below". - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
Workflow ID: wflow_xnM0evJjqB1qijfg
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. Will defer to @elijahbenizzy since I wasn't too close to the async persister stuff.
Also seems like our pre-commit hook doesn't like a couple of things...
black....................................................................Failed
- hook id: black
- files were modified by this hook
reformatted burr/core/parallelism.py
All done! ✨ 🍰 ✨
1 file reformatted, 203 files left unchanged.
trim trailing whitespace.................................................Passed
fix end of files.........................................................Failed
- hook id: end-of-file-fixer
- exit code: 1
- files were modified by this hook
Fixing docs/concepts/parallelism.rst
fix requirements.txt.....................................................Passed
check python ast.........................................................Passed
isort....................................................................Failed
- hook id: isort
- files were modified by this hook
Fixing /home/runner/work/burr/burr/burr/integrations/persisters/b_asyncpg.py
flake8...................................................................Failed
- hook id: flake8
- exit code: 1
<unknown>:24: SyntaxWarning: invalid escape sequence '\ '
<unknown>:84: SyntaxWarning: invalid escape sequence '\-'
burr/integrations/persisters/b_asyncpg.py:154:23: F821 undefined name 'Self'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! Only thing missing is testing, LMK if you want to add it, but otherwise just add a TODO. I think this one is tricky to tests TBH.
Also pre-commit hooks.
I pushed fixes for the pre-commit hooks. I'd still like to know your opinion on the following @elijahbenizzy : I have kept things backwards compatible, but should we really let people instantiate async persisters with direct connections? This means people that use this will still run into the issue when using parallelism. In async setting, I would argue it'd be better to "force" connection pools to be used Regarding tests, there are already some in the test folder that I can draw inspiration from, but if you have something specific in mind lmk |
@gamarin2 Ok, I'm not sure this is "backwards compatible" as much as "preserves prior behavior". E.G. will anyone's code break if this is released and the default is set to I don't think so, although you have a better sense. If not: let's set the default to the optimal behavior and save people a headache. @jernejfrank -- any thoughts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @gamarin2 , great work and thanks for opening the PR! I left some minor comments.
Don't let this be a blocker, but would be great if you could squeeze in at least an E2E test with the pool. Up to you, like @elijahbenizzy said you can leave it as a TODO.
@elijahbenizzy RE default setting:
I don't think opening a connection pool instead of a connection will go terribly wrong for existing stuff. The one place I could see it go wrong is some multithreading happening outside of Burr and then opening the connection pool would result in a deadlock.
+1 from me to make the default using the pool (as it is the more sensible option for Burr).
def __init__( | ||
self, | ||
connection=None, | ||
pool=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this to last place to preserve original function signature.
self.connection = connection | ||
self.pool = pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would here be sensible to create a check that you can only have either a connection or a pool but not both?
.from_values(...)
is our preferred entyr point, but you could also create an instance by passing in a connection and a pool. I don't think anybody who uses the direct way would do that, but you never know.
async def cleanup(self): | ||
"""Closes the connection to the database.""" | ||
await self.connection.close() | ||
if self.connection is not None: | ||
await self.connection.close() | ||
self.connection = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did I miss it or are we missing a way to close the connection pool?
In our docs we point to this method for manual cleanup / Context manager uses the same, but this just closes the connection not the pool.
This PR adds a way to instantiate the async postgres persister with a connection pool. The goal is to prevent a bug in parallelism: when the user implements async versions of map-reduce classes (i.e. set is_async to True), burr calls
asyncio.gather
instead of threads, meaning we can't reuse the connection of the async persister.One solution is to update
b_asyncpg
to use a connection pool instead of a single connection and add a copy method to it, meaning that when burr cascades to get persisters for each sub-app, we get a new connection from the pool each time.Changes
AsyncPostgresPersister
with a pool instead of a connectionHow I tested this
Manual testing
Notes
So far I only implemented a proposed solution in
b_asyncpg
, but the issue is still present in other async persister implementations. Once we agree on a definitive solution, I can update them as well.Open question:
Checklist
Important
Add connection pool support to
AsyncPostgreSQLPersister
for better parallelism handling in async contexts.AsyncPostgreSQLPersister
can now be instantiated with a connection pool usingfrom_values()
withuse_pool=True
.copy()
method toAsyncPostgreSQLPersister
to create new instances using the pool._get_connection()
and_release_connection()
to handle pooled connections.actions.rst
,parallelism.rst
, andsync-vs-async.rst
to include information about using connection pools with async persisters.b_asyncpg.py
.This description was created by
for c4d6293. You can customize this summary. It will automatically update as commits are pushed.