Skip to content

Commit fead869

Browse files
authored
Merge pull request #101 from weiznich/feature/more_pool_configs
Add a `RecyclingMethod` configuration to the connection pool
2 parents 7884a32 + 6df08b8 commit fead869

File tree

9 files changed

+155
-48
lines changed

9 files changed

+155
-48
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ for Rust libraries in [RFC #1105](https://github.com/rust-lang/rfcs/blob/master/
77
## Unreleased
88

99
* Add a `AsyncConnectionWrapper` type to turn a `diesel_async::AsyncConnection` into a `diesel::Connection`. This might be used to execute migrations via `diesel_migrations`.
10+
* Add some connection pool configurations to specify how connections
11+
in the pool should be checked if they are still valid
1012

1113
## [0.3.2] - 2023-07-24
1214

@@ -56,3 +58,5 @@ for Rust libraries in [RFC #1105](https://github.com/rust-lang/rfcs/blob/master/
5658
[0.2.1]: https://github.com/weiznich/diesel_async/compare/v0.2.0...v0.2.1
5759
[0.2.2]: https://github.com/weiznich/diesel_async/compare/v0.2.1...v0.2.2
5860
[0.3.0]: https://github.com/weiznich/diesel_async/compare/v0.2.0...v0.3.0
61+
[0.3.1]: https://github.com/weiznich/diesel_async/compare/v0.3.0...v0.3.1
62+
[0.3.2]: https://github.com/weiznich/diesel_async/compare/v0.3.1...v0.3.2

examples/postgres/pooled-with-rustls/src/main.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use diesel::{ConnectionError, ConnectionResult};
22
use diesel_async::pooled_connection::bb8::Pool;
33
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
4+
use diesel_async::pooled_connection::ManagerConfig;
45
use diesel_async::AsyncPgConnection;
56
use futures_util::future::BoxFuture;
67
use futures_util::FutureExt;
@@ -10,12 +11,14 @@ use std::time::Duration;
1011
async fn main() -> Result<(), Box<dyn std::error::Error>> {
1112
let db_url = std::env::var("DATABASE_URL").expect("Env var `DATABASE_URL` not set");
1213

14+
let config = ManagerConfig {
15+
custom_setup: Box::new(establish_connection),
16+
..ManagerConfig::default()
17+
};
18+
1319
// First we have to construct a connection manager with our custom `establish_connection`
1420
// function
15-
let mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new_with_setup(
16-
db_url,
17-
establish_connection,
18-
);
21+
let mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new_with_config(db_url, config);
1922
// From that connection we can then create a pool, here given with some example settings.
2023
//
2124
// This creates a TLS configuration that's equivalent to `libpq'` `sslmode=verify-full`, which

src/async_connection_wrapper.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,16 +258,22 @@ mod implementation {
258258
B: BlockOn,
259259
Self: diesel::Connection,
260260
C: crate::AsyncConnection<Backend = <Self as diesel::Connection>::Backend>
261-
+ crate::pooled_connection::PoolableConnection,
261+
+ crate::pooled_connection::PoolableConnection
262+
+ 'static,
263+
diesel::dsl::BareSelect<diesel::dsl::AsExprOf<i32, diesel::sql_types::Integer>>:
264+
crate::methods::ExecuteDsl<C>,
265+
diesel::query_builder::SqlQuery: crate::methods::ExecuteDsl<C>,
262266
{
263267
fn ping(&mut self) -> diesel::QueryResult<()> {
264-
diesel::Connection::execute_returning_count(self, &C::make_ping_query()).map(|_| ())
268+
let fut = crate::pooled_connection::PoolableConnection::ping(
269+
&mut self.inner,
270+
&crate::pooled_connection::RecyclingMethod::Verified,
271+
);
272+
self.runtime.block_on(fut)
265273
}
266274

267275
fn is_broken(&mut self) -> bool {
268-
<C::TransactionManager as crate::TransactionManager<_>>::is_broken_transaction_manager(
269-
&mut self.inner,
270-
)
276+
crate::pooled_connection::PoolableConnection::is_broken(&mut self.inner)
271277
}
272278
}
273279

src/mysql/mod.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,7 @@ impl AsyncMysqlConnection {
307307
feature = "mobc",
308308
feature = "r2d2"
309309
))]
310-
impl crate::pooled_connection::PoolableConnection for AsyncMysqlConnection {
311-
type PingQuery = crate::pooled_connection::CheckConnectionQuery;
312-
313-
fn make_ping_query() -> Self::PingQuery {
314-
crate::pooled_connection::CheckConnectionQuery
315-
}
316-
}
310+
impl crate::pooled_connection::PoolableConnection for AsyncMysqlConnection {}
317311

318312
#[cfg(test)]
319313
mod tests {

src/pg/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -477,10 +477,10 @@ async fn lookup_type(
477477
feature = "r2d2"
478478
))]
479479
impl crate::pooled_connection::PoolableConnection for AsyncPgConnection {
480-
type PingQuery = crate::pooled_connection::CheckConnectionQuery;
480+
fn is_broken(&mut self) -> bool {
481+
use crate::TransactionManager;
481482

482-
fn make_ping_query() -> Self::PingQuery {
483-
crate::pooled_connection::CheckConnectionQuery
483+
Self::TransactionManager::is_broken_transaction_manager(self) || self.conn.is_closed()
484484
}
485485
}
486486

src/pooled_connection/bb8.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
4444
use super::{AsyncDieselConnectionManager, PoolError, PoolableConnection};
4545
use bb8::ManageConnection;
46+
use diesel::query_builder::QueryFragment;
4647

4748
/// Type alias for using [`bb8::Pool`] with [`diesel-async`]
4849
pub type Pool<C> = bb8::Pool<AsyncDieselConnectionManager<C>>;
@@ -55,19 +56,24 @@ pub type RunError = bb8::RunError<super::PoolError>;
5556
impl<C> ManageConnection for AsyncDieselConnectionManager<C>
5657
where
5758
C: PoolableConnection + 'static,
59+
diesel::dsl::BareSelect<diesel::dsl::AsExprOf<i32, diesel::sql_types::Integer>>:
60+
crate::methods::ExecuteDsl<C>,
61+
diesel::query_builder::SqlQuery: QueryFragment<C::Backend>,
5862
{
5963
type Connection = C;
6064

6165
type Error = PoolError;
6266

6367
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
64-
(self.setup)(&self.connection_url)
68+
(self.manager_config.custom_setup)(&self.connection_url)
6569
.await
6670
.map_err(PoolError::ConnectionError)
6771
}
6872

6973
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
70-
conn.ping().await.map_err(PoolError::QueryError)
74+
conn.ping(&self.manager_config.recycling_method)
75+
.await
76+
.map_err(PoolError::QueryError)
7177
}
7278

7379
fn has_broken(&self, conn: &mut Self::Connection) -> bool {

src/pooled_connection/deadpool.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
//! ```
4242
use super::{AsyncDieselConnectionManager, PoolableConnection};
4343
use deadpool::managed::Manager;
44+
use diesel::query_builder::QueryFragment;
4445

4546
/// Type alias for using [`deadpool::managed::Pool`] with [`diesel-async`]
4647
pub type Pool<C> = deadpool::managed::Pool<AsyncDieselConnectionManager<C>>;
@@ -63,13 +64,16 @@ pub type HookErrorCause = deadpool::managed::HookErrorCause<super::PoolError>;
6364
impl<C> Manager for AsyncDieselConnectionManager<C>
6465
where
6566
C: PoolableConnection + Send + 'static,
67+
diesel::dsl::BareSelect<diesel::dsl::AsExprOf<i32, diesel::sql_types::Integer>>:
68+
crate::methods::ExecuteDsl<C>,
69+
diesel::query_builder::SqlQuery: QueryFragment<C::Backend>,
6670
{
6771
type Type = C;
6872

6973
type Error = super::PoolError;
7074

7175
async fn create(&self) -> Result<Self::Type, Self::Error> {
72-
(self.setup)(&self.connection_url)
76+
(self.manager_config.custom_setup)(&self.connection_url)
7377
.await
7478
.map_err(super::PoolError::ConnectionError)
7579
}
@@ -80,7 +84,9 @@ where
8084
"Broken connection",
8185
));
8286
}
83-
obj.ping().await.map_err(super::PoolError::QueryError)?;
87+
obj.ping(&self.manager_config.recycling_method)
88+
.await
89+
.map_err(super::PoolError::QueryError)?;
8490
Ok(())
8591
}
8692
}

