Skip to content

Pool based async persister #535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

gamarin2
Copy link

@gamarin2 gamarin2 commented May 6, 2025

This PR adds a way to instantiate the async postgres persister with a connection pool. The goal is to prevent a bug in parallelism: when the user implements async versions of map-reduce classes (i.e. set is_async to True), burr calls asyncio.gather instead of threads, meaning we can't reuse the connection of the async persister.

One solution is to update b_asyncpg to use a connection pool instead of a single connection and add a copy method to it, meaning that when burr cascades to get persisters for each sub-app, we get a new connection from the pool each time.

Changes

  • Added a way to instantiate AsyncPostgresPersister with a pool instead of a connection
  • Added a copy method that favors using the pool in copied instances if self.pool is not None
  • Kept everything backwards compatible
  • Also added missing async related docs

How I tested this

Manual testing

Notes

So far I only implemented a proposed solution in b_asyncpg, but the issue is still present in other async persister implementations. Once we agree on a definitive solution, I can update them as well.

Open question:

  • Do we really want to keep things backwards compatible and let user instantiate with a simple connection? In most scenarios a pool is safer, especially in async context

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

Important

Add connection pool support to AsyncPostgreSQLPersister for better parallelism handling in async contexts.

  • Behavior:
    • AsyncPostgreSQLPersister can now be instantiated with a connection pool using from_values() with use_pool=True.
    • Adds copy() method to AsyncPostgreSQLPersister to create new instances using the pool.
    • Updates _get_connection() and _release_connection() to handle pooled connections.
  • Documentation:
    • Updates actions.rst, parallelism.rst, and sync-vs-async.rst to include information about using connection pools with async persisters.
  • Misc:
    • Maintains backward compatibility with direct connections.
    • Fixes minor formatting issues in b_asyncpg.py.

This description was created by Ellipsis for c4d6293. You can customize this summary. It will automatically update as commits are pushed.

Copy link
Contributor

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Changes requested ❌

Reviewed everything up to c4d6293 in 2 minutes and 30 seconds. Click for details.
  • Reviewed 612 lines of code in 5 files
  • Skipped 0 files when reviewing.
  • Skipped posting 17 draft comments. View those below.
  • Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. burr/integrations/persisters/b_asyncpg.py:154
  • Draft comment:
    The copy() method’s docstring states that if using a direct connection, it should raise an error (since connections can’t be shared). However, the current implementation returns a new persister that reuses the same connection. Please align the behavior with the documentation—either update the docstring to reflect that direct connections are reused, or modify the implementation to raise an error when not using a pool.
  • Reason this comment was not posted:
    Marked as duplicate.
2. burr/integrations/persisters/b_asyncpg.py:380
  • Draft comment:
    In cleanup(), only the direct connection is closed, and the pool is left untouched. If the pool is shared across instances, this may be intended, but consider adding a comment clarifying that the pool’s lifecycle is managed separately.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The comment is technically correct but not actionable. The pool's lifecycle being managed separately is intentional design, as evidenced by the class-level _pool variable and the warning in the docstring. Adding a comment about this would be redundant since it's already documented in the class docstring. The current behavior is correct and doesn't need modification. The comment does identify a potential source of confusion for developers who might expect cleanup() to handle all resources. Maybe there is some value in making the behavior more explicit? While the concern is valid, the class already has clear documentation about pool management in its docstring. Adding redundant comments would not improve code quality. Delete the comment since the pool lifecycle management is already well-documented in the class docstring and the current behavior is intentional.
3. docs/concepts/parallelism.rst:680
  • Draft comment:
    The documentation example refers to AsyncPGPersister, but the class implemented is AsyncPostgreSQLPersister. For consistency and to avoid confusion, please ensure the naming is consistent in both the documentation and the code.
  • Reason this comment was not posted:
    Comment looked like it was already resolved.
4. docs/concepts/sync-vs-async.rst:43
  • Draft comment:
    The checklist for Async Applications is clear; just ensure that it mentions the importance of using persisters with connection pools (as seen in the async persister implementation) to avoid connection sharing issues. A note could be added here to emphasize that direct connections aren’t safe for parallel asynchronous tasks.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The checklist already includes the key point about using async persisters with connection pools. The suggested addition is more implementation detail that belongs in API docs, not a high-level concepts guide. The current bullet point is appropriately concise for a checklist while covering the essential requirement. The comment does point to important technical details about connection safety that could prevent bugs. Maybe this level of detail would help developers avoid mistakes. While the technical details are valid, they belong in API documentation. The concepts guide should stay high-level and the existing bullet point already captures the key requirement. Delete the comment. The existing bullet point about connection pools is sufficient for a high-level checklist, and implementation details belong in API docs.
5. burr/integrations/persisters/b_asyncpg.py:61
  • Draft comment:
    The global _pool class variable is shared among all persister instances. This might cause issues if different DB parameters are needed per instance. Consider clarifying this behavior in the documentation or providing instance-specific pool support.
  • Reason this comment was not posted:
    Confidence changes required: 50% <= threshold 50% None
6. burr/integrations/persisters/b_asyncpg.py:380
  • Draft comment:
    The cleanup() method only closes the direct connection and does not handle closing the connection pool. Consider documenting this behavior or providing an optional method to close the pool when it's no longer needed.
  • Reason this comment was not posted:
    Confidence changes required: 50% <= threshold 50% None
7. docs/concepts/actions.rst:10
  • Draft comment:
    Typo: 'asynchonous' should be corrected to 'asynchronous'.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
