Connection acquire expand (#1737)
* Try connection pool acquire timeout * expanded ConnectionAcquire Errors, and adjusted relevant functions * updated to include ClosedConnection removed unknown as variant of ConnectionAcquireError updated DbErr Eq * revert mistakenly edited code * prolonged the timeout time for connection timeout tests * Revert "prolonged the timeout time for connection timeout tests" This reverts commit 04d98cf1fdafe64dbe6168b3bbcdcc813329ac4c. * Error PartialEq & Eq * fn conn_acquire_err * refactor code with conn_acquire_err * fmt * error msg --------- Co-authored-by: Billy Chan <ccw.billy.123@gmail.com>
This commit is contained in:
parent
500e761a68
commit
fd4aab7a8c
@ -82,16 +82,13 @@ impl SqlxMySqlPoolConnection {
|
|||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
let query = sqlx_query(&stmt);
|
let query = sqlx_query(&stmt);
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||||
match query.execute(conn).await {
|
match query.execute(conn).await {
|
||||||
Ok(res) => Ok(res.into()),
|
Ok(res) => Ok(res.into()),
|
||||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute an unprepared SQL statement on a MySQL backend
|
/// Execute an unprepared SQL statement on a MySQL backend
|
||||||
@ -99,14 +96,11 @@ impl SqlxMySqlPoolConnection {
|
|||||||
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
|
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
|
||||||
debug_print!("{}", sql);
|
debug_print!("{}", sql);
|
||||||
|
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
match conn.execute(sql).await {
|
match conn.execute(sql).await {
|
||||||
Ok(res) => Ok(res.into()),
|
Ok(res) => Ok(res.into()),
|
||||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get one result from a SQL query. Returns [Option::None] if no match was found
|
/// Get one result from a SQL query. Returns [Option::None] if no match was found
|
||||||
@ -115,7 +109,7 @@ impl SqlxMySqlPoolConnection {
|
|||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
let query = sqlx_query(&stmt);
|
let query = sqlx_query(&stmt);
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||||
match query.fetch_one(conn).await {
|
match query.fetch_one(conn).await {
|
||||||
Ok(row) => Ok(Some(row.into())),
|
Ok(row) => Ok(Some(row.into())),
|
||||||
@ -125,9 +119,6 @@ impl SqlxMySqlPoolConnection {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the results of a query returning them as a Vec<[QueryResult]>
|
/// Get the results of a query returning them as a Vec<[QueryResult]>
|
||||||
@ -136,16 +127,13 @@ impl SqlxMySqlPoolConnection {
|
|||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
let query = sqlx_query(&stmt);
|
let query = sqlx_query(&stmt);
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||||
match query.fetch_all(conn).await {
|
match query.fetch_all(conn).await {
|
||||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stream the results of executing a SQL query
|
/// Stream the results of executing a SQL query
|
||||||
@ -153,15 +141,12 @@ impl SqlxMySqlPoolConnection {
|
|||||||
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
|
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
|
||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
if let Ok(conn) = self.pool.acquire().await {
|
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
Ok(QueryStream::from((
|
Ok(QueryStream::from((
|
||||||
conn,
|
conn,
|
||||||
stmt,
|
stmt,
|
||||||
self.metric_callback.clone(),
|
self.metric_callback.clone(),
|
||||||
)))
|
)))
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bundle a set of SQL statements that execute together.
|
/// Bundle a set of SQL statements that execute together.
|
||||||
@ -171,7 +156,7 @@ impl SqlxMySqlPoolConnection {
|
|||||||
isolation_level: Option<IsolationLevel>,
|
isolation_level: Option<IsolationLevel>,
|
||||||
access_mode: Option<AccessMode>,
|
access_mode: Option<AccessMode>,
|
||||||
) -> Result<DatabaseTransaction, DbErr> {
|
) -> Result<DatabaseTransaction, DbErr> {
|
||||||
if let Ok(conn) = self.pool.acquire().await {
|
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
DatabaseTransaction::new_mysql(
|
DatabaseTransaction::new_mysql(
|
||||||
conn,
|
conn,
|
||||||
self.metric_callback.clone(),
|
self.metric_callback.clone(),
|
||||||
@ -179,9 +164,6 @@ impl SqlxMySqlPoolConnection {
|
|||||||
access_mode,
|
access_mode,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a MySQL transaction
|
/// Create a MySQL transaction
|
||||||
@ -200,7 +182,7 @@ impl SqlxMySqlPoolConnection {
|
|||||||
T: Send,
|
T: Send,
|
||||||
E: std::error::Error + Send,
|
E: std::error::Error + Send,
|
||||||
{
|
{
|
||||||
if let Ok(conn) = self.pool.acquire().await {
|
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
let transaction = DatabaseTransaction::new_mysql(
|
let transaction = DatabaseTransaction::new_mysql(
|
||||||
conn,
|
conn,
|
||||||
self.metric_callback.clone(),
|
self.metric_callback.clone(),
|
||||||
@ -210,9 +192,6 @@ impl SqlxMySqlPoolConnection {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| TransactionError::Connection(e))?;
|
.map_err(|e| TransactionError::Connection(e))?;
|
||||||
transaction.run(callback).await
|
transaction.run(callback).await
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire.into())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
|
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
|
||||||
@ -224,14 +203,11 @@ impl SqlxMySqlPoolConnection {
|
|||||||
|
|
||||||
/// Checks if a connection to the database is still valid.
|
/// Checks if a connection to the database is still valid.
|
||||||
pub async fn ping(&self) -> Result<(), DbErr> {
|
pub async fn ping(&self) -> Result<(), DbErr> {
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
match conn.ping().await {
|
match conn.ping().await {
|
||||||
Ok(_) => Ok(()),
|
Ok(_) => Ok(()),
|
||||||
Err(err) => Err(sqlx_error_to_conn_err(err)),
|
Err(err) => Err(sqlx_error_to_conn_err(err)),
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Explicitly close the MySQL connection
|
/// Explicitly close the MySQL connection
|
||||||
|
@ -97,16 +97,13 @@ impl SqlxPostgresPoolConnection {
|
|||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
let query = sqlx_query(&stmt);
|
let query = sqlx_query(&stmt);
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||||
match query.execute(conn).await {
|
match query.execute(conn).await {
|
||||||
Ok(res) => Ok(res.into()),
|
Ok(res) => Ok(res.into()),
|
||||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute an unprepared SQL statement on a PostgreSQL backend
|
/// Execute an unprepared SQL statement on a PostgreSQL backend
|
||||||
@ -114,14 +111,11 @@ impl SqlxPostgresPoolConnection {
|
|||||||
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
|
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
|
||||||
debug_print!("{}", sql);
|
debug_print!("{}", sql);
|
||||||
|
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
match conn.execute(sql).await {
|
match conn.execute(sql).await {
|
||||||
Ok(res) => Ok(res.into()),
|
Ok(res) => Ok(res.into()),
|
||||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get one result from a SQL query. Returns [Option::None] if no match was found
|
/// Get one result from a SQL query. Returns [Option::None] if no match was found
|
||||||
@ -130,7 +124,7 @@ impl SqlxPostgresPoolConnection {
|
|||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
let query = sqlx_query(&stmt);
|
let query = sqlx_query(&stmt);
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||||
match query.fetch_one(conn).await {
|
match query.fetch_one(conn).await {
|
||||||
Ok(row) => Ok(Some(row.into())),
|
Ok(row) => Ok(Some(row.into())),
|
||||||
@ -140,9 +134,6 @@ impl SqlxPostgresPoolConnection {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the results of a query returning them as a Vec<[QueryResult]>
|
/// Get the results of a query returning them as a Vec<[QueryResult]>
|
||||||
@ -151,16 +142,13 @@ impl SqlxPostgresPoolConnection {
|
|||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
let query = sqlx_query(&stmt);
|
let query = sqlx_query(&stmt);
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||||
match query.fetch_all(conn).await {
|
match query.fetch_all(conn).await {
|
||||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stream the results of executing a SQL query
|
/// Stream the results of executing a SQL query
|
||||||
@ -168,15 +156,12 @@ impl SqlxPostgresPoolConnection {
|
|||||||
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
|
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
|
||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
if let Ok(conn) = self.pool.acquire().await {
|
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
Ok(QueryStream::from((
|
Ok(QueryStream::from((
|
||||||
conn,
|
conn,
|
||||||
stmt,
|
stmt,
|
||||||
self.metric_callback.clone(),
|
self.metric_callback.clone(),
|
||||||
)))
|
)))
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bundle a set of SQL statements that execute together.
|
/// Bundle a set of SQL statements that execute together.
|
||||||
@ -186,7 +171,7 @@ impl SqlxPostgresPoolConnection {
|
|||||||
isolation_level: Option<IsolationLevel>,
|
isolation_level: Option<IsolationLevel>,
|
||||||
access_mode: Option<AccessMode>,
|
access_mode: Option<AccessMode>,
|
||||||
) -> Result<DatabaseTransaction, DbErr> {
|
) -> Result<DatabaseTransaction, DbErr> {
|
||||||
if let Ok(conn) = self.pool.acquire().await {
|
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
DatabaseTransaction::new_postgres(
|
DatabaseTransaction::new_postgres(
|
||||||
conn,
|
conn,
|
||||||
self.metric_callback.clone(),
|
self.metric_callback.clone(),
|
||||||
@ -194,9 +179,6 @@ impl SqlxPostgresPoolConnection {
|
|||||||
access_mode,
|
access_mode,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a PostgreSQL transaction
|
/// Create a PostgreSQL transaction
|
||||||
@ -215,7 +197,7 @@ impl SqlxPostgresPoolConnection {
|
|||||||
T: Send,
|
T: Send,
|
||||||
E: std::error::Error + Send,
|
E: std::error::Error + Send,
|
||||||
{
|
{
|
||||||
if let Ok(conn) = self.pool.acquire().await {
|
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
let transaction = DatabaseTransaction::new_postgres(
|
let transaction = DatabaseTransaction::new_postgres(
|
||||||
conn,
|
conn,
|
||||||
self.metric_callback.clone(),
|
self.metric_callback.clone(),
|
||||||
@ -225,9 +207,6 @@ impl SqlxPostgresPoolConnection {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| TransactionError::Connection(e))?;
|
.map_err(|e| TransactionError::Connection(e))?;
|
||||||
transaction.run(callback).await
|
transaction.run(callback).await
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire.into())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
|
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
|
||||||
@ -239,14 +218,11 @@ impl SqlxPostgresPoolConnection {
|
|||||||
|
|
||||||
/// Checks if a connection to the database is still valid.
|
/// Checks if a connection to the database is still valid.
|
||||||
pub async fn ping(&self) -> Result<(), DbErr> {
|
pub async fn ping(&self) -> Result<(), DbErr> {
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
match conn.ping().await {
|
match conn.ping().await {
|
||||||
Ok(_) => Ok(()),
|
Ok(_) => Ok(()),
|
||||||
Err(err) => Err(sqlx_error_to_conn_err(err)),
|
Err(err) => Err(sqlx_error_to_conn_err(err)),
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Explicitly close the Postgres connection
|
/// Explicitly close the Postgres connection
|
||||||
|
@ -89,16 +89,13 @@ impl SqlxSqlitePoolConnection {
|
|||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
let query = sqlx_query(&stmt);
|
let query = sqlx_query(&stmt);
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||||
match query.execute(conn).await {
|
match query.execute(conn).await {
|
||||||
Ok(res) => Ok(res.into()),
|
Ok(res) => Ok(res.into()),
|
||||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute an unprepared SQL statement on a SQLite backend
|
/// Execute an unprepared SQL statement on a SQLite backend
|
||||||
@ -106,14 +103,11 @@ impl SqlxSqlitePoolConnection {
|
|||||||
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
|
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
|
||||||
debug_print!("{}", sql);
|
debug_print!("{}", sql);
|
||||||
|
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
match conn.execute(sql).await {
|
match conn.execute(sql).await {
|
||||||
Ok(res) => Ok(res.into()),
|
Ok(res) => Ok(res.into()),
|
||||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get one result from a SQL query. Returns [Option::None] if no match was found
|
/// Get one result from a SQL query. Returns [Option::None] if no match was found
|
||||||
@ -122,7 +116,7 @@ impl SqlxSqlitePoolConnection {
|
|||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
let query = sqlx_query(&stmt);
|
let query = sqlx_query(&stmt);
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||||
match query.fetch_one(conn).await {
|
match query.fetch_one(conn).await {
|
||||||
Ok(row) => Ok(Some(row.into())),
|
Ok(row) => Ok(Some(row.into())),
|
||||||
@ -132,9 +126,6 @@ impl SqlxSqlitePoolConnection {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the results of a query returning them as a Vec<[QueryResult]>
|
/// Get the results of a query returning them as a Vec<[QueryResult]>
|
||||||
@ -143,16 +134,13 @@ impl SqlxSqlitePoolConnection {
|
|||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
let query = sqlx_query(&stmt);
|
let query = sqlx_query(&stmt);
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||||
match query.fetch_all(conn).await {
|
match query.fetch_all(conn).await {
|
||||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stream the results of executing a SQL query
|
/// Stream the results of executing a SQL query
|
||||||
@ -160,15 +148,12 @@ impl SqlxSqlitePoolConnection {
|
|||||||
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
|
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
|
||||||
debug_print!("{}", stmt);
|
debug_print!("{}", stmt);
|
||||||
|
|
||||||
if let Ok(conn) = self.pool.acquire().await {
|
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
Ok(QueryStream::from((
|
Ok(QueryStream::from((
|
||||||
conn,
|
conn,
|
||||||
stmt,
|
stmt,
|
||||||
self.metric_callback.clone(),
|
self.metric_callback.clone(),
|
||||||
)))
|
)))
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bundle a set of SQL statements that execute together.
|
/// Bundle a set of SQL statements that execute together.
|
||||||
@ -178,7 +163,7 @@ impl SqlxSqlitePoolConnection {
|
|||||||
isolation_level: Option<IsolationLevel>,
|
isolation_level: Option<IsolationLevel>,
|
||||||
access_mode: Option<AccessMode>,
|
access_mode: Option<AccessMode>,
|
||||||
) -> Result<DatabaseTransaction, DbErr> {
|
) -> Result<DatabaseTransaction, DbErr> {
|
||||||
if let Ok(conn) = self.pool.acquire().await {
|
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
DatabaseTransaction::new_sqlite(
|
DatabaseTransaction::new_sqlite(
|
||||||
conn,
|
conn,
|
||||||
self.metric_callback.clone(),
|
self.metric_callback.clone(),
|
||||||
@ -186,9 +171,6 @@ impl SqlxSqlitePoolConnection {
|
|||||||
access_mode,
|
access_mode,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a MySQL transaction
|
/// Create a MySQL transaction
|
||||||
@ -207,7 +189,7 @@ impl SqlxSqlitePoolConnection {
|
|||||||
T: Send,
|
T: Send,
|
||||||
E: std::error::Error + Send,
|
E: std::error::Error + Send,
|
||||||
{
|
{
|
||||||
if let Ok(conn) = self.pool.acquire().await {
|
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
let transaction = DatabaseTransaction::new_sqlite(
|
let transaction = DatabaseTransaction::new_sqlite(
|
||||||
conn,
|
conn,
|
||||||
self.metric_callback.clone(),
|
self.metric_callback.clone(),
|
||||||
@ -217,9 +199,6 @@ impl SqlxSqlitePoolConnection {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| TransactionError::Connection(e))?;
|
.map_err(|e| TransactionError::Connection(e))?;
|
||||||
transaction.run(callback).await
|
transaction.run(callback).await
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire.into())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
|
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
|
||||||
@ -231,14 +210,11 @@ impl SqlxSqlitePoolConnection {
|
|||||||
|
|
||||||
/// Checks if a connection to the database is still valid.
|
/// Checks if a connection to the database is still valid.
|
||||||
pub async fn ping(&self) -> Result<(), DbErr> {
|
pub async fn ping(&self) -> Result<(), DbErr> {
|
||||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
|
||||||
match conn.ping().await {
|
match conn.ping().await {
|
||||||
Ok(_) => Ok(()),
|
Ok(_) => Ok(()),
|
||||||
Err(err) => Err(sqlx_error_to_conn_err(err)),
|
Err(err) => Err(sqlx_error_to_conn_err(err)),
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(DbErr::ConnectionAcquire)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Explicitly close the SQLite connection
|
/// Explicitly close the SQLite connection
|
||||||
|
25
src/error.rs
25
src/error.rs
@ -16,8 +16,8 @@ use thiserror::Error;
|
|||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum DbErr {
|
pub enum DbErr {
|
||||||
/// This error can happen when the connection pool is fully-utilized
|
/// This error can happen when the connection pool is fully-utilized
|
||||||
#[error("Failed to acquire connection from pool")]
|
#[error("Failed to acquire connection from pool: {0}")]
|
||||||
ConnectionAcquire,
|
ConnectionAcquire(ConnAcquireErr),
|
||||||
/// Runtime type conversion error
|
/// Runtime type conversion error
|
||||||
#[error("Error converting `{from}` into `{into}`: {source}")]
|
#[error("Error converting `{from}` into `{into}`: {source}")]
|
||||||
TryIntoErr {
|
TryIntoErr {
|
||||||
@ -75,6 +75,17 @@ pub enum DbErr {
|
|||||||
RecordNotUpdated,
|
RecordNotUpdated,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Connection error
|
||||||
|
#[derive(Error, Debug, PartialEq, Eq)]
|
||||||
|
pub enum ConnAcquireErr {
|
||||||
|
/// Connection Timed Out
|
||||||
|
#[error("Connection Timed out")]
|
||||||
|
Timeout,
|
||||||
|
/// Unavailable connection
|
||||||
|
#[error("Connection closed by host")]
|
||||||
|
ConnectionClosed,
|
||||||
|
}
|
||||||
|
|
||||||
/// Runtime error
|
/// Runtime error
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum RuntimeErr {
|
pub enum RuntimeErr {
|
||||||
@ -140,6 +151,16 @@ where
|
|||||||
DbErr::Json(s.to_string())
|
DbErr::Json(s.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
#[cfg(feature = "sqlx-dep")]
|
||||||
|
pub(crate) fn conn_acquire_err(sqlx_err: sqlx::Error) -> DbErr {
|
||||||
|
match sqlx_err {
|
||||||
|
sqlx::Error::PoolTimedOut => DbErr::ConnectionAcquire(ConnAcquireErr::Timeout),
|
||||||
|
sqlx::Error::PoolClosed => DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed),
|
||||||
|
_ => DbErr::Conn(RuntimeErr::SqlxError(sqlx_err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// An error from unsuccessful SQL query
|
/// An error from unsuccessful SQL query
|
||||||
#[derive(Error, Debug, Clone, PartialEq, Eq)]
|
#[derive(Error, Debug, Clone, PartialEq, Eq)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
|
@ -25,7 +25,38 @@ pub async fn connection_ping_closed_mysql() {
|
|||||||
let ctx_ping = std::rc::Rc::clone(&ctx);
|
let ctx_ping = std::rc::Rc::clone(&ctx);
|
||||||
|
|
||||||
ctx.db.get_mysql_connection_pool().close().await;
|
ctx.db.get_mysql_connection_pool().close().await;
|
||||||
assert_eq!(ctx_ping.db.ping().await, Err(DbErr::ConnectionAcquire));
|
assert_eq!(
|
||||||
|
ctx_ping.db.ping().await,
|
||||||
|
Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed))
|
||||||
|
);
|
||||||
|
|
||||||
|
let base_url = std::env::var("DATABASE_URL").unwrap();
|
||||||
|
let mut opt = sea_orm::ConnectOptions::new(format!("{base_url}/connection_ping_closed"));
|
||||||
|
opt
|
||||||
|
// The connection pool has a single connection only
|
||||||
|
.max_connections(1)
|
||||||
|
// A controlled connection acquire timeout
|
||||||
|
.acquire_timeout(std::time::Duration::from_secs(2));
|
||||||
|
|
||||||
|
let db = sea_orm::Database::connect(opt).await.unwrap();
|
||||||
|
|
||||||
|
async fn transaction_blocked(db: &DatabaseConnection) {
|
||||||
|
let _txn = sea_orm::TransactionTrait::begin(db).await.unwrap();
|
||||||
|
// Occupy the only connection, thus forcing others fail to acquire connection
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn transaction(db: &DatabaseConnection) {
|
||||||
|
// Should fail to acquire
|
||||||
|
let txn = sea_orm::TransactionTrait::begin(db).await;
|
||||||
|
assert_eq!(
|
||||||
|
txn.expect_err("should be a time out"),
|
||||||
|
crate::DbErr::ConnectionAcquire(ConnAcquireErr::Timeout)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::join!(transaction_blocked(&db), transaction(&db));
|
||||||
|
|
||||||
ctx.delete().await;
|
ctx.delete().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,7 +67,38 @@ pub async fn connection_ping_closed_sqlite() {
|
|||||||
let ctx_ping = std::rc::Rc::clone(&ctx);
|
let ctx_ping = std::rc::Rc::clone(&ctx);
|
||||||
|
|
||||||
ctx.db.get_sqlite_connection_pool().close().await;
|
ctx.db.get_sqlite_connection_pool().close().await;
|
||||||
assert_eq!(ctx_ping.db.ping().await, Err(DbErr::ConnectionAcquire));
|
assert_eq!(
|
||||||
|
ctx_ping.db.ping().await,
|
||||||
|
Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed))
|
||||||
|
);
|
||||||
|
|
||||||
|
let base_url = std::env::var("DATABASE_URL").unwrap();
|
||||||
|
let mut opt = sea_orm::ConnectOptions::new(base_url);
|
||||||
|
opt
|
||||||
|
// The connection pool has a single connection only
|
||||||
|
.max_connections(1)
|
||||||
|
// A controlled connection acquire timeout
|
||||||
|
.acquire_timeout(std::time::Duration::from_secs(2));
|
||||||
|
|
||||||
|
let db = sea_orm::Database::connect(opt).await.unwrap();
|
||||||
|
|
||||||
|
async fn transaction_blocked(db: &DatabaseConnection) {
|
||||||
|
let _txn = sea_orm::TransactionTrait::begin(db).await.unwrap();
|
||||||
|
// Occupy the only connection, thus forcing others fail to acquire connection
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn transaction(db: &DatabaseConnection) {
|
||||||
|
// Should fail to acquire
|
||||||
|
let txn = sea_orm::TransactionTrait::begin(db).await;
|
||||||
|
assert_eq!(
|
||||||
|
txn.expect_err("should be a time out"),
|
||||||
|
crate::DbErr::ConnectionAcquire(ConnAcquireErr::Timeout)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::join!(transaction_blocked(&db), transaction(&db));
|
||||||
|
|
||||||
ctx.delete().await;
|
ctx.delete().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,6 +109,37 @@ pub async fn connection_ping_closed_postgres() {
|
|||||||
let ctx_ping = std::rc::Rc::clone(&ctx);
|
let ctx_ping = std::rc::Rc::clone(&ctx);
|
||||||
|
|
||||||
ctx.db.get_postgres_connection_pool().close().await;
|
ctx.db.get_postgres_connection_pool().close().await;
|
||||||
assert_eq!(ctx_ping.db.ping().await, Err(DbErr::ConnectionAcquire));
|
assert_eq!(
|
||||||
|
ctx_ping.db.ping().await,
|
||||||
|
Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed))
|
||||||
|
);
|
||||||
|
|
||||||
|
let base_url = std::env::var("DATABASE_URL").unwrap();
|
||||||
|
let mut opt = sea_orm::ConnectOptions::new(format!("{base_url}/connection_ping_closed"));
|
||||||
|
opt
|
||||||
|
// The connection pool has a single connection only
|
||||||
|
.max_connections(1)
|
||||||
|
// A controlled connection acquire timeout
|
||||||
|
.acquire_timeout(std::time::Duration::from_secs(2));
|
||||||
|
|
||||||
|
let db = sea_orm::Database::connect(opt).await.unwrap();
|
||||||
|
|
||||||
|
async fn transaction_blocked(db: &DatabaseConnection) {
|
||||||
|
let _txn = sea_orm::TransactionTrait::begin(db).await.unwrap();
|
||||||
|
// Occupy the only connection, thus forcing others fail to acquire connection
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn transaction(db: &DatabaseConnection) {
|
||||||
|
// Should fail to acquire
|
||||||
|
let txn = sea_orm::TransactionTrait::begin(db).await;
|
||||||
|
assert_eq!(
|
||||||
|
txn.expect_err("should be a time out"),
|
||||||
|
crate::DbErr::ConnectionAcquire(ConnAcquireErr::Timeout)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::join!(transaction_blocked(&db), transaction(&db));
|
||||||
|
|
||||||
ctx.delete().await;
|
ctx.delete().await;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user