src/pooled_connection/mobc.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
//! # }
4141
//! ```
4242
use super::{AsyncDieselConnectionManager, PoolError, PoolableConnection};
43+
use diesel::query_builder::QueryFragment;
4344
use mobc::Manager;
4445

4546
/// Type alias for using [`mobc::Pool`] with [`diesel-async`]
@@ -52,19 +53,24 @@ pub type Builder<C> = mobc::Builder<AsyncDieselConnectionManager<C>>;
5253
impl<C> Manager for AsyncDieselConnectionManager<C>
5354
where
5455
C: PoolableConnection + 'static,
56+
diesel::dsl::BareSelect<diesel::dsl::AsExprOf<i32, diesel::sql_types::Integer>>:
57+
crate::methods::ExecuteDsl<C>,
58+
diesel::query_builder::SqlQuery: QueryFragment<C::Backend>,
5559
{
5660
type Connection = C;
5761

5862
type Error = PoolError;
5963

6064
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
61-
(self.setup)(&self.connection_url)
65+
(self.manager_config.custom_setup)(&self.connection_url)
6266
.await
6367
.map_err(PoolError::ConnectionError)
6468
}
6569

6670
async fn check(&self, mut conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
67-
conn.ping().await.map_err(PoolError::QueryError)?;
71+
conn.ping(&self.manager_config.recycling_method)
72+
.await
73+
.map_err(PoolError::QueryError)?;
6874
Ok(conn)
6975
}
7076
}

