Merge pull request #373 from nappa85/master
This commit is contained in:
commit
7da5b6bf90
@ -27,7 +27,7 @@ async-trait = { version = "^0.1" }
|
||||
chrono = { version = "^0", optional = true }
|
||||
futures = { version = "^0.3" }
|
||||
futures-util = { version = "^0.3" }
|
||||
log = { version = "^0.4", optional = true }
|
||||
tracing = "0.1"
|
||||
rust_decimal = { version = "^1", optional = true }
|
||||
sea-orm-macros = { version = "^0.4.2", path = "sea-orm-macros", optional = true }
|
||||
sea-query = { version = "^0.19.4", features = ["thread-safe"] }
|
||||
@ -36,8 +36,9 @@ serde = { version = "^1.0", features = ["derive"] }
|
||||
serde_json = { version = "^1", optional = true }
|
||||
sqlx = { version = "^0.5", optional = true }
|
||||
uuid = { version = "0.8", features = ["serde", "v4"], optional = true }
|
||||
ouroboros = "0.11"
|
||||
ouroboros = "0.14"
|
||||
url = "^2.2"
|
||||
once_cell = "1.8"
|
||||
|
||||
[dev-dependencies]
|
||||
smol = { version = "^1.2" }
|
||||
@ -47,12 +48,12 @@ tokio = { version = "^1.6", features = ["full"] }
|
||||
actix-rt = { version = "2.2.0" }
|
||||
maplit = { version = "^1" }
|
||||
rust_decimal_macros = { version = "^1" }
|
||||
env_logger = { version = "^0.9" }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
sea-orm = { path = ".", features = ["debug-print"] }
|
||||
pretty_assertions = { version = "^0.7" }
|
||||
|
||||
[features]
|
||||
debug-print = ["log"]
|
||||
debug-print = []
|
||||
default = [
|
||||
"macros",
|
||||
"mock",
|
||||
|
@ -596,9 +596,8 @@ pub fn test(_: TokenStream, input: TokenStream) -> TokenStream {
|
||||
#[test]
|
||||
#(#attrs)*
|
||||
fn #name() #ret {
|
||||
let _ = ::env_logger::builder()
|
||||
.filter_level(::log::LevelFilter::Debug)
|
||||
.is_test(true)
|
||||
let _ = ::tracing_subscriber::fmt()
|
||||
.with_max_level(::tracing::Level::DEBUG)
|
||||
.try_init();
|
||||
crate::block_on!(async { #body })
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ use crate::{
|
||||
StatementBuilder, TransactionError,
|
||||
};
|
||||
use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder};
|
||||
use tracing::instrument;
|
||||
use std::{future::Future, pin::Pin};
|
||||
use url::Url;
|
||||
|
||||
@ -49,6 +50,7 @@ pub enum DatabaseBackend {
|
||||
|
||||
/// The same as [DatabaseBackend] just shorter :)
|
||||
pub type DbBackend = DatabaseBackend;
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum InnerConnection {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
MySql(PoolConnection<sqlx::MySql>),
|
||||
@ -104,6 +106,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
|
||||
match self {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
@ -118,6 +121,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
|
||||
match self {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
@ -132,6 +136,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
|
||||
match self {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
@ -146,6 +151,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
fn stream(
|
||||
&'a self,
|
||||
stmt: Statement,
|
||||
@ -160,13 +166,14 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection {
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.stream(stmt).await?,
|
||||
#[cfg(feature = "mock")]
|
||||
DatabaseConnection::MockDatabaseConnection(conn) => {
|
||||
crate::QueryStream::from((Arc::clone(conn), stmt))
|
||||
crate::QueryStream::from((Arc::clone(conn), stmt, None))
|
||||
}
|
||||
DatabaseConnection::Disconnected => panic!("Disconnected"),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||
match self {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
@ -177,7 +184,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection {
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin().await,
|
||||
#[cfg(feature = "mock")]
|
||||
DatabaseConnection::MockDatabaseConnection(conn) => {
|
||||
DatabaseTransaction::new_mock(Arc::clone(conn)).await
|
||||
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
|
||||
}
|
||||
DatabaseConnection::Disconnected => panic!("Disconnected"),
|
||||
}
|
||||
@ -185,6 +192,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection {
|
||||
|
||||
/// Execute the function inside a transaction.
|
||||
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
|
||||
#[instrument(level = "trace", skip(_callback))]
|
||||
async fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'c> FnOnce(
|
||||
@ -205,7 +213,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection {
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.transaction(_callback).await,
|
||||
#[cfg(feature = "mock")]
|
||||
DatabaseConnection::MockDatabaseConnection(conn) => {
|
||||
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn))
|
||||
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None)
|
||||
.await
|
||||
.map_err(TransactionError::Connection)?;
|
||||
transaction.run(_callback).await
|
||||
@ -237,6 +245,24 @@ impl DatabaseConnection {
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseConnection {
|
||||
/// Sets a callback to metric this connection
|
||||
pub fn set_metric_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
|
||||
{
|
||||
match self {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.set_metric_callback(callback),
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.set_metric_callback(callback),
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.set_metric_callback(callback),
|
||||
_ => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DbBackend {
|
||||
/// Check if the URI is the same as the specified database backend.
|
||||
/// Returns true if they match.
|
||||
|
@ -4,6 +4,7 @@ use crate::{
|
||||
Statement,
|
||||
};
|
||||
use sea_query::{Value, ValueType, Values};
|
||||
use tracing::instrument;
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
/// Defines a Mock database suitable for testing
|
||||
@ -89,6 +90,7 @@ impl MockDatabase {
|
||||
}
|
||||
|
||||
impl MockDatabaseTrait for MockDatabase {
|
||||
#[instrument(level = "trace")]
|
||||
fn execute(&mut self, counter: usize, statement: Statement) -> Result<ExecResult, DbErr> {
|
||||
if let Some(transaction) = &mut self.transaction {
|
||||
transaction.push(statement);
|
||||
@ -104,6 +106,7 @@ impl MockDatabaseTrait for MockDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
fn query(&mut self, counter: usize, statement: Statement) -> Result<Vec<QueryResult>, DbErr> {
|
||||
if let Some(transaction) = &mut self.transaction {
|
||||
transaction.push(statement);
|
||||
@ -122,6 +125,7 @@ impl MockDatabaseTrait for MockDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
fn begin(&mut self) {
|
||||
if self.transaction.is_some() {
|
||||
self.transaction
|
||||
@ -133,6 +137,7 @@ impl MockDatabaseTrait for MockDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
fn commit(&mut self) {
|
||||
if self.transaction.is_some() {
|
||||
if self.transaction.as_mut().unwrap().commit(self.db_backend) {
|
||||
@ -144,6 +149,7 @@ impl MockDatabaseTrait for MockDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
fn rollback(&mut self) {
|
||||
if self.transaction.is_some() {
|
||||
if self.transaction.as_mut().unwrap().rollback(self.db_backend) {
|
||||
|
@ -14,6 +14,7 @@ pub use db_connection::*;
|
||||
pub use mock::*;
|
||||
pub use statement::*;
|
||||
pub use stream::*;
|
||||
use tracing::instrument;
|
||||
pub use transaction::*;
|
||||
|
||||
use crate::DbErr;
|
||||
@ -42,6 +43,7 @@ pub struct ConnectOptions {
|
||||
|
||||
impl Database {
|
||||
/// Method to create a [DatabaseConnection] on a database
|
||||
#[instrument(level = "trace", skip(opt))]
|
||||
pub async fn connect<C>(opt: C) -> Result<DatabaseConnection, DbErr>
|
||||
where
|
||||
C: Into<ConnectOptions>,
|
||||
|
@ -12,6 +12,8 @@ use futures::TryStreamExt;
|
||||
#[cfg(feature = "sqlx-dep")]
|
||||
use sqlx::{pool::PoolConnection, Executor};
|
||||
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{DbErr, InnerConnection, QueryResult, Statement};
|
||||
|
||||
/// Creates a stream from a [QueryResult]
|
||||
@ -19,36 +21,37 @@ use crate::{DbErr, InnerConnection, QueryResult, Statement};
|
||||
pub struct QueryStream {
|
||||
stmt: Statement,
|
||||
conn: InnerConnection,
|
||||
#[borrows(mut conn, stmt)]
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
#[borrows(mut conn, stmt, metric_callback)]
|
||||
#[not_covariant]
|
||||
stream: Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>> + 'this>>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
impl From<(PoolConnection<sqlx::MySql>, Statement)> for QueryStream {
|
||||
fn from((conn, stmt): (PoolConnection<sqlx::MySql>, Statement)) -> Self {
|
||||
QueryStream::build(stmt, InnerConnection::MySql(conn))
|
||||
impl From<(PoolConnection<sqlx::MySql>, Statement, Option<crate::metric::Callback>)> for QueryStream {
|
||||
fn from((conn, stmt, metric_callback): (PoolConnection<sqlx::MySql>, Statement, Option<crate::metric::Callback>)) -> Self {
|
||||
QueryStream::build(stmt, InnerConnection::MySql(conn), metric_callback)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
impl From<(PoolConnection<sqlx::Postgres>, Statement)> for QueryStream {
|
||||
fn from((conn, stmt): (PoolConnection<sqlx::Postgres>, Statement)) -> Self {
|
||||
QueryStream::build(stmt, InnerConnection::Postgres(conn))
|
||||
impl From<(PoolConnection<sqlx::Postgres>, Statement, Option<crate::metric::Callback>)> for QueryStream {
|
||||
fn from((conn, stmt, metric_callback): (PoolConnection<sqlx::Postgres>, Statement, Option<crate::metric::Callback>)) -> Self {
|
||||
QueryStream::build(stmt, InnerConnection::Postgres(conn), metric_callback)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
impl From<(PoolConnection<sqlx::Sqlite>, Statement)> for QueryStream {
|
||||
fn from((conn, stmt): (PoolConnection<sqlx::Sqlite>, Statement)) -> Self {
|
||||
QueryStream::build(stmt, InnerConnection::Sqlite(conn))
|
||||
impl From<(PoolConnection<sqlx::Sqlite>, Statement, Option<crate::metric::Callback>)> for QueryStream {
|
||||
fn from((conn, stmt, metric_callback): (PoolConnection<sqlx::Sqlite>, Statement, Option<crate::metric::Callback>)) -> Self {
|
||||
QueryStream::build(stmt, InnerConnection::Sqlite(conn), metric_callback)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "mock")]
|
||||
impl From<(Arc<crate::MockDatabaseConnection>, Statement)> for QueryStream {
|
||||
fn from((conn, stmt): (Arc<crate::MockDatabaseConnection>, Statement)) -> Self {
|
||||
QueryStream::build(stmt, InnerConnection::Mock(conn))
|
||||
impl From<(Arc<crate::MockDatabaseConnection>, Statement, Option<crate::metric::Callback>)> for QueryStream {
|
||||
fn from((conn, stmt, metric_callback): (Arc<crate::MockDatabaseConnection>, Statement, Option<crate::metric::Callback>)) -> Self {
|
||||
QueryStream::build(stmt, InnerConnection::Mock(conn), metric_callback)
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,41 +62,51 @@ impl std::fmt::Debug for QueryStream {
|
||||
}
|
||||
|
||||
impl QueryStream {
|
||||
fn build(stmt: Statement, conn: InnerConnection) -> QueryStream {
|
||||
#[instrument(level = "trace", skip(metric_callback))]
|
||||
fn build(stmt: Statement, conn: InnerConnection, metric_callback: Option<crate::metric::Callback>) -> QueryStream {
|
||||
QueryStreamBuilder {
|
||||
stmt,
|
||||
conn,
|
||||
stream_builder: |conn, stmt| match conn {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
InnerConnection::MySql(c) => {
|
||||
let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
metric_callback,
|
||||
stream_builder: |conn, stmt, metric_callback| {
|
||||
match conn {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
InnerConnection::MySql(c) => {
|
||||
let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
InnerConnection::Postgres(c) => {
|
||||
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
InnerConnection::Sqlite(c) => {
|
||||
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
InnerConnection::Mock(c) => c.fetch(stmt),
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
InnerConnection::Postgres(c) => {
|
||||
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
InnerConnection::Sqlite(c) => {
|
||||
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
InnerConnection::Mock(c) => c.fetch(stmt),
|
||||
},
|
||||
}
|
||||
}
|
||||
.build()
|
||||
}
|
||||
|
@ -11,6 +11,8 @@ use sqlx::Executor;
|
||||
|
||||
use futures::lock::MutexGuard;
|
||||
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{DbErr, InnerConnection, QueryResult, Statement};
|
||||
|
||||
/// `TransactionStream` cannot be used in a `transaction` closure as it does not impl `Send`.
|
||||
@ -19,7 +21,8 @@ use crate::{DbErr, InnerConnection, QueryResult, Statement};
|
||||
pub struct TransactionStream<'a> {
|
||||
stmt: Statement,
|
||||
conn: MutexGuard<'a, InnerConnection>,
|
||||
#[borrows(mut conn, stmt)]
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
#[borrows(mut conn, stmt, metric_callback)]
|
||||
#[not_covariant]
|
||||
stream: Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>> + 'this>>,
|
||||
}
|
||||
@ -31,45 +34,51 @@ impl<'a> std::fmt::Debug for TransactionStream<'a> {
|
||||
}
|
||||
|
||||
impl<'a> TransactionStream<'a> {
|
||||
#[instrument(level = "trace", skip(metric_callback))]
|
||||
pub(crate) async fn build(
|
||||
conn: MutexGuard<'a, InnerConnection>,
|
||||
stmt: Statement,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
) -> TransactionStream<'a> {
|
||||
TransactionStreamAsyncBuilder {
|
||||
stmt,
|
||||
conn,
|
||||
stream_builder: |conn, stmt| {
|
||||
metric_callback,
|
||||
stream_builder: |conn, stmt, metric_callback| {
|
||||
Box::pin(async move {
|
||||
match conn.deref_mut() {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
InnerConnection::MySql(c) => {
|
||||
let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
InnerConnection::Postgres(c) => {
|
||||
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
InnerConnection::Sqlite(c) => {
|
||||
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
InnerConnection::Mock(c) => c.fetch(stmt),
|
||||
|
@ -7,6 +7,7 @@ use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
|
||||
use futures::lock::Mutex;
|
||||
#[cfg(feature = "sqlx-dep")]
|
||||
use sqlx::{pool::PoolConnection, TransactionManager};
|
||||
use tracing::instrument;
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
|
||||
// a Transaction is just a sugar for a connection where START TRANSACTION has been executed
|
||||
@ -16,6 +17,7 @@ pub struct DatabaseTransaction {
|
||||
conn: Arc<Mutex<InnerConnection>>,
|
||||
backend: DbBackend,
|
||||
open: bool,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DatabaseTransaction {
|
||||
@ -28,10 +30,12 @@ impl DatabaseTransaction {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
pub(crate) async fn new_mysql(
|
||||
inner: PoolConnection<sqlx::MySql>,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
Self::begin(
|
||||
Arc::new(Mutex::new(InnerConnection::MySql(inner))),
|
||||
DbBackend::MySql,
|
||||
metric_callback,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@ -39,10 +43,12 @@ impl DatabaseTransaction {
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
pub(crate) async fn new_postgres(
|
||||
inner: PoolConnection<sqlx::Postgres>,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
Self::begin(
|
||||
Arc::new(Mutex::new(InnerConnection::Postgres(inner))),
|
||||
DbBackend::Postgres,
|
||||
metric_callback,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@ -50,10 +56,12 @@ impl DatabaseTransaction {
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
pub(crate) async fn new_sqlite(
|
||||
inner: PoolConnection<sqlx::Sqlite>,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
Self::begin(
|
||||
Arc::new(Mutex::new(InnerConnection::Sqlite(inner))),
|
||||
DbBackend::Sqlite,
|
||||
metric_callback,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@ -61,19 +69,27 @@ impl DatabaseTransaction {
|
||||
#[cfg(feature = "mock")]
|
||||
pub(crate) async fn new_mock(
|
||||
inner: Arc<crate::MockDatabaseConnection>,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
let backend = inner.get_database_backend();
|
||||
Self::begin(Arc::new(Mutex::new(InnerConnection::Mock(inner))), backend).await
|
||||
Self::begin(
|
||||
Arc::new(Mutex::new(InnerConnection::Mock(inner))),
|
||||
backend,
|
||||
metric_callback,
|
||||
).await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(metric_callback))]
|
||||
async fn begin(
|
||||
conn: Arc<Mutex<InnerConnection>>,
|
||||
backend: DbBackend,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
let res = DatabaseTransaction {
|
||||
conn,
|
||||
backend,
|
||||
open: true,
|
||||
metric_callback,
|
||||
};
|
||||
match *res.conn.lock().await {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
@ -104,6 +120,7 @@ impl DatabaseTransaction {
|
||||
|
||||
/// Runs a transaction to completion returning an rolling back the transaction on
|
||||
/// encountering an error if it fails
|
||||
#[instrument(level = "trace", skip(callback))]
|
||||
pub(crate) async fn run<F, T, E>(self, callback: F) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'b> FnOnce(
|
||||
@ -125,6 +142,7 @@ impl DatabaseTransaction {
|
||||
}
|
||||
|
||||
/// Commit a transaction atomically
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn commit(mut self) -> Result<(), DbErr> {
|
||||
self.open = false;
|
||||
match *self.conn.lock().await {
|
||||
@ -155,6 +173,7 @@ impl DatabaseTransaction {
|
||||
}
|
||||
|
||||
/// rolls back a transaction in case error are encountered during the operation
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn rollback(mut self) -> Result<(), DbErr> {
|
||||
self.open = false;
|
||||
match *self.conn.lock().await {
|
||||
@ -185,6 +204,7 @@ impl DatabaseTransaction {
|
||||
}
|
||||
|
||||
// the rollback is queued and will be performed on next async operation, like returning the connection to the pool
|
||||
#[instrument(level = "trace")]
|
||||
fn start_rollback(&mut self) {
|
||||
if self.open {
|
||||
if let Some(mut conn) = self.conn.try_lock() {
|
||||
@ -229,6 +249,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
|
||||
self.backend
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
@ -236,17 +257,23 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
InnerConnection::MySql(conn) => {
|
||||
let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
|
||||
query.execute(conn).await.map(Into::into)
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
query.execute(conn).await.map(Into::into)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
InnerConnection::Postgres(conn) => {
|
||||
let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
|
||||
query.execute(conn).await.map(Into::into)
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
query.execute(conn).await.map(Into::into)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
InnerConnection::Sqlite(conn) => {
|
||||
let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
|
||||
query.execute(conn).await.map(Into::into)
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
query.execute(conn).await.map(Into::into)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
InnerConnection::Mock(conn) => return conn.execute(stmt),
|
||||
@ -255,6 +282,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
|
||||
_res.map_err(sqlx_error_to_exec_err)
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
@ -285,6 +313,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
@ -320,21 +349,28 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
|
||||
_res.map_err(sqlx_error_to_query_err)
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
fn stream(
|
||||
&'a self,
|
||||
stmt: Statement,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Self::Stream, DbErr>> + 'a>> {
|
||||
Box::pin(
|
||||
async move { Ok(crate::TransactionStream::build(self.conn.lock().await, stmt).await) },
|
||||
async move { Ok(crate::TransactionStream::build(self.conn.lock().await, stmt, self.metric_callback.clone()).await) },
|
||||
)
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||
DatabaseTransaction::begin(Arc::clone(&self.conn), self.backend).await
|
||||
DatabaseTransaction::begin(
|
||||
Arc::clone(&self.conn),
|
||||
self.backend,
|
||||
self.metric_callback.clone()
|
||||
).await
|
||||
}
|
||||
|
||||
/// Execute the function inside a transaction.
|
||||
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
|
||||
#[instrument(level = "trace", skip(_callback))]
|
||||
async fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'c> FnOnce(
|
||||
|
@ -3,6 +3,7 @@ use crate::{
|
||||
Statement, Transaction,
|
||||
};
|
||||
use futures::Stream;
|
||||
use tracing::instrument;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
pin::Pin,
|
||||
@ -69,6 +70,7 @@ impl MockDatabaseConnector {
|
||||
|
||||
/// Cpnnect to the [MockDatabase]
|
||||
#[allow(unused_variables)]
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn connect(string: &str) -> Result<DatabaseConnection, DbErr> {
|
||||
macro_rules! connect_mock_db {
|
||||
( $syntax: expr ) => {
|
||||
@ -117,6 +119,7 @@ impl MockDatabaseConnection {
|
||||
}
|
||||
|
||||
/// Execute the SQL statement in the [MockDatabase]
|
||||
#[instrument(level = "trace")]
|
||||
pub fn execute(&self, statement: Statement) -> Result<ExecResult, DbErr> {
|
||||
debug_print!("{}", statement);
|
||||
let counter = self.execute_counter.fetch_add(1, Ordering::SeqCst);
|
||||
@ -124,6 +127,7 @@ impl MockDatabaseConnection {
|
||||
}
|
||||
|
||||
/// Return one [QueryResult] if the query was successful
|
||||
#[instrument(level = "trace")]
|
||||
pub fn query_one(&self, statement: Statement) -> Result<Option<QueryResult>, DbErr> {
|
||||
debug_print!("{}", statement);
|
||||
let counter = self.query_counter.fetch_add(1, Ordering::SeqCst);
|
||||
@ -132,6 +136,7 @@ impl MockDatabaseConnection {
|
||||
}
|
||||
|
||||
/// Return all [QueryResult]s if the query was successful
|
||||
#[instrument(level = "trace")]
|
||||
pub fn query_all(&self, statement: Statement) -> Result<Vec<QueryResult>, DbErr> {
|
||||
debug_print!("{}", statement);
|
||||
let counter = self.query_counter.fetch_add(1, Ordering::SeqCst);
|
||||
@ -139,6 +144,7 @@ impl MockDatabaseConnection {
|
||||
}
|
||||
|
||||
/// Return [QueryResult]s from a multi-query operation
|
||||
#[instrument(level = "trace")]
|
||||
pub fn fetch(
|
||||
&self,
|
||||
statement: &Statement,
|
||||
@ -150,16 +156,19 @@ impl MockDatabaseConnection {
|
||||
}
|
||||
|
||||
/// Create a statement block of SQL statements that execute together.
|
||||
#[instrument(level = "trace")]
|
||||
pub fn begin(&self) {
|
||||
self.mocker.lock().unwrap().begin()
|
||||
}
|
||||
|
||||
/// Commit a transaction atomically to the database
|
||||
#[instrument(level = "trace")]
|
||||
pub fn commit(&self) {
|
||||
self.mocker.lock().unwrap().commit()
|
||||
}
|
||||
|
||||
/// Roll back a faulty transaction
|
||||
#[instrument(level = "trace")]
|
||||
pub fn rollback(&self) {
|
||||
self.mocker.lock().unwrap().rollback()
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
use std::{future::Future, pin::Pin};
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
|
||||
use sqlx::{
|
||||
mysql::{MySqlArguments, MySqlConnectOptions, MySqlQueryResult, MySqlRow},
|
||||
@ -7,6 +7,7 @@ use sqlx::{
|
||||
|
||||
sea_query::sea_query_driver_mysql!();
|
||||
use sea_query_driver_mysql::bind_query;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{
|
||||
debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction,
|
||||
@ -20,9 +21,16 @@ use super::sqlx_common::*;
|
||||
pub struct SqlxMySqlConnector;
|
||||
|
||||
/// Defines a sqlx MySQL pool
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct SqlxMySqlPoolConnection {
|
||||
pool: MySqlPool,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SqlxMySqlPoolConnection {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "SqlxMySqlPoolConnection {{ pool: {:?} }}", self.pool)
|
||||
}
|
||||
}
|
||||
|
||||
impl SqlxMySqlConnector {
|
||||
@ -32,6 +40,7 @@ impl SqlxMySqlConnector {
|
||||
}
|
||||
|
||||
/// Add configuration options for the MySQL database
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
|
||||
let mut opt = options
|
||||
.url
|
||||
@ -43,7 +52,7 @@ impl SqlxMySqlConnector {
|
||||
}
|
||||
match options.pool_options().connect_with(opt).await {
|
||||
Ok(pool) => Ok(DatabaseConnection::SqlxMySqlPoolConnection(
|
||||
SqlxMySqlPoolConnection { pool },
|
||||
SqlxMySqlPoolConnection { pool, metric_callback: None },
|
||||
)),
|
||||
Err(e) => Err(sqlx_error_to_conn_err(e)),
|
||||
}
|
||||
@ -53,21 +62,24 @@ impl SqlxMySqlConnector {
|
||||
impl SqlxMySqlConnector {
|
||||
/// Instantiate a sqlx pool connection to a [DatabaseConnection]
|
||||
pub fn from_sqlx_mysql_pool(pool: MySqlPool) -> DatabaseConnection {
|
||||
DatabaseConnection::SqlxMySqlPoolConnection(SqlxMySqlPoolConnection { pool })
|
||||
DatabaseConnection::SqlxMySqlPoolConnection(SqlxMySqlPoolConnection { pool, metric_callback: None })
|
||||
}
|
||||
}
|
||||
|
||||
impl SqlxMySqlPoolConnection {
|
||||
/// Execute a [Statement] on a MySQL backend
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
}
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Exec(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -76,18 +88,21 @@ impl SqlxMySqlPoolConnection {
|
||||
}
|
||||
|
||||
/// Get one result from a SQL query. Returns [Option::None] if no match was found
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
}
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -96,15 +111,18 @@ impl SqlxMySqlPoolConnection {
|
||||
}
|
||||
|
||||
/// Get the results of a query returning them as a Vec<[QueryResult]>
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
}
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -113,11 +131,12 @@ impl SqlxMySqlPoolConnection {
|
||||
}
|
||||
|
||||
/// Stream the results of executing a SQL query
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
Ok(QueryStream::from((conn, stmt)))
|
||||
Ok(QueryStream::from((conn, stmt, self.metric_callback.clone())))
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -126,9 +145,10 @@ impl SqlxMySqlPoolConnection {
|
||||
}
|
||||
|
||||
/// Bundle a set of SQL statements that execute together.
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
DatabaseTransaction::new_mysql(conn).await
|
||||
DatabaseTransaction::new_mysql(conn, self.metric_callback.clone()).await
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -137,6 +157,7 @@ impl SqlxMySqlPoolConnection {
|
||||
}
|
||||
|
||||
/// Create a MySQL transaction
|
||||
#[instrument(level = "trace", skip(callback))]
|
||||
pub async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'b> FnOnce(
|
||||
@ -147,7 +168,7 @@ impl SqlxMySqlPoolConnection {
|
||||
E: std::error::Error + Send,
|
||||
{
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
let transaction = DatabaseTransaction::new_mysql(conn)
|
||||
let transaction = DatabaseTransaction::new_mysql(conn, self.metric_callback.clone())
|
||||
.await
|
||||
.map_err(|e| TransactionError::Connection(e))?;
|
||||
transaction.run(callback).await
|
||||
@ -157,6 +178,13 @@ impl SqlxMySqlPoolConnection {
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
|
||||
{
|
||||
self.metric_callback = Some(Arc::new(callback));
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MySqlRow> for QueryResult {
|
||||
|
@ -1,4 +1,4 @@
|
||||
use std::{future::Future, pin::Pin};
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
|
||||
use sqlx::{
|
||||
postgres::{PgArguments, PgConnectOptions, PgQueryResult, PgRow},
|
||||
@ -7,6 +7,7 @@ use sqlx::{
|
||||
|
||||
sea_query::sea_query_driver_postgres!();
|
||||
use sea_query_driver_postgres::bind_query;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{
|
||||
debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction,
|
||||
@ -20,9 +21,16 @@ use super::sqlx_common::*;
|
||||
pub struct SqlxPostgresConnector;
|
||||
|
||||
/// Defines a sqlx PostgreSQL pool
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct SqlxPostgresPoolConnection {
|
||||
pool: PgPool,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SqlxPostgresPoolConnection {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool)
|
||||
}
|
||||
}
|
||||
|
||||
impl SqlxPostgresConnector {
|
||||
@ -32,6 +40,7 @@ impl SqlxPostgresConnector {
|
||||
}
|
||||
|
||||
/// Add configuration options for the MySQL database
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
|
||||
let mut opt = options
|
||||
.url
|
||||
@ -43,7 +52,7 @@ impl SqlxPostgresConnector {
|
||||
}
|
||||
match options.pool_options().connect_with(opt).await {
|
||||
Ok(pool) => Ok(DatabaseConnection::SqlxPostgresPoolConnection(
|
||||
SqlxPostgresPoolConnection { pool },
|
||||
SqlxPostgresPoolConnection { pool, metric_callback: None },
|
||||
)),
|
||||
Err(e) => Err(sqlx_error_to_conn_err(e)),
|
||||
}
|
||||
@ -53,21 +62,24 @@ impl SqlxPostgresConnector {
|
||||
impl SqlxPostgresConnector {
|
||||
/// Instantiate a sqlx pool connection to a [DatabaseConnection]
|
||||
pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
|
||||
DatabaseConnection::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection { pool })
|
||||
DatabaseConnection::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection { pool, metric_callback: None })
|
||||
}
|
||||
}
|
||||
|
||||
impl SqlxPostgresPoolConnection {
|
||||
/// Execute a [Statement] on a PostgreSQL backend
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
}
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Exec(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -76,18 +88,21 @@ impl SqlxPostgresPoolConnection {
|
||||
}
|
||||
|
||||
/// Get one result from a SQL query. Returns [Option::None] if no match was found
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
}
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -96,15 +111,18 @@ impl SqlxPostgresPoolConnection {
|
||||
}
|
||||
|
||||
/// Get the results of a query returning them as a Vec<[QueryResult]>
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
}
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -113,11 +131,12 @@ impl SqlxPostgresPoolConnection {
|
||||
}
|
||||
|
||||
/// Stream the results of executing a SQL query
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
Ok(QueryStream::from((conn, stmt)))
|
||||
Ok(QueryStream::from((conn, stmt, self.metric_callback.clone())))
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -126,9 +145,10 @@ impl SqlxPostgresPoolConnection {
|
||||
}
|
||||
|
||||
/// Bundle a set of SQL statements that execute together.
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
DatabaseTransaction::new_postgres(conn).await
|
||||
DatabaseTransaction::new_postgres(conn, self.metric_callback.clone()).await
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -137,6 +157,7 @@ impl SqlxPostgresPoolConnection {
|
||||
}
|
||||
|
||||
/// Create a PostgreSQL transaction
|
||||
#[instrument(level = "trace", skip(callback))]
|
||||
pub async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'b> FnOnce(
|
||||
@ -147,7 +168,7 @@ impl SqlxPostgresPoolConnection {
|
||||
E: std::error::Error + Send,
|
||||
{
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
let transaction = DatabaseTransaction::new_postgres(conn)
|
||||
let transaction = DatabaseTransaction::new_postgres(conn, self.metric_callback.clone())
|
||||
.await
|
||||
.map_err(|e| TransactionError::Connection(e))?;
|
||||
transaction.run(callback).await
|
||||
@ -157,6 +178,13 @@ impl SqlxPostgresPoolConnection {
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
|
||||
{
|
||||
self.metric_callback = Some(Arc::new(callback));
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PgRow> for QueryResult {
|
||||
|
@ -1,4 +1,4 @@
|
||||
use std::{future::Future, pin::Pin};
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
|
||||
use sqlx::{
|
||||
sqlite::{SqliteArguments, SqliteConnectOptions, SqliteQueryResult, SqliteRow},
|
||||
@ -7,6 +7,7 @@ use sqlx::{
|
||||
|
||||
sea_query::sea_query_driver_sqlite!();
|
||||
use sea_query_driver_sqlite::bind_query;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{
|
||||
debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction,
|
||||
@ -20,9 +21,16 @@ use super::sqlx_common::*;
|
||||
pub struct SqlxSqliteConnector;
|
||||
|
||||
/// Defines a sqlx SQLite pool
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct SqlxSqlitePoolConnection {
|
||||
pool: SqlitePool,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SqlxSqlitePoolConnection {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "SqlxSqlitePoolConnection {{ pool: {:?} }}", self.pool)
|
||||
}
|
||||
}
|
||||
|
||||
impl SqlxSqliteConnector {
|
||||
@ -32,6 +40,7 @@ impl SqlxSqliteConnector {
|
||||
}
|
||||
|
||||
/// Add configuration options for the SQLite database
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
|
||||
let mut options = options;
|
||||
let mut opt = options
|
||||
@ -47,7 +56,7 @@ impl SqlxSqliteConnector {
|
||||
}
|
||||
match options.pool_options().connect_with(opt).await {
|
||||
Ok(pool) => Ok(DatabaseConnection::SqlxSqlitePoolConnection(
|
||||
SqlxSqlitePoolConnection { pool },
|
||||
SqlxSqlitePoolConnection { pool, metric_callback: None },
|
||||
)),
|
||||
Err(e) => Err(sqlx_error_to_conn_err(e)),
|
||||
}
|
||||
@ -57,21 +66,24 @@ impl SqlxSqliteConnector {
|
||||
impl SqlxSqliteConnector {
|
||||
/// Instantiate a sqlx pool connection to a [DatabaseConnection]
|
||||
pub fn from_sqlx_sqlite_pool(pool: SqlitePool) -> DatabaseConnection {
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(SqlxSqlitePoolConnection { pool })
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(SqlxSqlitePoolConnection { pool, metric_callback: None })
|
||||
}
|
||||
}
|
||||
|
||||
impl SqlxSqlitePoolConnection {
|
||||
/// Execute a [Statement] on a SQLite backend
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
}
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Exec(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -80,18 +92,21 @@ impl SqlxSqlitePoolConnection {
|
||||
}
|
||||
|
||||
/// Get one result from a SQL query. Returns [Option::None] if no match was found
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
}
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -100,15 +115,18 @@ impl SqlxSqlitePoolConnection {
|
||||
}
|
||||
|
||||
/// Get the results of a query returning them as a Vec<[QueryResult]>
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
}
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -117,11 +135,12 @@ impl SqlxSqlitePoolConnection {
|
||||
}
|
||||
|
||||
/// Stream the results of executing a SQL query
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
|
||||
debug_print!("{}", stmt);
|
||||
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
Ok(QueryStream::from((conn, stmt)))
|
||||
Ok(QueryStream::from((conn, stmt, self.metric_callback.clone())))
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -130,9 +149,10 @@ impl SqlxSqlitePoolConnection {
|
||||
}
|
||||
|
||||
/// Bundle a set of SQL statements that execute together.
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
DatabaseTransaction::new_sqlite(conn).await
|
||||
DatabaseTransaction::new_sqlite(conn, self.metric_callback.clone()).await
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -141,6 +161,7 @@ impl SqlxSqlitePoolConnection {
|
||||
}
|
||||
|
||||
/// Create a MySQL transaction
|
||||
#[instrument(level = "trace", skip(callback))]
|
||||
pub async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'b> FnOnce(
|
||||
@ -151,7 +172,7 @@ impl SqlxSqlitePoolConnection {
|
||||
E: std::error::Error + Send,
|
||||
{
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
let transaction = DatabaseTransaction::new_sqlite(conn)
|
||||
let transaction = DatabaseTransaction::new_sqlite(conn, self.metric_callback.clone())
|
||||
.await
|
||||
.map_err(|e| TransactionError::Connection(e))?;
|
||||
transaction.run(callback).await
|
||||
@ -161,6 +182,13 @@ impl SqlxSqlitePoolConnection {
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
|
||||
{
|
||||
self.metric_callback = Some(Arc::new(callback));
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SqliteRow> for QueryResult {
|
||||
|
@ -284,6 +284,8 @@ pub mod schema;
|
||||
#[cfg(feature = "macros")]
|
||||
pub mod tests_cfg;
|
||||
mod util;
|
||||
/// Holds types and methods to perform metric collection
|
||||
pub mod metric;
|
||||
|
||||
pub use database::*;
|
||||
pub use driver::*;
|
||||
|
55
src/metric.rs
Normal file
55
src/metric.rs
Normal file
@ -0,0 +1,55 @@
|
||||
use std::{time::Duration, sync::Arc};
|
||||
|
||||
pub(crate) type Callback = Arc<dyn Fn(&Info<'_>) + Send + Sync>;
|
||||
|
||||
pub(crate) use inner::{metric, metric_ok};
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Query execution infos
|
||||
pub struct Info<'a> {
|
||||
/// Query executiuon duration
|
||||
pub elapsed: Duration,
|
||||
/// Query data
|
||||
pub statement: &'a crate::Statement,
|
||||
/// Query execution failed
|
||||
pub failed: bool,
|
||||
}
|
||||
|
||||
mod inner {
|
||||
macro_rules! metric {
|
||||
($metric_callback:expr, $stmt:expr, $code:block) => {
|
||||
{
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = $code;
|
||||
if let Some(callback) = $metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: $stmt,
|
||||
failed: res.is_err(),
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
}
|
||||
};
|
||||
}
|
||||
pub(crate) use metric;
|
||||
macro_rules! metric_ok {
|
||||
($metric_callback:expr, $stmt:expr, $code:block) => {
|
||||
{
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = $code;
|
||||
if let Some(callback) = $metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: $stmt,
|
||||
failed: false,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
}
|
||||
};
|
||||
}
|
||||
pub(crate) use metric_ok;
|
||||
}
|
@ -15,7 +15,7 @@
|
||||
#[macro_export]
|
||||
#[cfg(feature = "debug-print")]
|
||||
macro_rules! debug_print {
|
||||
($( $args:expr ),*) => { log::debug!( $( $args ),* ); }
|
||||
($( $args:expr ),*) => { tracing::debug!( $( $args ),* ); }
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
|
Loading…
x
Reference in New Issue
Block a user