8. burr/core/parallelism.py:538
  • Draft comment:
    Typo: In the docstrings for the state_persister, state_initializer, and tracker methods, 'reserverd' is used instead of 'reserved'. Please update the spelling to ensure the documentation is correct.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
9. burr/core/parallelism.py:278
  • Draft comment:
    Typo: The docstring for the async 'state_generator' function includes the misspelling 'exhause'. Please change it to 'exhaust' for clarity.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
10. docs/concepts/actions.rst:10
  • Draft comment:
    Typographical error: 'asynchonous' should be corrected to 'asynchronous'.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
11. docs/concepts/actions.rst:80
  • Draft comment:
    Syntax typo in code snippet: 'def custom_action(state: State) -> State' is missing a colon at the end. Please add a ':' after the function signature.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
12. docs/concepts/parallelism.rst:497
  • Draft comment:
    Typographical error: 'hte' should be 'the'. Please fix this minor typo.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
13. docs/concepts/parallelism.rst:513
  • Draft comment:
    Typographical error: 'mst' should be corrected to 'most'.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
14. docs/concepts/parallelism.rst:588
  • Draft comment:
    Typographical errors: replace 'providea' with 'provide a' and change 'it's' to 'its' to correctly show possession.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
15. docs/concepts/sync-vs-async.rst:60
  • Draft comment:
    Typo: The phrase "on the same as thread" should be corrected to "on the same thread".
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
16. docs/concepts/sync-vs-async.rst:67
  • Draft comment:
    Typo: "suports" should be corrected to "supports".
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
17. docs/concepts/sync-vs-async.rst:77
  • Draft comment:
    Typo: "bellow" should be corrected to "below".
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.

Workflow ID: wflow_xnM0evJjqB1qijfg

You can customize Ellipsis by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.

Copy link
Contributor

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. Will defer to @elijahbenizzy since I wasn't too close to the async persister stuff.

Also seems like our pre-commit hook doesn't like a couple of things...

black....................................................................Failed
- hook id: black
- files were modified by this hook

reformatted burr/core/parallelism.py

All done! ✨ 🍰 ✨
1 file reformatted, 203 files left unchanged.

trim trailing whitespace.................................................Passed
fix end of files.........................................................Failed
- hook id: end-of-file-fixer
- exit code: 1
- files were modified by this hook

Fixing docs/concepts/parallelism.rst

fix requirements.txt.....................................................Passed
check python ast.........................................................Passed
isort....................................................................Failed
- hook id: isort
- files were modified by this hook

Fixing /home/runner/work/burr/burr/burr/integrations/persisters/b_asyncpg.py

flake8...................................................................Failed
- hook id: flake8
- exit code: 1

<unknown>:24: SyntaxWarning: invalid escape sequence '\ '
<unknown>:84: SyntaxWarning: invalid escape sequence '\-'
burr/integrations/persisters/b_asyncpg.py:154:23: F821 undefined name 'Self'

Copy link
Contributor

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! Only thing missing is testing, LMK if you want to add it, but otherwise just add a TODO. I think this one is tricky to tests TBH.

Also pre-commit hooks.

@gamarin2
Copy link
Author

I pushed fixes for the pre-commit hooks. I'd still like to know your opinion on the following @elijahbenizzy : I have kept things backwards compatible, but should we really let people instantiate async persisters with direct connections? This means people that use this will still run into the issue when using parallelism. In async setting, I would argue it'd be better to "force" connection pools to be used

Regarding tests, there are already some in the test folder that I can draw inspiration from, but if you have something specific in mind lmk

@elijahbenizzy
Copy link
Contributor

I pushed fixes for the pre-commit hooks. I'd still like to know your opinion on the following @elijahbenizzy : I have kept things backwards compatible, but should we really let people instantiate async persisters with direct connections? This means people that use this will still run into the issue when using parallelism. In async setting, I would argue it'd be better to "force" connection pools to be used

Regarding tests, there are already some in the test folder that I can draw inspiration from, but if you have something specific in mind lmk

@gamarin2 Ok, I'm not sure this is "backwards compatible" as much as "preserves prior behavior". E.G. will anyone's code break if this is released and the default is set to True rather than False for use_pool?

I don't think so, although you have a better sense.

If not: let's set the default to the optimal behavior and save people a headache.
If so: let's think more carefully about this -- we're still in 0.X.X so we have the right, but we want to be smart about this.

@jernejfrank -- any thoughts?

Copy link
Contributor

@jernejfrank jernejfrank left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @gamarin2 , great work and thanks for opening the PR! I left some minor comments.

Don't let this be a blocker, but would be great if you could squeeze in at least an E2E test with the pool. Up to you, like @elijahbenizzy said you can leave it as a TODO.

@elijahbenizzy RE default setting:

I don't think opening a connection pool instead of a connection will go terribly wrong for existing stuff. The one place I could see it go wrong is some multithreading happening outside of Burr and then opening the connection pool would result in a deadlock.

+1 from me to make the default using the pool (as it is the more sensible option for Burr).

def __init__(
self,
connection=None,
pool=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this to last place to preserve original function signature.

Comment on lines 155 to +156
self.connection = connection
self.pool = pool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would here be sensible to create a check that you can only have either a connection or a pool but not both?

.from_values(...) is our preferred entyr point, but you could also create an instance by passing in a connection and a pool. I don't think anybody who uses the direct way would do that, but you never know.

Comment on lines 386 to +390
async def cleanup(self):
"""Closes the connection to the database."""
await self.connection.close()
if self.connection is not None:
await self.connection.close()
self.connection = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss it or are we missing a way to close the connection pool?

In our docs we point to this method for manual cleanup / Context manager uses the same, but this just closes the connection not the pool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants