Move code

This commit is contained in:
Chris Tsang 2021-10-04 20:40:27 +08:00
parent af6665fe4f
commit 6b98a6f395
6 changed files with 634 additions and 637 deletions

View File

@ -1,250 +1,40 @@
use crate::{ use crate::{
error::*, ConnectionTrait, DatabaseTransaction, ExecResult, QueryResult, Statement, DatabaseTransaction, DbBackend, DbErr, ExecResult, QueryResult, Statement, TransactionError,
StatementBuilder, TransactionError,
}; };
use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder}; use futures::Stream;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[cfg(feature = "mock")]
use std::sync::Arc;
#[cfg_attr(not(feature = "mock"), derive(Clone))]
pub enum DatabaseConnection {
#[cfg(feature = "sqlx-mysql")]
SqlxMySqlPoolConnection(crate::SqlxMySqlPoolConnection),
#[cfg(feature = "sqlx-postgres")]
SqlxPostgresPoolConnection(crate::SqlxPostgresPoolConnection),
#[cfg(feature = "sqlx-sqlite")]
SqlxSqlitePoolConnection(crate::SqlxSqlitePoolConnection),
#[cfg(feature = "mock")]
MockDatabaseConnection(Arc<crate::MockDatabaseConnection>),
Disconnected,
}
pub type DbConn = DatabaseConnection;
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum DatabaseBackend {
MySql,
Postgres,
Sqlite,
}
pub type DbBackend = DatabaseBackend;
impl Default for DatabaseConnection {
fn default() -> Self {
Self::Disconnected
}
}
impl std::fmt::Debug for DatabaseConnection {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}",
match self {
#[cfg(feature = "sqlx-mysql")]
Self::SqlxMySqlPoolConnection(_) => "SqlxMySqlPoolConnection",
#[cfg(feature = "sqlx-postgres")]
Self::SqlxPostgresPoolConnection(_) => "SqlxPostgresPoolConnection",
#[cfg(feature = "sqlx-sqlite")]
Self::SqlxSqlitePoolConnection(_) => "SqlxSqlitePoolConnection",
#[cfg(feature = "mock")]
Self::MockDatabaseConnection(_) => "MockDatabaseConnection",
Self::Disconnected => "Disconnected",
}
)
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl<'a> ConnectionTrait<'a> for DatabaseConnection { pub trait ConnectionTrait<'a>: Sync {
type Stream = crate::QueryStream; type Stream: Stream<Item = Result<QueryResult, DbErr>>;
fn get_database_backend(&self) -> DbBackend { fn get_database_backend(&self) -> DbBackend;
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(_) => DbBackend::MySql,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(_) => DbBackend::Postgres,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(_) => DbBackend::Sqlite,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.get_database_backend(),
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}
async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> { async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr>;
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.execute(stmt).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.execute(stmt).await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.execute(stmt).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.execute(stmt),
DatabaseConnection::Disconnected => Err(DbErr::Conn("Disconnected".to_owned())),
}
}
async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> { async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr>;
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.query_one(stmt).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.query_one(stmt).await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.query_one(stmt).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.query_one(stmt),
DatabaseConnection::Disconnected => Err(DbErr::Conn("Disconnected".to_owned())),
}
}
async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> { async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr>;
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.query_all(stmt).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.query_all(stmt).await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.query_all(stmt).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.query_all(stmt),
DatabaseConnection::Disconnected => Err(DbErr::Conn("Disconnected".to_owned())),
}
}
fn stream( fn stream(
&'a self, &'a self,
stmt: Statement, stmt: Statement,
) -> Pin<Box<dyn Future<Output = Result<Self::Stream, DbErr>> + 'a>> { ) -> Pin<Box<dyn Future<Output = Result<Self::Stream, DbErr>> + 'a>>;
Box::pin(async move {
Ok(match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.stream(stmt).await?,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.stream(stmt).await?,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.stream(stmt).await?,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
crate::QueryStream::from((Arc::clone(conn), stmt))
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
})
})
}
async fn begin(&self) -> Result<DatabaseTransaction, DbErr> { async fn begin(&self) -> Result<DatabaseTransaction, DbErr>;
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin().await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin().await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin().await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn)).await
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}
/// Execute the function inside a transaction. /// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>> async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
where where
F: for<'c> FnOnce( F: for<'c> FnOnce(
&'c DatabaseTransaction, &'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>> ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send, + Send,
T: Send, T: Send,
E: std::error::Error + Send, E: std::error::Error + Send;
{
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.transaction(_callback).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
conn.transaction(_callback).await
}
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.transaction(_callback).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn))
.await
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(_callback).await
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}
#[cfg(feature = "mock")]
fn is_mock_connection(&self) -> bool { fn is_mock_connection(&self) -> bool {
match self { false
DatabaseConnection::MockDatabaseConnection(_) => true,
_ => false,
}
}
}
#[cfg(feature = "mock")]
impl DatabaseConnection {
pub fn as_mock_connection(&self) -> &crate::MockDatabaseConnection {
match self {
DatabaseConnection::MockDatabaseConnection(mock_conn) => mock_conn,
_ => panic!("not mock connection"),
}
}
pub fn into_transaction_log(self) -> Vec<crate::Transaction> {
let mut mocker = self.as_mock_connection().get_mocker_mutex().lock().unwrap();
mocker.drain_transaction_log()
}
}
impl DbBackend {
pub fn is_prefix_of(self, base_url: &str) -> bool {
match self {
Self::Postgres => {
base_url.starts_with("postgres://") || base_url.starts_with("postgresql://")
}
Self::MySql => base_url.starts_with("mysql://"),
Self::Sqlite => base_url.starts_with("sqlite:"),
}
}
pub fn build<S>(&self, statement: &S) -> Statement
where
S: StatementBuilder,
{
statement.build(self)
}
pub fn get_query_builder(&self) -> Box<dyn QueryBuilder> {
match self {
Self::MySql => Box::new(MysqlQueryBuilder),
Self::Postgres => Box::new(PostgresQueryBuilder),
Self::Sqlite => Box::new(SqliteQueryBuilder),
}
}
}
#[cfg(test)]
mod tests {
use crate::DatabaseConnection;
#[test]
fn assert_database_connection_traits() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<DatabaseConnection>();
} }
} }

