diff --git a/src/driver/sqlx_mysql.rs b/src/driver/sqlx_mysql.rs index ebea1b83..a777ecb8 100644 --- a/src/driver/sqlx_mysql.rs +++ b/src/driver/sqlx_mysql.rs @@ -82,16 +82,13 @@ impl SqlxMySqlPoolConnection { debug_print!("{}", stmt); let query = sqlx_query(&stmt); - if let Ok(conn) = &mut self.pool.acquire().await { - crate::metric::metric!(self.metric_callback, &stmt, { - match query.execute(conn).await { - Ok(res) => Ok(res.into()), - Err(err) => Err(sqlx_error_to_exec_err(err)), - } - }) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + crate::metric::metric!(self.metric_callback, &stmt, { + match query.execute(conn).await { + Ok(res) => Ok(res.into()), + Err(err) => Err(sqlx_error_to_exec_err(err)), + } + }) } /// Execute an unprepared SQL statement on a MySQL backend @@ -99,13 +96,10 @@ impl SqlxMySqlPoolConnection { pub async fn execute_unprepared(&self, sql: &str) -> Result { debug_print!("{}", sql); - if let Ok(conn) = &mut self.pool.acquire().await { - match conn.execute(sql).await { - Ok(res) => Ok(res.into()), - Err(err) => Err(sqlx_error_to_exec_err(err)), - } - } else { - Err(DbErr::ConnectionAcquire) + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + match conn.execute(sql).await { + Ok(res) => Ok(res.into()), + Err(err) => Err(sqlx_error_to_exec_err(err)), } } @@ -115,19 +109,16 @@ impl SqlxMySqlPoolConnection { debug_print!("{}", stmt); let query = sqlx_query(&stmt); - if let Ok(conn) = &mut self.pool.acquire().await { - crate::metric::metric!(self.metric_callback, &stmt, { - match query.fetch_one(conn).await { - Ok(row) => Ok(Some(row.into())), - Err(err) => match err { - sqlx::Error::RowNotFound => Ok(None), - _ => Err(sqlx_error_to_query_err(err)), - }, - } - }) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_one(conn).await { + Ok(row) => Ok(Some(row.into())), + Err(err) => match err { + sqlx::Error::RowNotFound => Ok(None), + _ => Err(sqlx_error_to_query_err(err)), + }, + } + }) } /// Get the results of a query returning them as a Vec<[QueryResult]> @@ -136,16 +127,13 @@ impl SqlxMySqlPoolConnection { debug_print!("{}", stmt); let query = sqlx_query(&stmt); - if let Ok(conn) = &mut self.pool.acquire().await { - crate::metric::metric!(self.metric_callback, &stmt, { - match query.fetch_all(conn).await { - Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), - Err(err) => Err(sqlx_error_to_query_err(err)), - } - }) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_all(conn).await { + Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), + Err(err) => Err(sqlx_error_to_query_err(err)), + } + }) } /// Stream the results of executing a SQL query @@ -153,15 +141,12 @@ impl SqlxMySqlPoolConnection { pub async fn stream(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); - if let Ok(conn) = self.pool.acquire().await { - Ok(QueryStream::from(( - conn, - stmt, - self.metric_callback.clone(), - ))) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = self.pool.acquire().await.map_err(conn_acquire_err)?; + Ok(QueryStream::from(( + conn, + stmt, + self.metric_callback.clone(), + ))) } /// Bundle a set of SQL statements that execute together. @@ -171,17 +156,14 @@ impl SqlxMySqlPoolConnection { isolation_level: Option, access_mode: Option, ) -> Result { - if let Ok(conn) = self.pool.acquire().await { - DatabaseTransaction::new_mysql( - conn, - self.metric_callback.clone(), - isolation_level, - access_mode, - ) - .await - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = self.pool.acquire().await.map_err(conn_acquire_err)?; + DatabaseTransaction::new_mysql( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await } /// Create a MySQL transaction @@ -200,19 +182,16 @@ impl SqlxMySqlPoolConnection { T: Send, E: std::error::Error + Send, { - if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_mysql( - conn, - self.metric_callback.clone(), - isolation_level, - access_mode, - ) - .await - .map_err(|e| TransactionError::Connection(e))?; - transaction.run(callback).await - } else { - Err(DbErr::ConnectionAcquire.into()) - } + let conn = self.pool.acquire().await.map_err(conn_acquire_err)?; + let transaction = DatabaseTransaction::new_mysql( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await + .map_err(|e| TransactionError::Connection(e))?; + transaction.run(callback).await } pub(crate) fn set_metric_callback(&mut self, callback: F) @@ -224,13 +203,10 @@ impl SqlxMySqlPoolConnection { /// Checks if a connection to the database is still valid. pub async fn ping(&self) -> Result<(), DbErr> { - if let Ok(conn) = &mut self.pool.acquire().await { - match conn.ping().await { - Ok(_) => Ok(()), - Err(err) => Err(sqlx_error_to_conn_err(err)), - } - } else { - Err(DbErr::ConnectionAcquire) + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + match conn.ping().await { + Ok(_) => Ok(()), + Err(err) => Err(sqlx_error_to_conn_err(err)), } } diff --git a/src/driver/sqlx_postgres.rs b/src/driver/sqlx_postgres.rs index e78aa8d8..35f65a61 100644 --- a/src/driver/sqlx_postgres.rs +++ b/src/driver/sqlx_postgres.rs @@ -97,16 +97,13 @@ impl SqlxPostgresPoolConnection { debug_print!("{}", stmt); let query = sqlx_query(&stmt); - if let Ok(conn) = &mut self.pool.acquire().await { - crate::metric::metric!(self.metric_callback, &stmt, { - match query.execute(conn).await { - Ok(res) => Ok(res.into()), - Err(err) => Err(sqlx_error_to_exec_err(err)), - } - }) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + crate::metric::metric!(self.metric_callback, &stmt, { + match query.execute(conn).await { + Ok(res) => Ok(res.into()), + Err(err) => Err(sqlx_error_to_exec_err(err)), + } + }) } /// Execute an unprepared SQL statement on a PostgreSQL backend @@ -114,13 +111,10 @@ impl SqlxPostgresPoolConnection { pub async fn execute_unprepared(&self, sql: &str) -> Result { debug_print!("{}", sql); - if let Ok(conn) = &mut self.pool.acquire().await { - match conn.execute(sql).await { - Ok(res) => Ok(res.into()), - Err(err) => Err(sqlx_error_to_exec_err(err)), - } - } else { - Err(DbErr::ConnectionAcquire) + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + match conn.execute(sql).await { + Ok(res) => Ok(res.into()), + Err(err) => Err(sqlx_error_to_exec_err(err)), } } @@ -130,19 +124,16 @@ impl SqlxPostgresPoolConnection { debug_print!("{}", stmt); let query = sqlx_query(&stmt); - if let Ok(conn) = &mut self.pool.acquire().await { - crate::metric::metric!(self.metric_callback, &stmt, { - match query.fetch_one(conn).await { - Ok(row) => Ok(Some(row.into())), - Err(err) => match err { - sqlx::Error::RowNotFound => Ok(None), - _ => Err(sqlx_error_to_query_err(err)), - }, - } - }) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_one(conn).await { + Ok(row) => Ok(Some(row.into())), + Err(err) => match err { + sqlx::Error::RowNotFound => Ok(None), + _ => Err(sqlx_error_to_query_err(err)), + }, + } + }) } /// Get the results of a query returning them as a Vec<[QueryResult]> @@ -151,16 +142,13 @@ impl SqlxPostgresPoolConnection { debug_print!("{}", stmt); let query = sqlx_query(&stmt); - if let Ok(conn) = &mut self.pool.acquire().await { - crate::metric::metric!(self.metric_callback, &stmt, { - match query.fetch_all(conn).await { - Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), - Err(err) => Err(sqlx_error_to_query_err(err)), - } - }) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_all(conn).await { + Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), + Err(err) => Err(sqlx_error_to_query_err(err)), + } + }) } /// Stream the results of executing a SQL query @@ -168,15 +156,12 @@ impl SqlxPostgresPoolConnection { pub async fn stream(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); - if let Ok(conn) = self.pool.acquire().await { - Ok(QueryStream::from(( - conn, - stmt, - self.metric_callback.clone(), - ))) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = self.pool.acquire().await.map_err(conn_acquire_err)?; + Ok(QueryStream::from(( + conn, + stmt, + self.metric_callback.clone(), + ))) } /// Bundle a set of SQL statements that execute together. @@ -186,17 +171,14 @@ impl SqlxPostgresPoolConnection { isolation_level: Option, access_mode: Option, ) -> Result { - if let Ok(conn) = self.pool.acquire().await { - DatabaseTransaction::new_postgres( - conn, - self.metric_callback.clone(), - isolation_level, - access_mode, - ) - .await - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = self.pool.acquire().await.map_err(conn_acquire_err)?; + DatabaseTransaction::new_postgres( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await } /// Create a PostgreSQL transaction @@ -215,19 +197,16 @@ impl SqlxPostgresPoolConnection { T: Send, E: std::error::Error + Send, { - if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_postgres( - conn, - self.metric_callback.clone(), - isolation_level, - access_mode, - ) - .await - .map_err(|e| TransactionError::Connection(e))?; - transaction.run(callback).await - } else { - Err(DbErr::ConnectionAcquire.into()) - } + let conn = self.pool.acquire().await.map_err(conn_acquire_err)?; + let transaction = DatabaseTransaction::new_postgres( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await + .map_err(|e| TransactionError::Connection(e))?; + transaction.run(callback).await } pub(crate) fn set_metric_callback(&mut self, callback: F) @@ -239,13 +218,10 @@ impl SqlxPostgresPoolConnection { /// Checks if a connection to the database is still valid. pub async fn ping(&self) -> Result<(), DbErr> { - if let Ok(conn) = &mut self.pool.acquire().await { - match conn.ping().await { - Ok(_) => Ok(()), - Err(err) => Err(sqlx_error_to_conn_err(err)), - } - } else { - Err(DbErr::ConnectionAcquire) + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + match conn.ping().await { + Ok(_) => Ok(()), + Err(err) => Err(sqlx_error_to_conn_err(err)), } } diff --git a/src/driver/sqlx_sqlite.rs b/src/driver/sqlx_sqlite.rs index 5e187549..f8d351a9 100644 --- a/src/driver/sqlx_sqlite.rs +++ b/src/driver/sqlx_sqlite.rs @@ -89,16 +89,13 @@ impl SqlxSqlitePoolConnection { debug_print!("{}", stmt); let query = sqlx_query(&stmt); - if let Ok(conn) = &mut self.pool.acquire().await { - crate::metric::metric!(self.metric_callback, &stmt, { - match query.execute(conn).await { - Ok(res) => Ok(res.into()), - Err(err) => Err(sqlx_error_to_exec_err(err)), - } - }) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + crate::metric::metric!(self.metric_callback, &stmt, { + match query.execute(conn).await { + Ok(res) => Ok(res.into()), + Err(err) => Err(sqlx_error_to_exec_err(err)), + } + }) } /// Execute an unprepared SQL statement on a SQLite backend @@ -106,13 +103,10 @@ impl SqlxSqlitePoolConnection { pub async fn execute_unprepared(&self, sql: &str) -> Result { debug_print!("{}", sql); - if let Ok(conn) = &mut self.pool.acquire().await { - match conn.execute(sql).await { - Ok(res) => Ok(res.into()), - Err(err) => Err(sqlx_error_to_exec_err(err)), - } - } else { - Err(DbErr::ConnectionAcquire) + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + match conn.execute(sql).await { + Ok(res) => Ok(res.into()), + Err(err) => Err(sqlx_error_to_exec_err(err)), } } @@ -122,19 +116,16 @@ impl SqlxSqlitePoolConnection { debug_print!("{}", stmt); let query = sqlx_query(&stmt); - if let Ok(conn) = &mut self.pool.acquire().await { - crate::metric::metric!(self.metric_callback, &stmt, { - match query.fetch_one(conn).await { - Ok(row) => Ok(Some(row.into())), - Err(err) => match err { - sqlx::Error::RowNotFound => Ok(None), - _ => Err(sqlx_error_to_query_err(err)), - }, - } - }) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_one(conn).await { + Ok(row) => Ok(Some(row.into())), + Err(err) => match err { + sqlx::Error::RowNotFound => Ok(None), + _ => Err(sqlx_error_to_query_err(err)), + }, + } + }) } /// Get the results of a query returning them as a Vec<[QueryResult]> @@ -143,16 +134,13 @@ impl SqlxSqlitePoolConnection { debug_print!("{}", stmt); let query = sqlx_query(&stmt); - if let Ok(conn) = &mut self.pool.acquire().await { - crate::metric::metric!(self.metric_callback, &stmt, { - match query.fetch_all(conn).await { - Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), - Err(err) => Err(sqlx_error_to_query_err(err)), - } - }) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_all(conn).await { + Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), + Err(err) => Err(sqlx_error_to_query_err(err)), + } + }) } /// Stream the results of executing a SQL query @@ -160,15 +148,12 @@ impl SqlxSqlitePoolConnection { pub async fn stream(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); - if let Ok(conn) = self.pool.acquire().await { - Ok(QueryStream::from(( - conn, - stmt, - self.metric_callback.clone(), - ))) - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = self.pool.acquire().await.map_err(conn_acquire_err)?; + Ok(QueryStream::from(( + conn, + stmt, + self.metric_callback.clone(), + ))) } /// Bundle a set of SQL statements that execute together. @@ -178,17 +163,14 @@ impl SqlxSqlitePoolConnection { isolation_level: Option, access_mode: Option, ) -> Result { - if let Ok(conn) = self.pool.acquire().await { - DatabaseTransaction::new_sqlite( - conn, - self.metric_callback.clone(), - isolation_level, - access_mode, - ) - .await - } else { - Err(DbErr::ConnectionAcquire) - } + let conn = self.pool.acquire().await.map_err(conn_acquire_err)?; + DatabaseTransaction::new_sqlite( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await } /// Create a MySQL transaction @@ -207,19 +189,16 @@ impl SqlxSqlitePoolConnection { T: Send, E: std::error::Error + Send, { - if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_sqlite( - conn, - self.metric_callback.clone(), - isolation_level, - access_mode, - ) - .await - .map_err(|e| TransactionError::Connection(e))?; - transaction.run(callback).await - } else { - Err(DbErr::ConnectionAcquire.into()) - } + let conn = self.pool.acquire().await.map_err(conn_acquire_err)?; + let transaction = DatabaseTransaction::new_sqlite( + conn, + self.metric_callback.clone(), + isolation_level, + access_mode, + ) + .await + .map_err(|e| TransactionError::Connection(e))?; + transaction.run(callback).await } pub(crate) fn set_metric_callback(&mut self, callback: F) @@ -231,13 +210,10 @@ impl SqlxSqlitePoolConnection { /// Checks if a connection to the database is still valid. pub async fn ping(&self) -> Result<(), DbErr> { - if let Ok(conn) = &mut self.pool.acquire().await { - match conn.ping().await { - Ok(_) => Ok(()), - Err(err) => Err(sqlx_error_to_conn_err(err)), - } - } else { - Err(DbErr::ConnectionAcquire) + let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?; + match conn.ping().await { + Ok(_) => Ok(()), + Err(err) => Err(sqlx_error_to_conn_err(err)), } } diff --git a/src/error.rs b/src/error.rs index 125da4f8..a63ac708 100644 --- a/src/error.rs +++ b/src/error.rs @@ -16,8 +16,8 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum DbErr { /// This error can happen when the connection pool is fully-utilized - #[error("Failed to acquire connection from pool")] - ConnectionAcquire, + #[error("Failed to acquire connection from pool: {0}")] + ConnectionAcquire(ConnAcquireErr), /// Runtime type conversion error #[error("Error converting `{from}` into `{into}`: {source}")] TryIntoErr { @@ -75,6 +75,17 @@ pub enum DbErr { RecordNotUpdated, } +/// Connection error +#[derive(Error, Debug, PartialEq, Eq)] +pub enum ConnAcquireErr { + /// Connection Timed Out + #[error("Connection Timed out")] + Timeout, + /// Unavailable connection + #[error("Connection closed by host")] + ConnectionClosed, +} + /// Runtime error #[derive(Error, Debug)] pub enum RuntimeErr { @@ -140,6 +151,16 @@ where DbErr::Json(s.to_string()) } +#[allow(dead_code)] +#[cfg(feature = "sqlx-dep")] +pub(crate) fn conn_acquire_err(sqlx_err: sqlx::Error) -> DbErr { + match sqlx_err { + sqlx::Error::PoolTimedOut => DbErr::ConnectionAcquire(ConnAcquireErr::Timeout), + sqlx::Error::PoolClosed => DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed), + _ => DbErr::Conn(RuntimeErr::SqlxError(sqlx_err)), + } +} + /// An error from unsuccessful SQL query #[derive(Error, Debug, Clone, PartialEq, Eq)] #[non_exhaustive] diff --git a/tests/connection_tests.rs b/tests/connection_tests.rs index 83abb1f9..e5966d10 100644 --- a/tests/connection_tests.rs +++ b/tests/connection_tests.rs @@ -25,7 +25,38 @@ pub async fn connection_ping_closed_mysql() { let ctx_ping = std::rc::Rc::clone(&ctx); ctx.db.get_mysql_connection_pool().close().await; - assert_eq!(ctx_ping.db.ping().await, Err(DbErr::ConnectionAcquire)); + assert_eq!( + ctx_ping.db.ping().await, + Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)) + ); + + let base_url = std::env::var("DATABASE_URL").unwrap(); + let mut opt = sea_orm::ConnectOptions::new(format!("{base_url}/connection_ping_closed")); + opt + // The connection pool has a single connection only + .max_connections(1) + // A controlled connection acquire timeout + .acquire_timeout(std::time::Duration::from_secs(2)); + + let db = sea_orm::Database::connect(opt).await.unwrap(); + + async fn transaction_blocked(db: &DatabaseConnection) { + let _txn = sea_orm::TransactionTrait::begin(db).await.unwrap(); + // Occupy the only connection, thus forcing others fail to acquire connection + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + } + + async fn transaction(db: &DatabaseConnection) { + // Should fail to acquire + let txn = sea_orm::TransactionTrait::begin(db).await; + assert_eq!( + txn.expect_err("should be a time out"), + crate::DbErr::ConnectionAcquire(ConnAcquireErr::Timeout) + ) + } + + tokio::join!(transaction_blocked(&db), transaction(&db)); + ctx.delete().await; } @@ -36,7 +67,38 @@ pub async fn connection_ping_closed_sqlite() { let ctx_ping = std::rc::Rc::clone(&ctx); ctx.db.get_sqlite_connection_pool().close().await; - assert_eq!(ctx_ping.db.ping().await, Err(DbErr::ConnectionAcquire)); + assert_eq!( + ctx_ping.db.ping().await, + Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)) + ); + + let base_url = std::env::var("DATABASE_URL").unwrap(); + let mut opt = sea_orm::ConnectOptions::new(base_url); + opt + // The connection pool has a single connection only + .max_connections(1) + // A controlled connection acquire timeout + .acquire_timeout(std::time::Duration::from_secs(2)); + + let db = sea_orm::Database::connect(opt).await.unwrap(); + + async fn transaction_blocked(db: &DatabaseConnection) { + let _txn = sea_orm::TransactionTrait::begin(db).await.unwrap(); + // Occupy the only connection, thus forcing others fail to acquire connection + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + } + + async fn transaction(db: &DatabaseConnection) { + // Should fail to acquire + let txn = sea_orm::TransactionTrait::begin(db).await; + assert_eq!( + txn.expect_err("should be a time out"), + crate::DbErr::ConnectionAcquire(ConnAcquireErr::Timeout) + ) + } + + tokio::join!(transaction_blocked(&db), transaction(&db)); + ctx.delete().await; } @@ -47,6 +109,37 @@ pub async fn connection_ping_closed_postgres() { let ctx_ping = std::rc::Rc::clone(&ctx); ctx.db.get_postgres_connection_pool().close().await; - assert_eq!(ctx_ping.db.ping().await, Err(DbErr::ConnectionAcquire)); + assert_eq!( + ctx_ping.db.ping().await, + Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)) + ); + + let base_url = std::env::var("DATABASE_URL").unwrap(); + let mut opt = sea_orm::ConnectOptions::new(format!("{base_url}/connection_ping_closed")); + opt + // The connection pool has a single connection only + .max_connections(1) + // A controlled connection acquire timeout + .acquire_timeout(std::time::Duration::from_secs(2)); + + let db = sea_orm::Database::connect(opt).await.unwrap(); + + async fn transaction_blocked(db: &DatabaseConnection) { + let _txn = sea_orm::TransactionTrait::begin(db).await.unwrap(); + // Occupy the only connection, thus forcing others fail to acquire connection + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + } + + async fn transaction(db: &DatabaseConnection) { + // Should fail to acquire + let txn = sea_orm::TransactionTrait::begin(db).await; + assert_eq!( + txn.expect_err("should be a time out"), + crate::DbErr::ConnectionAcquire(ConnAcquireErr::Timeout) + ) + } + + tokio::join!(transaction_blocked(&db), transaction(&db)); + ctx.delete().await; }