Transactions Isolation level and Access mode (#1230)
* Transactions Isolation level and Access mode * Fix typo * Fix clippy lints
This commit is contained in:
parent
95157e6e6b
commit
824158457b
@ -47,6 +47,48 @@ pub trait StreamTrait: Send + Sync {
|
||||
) -> Pin<Box<dyn Future<Output = Result<Self::Stream<'a>, DbErr>> + 'a + Send>>;
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
/// Isolation level
|
||||
pub enum IsolationLevel {
|
||||
/// Consistent reads within the same transaction read the snapshot established by the first read.
|
||||
RepeatableRead,
|
||||
/// Each consistent read, even within the same transaction, sets and reads its own fresh snapshot.
|
||||
ReadCommitted,
|
||||
/// SELECT statements are performed in a nonlocking fashion, but a possible earlier version of a row might be used.
|
||||
ReadUncommitted,
|
||||
/// All statements of the current transaction can only see rows committed before the first query or data-modification statement was executed in this transaction.
|
||||
Serializable,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for IsolationLevel {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
IsolationLevel::RepeatableRead => write!(f, "REPEATABLE READ"),
|
||||
IsolationLevel::ReadCommitted => write!(f, "READ COMMITTED"),
|
||||
IsolationLevel::ReadUncommitted => write!(f, "READ UNCOMMITTED"),
|
||||
IsolationLevel::Serializable => write!(f, "SERIALIZABLE"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
/// Access mode
|
||||
pub enum AccessMode {
|
||||
/// Data can't be modified in this transaction
|
||||
ReadOnly,
|
||||
/// Data can be modified in this transaction (default)
|
||||
ReadWrite,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for AccessMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
AccessMode::ReadOnly => write!(f, "READ ONLY"),
|
||||
AccessMode::ReadWrite => write!(f, "READ WRITE"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn database transaction
|
||||
#[async_trait::async_trait]
|
||||
pub trait TransactionTrait {
|
||||
@ -54,6 +96,14 @@ pub trait TransactionTrait {
|
||||
/// Returns a Transaction that can be committed or rolled back
|
||||
async fn begin(&self) -> Result<DatabaseTransaction, DbErr>;
|
||||
|
||||
/// Execute SQL `BEGIN` transaction with isolation level and/or access mode.
|
||||
/// Returns a Transaction that can be committed or rolled back
|
||||
async fn begin_with_config(
|
||||
&self,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr>;
|
||||
|
||||
/// Execute the function inside a transaction.
|
||||
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
|
||||
async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
|
||||
@ -64,4 +114,20 @@ pub trait TransactionTrait {
|
||||
+ Send,
|
||||
T: Send,
|
||||
E: std::error::Error + Send;
|
||||
|
||||
/// Execute the function inside a transaction with isolation level and/or access mode.
|
||||
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
|
||||
async fn transaction_with_config<F, T, E>(
|
||||
&self,
|
||||
callback: F,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'c> FnOnce(
|
||||
&'c DatabaseTransaction,
|
||||
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
|
||||
+ Send,
|
||||
T: Send,
|
||||
E: std::error::Error + Send;
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
error::*, ConnectionTrait, DatabaseTransaction, ExecResult, QueryResult, Statement,
|
||||
StatementBuilder, StreamTrait, TransactionError, TransactionTrait,
|
||||
error::*, AccessMode, ConnectionTrait, DatabaseTransaction, ExecResult, IsolationLevel,
|
||||
QueryResult, Statement, StatementBuilder, StreamTrait, TransactionError, TransactionTrait,
|
||||
};
|
||||
use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder};
|
||||
use std::{future::Future, pin::Pin};
|
||||
@ -198,11 +198,38 @@ impl TransactionTrait for DatabaseConnection {
|
||||
async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||
match self {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin().await,
|
||||
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin(None, None).await,
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin().await,
|
||||
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin(None, None).await,
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin().await,
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin(None, None).await,
|
||||
#[cfg(feature = "mock")]
|
||||
DatabaseConnection::MockDatabaseConnection(conn) => {
|
||||
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
|
||||
}
|
||||
DatabaseConnection::Disconnected => panic!("Disconnected"),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn begin_with_config(
|
||||
&self,
|
||||
_isolation_level: Option<IsolationLevel>,
|
||||
_access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
match self {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
DatabaseConnection::SqlxMySqlPoolConnection(conn) => {
|
||||
conn.begin(_isolation_level, _access_mode).await
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
|
||||
conn.begin(_isolation_level, _access_mode).await
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
|
||||
conn.begin(_isolation_level, _access_mode).await
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
DatabaseConnection::MockDatabaseConnection(conn) => {
|
||||
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
|
||||
@ -225,13 +252,61 @@ impl TransactionTrait for DatabaseConnection {
|
||||
{
|
||||
match self {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.transaction(_callback).await,
|
||||
DatabaseConnection::SqlxMySqlPoolConnection(conn) => {
|
||||
conn.transaction(_callback, None, None).await
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
|
||||
conn.transaction(_callback).await
|
||||
conn.transaction(_callback, None, None).await
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.transaction(_callback).await,
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
|
||||
conn.transaction(_callback, None, None).await
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
DatabaseConnection::MockDatabaseConnection(conn) => {
|
||||
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None)
|
||||
.await
|
||||
.map_err(TransactionError::Connection)?;
|
||||
transaction.run(_callback).await
|
||||
}
|
||||
DatabaseConnection::Disconnected => panic!("Disconnected"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the function inside a transaction.
|
||||
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
|
||||
#[instrument(level = "trace", skip(_callback))]
|
||||
async fn transaction_with_config<F, T, E>(
|
||||
&self,
|
||||
_callback: F,
|
||||
_isolation_level: Option<IsolationLevel>,
|
||||
_access_mode: Option<AccessMode>,
|
||||
) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'c> FnOnce(
|
||||
&'c DatabaseTransaction,
|
||||
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
|
||||
+ Send,
|
||||
T: Send,
|
||||
E: std::error::Error + Send,
|
||||
{
|
||||
match self {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
DatabaseConnection::SqlxMySqlPoolConnection(conn) => {
|
||||
conn.transaction(_callback, _isolation_level, _access_mode)
|
||||
.await
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
|
||||
conn.transaction(_callback, _isolation_level, _access_mode)
|
||||
.await
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
|
||||
conn.transaction(_callback, _isolation_level, _access_mode)
|
||||
.await
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
DatabaseConnection::MockDatabaseConnection(conn) => {
|
||||
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None)
|
||||
|
@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
debug_print, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, QueryResult,
|
||||
Statement, StreamTrait, TransactionStream, TransactionTrait,
|
||||
debug_print, AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection,
|
||||
IsolationLevel, QueryResult, Statement, StreamTrait, TransactionStream, TransactionTrait,
|
||||
};
|
||||
#[cfg(feature = "sqlx-dep")]
|
||||
use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
|
||||
@ -31,11 +31,15 @@ impl DatabaseTransaction {
|
||||
pub(crate) async fn new_mysql(
|
||||
inner: PoolConnection<sqlx::MySql>,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
Self::begin(
|
||||
Arc::new(Mutex::new(InnerConnection::MySql(inner))),
|
||||
DbBackend::MySql,
|
||||
metric_callback,
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@ -44,11 +48,15 @@ impl DatabaseTransaction {
|
||||
pub(crate) async fn new_postgres(
|
||||
inner: PoolConnection<sqlx::Postgres>,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
Self::begin(
|
||||
Arc::new(Mutex::new(InnerConnection::Postgres(inner))),
|
||||
DbBackend::Postgres,
|
||||
metric_callback,
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@ -57,11 +65,15 @@ impl DatabaseTransaction {
|
||||
pub(crate) async fn new_sqlite(
|
||||
inner: PoolConnection<sqlx::Sqlite>,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
Self::begin(
|
||||
Arc::new(Mutex::new(InnerConnection::Sqlite(inner))),
|
||||
DbBackend::Sqlite,
|
||||
metric_callback,
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@ -76,6 +88,8 @@ impl DatabaseTransaction {
|
||||
Arc::new(Mutex::new(InnerConnection::Mock(inner))),
|
||||
backend,
|
||||
metric_callback,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@ -86,6 +100,8 @@ impl DatabaseTransaction {
|
||||
conn: Arc<Mutex<InnerConnection>>,
|
||||
backend: DbBackend,
|
||||
metric_callback: Option<crate::metric::Callback>,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
let res = DatabaseTransaction {
|
||||
conn,
|
||||
@ -96,21 +112,34 @@ impl DatabaseTransaction {
|
||||
match *res.conn.lock().await {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
InnerConnection::MySql(ref mut c) => {
|
||||
// in MySQL SET TRANSACTION operations must be executed before transaction start
|
||||
crate::driver::sqlx_mysql::set_transaction_config(c, isolation_level, access_mode)
|
||||
.await?;
|
||||
<sqlx::MySql as sqlx::Database>::TransactionManager::begin(c)
|
||||
.await
|
||||
.map_err(sqlx_error_to_query_err)?
|
||||
.map_err(sqlx_error_to_query_err)?;
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
InnerConnection::Postgres(ref mut c) => {
|
||||
<sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c)
|
||||
.await
|
||||
.map_err(sqlx_error_to_query_err)?
|
||||
.map_err(sqlx_error_to_query_err)?;
|
||||
// in PostgreSQL SET TRANSACTION operations must be executed inside transaction
|
||||
crate::driver::sqlx_postgres::set_transaction_config(
|
||||
c,
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
InnerConnection::Sqlite(ref mut c) => {
|
||||
// in SQLite isolation level and access mode are global settings
|
||||
crate::driver::sqlx_sqlite::set_transaction_config(c, isolation_level, access_mode)
|
||||
.await?;
|
||||
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c)
|
||||
.await
|
||||
.map_err(sqlx_error_to_query_err)?
|
||||
.map_err(sqlx_error_to_query_err)?;
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
InnerConnection::Mock(ref mut c) => {
|
||||
@ -415,6 +444,24 @@ impl TransactionTrait for DatabaseTransaction {
|
||||
Arc::clone(&self.conn),
|
||||
self.backend,
|
||||
self.metric_callback.clone(),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace")]
|
||||
async fn begin_with_config(
|
||||
&self,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
DatabaseTransaction::begin(
|
||||
Arc::clone(&self.conn),
|
||||
self.backend,
|
||||
self.metric_callback.clone(),
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@ -434,6 +481,30 @@ impl TransactionTrait for DatabaseTransaction {
|
||||
let transaction = self.begin().await.map_err(TransactionError::Connection)?;
|
||||
transaction.run(_callback).await
|
||||
}
|
||||
|
||||
/// Execute the function inside a transaction with isolation level and/or access mode.
|
||||
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
|
||||
#[instrument(level = "trace", skip(_callback))]
|
||||
async fn transaction_with_config<F, T, E>(
|
||||
&self,
|
||||
_callback: F,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'c> FnOnce(
|
||||
&'c DatabaseTransaction,
|
||||
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
|
||||
+ Send,
|
||||
T: Send,
|
||||
E: std::error::Error + Send,
|
||||
{
|
||||
let transaction = self
|
||||
.begin_with_config(isolation_level, access_mode)
|
||||
.await
|
||||
.map_err(TransactionError::Connection)?;
|
||||
transaction.run(_callback).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Defines errors for handling transaction failures
|
||||
|
@ -3,15 +3,16 @@ use std::{future::Future, pin::Pin, sync::Arc};
|
||||
|
||||
use sqlx::{
|
||||
mysql::{MySqlConnectOptions, MySqlQueryResult, MySqlRow},
|
||||
MySql, MySqlPool,
|
||||
pool::PoolConnection,
|
||||
Executor, MySql, MySqlPool,
|
||||
};
|
||||
|
||||
use sea_query_binder::SqlxValues;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{
|
||||
debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction,
|
||||
QueryStream, Statement, TransactionError,
|
||||
debug_print, error::*, executor::*, AccessMode, ConnectOptions, DatabaseConnection,
|
||||
DatabaseTransaction, DbBackend, IsolationLevel, QueryStream, Statement, TransactionError,
|
||||
};
|
||||
|
||||
use super::sqlx_common::*;
|
||||
@ -150,9 +151,19 @@ impl SqlxMySqlPoolConnection {
|
||||
|
||||
/// Bundle a set of SQL statements that execute together.
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||
pub async fn begin(
|
||||
&self,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
DatabaseTransaction::new_mysql(conn, self.metric_callback.clone()).await
|
||||
DatabaseTransaction::new_mysql(
|
||||
conn,
|
||||
self.metric_callback.clone(),
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Err(DbErr::ConnectionAcquire)
|
||||
}
|
||||
@ -160,7 +171,12 @@ impl SqlxMySqlPoolConnection {
|
||||
|
||||
/// Create a MySQL transaction
|
||||
#[instrument(level = "trace", skip(callback))]
|
||||
pub async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
|
||||
pub async fn transaction<F, T, E>(
|
||||
&self,
|
||||
callback: F,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'b> FnOnce(
|
||||
&'b DatabaseTransaction,
|
||||
@ -170,9 +186,14 @@ impl SqlxMySqlPoolConnection {
|
||||
E: std::error::Error + Send,
|
||||
{
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
let transaction = DatabaseTransaction::new_mysql(conn, self.metric_callback.clone())
|
||||
.await
|
||||
.map_err(|e| TransactionError::Connection(e))?;
|
||||
let transaction = DatabaseTransaction::new_mysql(
|
||||
conn,
|
||||
self.metric_callback.clone(),
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| TransactionError::Connection(e))?;
|
||||
transaction.run(callback).await
|
||||
} else {
|
||||
Err(TransactionError::Connection(DbErr::ConnectionAcquire))
|
||||
@ -215,3 +236,29 @@ pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, MySql, Sqlx
|
||||
.map_or(Values(Vec::new()), |values| values.clone());
|
||||
sqlx::query_with(&stmt.sql, SqlxValues(values))
|
||||
}
|
||||
|
||||
pub(crate) async fn set_transaction_config(
|
||||
conn: &mut PoolConnection<MySql>,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<(), DbErr> {
|
||||
if let Some(isolation_level) = isolation_level {
|
||||
let stmt = Statement {
|
||||
sql: format!("SET TRANSACTION ISOLATION LEVEL {isolation_level}"),
|
||||
values: None,
|
||||
db_backend: DbBackend::MySql,
|
||||
};
|
||||
let query = sqlx_query(&stmt);
|
||||
conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
|
||||
}
|
||||
if let Some(access_mode) = access_mode {
|
||||
let stmt = Statement {
|
||||
sql: format!("SET TRANSACTION {access_mode}"),
|
||||
values: None,
|
||||
db_backend: DbBackend::MySql,
|
||||
};
|
||||
let query = sqlx_query(&stmt);
|
||||
conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -2,16 +2,17 @@ use sea_query::Values;
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
|
||||
use sqlx::{
|
||||
pool::PoolConnection,
|
||||
postgres::{PgConnectOptions, PgQueryResult, PgRow},
|
||||
PgPool, Postgres,
|
||||
Executor, PgPool, Postgres,
|
||||
};
|
||||
|
||||
use sea_query_binder::SqlxValues;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{
|
||||
debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction,
|
||||
QueryStream, Statement, TransactionError,
|
||||
debug_print, error::*, executor::*, AccessMode, ConnectOptions, DatabaseConnection,
|
||||
DatabaseTransaction, DbBackend, IsolationLevel, QueryStream, Statement, TransactionError,
|
||||
};
|
||||
|
||||
use super::sqlx_common::*;
|
||||
@ -165,9 +166,19 @@ impl SqlxPostgresPoolConnection {
|
||||
|
||||
/// Bundle a set of SQL statements that execute together.
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||
pub async fn begin(
|
||||
&self,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
DatabaseTransaction::new_postgres(conn, self.metric_callback.clone()).await
|
||||
DatabaseTransaction::new_postgres(
|
||||
conn,
|
||||
self.metric_callback.clone(),
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Err(DbErr::ConnectionAcquire)
|
||||
}
|
||||
@ -175,7 +186,12 @@ impl SqlxPostgresPoolConnection {
|
||||
|
||||
/// Create a PostgreSQL transaction
|
||||
#[instrument(level = "trace", skip(callback))]
|
||||
pub async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
|
||||
pub async fn transaction<F, T, E>(
|
||||
&self,
|
||||
callback: F,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'b> FnOnce(
|
||||
&'b DatabaseTransaction,
|
||||
@ -185,9 +201,14 @@ impl SqlxPostgresPoolConnection {
|
||||
E: std::error::Error + Send,
|
||||
{
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
let transaction = DatabaseTransaction::new_postgres(conn, self.metric_callback.clone())
|
||||
.await
|
||||
.map_err(|e| TransactionError::Connection(e))?;
|
||||
let transaction = DatabaseTransaction::new_postgres(
|
||||
conn,
|
||||
self.metric_callback.clone(),
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| TransactionError::Connection(e))?;
|
||||
transaction.run(callback).await
|
||||
} else {
|
||||
Err(TransactionError::Connection(DbErr::ConnectionAcquire))
|
||||
@ -230,3 +251,29 @@ pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, S
|
||||
.map_or(Values(Vec::new()), |values| values.clone());
|
||||
sqlx::query_with(&stmt.sql, SqlxValues(values))
|
||||
}
|
||||
|
||||
pub(crate) async fn set_transaction_config(
|
||||
conn: &mut PoolConnection<Postgres>,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<(), DbErr> {
|
||||
if let Some(isolation_level) = isolation_level {
|
||||
let stmt = Statement {
|
||||
sql: format!("SET TRANSACTION ISOLATION LEVEL {isolation_level}"),
|
||||
values: None,
|
||||
db_backend: DbBackend::Postgres,
|
||||
};
|
||||
let query = sqlx_query(&stmt);
|
||||
conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
|
||||
}
|
||||
if let Some(access_mode) = access_mode {
|
||||
let stmt = Statement {
|
||||
sql: format!("SET TRANSACTION {access_mode}"),
|
||||
values: None,
|
||||
db_backend: DbBackend::Postgres,
|
||||
};
|
||||
let query = sqlx_query(&stmt);
|
||||
conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -2,16 +2,17 @@ use sea_query::Values;
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
|
||||
use sqlx::{
|
||||
pool::PoolConnection,
|
||||
sqlite::{SqliteConnectOptions, SqliteQueryResult, SqliteRow},
|
||||
Sqlite, SqlitePool,
|
||||
};
|
||||
|
||||
use sea_query_binder::SqlxValues;
|
||||
use tracing::instrument;
|
||||
use tracing::{instrument, warn};
|
||||
|
||||
use crate::{
|
||||
debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction,
|
||||
QueryStream, Statement, TransactionError,
|
||||
debug_print, error::*, executor::*, AccessMode, ConnectOptions, DatabaseConnection,
|
||||
DatabaseTransaction, IsolationLevel, QueryStream, Statement, TransactionError,
|
||||
};
|
||||
|
||||
use super::sqlx_common::*;
|
||||
@ -157,9 +158,19 @@ impl SqlxSqlitePoolConnection {
|
||||
|
||||
/// Bundle a set of SQL statements that execute together.
|
||||
#[instrument(level = "trace")]
|
||||
pub async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||
pub async fn begin(
|
||||
&self,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<DatabaseTransaction, DbErr> {
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
DatabaseTransaction::new_sqlite(conn, self.metric_callback.clone()).await
|
||||
DatabaseTransaction::new_sqlite(
|
||||
conn,
|
||||
self.metric_callback.clone(),
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Err(DbErr::ConnectionAcquire)
|
||||
}
|
||||
@ -167,7 +178,12 @@ impl SqlxSqlitePoolConnection {
|
||||
|
||||
/// Create a MySQL transaction
|
||||
#[instrument(level = "trace", skip(callback))]
|
||||
pub async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
|
||||
pub async fn transaction<F, T, E>(
|
||||
&self,
|
||||
callback: F,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<T, TransactionError<E>>
|
||||
where
|
||||
F: for<'b> FnOnce(
|
||||
&'b DatabaseTransaction,
|
||||
@ -177,9 +193,14 @@ impl SqlxSqlitePoolConnection {
|
||||
E: std::error::Error + Send,
|
||||
{
|
||||
if let Ok(conn) = self.pool.acquire().await {
|
||||
let transaction = DatabaseTransaction::new_sqlite(conn, self.metric_callback.clone())
|
||||
.await
|
||||
.map_err(|e| TransactionError::Connection(e))?;
|
||||
let transaction = DatabaseTransaction::new_sqlite(
|
||||
conn,
|
||||
self.metric_callback.clone(),
|
||||
isolation_level,
|
||||
access_mode,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| TransactionError::Connection(e))?;
|
||||
transaction.run(callback).await
|
||||
} else {
|
||||
Err(TransactionError::Connection(DbErr::ConnectionAcquire))
|
||||
@ -222,3 +243,17 @@ pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Sqlite, Sql
|
||||
.map_or(Values(Vec::new()), |values| values.clone());
|
||||
sqlx::query_with(&stmt.sql, SqlxValues(values))
|
||||
}
|
||||
|
||||
pub(crate) async fn set_transaction_config(
|
||||
_conn: &mut PoolConnection<Sqlite>,
|
||||
isolation_level: Option<IsolationLevel>,
|
||||
access_mode: Option<AccessMode>,
|
||||
) -> Result<(), DbErr> {
|
||||
if isolation_level.is_some() {
|
||||
warn!("Setting isolation level in a SQLite transaction isn't supported");
|
||||
}
|
||||
if access_mode.is_some() {
|
||||
warn!("Setting access mode in a SQLite transaction isn't supported");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user