View File

@ -1,10 +1,39 @@
use crate::{ use crate::{
DatabaseTransaction, DbBackend, DbErr, ExecResult, QueryResult, Statement, TransactionError, error::*, ConnectionTrait, DatabaseTransaction, ExecResult, QueryResult, Statement,
StatementBuilder, TransactionError,
}; };
use futures::Stream; use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder};
use std::{future::Future, pin::Pin};
#[cfg(feature = "sqlx-dep")] #[cfg(feature = "sqlx-dep")]
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use std::{future::Future, pin::Pin};
#[cfg(feature = "mock")]
use std::sync::Arc;
#[cfg_attr(not(feature = "mock"), derive(Clone))]
pub enum DatabaseConnection {
#[cfg(feature = "sqlx-mysql")]
SqlxMySqlPoolConnection(crate::SqlxMySqlPoolConnection),
#[cfg(feature = "sqlx-postgres")]
SqlxPostgresPoolConnection(crate::SqlxPostgresPoolConnection),
#[cfg(feature = "sqlx-sqlite")]
SqlxSqlitePoolConnection(crate::SqlxSqlitePoolConnection),
#[cfg(feature = "mock")]
MockDatabaseConnection(Arc<crate::MockDatabaseConnection>),
Disconnected,
}
pub type DbConn = DatabaseConnection;
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum DatabaseBackend {
MySql,
Postgres,
Sqlite,
}
pub type DbBackend = DatabaseBackend;
pub(crate) enum InnerConnection { pub(crate) enum InnerConnection {
#[cfg(feature = "sqlx-mysql")] #[cfg(feature = "sqlx-mysql")]
@ -17,37 +46,219 @@ pub(crate) enum InnerConnection {
Mock(std::sync::Arc<crate::MockDatabaseConnection>), Mock(std::sync::Arc<crate::MockDatabaseConnection>),
} }
impl Default for DatabaseConnection {
fn default() -> Self {
Self::Disconnected
}
}
impl std::fmt::Debug for DatabaseConnection {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}",
match self {
#[cfg(feature = "sqlx-mysql")]
Self::SqlxMySqlPoolConnection(_) => "SqlxMySqlPoolConnection",
#[cfg(feature = "sqlx-postgres")]
Self::SqlxPostgresPoolConnection(_) => "SqlxPostgresPoolConnection",
#[cfg(feature = "sqlx-sqlite")]
Self::SqlxSqlitePoolConnection(_) => "SqlxSqlitePoolConnection",
#[cfg(feature = "mock")]
Self::MockDatabaseConnection(_) => "MockDatabaseConnection",
Self::Disconnected => "Disconnected",
}
)
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait ConnectionTrait<'a>: Sync { impl<'a> ConnectionTrait<'a> for DatabaseConnection {
type Stream: Stream<Item = Result<QueryResult, DbErr>>; type Stream = crate::QueryStream;
fn get_database_backend(&self) -> DbBackend; fn get_database_backend(&self) -> DbBackend {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(_) => DbBackend::MySql,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(_) => DbBackend::Postgres,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(_) => DbBackend::Sqlite,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.get_database_backend(),
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}
async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr>; async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.execute(stmt).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.execute(stmt).await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.execute(stmt).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.execute(stmt),
DatabaseConnection::Disconnected => Err(DbErr::Conn("Disconnected".to_owned())),
}
}
async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr>; async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.query_one(stmt).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.query_one(stmt).await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.query_one(stmt).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.query_one(stmt),
DatabaseConnection::Disconnected => Err(DbErr::Conn("Disconnected".to_owned())),
}
}
async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr>; async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.query_all(stmt).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.query_all(stmt).await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.query_all(stmt).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.query_all(stmt),
DatabaseConnection::Disconnected => Err(DbErr::Conn("Disconnected".to_owned())),
}
}
fn stream( fn stream(
&'a self, &'a self,
stmt: Statement, stmt: Statement,
) -> Pin<Box<dyn Future<Output = Result<Self::Stream, DbErr>> + 'a>>; ) -> Pin<Box<dyn Future<Output = Result<Self::Stream, DbErr>> + 'a>> {
Box::pin(async move {
Ok(match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.stream(stmt).await?,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.stream(stmt).await?,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.stream(stmt).await?,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
crate::QueryStream::from((Arc::clone(conn), stmt))
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
})
})
}
async fn begin(&self) -> Result<DatabaseTransaction, DbErr>; async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.begin().await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.begin().await,
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin().await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn)).await
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}
/// Execute the function inside a transaction. /// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>> async fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
where where
F: for<'c> FnOnce( F: for<'c> FnOnce(
&'c DatabaseTransaction, &'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>> ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send, + Send,
T: Send, T: Send,
E: std::error::Error + Send; E: std::error::Error + Send,
{
match self {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.transaction(_callback).await,
#[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
conn.transaction(_callback).await
}
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.transaction(_callback).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => {
let transaction = DatabaseTransaction::new_mock(Arc::clone(conn))
.await
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(_callback).await
}
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}
#[cfg(feature = "mock")]
fn is_mock_connection(&self) -> bool { fn is_mock_connection(&self) -> bool {
false match self {
DatabaseConnection::MockDatabaseConnection(_) => true,
_ => false,
}
}
}
#[cfg(feature = "mock")]
impl DatabaseConnection {
pub fn as_mock_connection(&self) -> &crate::MockDatabaseConnection {
match self {
DatabaseConnection::MockDatabaseConnection(mock_conn) => mock_conn,
_ => panic!("not mock connection"),
}
}
pub fn into_transaction_log(self) -> Vec<crate::Transaction> {
let mut mocker = self.as_mock_connection().get_mocker_mutex().lock().unwrap();
mocker.drain_transaction_log()
}
}
impl DbBackend {
pub fn is_prefix_of(self, base_url: &str) -> bool {
match self {
Self::Postgres => {
base_url.starts_with("postgres://") || base_url.starts_with("postgresql://")
}
Self::MySql => base_url.starts_with("mysql://"),
Self::Sqlite => base_url.starts_with("sqlite:"),
}
}
pub fn build<S>(&self, statement: &S) -> Statement
where
S: StatementBuilder,
{
statement.build(self)
}
pub fn get_query_builder(&self) -> Box<dyn QueryBuilder> {
match self {
Self::MySql => Box::new(MysqlQueryBuilder),
Self::Postgres => Box::new(PostgresQueryBuilder),
Self::Sqlite => Box::new(SqliteQueryBuilder),
}
}
}
#[cfg(test)]
mod tests {
use crate::DatabaseConnection;
#[test]
fn assert_database_connection_traits() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<DatabaseConnection>();
} }
} }

