Feat: Expose ping method from SQLx (#1627)

* feat: add connection.ping method

* fmt

* impl `ConnectionTrait::ping` method for `SchemaManagerConnection`

* Mock connection should always be online

* remove needless clippy lint

* fmt

* Remove needless instrument

* Update src/driver/sqlx_sqlite.rs

* <feat>(Ping) removed ping() from ConnectionTrait

* removed ping from transaction

* removed unused imports

* Revert

---------

Co-authored-by: Billy Chan <ccw.billy.123@gmail.com>
Co-authored-by: Chris Tsang <chris.2y3@outlook.com>
This commit is contained in:
Clément Guiton 2023-06-01 12:21:38 +02:00 committed by GitHub
parent 3343e2118b
commit 3ccb9cdbf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 3 deletions

View File

@ -390,6 +390,21 @@ impl DatabaseConnection {
} }
} }
/// Checks if a connection to the database is still valid.
pub async fn ping(&self) -> Result<(), DbErr> {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.ping().await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.ping().await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.ping().await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.ping(),
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
}
/// Explicitly close the database connection /// Explicitly close the database connection
pub async fn close(self) -> Result<(), DbErr> { pub async fn close(self) -> Result<(), DbErr> {
match self { match self {

View File

@ -203,6 +203,10 @@ impl MockDatabaseTrait for MockDatabase {
fn get_database_backend(&self) -> DbBackend { fn get_database_backend(&self) -> DbBackend {
self.db_backend self.db_backend
} }
fn ping(&self) -> Result<(), DbErr> {
Ok(())
}
} }
impl MockRow { impl MockRow {

View File

@ -47,6 +47,9 @@ pub trait MockDatabaseTrait: Send + Debug {
/// Get the backend being used in the [MockDatabase] /// Get the backend being used in the [MockDatabase]
fn get_database_backend(&self) -> DbBackend; fn get_database_backend(&self) -> DbBackend;
/// Ping the [MockDatabase]
fn ping(&self) -> Result<(), DbErr>;
} }
impl MockDatabaseConnector { impl MockDatabaseConnector {
@ -194,4 +197,9 @@ impl MockDatabaseConnection {
.expect("Failed to acquire mocker") .expect("Failed to acquire mocker")
.rollback() .rollback()
} }
/// Checks if a connection to the database is still valid.
pub fn ping(&self) -> Result<(), DbErr> {
self.mocker.lock().map_err(query_err)?.ping()
}
} }

View File

@ -4,7 +4,7 @@ use std::{future::Future, pin::Pin, sync::Arc};
use sqlx::{ use sqlx::{
mysql::{MySqlConnectOptions, MySqlQueryResult, MySqlRow}, mysql::{MySqlConnectOptions, MySqlQueryResult, MySqlRow},
pool::PoolConnection, pool::PoolConnection,
Executor, MySql, MySqlPool, Connection, Executor, MySql, MySqlPool,
}; };
use sea_query_binder::SqlxValues; use sea_query_binder::SqlxValues;
@ -222,6 +222,18 @@ impl SqlxMySqlPoolConnection {
self.metric_callback = Some(Arc::new(callback)); self.metric_callback = Some(Arc::new(callback));
} }
/// Checks if a connection to the database is still valid.
pub async fn ping(&self) -> Result<(), DbErr> {
if let Ok(conn) = &mut self.pool.acquire().await {
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => Err(sqlx_error_to_conn_err(err)),
}
} else {
Err(DbErr::ConnectionAcquire)
}
}
/// Explicitly close the MySQL connection /// Explicitly close the MySQL connection
pub async fn close(self) -> Result<(), DbErr> { pub async fn close(self) -> Result<(), DbErr> {
self.pool.close().await; self.pool.close().await;

View File

@ -4,7 +4,7 @@ use std::{future::Future, pin::Pin, sync::Arc};
use sqlx::{ use sqlx::{
pool::PoolConnection, pool::PoolConnection,
postgres::{PgConnectOptions, PgQueryResult, PgRow}, postgres::{PgConnectOptions, PgQueryResult, PgRow},
Executor, PgPool, Postgres, Connection, Executor, PgPool, Postgres,
}; };
use sea_query_binder::SqlxValues; use sea_query_binder::SqlxValues;
@ -237,6 +237,18 @@ impl SqlxPostgresPoolConnection {
self.metric_callback = Some(Arc::new(callback)); self.metric_callback = Some(Arc::new(callback));
} }
/// Checks if a connection to the database is still valid.
pub async fn ping(&self) -> Result<(), DbErr> {
if let Ok(conn) = &mut self.pool.acquire().await {
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => Err(sqlx_error_to_conn_err(err)),
}
} else {
Err(DbErr::ConnectionAcquire)
}
}
/// Explicitly close the Postgres connection /// Explicitly close the Postgres connection
pub async fn close(self) -> Result<(), DbErr> { pub async fn close(self) -> Result<(), DbErr> {
self.pool.close().await; self.pool.close().await;

View File

@ -4,7 +4,7 @@ use std::{future::Future, pin::Pin, sync::Arc};
use sqlx::{ use sqlx::{
pool::PoolConnection, pool::PoolConnection,
sqlite::{SqliteConnectOptions, SqliteQueryResult, SqliteRow}, sqlite::{SqliteConnectOptions, SqliteQueryResult, SqliteRow},
Executor, Sqlite, SqlitePool, Connection, Executor, Sqlite, SqlitePool,
}; };
use sea_query_binder::SqlxValues; use sea_query_binder::SqlxValues;
@ -229,6 +229,18 @@ impl SqlxSqlitePoolConnection {
self.metric_callback = Some(Arc::new(callback)); self.metric_callback = Some(Arc::new(callback));
} }
/// Checks if a connection to the database is still valid.
pub async fn ping(&self) -> Result<(), DbErr> {
if let Ok(conn) = &mut self.pool.acquire().await {
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => Err(sqlx_error_to_conn_err(err)),
}
} else {
Err(DbErr::ConnectionAcquire)
}
}
/// Explicitly close the SQLite connection /// Explicitly close the SQLite connection
pub async fn close(self) -> Result<(), DbErr> { pub async fn close(self) -> Result<(), DbErr> {
self.pool.close().await; self.pool.close().await;

53
tests/connection_tests.rs Normal file
View File

@ -0,0 +1,53 @@
pub mod common;
pub use common::{bakery_chain::*, setup::*, TestContext};
use pretty_assertions::assert_eq;
pub use sea_orm::entity::*;
pub use sea_orm::*;
#[sea_orm_macros::test]
#[cfg(any(
feature = "sqlx-mysql",
feature = "sqlx-sqlite",
feature = "sqlx-postgres"
))]
pub async fn connection_ping() {
let ctx = TestContext::new("connection_ping").await;
ctx.db.ping().await.unwrap();
ctx.delete().await;
}
#[sea_orm_macros::test]
#[cfg(feature = "sqlx-mysql")]
pub async fn connection_ping_closed_mysql() {
let ctx = std::rc::Rc::new(Box::new(TestContext::new("connection_ping_closed").await));
let ctx_ping = std::rc::Rc::clone(&ctx);
ctx.db.get_mysql_connection_pool().close().await;
assert_eq!(ctx_ping.db.ping().await, Err(DbErr::ConnectionAcquire));
ctx.delete().await;
}
#[sea_orm_macros::test]
#[cfg(feature = "sqlx-sqlite")]
pub async fn connection_ping_closed_sqlite() {
let ctx = std::rc::Rc::new(Box::new(TestContext::new("connection_ping_closed").await));
let ctx_ping = std::rc::Rc::clone(&ctx);
ctx.db.get_sqlite_connection_pool().close().await;
assert_eq!(ctx_ping.db.ping().await, Err(DbErr::ConnectionAcquire));
ctx.delete().await;
}
#[sea_orm_macros::test]
#[cfg(feature = "sqlx-postgres")]
pub async fn connection_ping_closed_postgres() {
let ctx = std::rc::Rc::new(Box::new(TestContext::new("connection_ping_closed").await));
let ctx_ping = std::rc::Rc::clone(&ctx);
ctx.db.get_postgres_connection_pool().close().await;
assert_eq!(ctx_ping.db.ping().await, Err(DbErr::ConnectionAcquire));
ctx.delete().await;
}