src/pooled_connection/mod.rs

Lines changed: 105 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
use crate::{AsyncConnection, SimpleAsyncConnection};
99
use crate::{TransactionManager, UpdateAndFetchResults};
1010
use diesel::associations::HasTable;
11-
use diesel::query_builder::{QueryFragment, QueryId};
1211
use diesel::QueryResult;
1312
use futures_util::{future, FutureExt};
13+
use std::borrow::Cow;
1414
use std::fmt;
1515
use std::ops::DerefMut;
1616

@@ -42,18 +42,83 @@ impl fmt::Display for PoolError {
4242

4343
impl std::error::Error for PoolError {}
4444

45-
type SetupCallback<C> =
45+
/// Type of the custom setup closure passed to [`ManagerConfig::custom_setup`]
46+
pub type SetupCallback<C> =
4647
Box<dyn Fn(&str) -> future::BoxFuture<diesel::ConnectionResult<C>> + Send + Sync>;
4748

49+
/// Type of the recycle check callback for the [`RecyclingMethod::CustomFunction`] variant
50+
pub type RecycleCheckCallback<C> =
51+
dyn Fn(&mut C) -> future::BoxFuture<QueryResult<()>> + Send + Sync;
52+
53+
/// Possible methods of how a connection is recycled.
54+
#[derive(Default)]
55+
pub enum RecyclingMethod<C> {
56+
/// Only check for open transactions when recycling existing connections
57+
/// Unless you have special needs this is a safe choice.
58+
///
59+
/// If the database connection is closed you will recieve an error on the first place
60+
/// you actually try to use the connection
61+
Fast,
62+
/// In addition to checking for open transactions a test query is executed
63+
///
64+
/// This is slower, but guarantees that the database connection is ready to be used.
65+
#[default]
66+
Verified,
67+
/// Like `Verified` but with a custom query
68+
CustomQuery(Cow<'static, str>),
69+
/// Like `Verified` but with a custom callback that allows to perform more checks
70+
///
71+
/// The connection is only recycled if the callback returns `Ok(())`
72+
CustomFunction(Box<RecycleCheckCallback<C>>),
73+
}
74+
75+
impl<C: fmt::Debug> fmt::Debug for RecyclingMethod<C> {
76+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77+
match self {
78+
Self::Fast => write!(f, "Fast"),
79+
Self::Verified => write!(f, "Verified"),
80+
Self::CustomQuery(arg0) => f.debug_tuple("CustomQuery").field(arg0).finish(),
81+
Self::CustomFunction(_) => f.debug_tuple("CustomFunction").finish(),
82+
}
83+
}
84+
}
85+
86+
/// Configuration object for a Manager.
87+
///
88+
/// This currently only makes it possible to specify which [`RecyclingMethod`]
89+
/// should be used when retrieving existing objects from the [`Pool`].
90+
pub struct ManagerConfig<C> {
91+
/// Method of how a connection is recycled. See [RecyclingMethod].
92+
pub recycling_method: RecyclingMethod<C>,
93+
/// Construct a new connection manger
94+
/// with a custom setup procedure
95+
///
96+
/// This can be used to for example establish a SSL secured
97+
/// postgres connection
98+
pub custom_setup: SetupCallback<C>,
99+
}
100+
101+
impl<C> Default for ManagerConfig<C>
102+
where
103+
C: AsyncConnection + 'static,
104+
{
105+
fn default() -> Self {
106+
Self {
107+
recycling_method: Default::default(),
108+
custom_setup: Box::new(|url| C::establish(url).boxed()),
109+
}
110+
}
111+
}
112+
48113
/// An connection manager for use with diesel-async.
49114
///
50115
/// See the concrete pool implementations for examples:
51116
/// * [deadpool](self::deadpool)
52117
/// * [bb8](self::bb8)
53118
/// * [mobc](self::mobc)
54119
pub struct AsyncDieselConnectionManager<C> {
55-
setup: SetupCallback<C>,
56120
connection_url: String,
121+
manager_config: ManagerConfig<C>,
57122
}
58123

59124
impl<C> fmt::Debug for AsyncDieselConnectionManager<C> {
@@ -66,28 +131,31 @@ impl<C> fmt::Debug for AsyncDieselConnectionManager<C> {
66131
}
67132
}
68133

69-
impl<C> AsyncDieselConnectionManager<C> {
134+
impl<C> AsyncDieselConnectionManager<C>
135+
where
136+
C: AsyncConnection + 'static,
137+
{
70138
/// Returns a new connection manager,
71139
/// which establishes connections to the given database URL.
140+
#[must_use]
72141
pub fn new(connection_url: impl Into<String>) -> Self
73142
where
74143
C: AsyncConnection + 'static,
75144
{
76-
Self::new_with_setup(connection_url, |url| C::establish(url).boxed())
145+
Self::new_with_config(connection_url, Default::default())
77146
}
78147

79-
/// Construct a new connection manger
80-
/// with a custom setup procedure
81-
///
82-
/// This can be used to for example establish a SSL secured
83-
/// postgres connection
84-
pub fn new_with_setup(
148+
/// Returns a new connection manager,
149+
/// which establishes connections with the given database URL
150+
/// and that uses the specified configuration
151+
#[must_use]
152+
pub fn new_with_config(
85153
connection_url: impl Into<String>,
86-
setup: impl Fn(&str) -> future::BoxFuture<diesel::ConnectionResult<C>> + Send + Sync + 'static,
154+
manager_config: ManagerConfig<C>,
87155
) -> Self {
88156
Self {
89-
setup: Box::new(setup),
90157
connection_url: connection_url.into(),
158+
manager_config,
91159
}
92160
}
93161
}
@@ -218,9 +286,8 @@ where
218286
}
219287
}
220288

221-
#[doc(hidden)]
222289
#[derive(diesel::query_builder::QueryId)]
223-
pub struct CheckConnectionQuery;
290+
struct CheckConnectionQuery;
224291

225292
impl<DB> diesel::query_builder::QueryFragment<DB> for CheckConnectionQuery
226293
where
@@ -244,19 +311,34 @@ impl<C> diesel::query_dsl::RunQueryDsl<C> for CheckConnectionQuery {}
244311
#[doc(hidden)]
245312
#[async_trait::async_trait]
246313
pub trait PoolableConnection: AsyncConnection {
247-
type PingQuery: QueryFragment<Self::Backend> + QueryId + Send;
248-
249-
fn make_ping_query() -> Self::PingQuery;
250-
251314
/// Check if a connection is still valid
252315
///
253-
/// The default implementation performs a `SELECT 1` query
254-
async fn ping(&mut self) -> diesel::QueryResult<()>
316+
/// The default implementation will perform a check based on the provided
317+
/// recycling method variant
318+
async fn ping(&mut self, config: &RecyclingMethod<Self>) -> diesel::QueryResult<()>
255319
where
256320
for<'a> Self: 'a,
321+
diesel::dsl::BareSelect<diesel::dsl::AsExprOf<i32, diesel::sql_types::Integer>>:
322+
crate::methods::ExecuteDsl<Self>,
323+
diesel::query_builder::SqlQuery: crate::methods::ExecuteDsl<Self>,
257324
{
258-
use crate::RunQueryDsl;
259-
Self::make_ping_query().execute(self).await.map(|_| ())
325+
use crate::run_query_dsl::RunQueryDsl;
326+
use diesel::IntoSql;
327+
328+
match config {
329+
RecyclingMethod::Fast => Ok(()),
330+
RecyclingMethod::Verified => {
331+
diesel::select(1_i32.into_sql::<diesel::sql_types::Integer>())
332+
.execute(self)
333+
.await
334+
.map(|_| ())
335+
}
336+
RecyclingMethod::CustomQuery(query) => diesel::sql_query(query.as_ref())
337+
.execute(self)
338+
.await
339+
.map(|_| ()),
340+
RecyclingMethod::CustomFunction(c) => c(self).await,
341+
}
260342
}
261343

262344
/// Checks if the connection is broken and should not be reused

0 commit comments

Comments
 (0)