View File

@ -1,370 +0,0 @@
use crate::{
debug_print, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, QueryResult,
Statement, TransactionStream,
};
#[cfg(feature = "sqlx-dep")]
use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
use futures::lock::Mutex;
#[cfg(feature = "sqlx-dep")]
use sqlx::{pool::PoolConnection, TransactionManager};
use std::{future::Future, pin::Pin, sync::Arc};
// a Transaction is just a sugar for a connection where START TRANSACTION has been executed
pub struct DatabaseTransaction {
conn: Arc<Mutex<InnerConnection>>,
backend: DbBackend,
open: bool,
}
impl std::fmt::Debug for DatabaseTransaction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DatabaseTransaction")
}
}
impl DatabaseTransaction {
#[cfg(feature = "sqlx-mysql")]
pub(crate) async fn new_mysql(
inner: PoolConnection<sqlx::MySql>,
) -> Result<DatabaseTransaction, DbErr> {
Self::build(
Arc::new(Mutex::new(InnerConnection::MySql(inner))),
DbBackend::MySql,
)
.await
}
#[cfg(feature = "sqlx-postgres")]
pub(crate) async fn new_postgres(
inner: PoolConnection<sqlx::Postgres>,
) -> Result<DatabaseTransaction, DbErr> {
Self::build(
Arc::new(Mutex::new(InnerConnection::Postgres(inner))),
DbBackend::Postgres,
)
.await
}
#[cfg(feature = "sqlx-sqlite")]
pub(crate) async fn new_sqlite(
inner: PoolConnection<sqlx::Sqlite>,
) -> Result<DatabaseTransaction, DbErr> {
Self::build(
Arc::new(Mutex::new(InnerConnection::Sqlite(inner))),
DbBackend::Sqlite,
)
.await
}
#[cfg(feature = "mock")]
pub(crate) async fn new_mock(
inner: Arc<crate::MockDatabaseConnection>,
) -> Result<DatabaseTransaction, DbErr> {
let backend = inner.get_database_backend();
Self::build(Arc::new(Mutex::new(InnerConnection::Mock(inner))), backend).await
}
async fn build(
conn: Arc<Mutex<InnerConnection>>,
backend: DbBackend,
) -> Result<DatabaseTransaction, DbErr> {
let res = DatabaseTransaction {
conn,
backend,
open: true,
};
match *res.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
}
// should we do something for mocked connections?
#[cfg(feature = "mock")]
InnerConnection::Mock(_) => {}
}
Ok(res)
}
pub(crate) async fn run<F, T, E>(self, callback: F) -> Result<T, TransactionError<E>>
where
F: for<'b> FnOnce(
&'b DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
+ Send,
T: Send,
E: std::error::Error + Send,
{
let res = callback(&self)
.await
.map_err(|e| TransactionError::Transaction(e));
if res.is_ok() {
self.commit()
.await
.map_err(|e| TransactionError::Connection(e))?;
} else {
self.rollback()
.await
.map_err(|e| TransactionError::Connection(e))?;
}
res
}
pub async fn commit(mut self) -> Result<(), DbErr> {
self.open = false;
match *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::commit(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::commit(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::commit(c)
.await
.map_err(sqlx_error_to_query_err)?
}
//Should we do something for mocked connections?
#[cfg(feature = "mock")]
InnerConnection::Mock(_) => {}
}
Ok(())
}
pub async fn rollback(mut self) -> Result<(), DbErr> {
self.open = false;
match *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::rollback(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::rollback(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::rollback(c)
.await
.map_err(sqlx_error_to_query_err)?
}
//Should we do something for mocked connections?
#[cfg(feature = "mock")]
InnerConnection::Mock(_) => {}
}
Ok(())
}
// the rollback is queued and will be performed on next async operation, like returning the connection to the pool
fn start_rollback(&mut self) {
if self.open {
if let Some(mut conn) = self.conn.try_lock() {
match &mut *conn {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::start_rollback(c);
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::start_rollback(c);
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::start_rollback(c);
}
//Should we do something for mocked connections?
#[cfg(feature = "mock")]
InnerConnection::Mock(_) => {}
}
} else {
//this should never happen
panic!("Dropping a locked Transaction");
}
}
}
}
impl Drop for DatabaseTransaction {
fn drop(&mut self) {
self.start_rollback();
}
}
#[async_trait::async_trait]
impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
type Stream = TransactionStream<'a>;
fn get_database_backend(&self) -> DbBackend {
// this way we don't need to lock
self.backend
}
async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
debug_print!("{}", stmt);
let _res = match &mut *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(conn) => {
let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
query.execute(conn).await.map(Into::into)
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(conn) => {
let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
query.execute(conn).await.map(Into::into)
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(conn) => {
let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
query.execute(conn).await.map(Into::into)
}
#[cfg(feature = "mock")]
InnerConnection::Mock(conn) => return conn.execute(stmt),
};
#[cfg(feature = "sqlx-dep")]
_res.map_err(sqlx_error_to_exec_err)
}
async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
debug_print!("{}", stmt);
let _res = match &mut *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(conn) => {
let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
query.fetch_one(conn).await.map(|row| Some(row.into()))
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(conn) => {
let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
query.fetch_one(conn).await.map(|row| Some(row.into()))
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(conn) => {
let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
query.fetch_one(conn).await.map(|row| Some(row.into()))
}
#[cfg(feature = "mock")]
InnerConnection::Mock(conn) => return conn.query_one(stmt),
};
#[cfg(feature = "sqlx-dep")]
if let Err(sqlx::Error::RowNotFound) = _res {
Ok(None)
} else {
_res.map_err(sqlx_error_to_query_err)
}
}
async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
debug_print!("{}", stmt);
let _res = match &mut *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(conn) => {
let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
query
.fetch_all(conn)
.await
.map(|rows| rows.into_iter().map(|r| r.into()).collect())
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(conn) => {
let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
query
.fetch_all(conn)
.await
.map(|rows| rows.into_iter().map(|r| r.into()).collect())
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(conn) => {
let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
query
.fetch_all(conn)
.await
.map(|rows| rows.into_iter().map(|r| r.into()).collect())
}
#[cfg(feature = "mock")]
InnerConnection::Mock(conn) => return conn.query_all(stmt),
};
#[cfg(feature = "sqlx-dep")]
_res.map_err(sqlx_error_to_query_err)
}
fn stream(
&'a self,
stmt: Statement,
) -> Pin<Box<dyn Future<Output = Result<Self::Stream, DbErr>> + 'a>> {
Box::pin(
async move { Ok(crate::TransactionStream::build(self.conn.lock().await, stmt).await) },
)
}
async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
DatabaseTransaction::build(Arc::clone(&self.conn), self.backend).await
}
/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(
&'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::error::Error + Send,
{
let transaction = self
.begin()
.await
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(_callback).await
}
}
#[derive(Debug)]
pub enum TransactionError<E>
where
E: std::error::Error,
{
Connection(DbErr),
Transaction(E),
}
impl<E> std::fmt::Display for TransactionError<E>
where
E: std::error::Error,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TransactionError::Connection(e) => std::fmt::Display::fmt(e, f),
TransactionError::Transaction(e) => std::fmt::Display::fmt(e, f),
}
}
}
impl<E> std::error::Error for TransactionError<E> where E: std::error::Error {}

