From e7b822c65d9ac8041e96cb954e92aeaa3156b2ff Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Mon, 4 Oct 2021 11:44:02 +0800 Subject: [PATCH] cargo fmt --- examples/rocket_example/src/pool.rs | 4 +- examples/rocket_example/src/setup.rs | 2 +- src/database/connection.rs | 35 +++-- src/database/db_connection.rs | 18 ++- src/database/db_transaction.rs | 224 +++++++++++++++++---------- src/database/mock.rs | 3 +- src/database/mod.rs | 12 +- src/database/stream/query.rs | 76 +++++---- src/database/stream/transaction.rs | 91 ++++++----- src/docs.rs | 2 +- src/driver/mock.rs | 23 ++- src/driver/sqlx_mysql.rs | 19 ++- src/driver/sqlx_postgres.rs | 19 ++- src/driver/sqlx_sqlite.rs | 19 ++- src/executor/delete.rs | 44 +++--- src/executor/insert.rs | 24 ++- src/executor/paginator.rs | 4 +- src/executor/select.rs | 110 ++++++++----- src/executor/update.rs | 39 ++--- src/lib.rs | 2 +- src/query/mod.rs | 2 +- tests/common/setup/mod.rs | 2 +- tests/common/setup/schema.rs | 4 +- tests/query_tests.rs | 2 +- tests/stream_tests.rs | 4 +- tests/transaction_tests.rs | 73 +++++---- 26 files changed, 517 insertions(+), 340 deletions(-) diff --git a/examples/rocket_example/src/pool.rs b/examples/rocket_example/src/pool.rs index 7c8e37cd..c4140c1f 100644 --- a/examples/rocket_example/src/pool.rs +++ b/examples/rocket_example/src/pool.rs @@ -16,9 +16,7 @@ impl rocket_db_pools::Pool for RocketDbPool { let config = figment.extract::().unwrap(); let conn = sea_orm::Database::connect(&config.url).await.unwrap(); - Ok(RocketDbPool { - conn, - }) + Ok(RocketDbPool { conn }) } async fn get(&self) -> Result { diff --git a/examples/rocket_example/src/setup.rs b/examples/rocket_example/src/setup.rs index 91bbb7b1..f5b5a99e 100644 --- a/examples/rocket_example/src/setup.rs +++ b/examples/rocket_example/src/setup.rs @@ -1,5 +1,5 @@ use sea_orm::sea_query::{ColumnDef, TableCreateStatement}; -use sea_orm::{query::*, error::*, sea_query, DbConn, ExecResult}; +use sea_orm::{error::*, query::*, sea_query, DbConn, ExecResult}; async fn create_table(db: &DbConn, stmt: &TableCreateStatement) -> Result { let builder = db.get_database_backend(); diff --git a/src/database/connection.rs b/src/database/connection.rs index 630b8b96..d2283caf 100644 --- a/src/database/connection.rs +++ b/src/database/connection.rs @@ -1,6 +1,9 @@ -use std::{future::Future, pin::Pin, sync::Arc}; -use crate::{DatabaseTransaction, ConnectionTrait, ExecResult, QueryResult, Statement, StatementBuilder, TransactionError, error::*}; +use crate::{ + error::*, ConnectionTrait, DatabaseTransaction, ExecResult, QueryResult, Statement, + StatementBuilder, TransactionError, +}; use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder}; +use std::{future::Future, pin::Pin, sync::Arc}; #[cfg_attr(not(feature = "mock"), derive(Clone))] pub enum DatabaseConnection { @@ -112,7 +115,10 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { } } - fn stream(&'a self, stmt: Statement) -> Pin> + 'a>> { + fn stream( + &'a self, + stmt: Statement, + ) -> Pin> + 'a>> { Box::pin(async move { Ok(match self { #[cfg(feature = "sqlx-mysql")] @@ -122,7 +128,9 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { #[cfg(feature = "sqlx-sqlite")] DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.stream(stmt).await?, #[cfg(feature = "mock")] - DatabaseConnection::MockDatabaseConnection(conn) => crate::QueryStream::from((Arc::clone(conn), stmt)), + DatabaseConnection::MockDatabaseConnection(conn) => { + crate::QueryStream::from((Arc::clone(conn), stmt)) + } DatabaseConnection::Disconnected => panic!("Disconnected"), }) }) @@ -137,7 +145,9 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { #[cfg(feature = "sqlx-sqlite")] DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin().await, #[cfg(feature = "mock")] - DatabaseConnection::MockDatabaseConnection(conn) => DatabaseTransaction::new_mock(Arc::clone(conn)).await, + DatabaseConnection::MockDatabaseConnection(conn) => { + DatabaseTransaction::new_mock(Arc::clone(conn)).await + } DatabaseConnection::Disconnected => panic!("Disconnected"), } } @@ -146,7 +156,10 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. async fn transaction(&self, _callback: F) -> Result> where - F: for<'c> FnOnce(&'c DatabaseTransaction) -> Pin> + Send + 'c>> + Send, + F: for<'c> FnOnce( + &'c DatabaseTransaction, + ) -> Pin> + Send + 'c>> + + Send, T: Send, E: std::error::Error + Send, { @@ -154,14 +167,18 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { #[cfg(feature = "sqlx-mysql")] DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.transaction(_callback).await, #[cfg(feature = "sqlx-postgres")] - DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.transaction(_callback).await, + DatabaseConnection::SqlxPostgresPoolConnection(conn) => { + conn.transaction(_callback).await + } #[cfg(feature = "sqlx-sqlite")] DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.transaction(_callback).await, #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => { - let transaction = DatabaseTransaction::new_mock(Arc::clone(conn)).await.map_err(|e| TransactionError::Connection(e))?; + let transaction = DatabaseTransaction::new_mock(Arc::clone(conn)) + .await + .map_err(|e| TransactionError::Connection(e))?; transaction.run(_callback).await - }, + } DatabaseConnection::Disconnected => panic!("Disconnected"), } } diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index 7f43ae84..6040e452 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -1,8 +1,10 @@ -use std::{future::Future, pin::Pin}; -use crate::{DatabaseTransaction, DbBackend, DbErr, ExecResult, QueryResult, Statement, TransactionError}; +use crate::{ + DatabaseTransaction, DbBackend, DbErr, ExecResult, QueryResult, Statement, TransactionError, +}; use futures::Stream; #[cfg(feature = "sqlx-dep")] use sqlx::pool::PoolConnection; +use std::{future::Future, pin::Pin}; pub(crate) enum InnerConnection { #[cfg(feature = "sqlx-mysql")] @@ -17,7 +19,7 @@ pub(crate) enum InnerConnection { #[async_trait::async_trait] pub trait ConnectionTrait<'a>: Sync { - type Stream: Stream>; + type Stream: Stream>; fn get_database_backend(&self) -> DbBackend; @@ -27,7 +29,10 @@ pub trait ConnectionTrait<'a>: Sync { async fn query_all(&self, stmt: Statement) -> Result, DbErr>; - fn stream(&'a self, stmt: Statement) -> Pin> + 'a>>; + fn stream( + &'a self, + stmt: Statement, + ) -> Pin> + 'a>>; async fn begin(&self) -> Result; @@ -35,7 +40,10 @@ pub trait ConnectionTrait<'a>: Sync { /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. async fn transaction(&self, callback: F) -> Result> where - F: for<'c> FnOnce(&'c DatabaseTransaction) -> Pin> + Send + 'c>> + Send, + F: for<'c> FnOnce( + &'c DatabaseTransaction, + ) -> Pin> + Send + 'c>> + + Send, T: Send, E: std::error::Error + Send; diff --git a/src/database/db_transaction.rs b/src/database/db_transaction.rs index b403971e..ae954097 100644 --- a/src/database/db_transaction.rs +++ b/src/database/db_transaction.rs @@ -1,10 +1,13 @@ -use std::{sync::Arc, future::Future, pin::Pin}; -use crate::{ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, QueryResult, Statement, TransactionStream, debug_print}; -use futures::lock::Mutex; +use crate::{ + debug_print, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, QueryResult, + Statement, TransactionStream, +}; #[cfg(feature = "sqlx-dep")] use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err}; +use futures::lock::Mutex; #[cfg(feature = "sqlx-dep")] use sqlx::{pool::PoolConnection, TransactionManager}; +use std::{future::Future, pin::Pin, sync::Arc}; // a Transaction is just a sugar for a connection where START TRANSACTION has been executed pub struct DatabaseTransaction { @@ -21,27 +24,50 @@ impl std::fmt::Debug for DatabaseTransaction { impl DatabaseTransaction { #[cfg(feature = "sqlx-mysql")] - pub(crate) async fn new_mysql(inner: PoolConnection) -> Result { - Self::build(Arc::new(Mutex::new(InnerConnection::MySql(inner))), DbBackend::MySql).await + pub(crate) async fn new_mysql( + inner: PoolConnection, + ) -> Result { + Self::build( + Arc::new(Mutex::new(InnerConnection::MySql(inner))), + DbBackend::MySql, + ) + .await } #[cfg(feature = "sqlx-postgres")] - pub(crate) async fn new_postgres(inner: PoolConnection) -> Result { - Self::build(Arc::new(Mutex::new(InnerConnection::Postgres(inner))), DbBackend::Postgres).await + pub(crate) async fn new_postgres( + inner: PoolConnection, + ) -> Result { + Self::build( + Arc::new(Mutex::new(InnerConnection::Postgres(inner))), + DbBackend::Postgres, + ) + .await } #[cfg(feature = "sqlx-sqlite")] - pub(crate) async fn new_sqlite(inner: PoolConnection) -> Result { - Self::build(Arc::new(Mutex::new(InnerConnection::Sqlite(inner))), DbBackend::Sqlite).await + pub(crate) async fn new_sqlite( + inner: PoolConnection, + ) -> Result { + Self::build( + Arc::new(Mutex::new(InnerConnection::Sqlite(inner))), + DbBackend::Sqlite, + ) + .await } #[cfg(feature = "mock")] - pub(crate) async fn new_mock(inner: Arc) -> Result { + pub(crate) async fn new_mock( + inner: Arc, + ) -> Result { let backend = inner.get_database_backend(); Self::build(Arc::new(Mutex::new(InnerConnection::Mock(inner))), backend).await } - async fn build(conn: Arc>, backend: DbBackend) -> Result { + async fn build( + conn: Arc>, + backend: DbBackend, + ) -> Result { let res = DatabaseTransaction { conn, backend, @@ -50,35 +76,49 @@ impl DatabaseTransaction { match *res.conn.lock().await { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(ref mut c) => { - ::TransactionManager::begin(c).await.map_err(sqlx_error_to_query_err)? - }, + ::TransactionManager::begin(c) + .await + .map_err(sqlx_error_to_query_err)? + } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(ref mut c) => { - ::TransactionManager::begin(c).await.map_err(sqlx_error_to_query_err)? - }, + ::TransactionManager::begin(c) + .await + .map_err(sqlx_error_to_query_err)? + } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(ref mut c) => { - ::TransactionManager::begin(c).await.map_err(sqlx_error_to_query_err)? - }, + ::TransactionManager::begin(c) + .await + .map_err(sqlx_error_to_query_err)? + } // should we do something for mocked connections? #[cfg(feature = "mock")] - InnerConnection::Mock(_) => {}, + InnerConnection::Mock(_) => {} } Ok(res) } pub(crate) async fn run(self, callback: F) -> Result> where - F: for<'b> FnOnce(&'b DatabaseTransaction) -> Pin> + Send + 'b>> + Send, + F: for<'b> FnOnce( + &'b DatabaseTransaction, + ) -> Pin> + Send + 'b>> + + Send, T: Send, E: std::error::Error + Send, { - let res = callback(&self).await.map_err(|e| TransactionError::Transaction(e)); + let res = callback(&self) + .await + .map_err(|e| TransactionError::Transaction(e)); if res.is_ok() { - self.commit().await.map_err(|e| TransactionError::Connection(e))?; - } - else { - self.rollback().await.map_err(|e| TransactionError::Connection(e))?; + self.commit() + .await + .map_err(|e| TransactionError::Connection(e))?; + } else { + self.rollback() + .await + .map_err(|e| TransactionError::Connection(e))?; } res } @@ -88,19 +128,25 @@ impl DatabaseTransaction { match *self.conn.lock().await { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(ref mut c) => { - ::TransactionManager::commit(c).await.map_err(sqlx_error_to_query_err)? - }, + ::TransactionManager::commit(c) + .await + .map_err(sqlx_error_to_query_err)? + } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(ref mut c) => { - ::TransactionManager::commit(c).await.map_err(sqlx_error_to_query_err)? - }, + ::TransactionManager::commit(c) + .await + .map_err(sqlx_error_to_query_err)? + } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(ref mut c) => { - ::TransactionManager::commit(c).await.map_err(sqlx_error_to_query_err)? - }, + ::TransactionManager::commit(c) + .await + .map_err(sqlx_error_to_query_err)? + } //Should we do something for mocked connections? #[cfg(feature = "mock")] - InnerConnection::Mock(_) => {}, + InnerConnection::Mock(_) => {} } Ok(()) } @@ -110,19 +156,25 @@ impl DatabaseTransaction { match *self.conn.lock().await { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(ref mut c) => { - ::TransactionManager::rollback(c).await.map_err(sqlx_error_to_query_err)? - }, + ::TransactionManager::rollback(c) + .await + .map_err(sqlx_error_to_query_err)? + } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(ref mut c) => { - ::TransactionManager::rollback(c).await.map_err(sqlx_error_to_query_err)? - }, + ::TransactionManager::rollback(c) + .await + .map_err(sqlx_error_to_query_err)? + } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(ref mut c) => { - ::TransactionManager::rollback(c).await.map_err(sqlx_error_to_query_err)? - }, + ::TransactionManager::rollback(c) + .await + .map_err(sqlx_error_to_query_err)? + } //Should we do something for mocked connections? #[cfg(feature = "mock")] - InnerConnection::Mock(_) => {}, + InnerConnection::Mock(_) => {} } Ok(()) } @@ -135,21 +187,20 @@ impl DatabaseTransaction { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(c) => { ::TransactionManager::start_rollback(c); - }, + } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(c) => { ::TransactionManager::start_rollback(c); - }, + } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(c) => { ::TransactionManager::start_rollback(c); - }, + } //Should we do something for mocked connections? #[cfg(feature = "mock")] - InnerConnection::Mock(_) => {}, + InnerConnection::Mock(_) => {} } - } - else { + } else { //this should never happen panic!("Dropping a locked Transaction"); } @@ -179,21 +230,18 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(conn) => { let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); - query.execute(conn).await - .map(Into::into) - }, + query.execute(conn).await.map(Into::into) + } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(conn) => { let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); - query.execute(conn).await - .map(Into::into) - }, + query.execute(conn).await.map(Into::into) + } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(conn) => { let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); - query.execute(conn).await - .map(Into::into) - }, + query.execute(conn).await.map(Into::into) + } #[cfg(feature = "mock")] InnerConnection::Mock(conn) => return conn.execute(stmt), }; @@ -208,29 +256,25 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(conn) => { let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); - query.fetch_one(conn).await - .map(|row| Some(row.into())) - }, + query.fetch_one(conn).await.map(|row| Some(row.into())) + } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(conn) => { let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); - query.fetch_one(conn).await - .map(|row| Some(row.into())) - }, + query.fetch_one(conn).await.map(|row| Some(row.into())) + } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(conn) => { - let query= crate::driver::sqlx_sqlite::sqlx_query(&stmt); - query.fetch_one(conn).await - .map(|row| Some(row.into())) - }, + let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); + query.fetch_one(conn).await.map(|row| Some(row.into())) + } #[cfg(feature = "mock")] InnerConnection::Mock(conn) => return conn.query_one(stmt), }; #[cfg(feature = "sqlx-dep")] if let Err(sqlx::Error::RowNotFound) = _res { Ok(None) - } - else { + } else { _res.map_err(sqlx_error_to_query_err) } } @@ -242,21 +286,27 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(conn) => { let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); - query.fetch_all(conn).await + query + .fetch_all(conn) + .await .map(|rows| rows.into_iter().map(|r| r.into()).collect()) - }, + } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(conn) => { let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); - query.fetch_all(conn).await + query + .fetch_all(conn) + .await .map(|rows| rows.into_iter().map(|r| r.into()).collect()) - }, + } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(conn) => { let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); - query.fetch_all(conn).await + query + .fetch_all(conn) + .await .map(|rows| rows.into_iter().map(|r| r.into()).collect()) - }, + } #[cfg(feature = "mock")] InnerConnection::Mock(conn) => return conn.query_all(stmt), }; @@ -264,10 +314,13 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { _res.map_err(sqlx_error_to_query_err) } - fn stream(&'a self, stmt: Statement) -> Pin> + 'a>> { - Box::pin(async move { - Ok(crate::TransactionStream::build(self.conn.lock().await, stmt).await) - }) + fn stream( + &'a self, + stmt: Statement, + ) -> Pin> + 'a>> { + Box::pin( + async move { Ok(crate::TransactionStream::build(self.conn.lock().await, stmt).await) }, + ) } async fn begin(&self) -> Result { @@ -278,24 +331,34 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. async fn transaction(&self, _callback: F) -> Result> where - F: for<'c> FnOnce(&'c DatabaseTransaction) -> Pin> + Send + 'c>> + Send, + F: for<'c> FnOnce( + &'c DatabaseTransaction, + ) -> Pin> + Send + 'c>> + + Send, T: Send, E: std::error::Error + Send, { - let transaction = self.begin().await.map_err(|e| TransactionError::Connection(e))?; + let transaction = self + .begin() + .await + .map_err(|e| TransactionError::Connection(e))?; transaction.run(_callback).await } } #[derive(Debug)] pub enum TransactionError -where E: std::error::Error { +where + E: std::error::Error, +{ Connection(DbErr), Transaction(E), } impl std::fmt::Display for TransactionError -where E: std::error::Error { +where + E: std::error::Error, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { TransactionError::Connection(e) => std::fmt::Display::fmt(e, f), @@ -304,5 +367,4 @@ where E: std::error::Error { } } -impl std::error::Error for TransactionError -where E: std::error::Error {} +impl std::error::Error for TransactionError where E: std::error::Error {} diff --git a/src/database/mock.rs b/src/database/mock.rs index d9e1f9d0..f42add7a 100644 --- a/src/database/mock.rs +++ b/src/database/mock.rs @@ -100,8 +100,7 @@ impl MockRow { where T: ValueType, { - T::try_from(self.values.get(col).unwrap().clone()) - .map_err(|e| DbErr::Query(e.to_string())) + T::try_from(self.values.get(col).unwrap().clone()).map_err(|e| DbErr::Query(e.to_string())) } pub fn into_column_value_tuples(self) -> impl Iterator { diff --git a/src/database/mod.rs b/src/database/mod.rs index ce4127e3..369ed539 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,20 +1,20 @@ mod connection; +mod db_connection; +mod db_transaction; #[cfg(feature = "mock")] mod mock; mod statement; -mod transaction; -mod db_connection; -mod db_transaction; mod stream; +mod transaction; pub use connection::*; +pub use db_connection::*; +pub use db_transaction::*; #[cfg(feature = "mock")] pub use mock::*; pub use statement::*; -pub use transaction::*; -pub use db_connection::*; -pub use db_transaction::*; pub use stream::*; +pub use transaction::*; use crate::DbErr; diff --git a/src/database/stream/query.rs b/src/database/stream/query.rs index 553d9f7b..73668da0 100644 --- a/src/database/stream/query.rs +++ b/src/database/stream/query.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, task::Poll, sync::Arc}; +use std::{pin::Pin, sync::Arc, task::Poll}; use futures::Stream; #[cfg(feature = "sqlx-dep")] @@ -57,52 +57,50 @@ impl QueryStream { QueryStreamBuilder { stmt, conn, - stream_builder: |conn, stmt| { - match conn { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(c) => { - let query = crate::driver::sqlx_mysql::sqlx_query(stmt); - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err) - ) - }, - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(c) => { - let query = crate::driver::sqlx_postgres::sqlx_query(stmt); - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err) - ) - }, - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(c) => { - let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err) - ) - }, - #[cfg(feature = "mock")] - InnerConnection::Mock(c) => { - c.fetch(stmt) - }, + stream_builder: |conn, stmt| match conn { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(c) => { + let query = crate::driver::sqlx_mysql::sqlx_query(stmt); + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(c) => { + let query = crate::driver::sqlx_postgres::sqlx_query(stmt); + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(c) => { + let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + } + #[cfg(feature = "mock")] + InnerConnection::Mock(c) => c.fetch(stmt), }, - }.build() + } + .build() } } impl Stream for QueryStream { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { let this = self.get_mut(); - this.with_stream_mut(|stream| { - stream.as_mut().poll_next(cx) - }) + this.with_stream_mut(|stream| stream.as_mut().poll_next(cx)) } } diff --git a/src/database/stream/transaction.rs b/src/database/stream/transaction.rs index d945f409..651f3d11 100644 --- a/src/database/stream/transaction.rs +++ b/src/database/stream/transaction.rs @@ -27,56 +27,65 @@ impl<'a> std::fmt::Debug for TransactionStream<'a> { } impl<'a> TransactionStream<'a> { - pub(crate) async fn build(conn: MutexGuard<'a, InnerConnection>, stmt: Statement) -> TransactionStream<'a> { + pub(crate) async fn build( + conn: MutexGuard<'a, InnerConnection>, + stmt: Statement, + ) -> TransactionStream<'a> { TransactionStreamAsyncBuilder { stmt, conn, - stream_builder: |conn, stmt| Box::pin(async move { - match conn.deref_mut() { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(c) => { - let query = crate::driver::sqlx_mysql::sqlx_query(stmt); - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err) - ) as Pin>>> - }, - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(c) => { - let query = crate::driver::sqlx_postgres::sqlx_query(stmt); - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err) - ) as Pin>>> - }, - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(c) => { - let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err) - ) as Pin>>> - }, - #[cfg(feature = "mock")] - InnerConnection::Mock(c) => { - c.fetch(stmt) - }, - } - }), - }.build().await + stream_builder: |conn, stmt| { + Box::pin(async move { + match conn.deref_mut() { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(c) => { + let query = crate::driver::sqlx_mysql::sqlx_query(stmt); + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + as Pin>>> + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(c) => { + let query = crate::driver::sqlx_postgres::sqlx_query(stmt); + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + as Pin>>> + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(c) => { + let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + as Pin>>> + } + #[cfg(feature = "mock")] + InnerConnection::Mock(c) => c.fetch(stmt), + } + }) + }, + } + .build() + .await } } impl<'a> Stream for TransactionStream<'a> { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { let this = self.get_mut(); - this.with_stream_mut(|stream| { - stream.as_mut().poll_next(cx) - }) + this.with_stream_mut(|stream| stream.as_mut().poll_next(cx)) } } diff --git a/src/docs.rs b/src/docs.rs index bab054ef..4d1226c3 100644 --- a/src/docs.rs +++ b/src/docs.rs @@ -163,4 +163,4 @@ //! }, //! ) //! } -//! ``` \ No newline at end of file +//! ``` diff --git a/src/driver/mock.rs b/src/driver/mock.rs index 388ac911..96317c6d 100644 --- a/src/driver/mock.rs +++ b/src/driver/mock.rs @@ -2,11 +2,15 @@ use crate::{ debug_print, error::*, DatabaseConnection, DbBackend, ExecResult, MockDatabase, QueryResult, Statement, Transaction, }; -use std::{fmt::Debug, pin::Pin, sync::{Arc, - atomic::{AtomicUsize, Ordering}, - Mutex, -}}; use futures::Stream; +use std::{ + fmt::Debug, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, +}; #[derive(Debug)] pub struct MockDatabaseConnector; @@ -49,9 +53,9 @@ impl MockDatabaseConnector { pub async fn connect(string: &str) -> Result { macro_rules! connect_mock_db { ( $syntax: expr ) => { - Ok(DatabaseConnection::MockDatabaseConnection( - Arc::new(MockDatabaseConnection::new(MockDatabase::new($syntax))), - )) + Ok(DatabaseConnection::MockDatabaseConnection(Arc::new( + MockDatabaseConnection::new(MockDatabase::new($syntax)), + ))) }; } @@ -105,7 +109,10 @@ impl MockDatabaseConnection { self.mocker.lock().unwrap().query(counter, statement) } - pub fn fetch(&self, statement: &Statement) -> Pin>>> { + pub fn fetch( + &self, + statement: &Statement, + ) -> Pin>>> { match self.query_all(statement.clone()) { Ok(v) => Box::pin(futures::stream::iter(v.into_iter().map(|r| Ok(r)))), Err(e) => Box::pin(futures::stream::iter(Some(Err(e)).into_iter())), diff --git a/src/driver/sqlx_mysql.rs b/src/driver/sqlx_mysql.rs index 75e6e5ff..6b6f9507 100644 --- a/src/driver/sqlx_mysql.rs +++ b/src/driver/sqlx_mysql.rs @@ -1,11 +1,17 @@ use std::{future::Future, pin::Pin}; -use sqlx::{MySql, MySqlPool, mysql::{MySqlArguments, MySqlQueryResult, MySqlRow}}; +use sqlx::{ + mysql::{MySqlArguments, MySqlQueryResult, MySqlRow}, + MySql, MySqlPool, +}; sea_query::sea_query_driver_mysql!(); use sea_query_driver_mysql::bind_query; -use crate::{DatabaseConnection, DatabaseTransaction, QueryStream, Statement, TransactionError, debug_print, error::*, executor::*}; +use crate::{ + debug_print, error::*, executor::*, DatabaseConnection, DatabaseTransaction, QueryStream, + Statement, TransactionError, +}; use super::sqlx_common::*; @@ -115,12 +121,17 @@ impl SqlxMySqlPoolConnection { pub async fn transaction(&self, callback: F) -> Result> where - F: for<'b> FnOnce(&'b DatabaseTransaction) -> Pin> + Send + 'b>> + Send, + F: for<'b> FnOnce( + &'b DatabaseTransaction, + ) -> Pin> + Send + 'b>> + + Send, T: Send, E: std::error::Error + Send, { if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_mysql(conn).await.map_err(|e| TransactionError::Connection(e))?; + let transaction = DatabaseTransaction::new_mysql(conn) + .await + .map_err(|e| TransactionError::Connection(e))?; transaction.run(callback).await } else { Err(TransactionError::Connection(DbErr::Query( diff --git a/src/driver/sqlx_postgres.rs b/src/driver/sqlx_postgres.rs index c9949375..13cb51cd 100644 --- a/src/driver/sqlx_postgres.rs +++ b/src/driver/sqlx_postgres.rs @@ -1,11 +1,17 @@ use std::{future::Future, pin::Pin}; -use sqlx::{PgPool, Postgres, postgres::{PgArguments, PgQueryResult, PgRow}}; +use sqlx::{ + postgres::{PgArguments, PgQueryResult, PgRow}, + PgPool, Postgres, +}; sea_query::sea_query_driver_postgres!(); use sea_query_driver_postgres::bind_query; -use crate::{DatabaseConnection, DatabaseTransaction, QueryStream, Statement, TransactionError, debug_print, error::*, executor::*}; +use crate::{ + debug_print, error::*, executor::*, DatabaseConnection, DatabaseTransaction, QueryStream, + Statement, TransactionError, +}; use super::sqlx_common::*; @@ -115,12 +121,17 @@ impl SqlxPostgresPoolConnection { pub async fn transaction(&self, callback: F) -> Result> where - F: for<'b> FnOnce(&'b DatabaseTransaction) -> Pin> + Send + 'b>> + Send, + F: for<'b> FnOnce( + &'b DatabaseTransaction, + ) -> Pin> + Send + 'b>> + + Send, T: Send, E: std::error::Error + Send, { if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_postgres(conn).await.map_err(|e| TransactionError::Connection(e))?; + let transaction = DatabaseTransaction::new_postgres(conn) + .await + .map_err(|e| TransactionError::Connection(e))?; transaction.run(callback).await } else { Err(TransactionError::Connection(DbErr::Query( diff --git a/src/driver/sqlx_sqlite.rs b/src/driver/sqlx_sqlite.rs index bf06a265..0f7548fc 100644 --- a/src/driver/sqlx_sqlite.rs +++ b/src/driver/sqlx_sqlite.rs @@ -1,11 +1,17 @@ use std::{future::Future, pin::Pin}; -use sqlx::{Sqlite, SqlitePool, sqlite::{SqliteArguments, SqliteQueryResult, SqliteRow}}; +use sqlx::{ + sqlite::{SqliteArguments, SqliteQueryResult, SqliteRow}, + Sqlite, SqlitePool, +}; sea_query::sea_query_driver_sqlite!(); use sea_query_driver_sqlite::bind_query; -use crate::{DatabaseConnection, DatabaseTransaction, QueryStream, Statement, TransactionError, debug_print, error::*, executor::*}; +use crate::{ + debug_print, error::*, executor::*, DatabaseConnection, DatabaseTransaction, QueryStream, + Statement, TransactionError, +}; use super::sqlx_common::*; @@ -115,12 +121,17 @@ impl SqlxSqlitePoolConnection { pub async fn transaction(&self, callback: F) -> Result> where - F: for<'b> FnOnce(&'b DatabaseTransaction) -> Pin> + Send + 'b>> + Send, + F: for<'b> FnOnce( + &'b DatabaseTransaction, + ) -> Pin> + Send + 'b>> + + Send, T: Send, E: std::error::Error + Send, { if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_sqlite(conn).await.map_err(|e| TransactionError::Connection(e))?; + let transaction = DatabaseTransaction::new_sqlite(conn) + .await + .map_err(|e| TransactionError::Connection(e))?; transaction.run(callback).await } else { Err(TransactionError::Connection(DbErr::Query( diff --git a/src/executor/delete.rs b/src/executor/delete.rs index 85b37cb0..2fa5f64a 100644 --- a/src/executor/delete.rs +++ b/src/executor/delete.rs @@ -1,4 +1,6 @@ -use crate::{ActiveModelTrait, ConnectionTrait, DeleteMany, DeleteOne, EntityTrait, Statement, error::*}; +use crate::{ + error::*, ActiveModelTrait, ConnectionTrait, DeleteMany, DeleteOne, EntityTrait, Statement, +}; use sea_query::DeleteStatement; use std::future::Future; @@ -16,11 +18,10 @@ impl<'a, A: 'a> DeleteOne where A: ActiveModelTrait, { - pub fn exec( - self, - db: &'a C, - ) -> impl Future> + 'a - where C: ConnectionTrait<'a> { + pub fn exec(self, db: &'a C) -> impl Future> + 'a + where + C: ConnectionTrait<'a>, + { // so that self is dropped before entering await exec_delete_only(self.query, db) } @@ -30,11 +31,10 @@ impl<'a, E> DeleteMany where E: EntityTrait, { - pub fn exec( - self, - db: &'a C, - ) -> impl Future> + 'a - where C: ConnectionTrait<'a> { + pub fn exec(self, db: &'a C) -> impl Future> + 'a + where + C: ConnectionTrait<'a>, + { // so that self is dropped before entering await exec_delete_only(self.query, db) } @@ -45,27 +45,27 @@ impl Deleter { Self { query } } - pub fn exec<'a, C>( - self, - db: &'a C, - ) -> impl Future> + '_ - where C: ConnectionTrait<'a> { + pub fn exec<'a, C>(self, db: &'a C) -> impl Future> + '_ + where + C: ConnectionTrait<'a>, + { let builder = db.get_database_backend(); exec_delete(builder.build(&self.query), db) } } -async fn exec_delete_only<'a, C>( - query: DeleteStatement, - db: &'a C, -) -> Result -where C: ConnectionTrait<'a> { +async fn exec_delete_only<'a, C>(query: DeleteStatement, db: &'a C) -> Result +where + C: ConnectionTrait<'a>, +{ Deleter::new(query).exec(db).await } // Only Statement impl Send async fn exec_delete<'a, C>(statement: Statement, db: &C) -> Result -where C: ConnectionTrait<'a> { +where + C: ConnectionTrait<'a>, +{ let result = db.execute(statement).await?; Ok(DeleteResult { rows_affected: result.rows_affected(), diff --git a/src/executor/insert.rs b/src/executor/insert.rs index 10b5d8d3..7d2e3b11 100644 --- a/src/executor/insert.rs +++ b/src/executor/insert.rs @@ -1,4 +1,7 @@ -use crate::{ActiveModelTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, PrimaryKeyTrait, Statement, TryFromU64, error::*}; +use crate::{ + error::*, ActiveModelTrait, ConnectionTrait, DbBackend, EntityTrait, Insert, PrimaryKeyTrait, + Statement, TryFromU64, +}; use sea_query::InsertStatement; use std::marker::PhantomData; @@ -24,10 +27,7 @@ where A: ActiveModelTrait, { #[allow(unused_mut)] - pub async fn exec<'a, C>( - self, - db: &'a C, - ) -> Result, DbErr> + pub async fn exec<'a, C>(self, db: &'a C) -> Result, DbErr> where C: ConnectionTrait<'a>, A: 'a, @@ -61,10 +61,7 @@ where } } - pub async fn exec<'a, C>( - self, - db: &'a C, - ) -> Result, DbErr> + pub async fn exec<'a, C>(self, db: &'a C) -> Result, DbErr> where C: ConnectionTrait<'a>, A: 'a, @@ -75,10 +72,7 @@ where } // Only Statement impl Send -async fn exec_insert<'a, A, C>( - statement: Statement, - db: &C, -) -> Result, DbErr> +async fn exec_insert<'a, A, C>(statement: Statement, db: &C) -> Result, DbErr> where C: ConnectionTrait<'a>, A: ActiveModelTrait, @@ -93,13 +87,13 @@ where .collect::>(); let res = db.query_one(statement).await?.unwrap(); res.try_get_many("", cols.as_ref()).unwrap_or_default() - }, + } _ => { let last_insert_id = db.execute(statement).await?.last_insert_id(); ValueTypeOf::::try_from_u64(last_insert_id) .ok() .unwrap_or_default() - }, + } }; Ok(InsertResult { last_insert_id }) } diff --git a/src/executor/paginator.rs b/src/executor/paginator.rs index 0d07591f..28f8574b 100644 --- a/src/executor/paginator.rs +++ b/src/executor/paginator.rs @@ -1,4 +1,4 @@ -use crate::{ConnectionTrait, DbBackend, SelectorTrait, error::*}; +use crate::{error::*, ConnectionTrait, DbBackend, SelectorTrait}; use async_stream::stream; use futures::Stream; use sea_query::{Alias, Expr, SelectStatement}; @@ -157,7 +157,7 @@ where #[cfg(feature = "mock")] mod tests { use crate::entity::prelude::*; - use crate::{ConnectionTrait, tests_cfg::*}; + use crate::{tests_cfg::*, ConnectionTrait}; use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction}; use futures::TryStreamExt; use sea_query::{Alias, Expr, SelectStatement, Value}; diff --git a/src/executor/select.rs b/src/executor/select.rs index dc8b7afb..f4ff69d4 100644 --- a/src/executor/select.rs +++ b/src/executor/select.rs @@ -1,10 +1,14 @@ -#[cfg(feature = "sqlx-dep")] -use std::pin::Pin; -use crate::{ConnectionTrait, EntityTrait, FromQueryResult, IdenStatic, Iterable, JsonValue, ModelTrait, Paginator, PrimaryKeyToColumn, QueryResult, Select, SelectA, SelectB, SelectTwo, SelectTwoMany, Statement, TryGetableMany, error::*}; +use crate::{ + error::*, ConnectionTrait, EntityTrait, FromQueryResult, IdenStatic, Iterable, JsonValue, + ModelTrait, Paginator, PrimaryKeyToColumn, QueryResult, Select, SelectA, SelectB, SelectTwo, + SelectTwoMany, Statement, TryGetableMany, +}; #[cfg(feature = "sqlx-dep")] use futures::{Stream, TryStreamExt}; use sea_query::SelectStatement; use std::marker::PhantomData; +#[cfg(feature = "sqlx-dep")] +use std::pin::Pin; #[derive(Clone, Debug)] pub struct Selector @@ -236,17 +240,24 @@ where } pub async fn one<'a, C>(self, db: &C) -> Result, DbErr> - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { self.into_model().one(db).await } pub async fn all<'a, C>(self, db: &C) -> Result, DbErr> - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { self.into_model().all(db).await } #[cfg(feature = "sqlx-dep")] - pub async fn stream<'a: 'b, 'b, C>(self, db: &'a C) -> Result> + 'b, DbErr> + pub async fn stream<'a: 'b, 'b, C>( + self, + db: &'a C, + ) -> Result> + 'b, DbErr> where C: ConnectionTrait<'a>, { @@ -258,12 +269,16 @@ where db: &'a C, page_size: usize, ) -> Paginator<'a, C, SelectModel> - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { self.into_model().paginate(db, page_size) } pub async fn count<'a, C>(self, db: &'a C) -> Result - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { self.paginate(db, 1).num_items().await } } @@ -292,24 +307,25 @@ where } } - pub async fn one<'a, C>( - self, - db: &C, - ) -> Result)>, DbErr> - where C: ConnectionTrait<'a> { + pub async fn one<'a, C>(self, db: &C) -> Result)>, DbErr> + where + C: ConnectionTrait<'a>, + { self.into_model().one(db).await } - pub async fn all<'a, C>( - self, - db: &C, - ) -> Result)>, DbErr> - where C: ConnectionTrait<'a> { + pub async fn all<'a, C>(self, db: &C) -> Result)>, DbErr> + where + C: ConnectionTrait<'a>, + { self.into_model().all(db).await } #[cfg(feature = "sqlx-dep")] - pub async fn stream<'a: 'b, 'b, C>(self, db: &'a C) -> Result), DbErr>> + 'b, DbErr> + pub async fn stream<'a: 'b, 'b, C>( + self, + db: &'a C, + ) -> Result), DbErr>> + 'b, DbErr> where C: ConnectionTrait<'a>, { @@ -321,12 +337,16 @@ where db: &'a C, page_size: usize, ) -> Paginator<'a, C, SelectTwoModel> - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { self.into_model().paginate(db, page_size) } pub async fn count<'a, C>(self, db: &'a C) -> Result - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { self.paginate(db, 1).num_items().await } } @@ -355,27 +375,28 @@ where } } - pub async fn one<'a, C>( - self, - db: &C, - ) -> Result)>, DbErr> - where C: ConnectionTrait<'a> { + pub async fn one<'a, C>(self, db: &C) -> Result)>, DbErr> + where + C: ConnectionTrait<'a>, + { self.into_model().one(db).await } #[cfg(feature = "sqlx-dep")] - pub async fn stream<'a: 'b, 'b, C>(self, db: &'a C) -> Result), DbErr>> + 'b, DbErr> + pub async fn stream<'a: 'b, 'b, C>( + self, + db: &'a C, + ) -> Result), DbErr>> + 'b, DbErr> where C: ConnectionTrait<'a>, { self.into_model().stream(db).await } - pub async fn all<'a, C>( - self, - db: &C, - ) -> Result)>, DbErr> - where C: ConnectionTrait<'a> { + pub async fn all<'a, C>(self, db: &C) -> Result)>, DbErr> + where + C: ConnectionTrait<'a>, + { let rows = self.into_model().all(db).await?; Ok(consolidate_query_result::(rows)) } @@ -411,7 +432,9 @@ where } pub async fn one<'a, C>(mut self, db: &C) -> Result, DbErr> - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { let builder = db.get_database_backend(); self.query.limit(1); let row = db.query_one(builder.build(&self.query)).await?; @@ -422,7 +445,9 @@ where } pub async fn all<'a, C>(self, db: &C) -> Result, DbErr> - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { let builder = db.get_database_backend(); let rows = db.query_all(builder.build(&self.query)).await?; let mut models = Vec::new(); @@ -433,7 +458,10 @@ where } #[cfg(feature = "sqlx-dep")] - pub async fn stream<'a: 'b, 'b, C>(self, db: &'a C) -> Result> + 'b>>, DbErr> + pub async fn stream<'a: 'b, 'b, C>( + self, + db: &'a C, + ) -> Result> + 'b>>, DbErr> where C: ConnectionTrait<'a>, S: 'b, @@ -446,7 +474,9 @@ where } pub fn paginate<'a, C>(self, db: &'a C, page_size: usize) -> Paginator<'a, C, S> - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { Paginator { query: self.query, page: 0, @@ -657,7 +687,9 @@ where /// ); /// ``` pub async fn one<'a, C>(self, db: &C) -> Result, DbErr> - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { let row = db.query_one(self.stmt).await?; match row { Some(row) => Ok(Some(S::from_raw_query_result(row)?)), @@ -697,7 +729,9 @@ where /// ); /// ``` pub async fn all<'a, C>(self, db: &C) -> Result, DbErr> - where C: ConnectionTrait<'a> { + where + C: ConnectionTrait<'a>, + { let rows = db.query_all(self.stmt).await?; let mut models = Vec::new(); for row in rows.into_iter() { diff --git a/src/executor/update.rs b/src/executor/update.rs index 06cd514e..9e36de57 100644 --- a/src/executor/update.rs +++ b/src/executor/update.rs @@ -1,4 +1,6 @@ -use crate::{ActiveModelTrait, ConnectionTrait, EntityTrait, Statement, UpdateMany, UpdateOne, error::*}; +use crate::{ + error::*, ActiveModelTrait, ConnectionTrait, EntityTrait, Statement, UpdateMany, UpdateOne, +}; use sea_query::UpdateStatement; use std::future::Future; @@ -17,7 +19,9 @@ where A: ActiveModelTrait, { pub async fn exec<'b, C>(self, db: &'b C) -> Result - where C: ConnectionTrait<'b> { + where + C: ConnectionTrait<'b>, + { // so that self is dropped before entering await exec_update_and_return_original(self.query, self.model, db).await } @@ -27,11 +31,10 @@ impl<'a, E> UpdateMany where E: EntityTrait, { - pub fn exec( - self, - db: &'a C, - ) -> impl Future> + 'a - where C: ConnectionTrait<'a> { + pub fn exec(self, db: &'a C) -> impl Future> + 'a + where + C: ConnectionTrait<'a>, + { // so that self is dropped before entering await exec_update_only(self.query, db) } @@ -42,21 +45,19 @@ impl Updater { Self { query } } - pub async fn exec<'a, C>( - self, - db: &'a C, - ) -> Result - where C: ConnectionTrait<'a> { + pub async fn exec<'a, C>(self, db: &'a C) -> Result + where + C: ConnectionTrait<'a>, + { let builder = db.get_database_backend(); exec_update(builder.build(&self.query), db).await } } -async fn exec_update_only<'a, C>( - query: UpdateStatement, - db: &'a C, -) -> Result -where C: ConnectionTrait<'a> { +async fn exec_update_only<'a, C>(query: UpdateStatement, db: &'a C) -> Result +where + C: ConnectionTrait<'a>, +{ Updater::new(query).exec(db).await } @@ -75,7 +76,9 @@ where // Only Statement impl Send async fn exec_update<'a, C>(statement: Statement, db: &'a C) -> Result -where C: ConnectionTrait<'a> { +where + C: ConnectionTrait<'a>, +{ let result = db.execute(statement).await?; Ok(UpdateResult { rows_affected: result.rows_affected(), diff --git a/src/lib.rs b/src/lib.rs index 910044a5..6ddc442c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -265,6 +265,7 @@ )] mod database; +mod docs; mod driver; pub mod entity; pub mod error; @@ -273,7 +274,6 @@ pub mod query; pub mod schema; #[doc(hidden)] pub mod tests_cfg; -mod docs; mod util; pub use database::*; diff --git a/src/query/mod.rs b/src/query/mod.rs index c7f60049..9a2d0aba 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -20,4 +20,4 @@ pub use select::*; pub use traits::*; pub use update::*; -pub use crate::{InsertResult, Statement, UpdateResult, Value, Values, ConnectionTrait}; +pub use crate::{ConnectionTrait, InsertResult, Statement, UpdateResult, Value, Values}; diff --git a/tests/common/setup/mod.rs b/tests/common/setup/mod.rs index 9deb903f..b9195edb 100644 --- a/tests/common/setup/mod.rs +++ b/tests/common/setup/mod.rs @@ -1,4 +1,4 @@ -use sea_orm::{Database, DatabaseBackend, DatabaseConnection, ConnectionTrait, Statement}; +use sea_orm::{ConnectionTrait, Database, DatabaseBackend, DatabaseConnection, Statement}; pub mod schema; pub use schema::*; diff --git a/tests/common/setup/schema.rs b/tests/common/setup/schema.rs index ac34304a..77d85308 100644 --- a/tests/common/setup/schema.rs +++ b/tests/common/setup/schema.rs @@ -1,6 +1,8 @@ pub use super::super::bakery_chain::*; use pretty_assertions::assert_eq; -use sea_orm::{error::*, sea_query, ConnectionTrait, DbBackend, DbConn, EntityTrait, ExecResult, Schema}; +use sea_orm::{ + error::*, sea_query, ConnectionTrait, DbBackend, DbConn, EntityTrait, ExecResult, Schema, +}; use sea_query::{ Alias, ColumnDef, ForeignKey, ForeignKeyAction, Index, Table, TableCreateStatement, }; diff --git a/tests/query_tests.rs b/tests/query_tests.rs index e688b14f..4b117dc6 100644 --- a/tests/query_tests.rs +++ b/tests/query_tests.rs @@ -2,7 +2,7 @@ pub mod common; pub use common::{bakery_chain::*, setup::*, TestContext}; pub use sea_orm::entity::*; -pub use sea_orm::{QueryFilter, ConnectionTrait}; +pub use sea_orm::{ConnectionTrait, QueryFilter}; // Run the test locally: // DATABASE_URL="mysql://root:@localhost" cargo test --features sqlx-mysql,runtime-async-std --test query_tests diff --git a/tests/stream_tests.rs b/tests/stream_tests.rs index 969b93e1..d30063e5 100644 --- a/tests/stream_tests.rs +++ b/tests/stream_tests.rs @@ -1,9 +1,9 @@ pub mod common; pub use common::{bakery_chain::*, setup::*, TestContext}; -pub use sea_orm::entity::*; -pub use sea_orm::{QueryFilter, ConnectionTrait, DbErr}; use futures::StreamExt; +pub use sea_orm::entity::*; +pub use sea_orm::{ConnectionTrait, DbErr, QueryFilter}; #[sea_orm_macros::test] #[cfg(any( diff --git a/tests/transaction_tests.rs b/tests/transaction_tests.rs index 539eaefc..61d194c4 100644 --- a/tests/transaction_tests.rs +++ b/tests/transaction_tests.rs @@ -1,9 +1,9 @@ pub mod common; pub use common::{bakery_chain::*, setup::*, TestContext}; -use sea_orm::{DatabaseTransaction, DbErr}; pub use sea_orm::entity::*; -pub use sea_orm::{QueryFilter, ConnectionTrait}; +pub use sea_orm::{ConnectionTrait, QueryFilter}; +use sea_orm::{DatabaseTransaction, DbErr}; #[sea_orm_macros::test] #[cfg(any( @@ -14,32 +14,37 @@ pub use sea_orm::{QueryFilter, ConnectionTrait}; pub async fn transaction() { let ctx = TestContext::new("transaction_test").await; - ctx.db.transaction::<_, _, DbErr>(|txn| Box::pin(async move { - let _ = bakery::ActiveModel { - name: Set("SeaSide Bakery".to_owned()), - profit_margin: Set(10.4), - ..Default::default() - } - .save(txn) - .await?; + ctx.db + .transaction::<_, _, DbErr>(|txn| { + Box::pin(async move { + let _ = bakery::ActiveModel { + name: Set("SeaSide Bakery".to_owned()), + profit_margin: Set(10.4), + ..Default::default() + } + .save(txn) + .await?; - let _ = bakery::ActiveModel { - name: Set("Top Bakery".to_owned()), - profit_margin: Set(15.0), - ..Default::default() - } - .save(txn) - .await?; + let _ = bakery::ActiveModel { + name: Set("Top Bakery".to_owned()), + profit_margin: Set(15.0), + ..Default::default() + } + .save(txn) + .await?; - let bakeries = Bakery::find() - .filter(bakery::Column::Name.contains("Bakery")) - .all(txn) - .await?; + let bakeries = Bakery::find() + .filter(bakery::Column::Name.contains("Bakery")) + .all(txn) + .await?; - assert_eq!(bakeries.len(), 2); + assert_eq!(bakeries.len(), 2); - Ok(()) - })).await.unwrap(); + Ok(()) + }) + }) + .await + .unwrap(); ctx.delete().await; } @@ -55,28 +60,36 @@ pub async fn transaction_with_reference() { let name1 = "SeaSide Bakery"; let name2 = "Top Bakery"; let search_name = "Bakery"; - ctx.db.transaction(|txn| _transaction_with_reference(txn, name1, name2, search_name)).await.unwrap(); + ctx.db + .transaction(|txn| _transaction_with_reference(txn, name1, name2, search_name)) + .await + .unwrap(); ctx.delete().await; } -fn _transaction_with_reference<'a>(txn: &'a DatabaseTransaction, name1: &'a str, name2: &'a str, search_name: &'a str) -> std::pin::Pin> + Send + 'a>> { +fn _transaction_with_reference<'a>( + txn: &'a DatabaseTransaction, + name1: &'a str, + name2: &'a str, + search_name: &'a str, +) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { let _ = bakery::ActiveModel { name: Set(name1.to_owned()), profit_margin: Set(10.4), ..Default::default() } - .save(txn) - .await?; + .save(txn) + .await?; let _ = bakery::ActiveModel { name: Set(name2.to_owned()), profit_margin: Set(15.0), ..Default::default() } - .save(txn) - .await?; + .save(txn) + .await?; let bakeries = Bakery::find() .filter(bakery::Column::Name.contains(search_name))