View File

@ -1,9 +1,9 @@
use crate::{ use crate::{
error::*, DatabaseConnection, DbBackend, EntityTrait, ExecResult, ExecResultHolder, Iden, error::*, DatabaseConnection, DbBackend, EntityTrait, ExecResult, ExecResultHolder, Iden,
Iterable, MockDatabaseConnection, MockDatabaseTrait, ModelTrait, QueryResult, QueryResultRow, Iterable, MockDatabaseConnection, MockDatabaseTrait, ModelTrait, QueryResult, QueryResultRow,
Statement, Transaction, Statement,
}; };
use sea_query::{Value, ValueType}; use sea_query::{Value, ValueType, Values};
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
#[derive(Debug)] #[derive(Debug)]
@ -29,6 +29,11 @@ pub trait IntoMockRow {
fn into_mock_row(self) -> MockRow; fn into_mock_row(self) -> MockRow;
} }
#[derive(Debug, Clone, PartialEq)]
pub struct Transaction {
stmts: Vec<Statement>,
}
impl MockDatabase { impl MockDatabase {
pub fn new(db_backend: DbBackend) -> Self { pub fn new(db_backend: DbBackend) -> Self {
Self { Self {
@ -134,3 +139,38 @@ impl IntoMockRow for BTreeMap<&str, Value> {
} }
} }
} }
impl Transaction {
pub fn from_sql_and_values<I>(db_backend: DbBackend, sql: &str, values: I) -> Self
where
I: IntoIterator<Item = Value>,
{
Self::one(Statement::from_string_values_tuple(
db_backend,
(sql.to_string(), Values(values.into_iter().collect())),
))
}
/// Create a Transaction with one statement
pub fn one(stmt: Statement) -> Self {
Self { stmts: vec![stmt] }
}
/// Create a Transaction with many statements
pub fn many<I>(stmts: I) -> Self
where
I: IntoIterator<Item = Statement>,
{
Self {
stmts: stmts.into_iter().collect(),
}
}
/// Wrap each Statement as a single-statement Transaction
pub fn wrap<I>(stmts: I) -> Vec<Self>
where
I: IntoIterator<Item = Statement>,
{
stmts.into_iter().map(Self::one).collect()
}
}

View File

@ -1,6 +1,5 @@
mod connection; mod connection;
mod db_connection; mod db_connection;
mod db_transaction;
#[cfg(feature = "mock")] #[cfg(feature = "mock")]
mod mock; mod mock;
mod statement; mod statement;
@ -9,7 +8,6 @@ mod transaction;
pub use connection::*; pub use connection::*;
pub use db_connection::*; pub use db_connection::*;
pub use db_transaction::*;
#[cfg(feature = "mock")] #[cfg(feature = "mock")]
pub use mock::*; pub use mock::*;
pub use statement::*; pub use statement::*;

View File

@ -1,42 +1,370 @@
use crate::{DbBackend, Statement}; use crate::{
use sea_query::{Value, Values}; debug_print, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, QueryResult,
Statement, TransactionStream,
};
#[cfg(feature = "sqlx-dep")]
use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
use futures::lock::Mutex;
#[cfg(feature = "sqlx-dep")]
use sqlx::{pool::PoolConnection, TransactionManager};
use std::{future::Future, pin::Pin, sync::Arc};
#[derive(Debug, Clone, PartialEq)] // a Transaction is just a sugar for a connection where START TRANSACTION has been executed
pub struct Transaction { pub struct DatabaseTransaction {
stmts: Vec<Statement>, conn: Arc<Mutex<InnerConnection>>,
backend: DbBackend,
open: bool,
} }
impl Transaction { impl std::fmt::Debug for DatabaseTransaction {
pub fn from_sql_and_values<I>(db_backend: DbBackend, sql: &str, values: I) -> Self fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
where write!(f, "DatabaseTransaction")
I: IntoIterator<Item = Value>, }
{ }
Self::one(Statement::from_string_values_tuple(
db_backend, impl DatabaseTransaction {
(sql.to_string(), Values(values.into_iter().collect())), #[cfg(feature = "sqlx-mysql")]
)) pub(crate) async fn new_mysql(
inner: PoolConnection<sqlx::MySql>,
) -> Result<DatabaseTransaction, DbErr> {
Self::begin(
Arc::new(Mutex::new(InnerConnection::MySql(inner))),
DbBackend::MySql,
)
.await
} }
/// Create a Transaction with one statement #[cfg(feature = "sqlx-postgres")]
pub fn one(stmt: Statement) -> Self { pub(crate) async fn new_postgres(
Self { stmts: vec![stmt] } inner: PoolConnection<sqlx::Postgres>,
) -> Result<DatabaseTransaction, DbErr> {
Self::begin(
Arc::new(Mutex::new(InnerConnection::Postgres(inner))),
DbBackend::Postgres,
)
.await
} }
/// Create a Transaction with many statements #[cfg(feature = "sqlx-sqlite")]
pub fn many<I>(stmts: I) -> Self pub(crate) async fn new_sqlite(
inner: PoolConnection<sqlx::Sqlite>,
) -> Result<DatabaseTransaction, DbErr> {
Self::begin(
Arc::new(Mutex::new(InnerConnection::Sqlite(inner))),
DbBackend::Sqlite,
)
.await
}
#[cfg(feature = "mock")]
pub(crate) async fn new_mock(
inner: Arc<crate::MockDatabaseConnection>,
) -> Result<DatabaseTransaction, DbErr> {
let backend = inner.get_database_backend();
Self::begin(Arc::new(Mutex::new(InnerConnection::Mock(inner))), backend).await
}
async fn begin(
conn: Arc<Mutex<InnerConnection>>,
backend: DbBackend,
) -> Result<DatabaseTransaction, DbErr> {
let res = DatabaseTransaction {
conn,
backend,
open: true,
};
match *res.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c)
.await
.map_err(sqlx_error_to_query_err)?
}
// should we do something for mocked connections?
#[cfg(feature = "mock")]
InnerConnection::Mock(_) => {}
}
Ok(res)
}
pub(crate) async fn run<F, T, E>(self, callback: F) -> Result<T, TransactionError<E>>
where where
I: IntoIterator<Item = Statement>, F: for<'b> FnOnce(
&'b DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
+ Send,
T: Send,
E: std::error::Error + Send,
{ {
Self { let res = callback(&self)
stmts: stmts.into_iter().collect(), .await
.map_err(|e| TransactionError::Transaction(e));
if res.is_ok() {
self.commit()
.await
.map_err(|e| TransactionError::Connection(e))?;
} else {
self.rollback()
.await
.map_err(|e| TransactionError::Connection(e))?;
}
res
}
pub async fn commit(mut self) -> Result<(), DbErr> {
self.open = false;
match *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::commit(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::commit(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::commit(c)
.await
.map_err(sqlx_error_to_query_err)?
}
//Should we do something for mocked connections?
#[cfg(feature = "mock")]
InnerConnection::Mock(_) => {}
}
Ok(())
}
pub async fn rollback(mut self) -> Result<(), DbErr> {
self.open = false;
match *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(ref mut c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::rollback(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(ref mut c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::rollback(c)
.await
.map_err(sqlx_error_to_query_err)?
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(ref mut c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::rollback(c)
.await
.map_err(sqlx_error_to_query_err)?
}
//Should we do something for mocked connections?
#[cfg(feature = "mock")]
InnerConnection::Mock(_) => {}
}
Ok(())
}
// the rollback is queued and will be performed on next async operation, like returning the connection to the pool
fn start_rollback(&mut self) {
if self.open {
if let Some(mut conn) = self.conn.try_lock() {
match &mut *conn {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(c) => {
<sqlx::MySql as sqlx::Database>::TransactionManager::start_rollback(c);
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(c) => {
<sqlx::Postgres as sqlx::Database>::TransactionManager::start_rollback(c);
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(c) => {
<sqlx::Sqlite as sqlx::Database>::TransactionManager::start_rollback(c);
}
//Should we do something for mocked connections?
#[cfg(feature = "mock")]
InnerConnection::Mock(_) => {}
}
} else {
//this should never happen
panic!("Dropping a locked Transaction");
}
}
}
}
impl Drop for DatabaseTransaction {
fn drop(&mut self) {
self.start_rollback();
}
}
#[async_trait::async_trait]
impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
type Stream = TransactionStream<'a>;
fn get_database_backend(&self) -> DbBackend {
// this way we don't need to lock
self.backend
}
async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
debug_print!("{}", stmt);
let _res = match &mut *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(conn) => {
let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
query.execute(conn).await.map(Into::into)
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(conn) => {
let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
query.execute(conn).await.map(Into::into)
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(conn) => {
let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
query.execute(conn).await.map(Into::into)
}
#[cfg(feature = "mock")]
InnerConnection::Mock(conn) => return conn.execute(stmt),
};
#[cfg(feature = "sqlx-dep")]
_res.map_err(sqlx_error_to_exec_err)
}
async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
debug_print!("{}", stmt);
let _res = match &mut *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(conn) => {
let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
query.fetch_one(conn).await.map(|row| Some(row.into()))
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(conn) => {
let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
query.fetch_one(conn).await.map(|row| Some(row.into()))
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(conn) => {
let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
query.fetch_one(conn).await.map(|row| Some(row.into()))
}
#[cfg(feature = "mock")]
InnerConnection::Mock(conn) => return conn.query_one(stmt),
};
#[cfg(feature = "sqlx-dep")]
if let Err(sqlx::Error::RowNotFound) = _res {
Ok(None)
} else {
_res.map_err(sqlx_error_to_query_err)
} }
} }
/// Wrap each Statement as a single-statement Transaction async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
pub fn wrap<I>(stmts: I) -> Vec<Self> debug_print!("{}", stmt);
let _res = match &mut *self.conn.lock().await {
#[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(conn) => {
let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
query
.fetch_all(conn)
.await
.map(|rows| rows.into_iter().map(|r| r.into()).collect())
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(conn) => {
let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
query
.fetch_all(conn)
.await
.map(|rows| rows.into_iter().map(|r| r.into()).collect())
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(conn) => {
let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
query
.fetch_all(conn)
.await
.map(|rows| rows.into_iter().map(|r| r.into()).collect())
}
#[cfg(feature = "mock")]
InnerConnection::Mock(conn) => return conn.query_all(stmt),
};
#[cfg(feature = "sqlx-dep")]
_res.map_err(sqlx_error_to_query_err)
}
fn stream(
&'a self,
stmt: Statement,
) -> Pin<Box<dyn Future<Output = Result<Self::Stream, DbErr>> + 'a>> {
Box::pin(
async move { Ok(crate::TransactionStream::build(self.conn.lock().await, stmt).await) },
)
}
async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
DatabaseTransaction::begin(Arc::clone(&self.conn), self.backend).await
}
/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction<F, T, E>(&self, _callback: F) -> Result<T, TransactionError<E>>
where where
I: IntoIterator<Item = Statement>, F: for<'c> FnOnce(
&'c DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>>
+ Send,
T: Send,
E: std::error::Error + Send,
{ {
stmts.into_iter().map(Self::one).collect() let transaction = self
.begin()
.await
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(_callback).await
} }
} }
#[derive(Debug)]
pub enum TransactionError<E>
where
E: std::error::Error,
{
Connection(DbErr),
Transaction(E),
}
impl<E> std::fmt::Display for TransactionError<E>
where
E: std::error::Error,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TransactionError::Connection(e) => std::fmt::Display::fmt(e, f),
TransactionError::Transaction(e) => std::fmt::Display::fmt(e, f),
}
}
}
impl<E> std::error::Error for TransactionError<E> where E: std::error::Error {}