cargo fmt

This commit is contained in:
Billy Chan 2021-12-17 11:14:54 +08:00
parent 67a601ecca
commit 4072e74284
No known key found for this signature in database
GPG Key ID: A2D690CAC7DF3CC7
12 changed files with 195 additions and 113 deletions

View File

@ -113,11 +113,7 @@ impl EntityWriter {
pub fn write_mod(&self) -> OutputFile { pub fn write_mod(&self) -> OutputFile {
let mut lines = Vec::new(); let mut lines = Vec::new();
Self::write_doc_comment(&mut lines); Self::write_doc_comment(&mut lines);
let code_blocks: Vec<TokenStream> = self let code_blocks: Vec<TokenStream> = self.entities.iter().map(Self::gen_mod).collect();
.entities
.iter()
.map(Self::gen_mod)
.collect();
Self::write( Self::write(
&mut lines, &mut lines,
vec![quote! { vec![quote! {
@ -143,11 +139,7 @@ impl EntityWriter {
pub fn write_prelude(&self) -> OutputFile { pub fn write_prelude(&self) -> OutputFile {
let mut lines = Vec::new(); let mut lines = Vec::new();
Self::write_doc_comment(&mut lines); Self::write_doc_comment(&mut lines);
let code_blocks = self let code_blocks = self.entities.iter().map(Self::gen_prelude_use).collect();
.entities
.iter()
.map(Self::gen_prelude_use)
.collect();
Self::write(&mut lines, code_blocks); Self::write(&mut lines, code_blocks);
OutputFile { OutputFile {
name: "prelude.rs".to_owned(), name: "prelude.rs".to_owned(),

View File

@ -3,8 +3,8 @@ use crate::{
StatementBuilder, TransactionError, StatementBuilder, TransactionError,
}; };
use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder}; use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder};
use tracing::instrument;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use tracing::instrument;
use url::Url; use url::Url;
#[cfg(feature = "sqlx-dep")] #[cfg(feature = "sqlx-dep")]
@ -255,10 +255,14 @@ impl DatabaseConnection {
#[cfg(feature = "sqlx-mysql")] #[cfg(feature = "sqlx-mysql")]
DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.set_metric_callback(callback), DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.set_metric_callback(callback),
#[cfg(feature = "sqlx-postgres")] #[cfg(feature = "sqlx-postgres")]
DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.set_metric_callback(callback), DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
conn.set_metric_callback(callback)
}
#[cfg(feature = "sqlx-sqlite")] #[cfg(feature = "sqlx-sqlite")]
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.set_metric_callback(callback), DatabaseConnection::SqlxSqlitePoolConnection(conn) => {
_ => {}, conn.set_metric_callback(callback)
}
_ => {}
} }
} }
} }

View File

@ -4,8 +4,8 @@ use crate::{
Statement, Statement,
}; };
use sea_query::{Value, ValueType, Values}; use sea_query::{Value, ValueType, Values};
use tracing::instrument;
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use tracing::instrument;
/// Defines a Mock database suitable for testing /// Defines a Mock database suitable for testing
#[derive(Debug)] #[derive(Debug)]

View File

@ -28,29 +28,77 @@ pub struct QueryStream {
} }
#[cfg(feature = "sqlx-mysql")] #[cfg(feature = "sqlx-mysql")]
impl From<(PoolConnection<sqlx::MySql>, Statement, Option<crate::metric::Callback>)> for QueryStream { impl
fn from((conn, stmt, metric_callback): (PoolConnection<sqlx::MySql>, Statement, Option<crate::metric::Callback>)) -> Self { From<(
PoolConnection<sqlx::MySql>,
Statement,
Option<crate::metric::Callback>,
)> for QueryStream
{
fn from(
(conn, stmt, metric_callback): (
PoolConnection<sqlx::MySql>,
Statement,
Option<crate::metric::Callback>,
),
) -> Self {
QueryStream::build(stmt, InnerConnection::MySql(conn), metric_callback) QueryStream::build(stmt, InnerConnection::MySql(conn), metric_callback)
} }
} }
#[cfg(feature = "sqlx-postgres")] #[cfg(feature = "sqlx-postgres")]
impl From<(PoolConnection<sqlx::Postgres>, Statement, Option<crate::metric::Callback>)> for QueryStream { impl
fn from((conn, stmt, metric_callback): (PoolConnection<sqlx::Postgres>, Statement, Option<crate::metric::Callback>)) -> Self { From<(
PoolConnection<sqlx::Postgres>,
Statement,
Option<crate::metric::Callback>,
)> for QueryStream
{
fn from(
(conn, stmt, metric_callback): (
PoolConnection<sqlx::Postgres>,
Statement,
Option<crate::metric::Callback>,
),
) -> Self {
QueryStream::build(stmt, InnerConnection::Postgres(conn), metric_callback) QueryStream::build(stmt, InnerConnection::Postgres(conn), metric_callback)
} }
} }
#[cfg(feature = "sqlx-sqlite")] #[cfg(feature = "sqlx-sqlite")]
impl From<(PoolConnection<sqlx::Sqlite>, Statement, Option<crate::metric::Callback>)> for QueryStream { impl
fn from((conn, stmt, metric_callback): (PoolConnection<sqlx::Sqlite>, Statement, Option<crate::metric::Callback>)) -> Self { From<(
PoolConnection<sqlx::Sqlite>,
Statement,
Option<crate::metric::Callback>,
)> for QueryStream
{
fn from(
(conn, stmt, metric_callback): (
PoolConnection<sqlx::Sqlite>,
Statement,
Option<crate::metric::Callback>,
),
) -> Self {
QueryStream::build(stmt, InnerConnection::Sqlite(conn), metric_callback) QueryStream::build(stmt, InnerConnection::Sqlite(conn), metric_callback)
} }
} }
#[cfg(feature = "mock")] #[cfg(feature = "mock")]
impl From<(Arc<crate::MockDatabaseConnection>, Statement, Option<crate::metric::Callback>)> for QueryStream { impl
fn from((conn, stmt, metric_callback): (Arc<crate::MockDatabaseConnection>, Statement, Option<crate::metric::Callback>)) -> Self { From<(
Arc<crate::MockDatabaseConnection>,
Statement,
Option<crate::metric::Callback>,
)> for QueryStream
{
fn from(
(conn, stmt, metric_callback): (
Arc<crate::MockDatabaseConnection>,
Statement,
Option<crate::metric::Callback>,
),
) -> Self {
QueryStream::build(stmt, InnerConnection::Mock(conn), metric_callback) QueryStream::build(stmt, InnerConnection::Mock(conn), metric_callback)
} }
} }
@ -63,50 +111,52 @@ impl std::fmt::Debug for QueryStream {
impl QueryStream { impl QueryStream {
#[instrument(level = "trace", skip(metric_callback))] #[instrument(level = "trace", skip(metric_callback))]
fn build(stmt: Statement, conn: InnerConnection, metric_callback: Option<crate::metric::Callback>) -> QueryStream { fn build(
stmt: Statement,
conn: InnerConnection,
metric_callback: Option<crate::metric::Callback>,
) -> QueryStream {
QueryStreamBuilder { QueryStreamBuilder {
stmt, stmt,
conn, conn,
metric_callback, metric_callback,
stream_builder: |conn, stmt, metric_callback| { stream_builder: |conn, stmt, metric_callback| match conn {
match conn { #[cfg(feature = "sqlx-mysql")]
#[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(c) => {
InnerConnection::MySql(c) => { let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
let query = crate::driver::sqlx_mysql::sqlx_query(stmt); crate::metric::metric_ok!(metric_callback, stmt, {
crate::metric::metric_ok!(metric_callback, stmt, { Box::pin(
Box::pin( c.fetch(query)
c.fetch(query) .map_ok(Into::into)
.map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err),
.map_err(crate::sqlx_error_to_query_err), )
) })
})
}
#[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(c) => {
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
crate::metric::metric_ok!(metric_callback, stmt, {
Box::pin(
c.fetch(query)
.map_ok(Into::into)
.map_err(crate::sqlx_error_to_query_err),
)
})
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(c) => {
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
crate::metric::metric_ok!(metric_callback, stmt, {
Box::pin(
c.fetch(query)
.map_ok(Into::into)
.map_err(crate::sqlx_error_to_query_err),
)
})
}
#[cfg(feature = "mock")]
InnerConnection::Mock(c) => c.fetch(stmt),
} }
} #[cfg(feature = "sqlx-postgres")]
InnerConnection::Postgres(c) => {
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
crate::metric::metric_ok!(metric_callback, stmt, {
Box::pin(
c.fetch(query)
.map_ok(Into::into)
.map_err(crate::sqlx_error_to_query_err),
)
})
}
#[cfg(feature = "sqlx-sqlite")]
InnerConnection::Sqlite(c) => {
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
crate::metric::metric_ok!(metric_callback, stmt, {
Box::pin(
c.fetch(query)
.map_ok(Into::into)
.map_err(crate::sqlx_error_to_query_err),
)
})
}
#[cfg(feature = "mock")]
InnerConnection::Mock(c) => c.fetch(stmt),
},
} }
.build() .build()
} }

View File

@ -55,7 +55,8 @@ impl<'a> TransactionStream<'a> {
c.fetch(query) c.fetch(query)
.map_ok(Into::into) .map_ok(Into::into)
.map_err(crate::sqlx_error_to_query_err), .map_err(crate::sqlx_error_to_query_err),
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>> )
as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
}) })
} }
#[cfg(feature = "sqlx-postgres")] #[cfg(feature = "sqlx-postgres")]
@ -66,7 +67,8 @@ impl<'a> TransactionStream<'a> {
c.fetch(query) c.fetch(query)
.map_ok(Into::into) .map_ok(Into::into)
.map_err(crate::sqlx_error_to_query_err), .map_err(crate::sqlx_error_to_query_err),
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>> )
as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
}) })
} }
#[cfg(feature = "sqlx-sqlite")] #[cfg(feature = "sqlx-sqlite")]
@ -77,7 +79,8 @@ impl<'a> TransactionStream<'a> {
c.fetch(query) c.fetch(query)
.map_ok(Into::into) .map_ok(Into::into)
.map_err(crate::sqlx_error_to_query_err), .map_err(crate::sqlx_error_to_query_err),
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>> )
as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
}) })
} }
#[cfg(feature = "mock")] #[cfg(feature = "mock")]

View File

@ -7,8 +7,8 @@ use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
use futures::lock::Mutex; use futures::lock::Mutex;
#[cfg(feature = "sqlx-dep")] #[cfg(feature = "sqlx-dep")]
use sqlx::{pool::PoolConnection, TransactionManager}; use sqlx::{pool::PoolConnection, TransactionManager};
use tracing::instrument;
use std::{future::Future, pin::Pin, sync::Arc}; use std::{future::Future, pin::Pin, sync::Arc};
use tracing::instrument;
// a Transaction is just a sugar for a connection where START TRANSACTION has been executed // a Transaction is just a sugar for a connection where START TRANSACTION has been executed
/// Defines a database transaction, whether it is an open transaction and the type of /// Defines a database transaction, whether it is an open transaction and the type of
@ -76,7 +76,8 @@ impl DatabaseTransaction {
Arc::new(Mutex::new(InnerConnection::Mock(inner))), Arc::new(Mutex::new(InnerConnection::Mock(inner))),
backend, backend,
metric_callback, metric_callback,
).await )
.await
} }
#[instrument(level = "trace", skip(metric_callback))] #[instrument(level = "trace", skip(metric_callback))]
@ -354,9 +355,14 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
&'a self, &'a self,
stmt: Statement, stmt: Statement,
) -> Pin<Box<dyn Future<Output = Result<Self::Stream, DbErr>> + 'a>> { ) -> Pin<Box<dyn Future<Output = Result<Self::Stream, DbErr>> + 'a>> {
Box::pin( Box::pin(async move {
async move { Ok(crate::TransactionStream::build(self.conn.lock().await, stmt, self.metric_callback.clone()).await) }, Ok(crate::TransactionStream::build(
) self.conn.lock().await,
stmt,
self.metric_callback.clone(),
)
.await)
})
} }
#[instrument(level = "trace")] #[instrument(level = "trace")]
@ -364,8 +370,9 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
DatabaseTransaction::begin( DatabaseTransaction::begin(
Arc::clone(&self.conn), Arc::clone(&self.conn),
self.backend, self.backend,
self.metric_callback.clone() self.metric_callback.clone(),
).await )
.await
} }
/// Execute the function inside a transaction. /// Execute the function inside a transaction.

View File

@ -3,7 +3,6 @@ use crate::{
Statement, Transaction, Statement, Transaction,
}; };
use futures::Stream; use futures::Stream;
use tracing::instrument;
use std::{ use std::{
fmt::Debug, fmt::Debug,
pin::Pin, pin::Pin,
@ -12,6 +11,7 @@ use std::{
Arc, Mutex, Arc, Mutex,
}, },
}; };
use tracing::instrument;
/// Defines a database driver for the [MockDatabase] /// Defines a database driver for the [MockDatabase]
#[derive(Debug)] #[derive(Debug)]

View File

@ -52,7 +52,10 @@ impl SqlxMySqlConnector {
} }
match options.pool_options().connect_with(opt).await { match options.pool_options().connect_with(opt).await {
Ok(pool) => Ok(DatabaseConnection::SqlxMySqlPoolConnection( Ok(pool) => Ok(DatabaseConnection::SqlxMySqlPoolConnection(
SqlxMySqlPoolConnection { pool, metric_callback: None }, SqlxMySqlPoolConnection {
pool,
metric_callback: None,
},
)), )),
Err(e) => Err(sqlx_error_to_conn_err(e)), Err(e) => Err(sqlx_error_to_conn_err(e)),
} }
@ -62,7 +65,10 @@ impl SqlxMySqlConnector {
impl SqlxMySqlConnector { impl SqlxMySqlConnector {
/// Instantiate a sqlx pool connection to a [DatabaseConnection] /// Instantiate a sqlx pool connection to a [DatabaseConnection]
pub fn from_sqlx_mysql_pool(pool: MySqlPool) -> DatabaseConnection { pub fn from_sqlx_mysql_pool(pool: MySqlPool) -> DatabaseConnection {
DatabaseConnection::SqlxMySqlPoolConnection(SqlxMySqlPoolConnection { pool, metric_callback: None }) DatabaseConnection::SqlxMySqlPoolConnection(SqlxMySqlPoolConnection {
pool,
metric_callback: None,
})
} }
} }
@ -136,7 +142,11 @@ impl SqlxMySqlPoolConnection {
debug_print!("{}", stmt); debug_print!("{}", stmt);
if let Ok(conn) = self.pool.acquire().await { if let Ok(conn) = self.pool.acquire().await {
Ok(QueryStream::from((conn, stmt, self.metric_callback.clone()))) Ok(QueryStream::from((
conn,
stmt,
self.metric_callback.clone(),
)))
} else { } else {
Err(DbErr::Query( Err(DbErr::Query(
"Failed to acquire connection from pool.".to_owned(), "Failed to acquire connection from pool.".to_owned(),

View File

@ -52,7 +52,10 @@ impl SqlxPostgresConnector {
} }
match options.pool_options().connect_with(opt).await { match options.pool_options().connect_with(opt).await {
Ok(pool) => Ok(DatabaseConnection::SqlxPostgresPoolConnection( Ok(pool) => Ok(DatabaseConnection::SqlxPostgresPoolConnection(
SqlxPostgresPoolConnection { pool, metric_callback: None }, SqlxPostgresPoolConnection {
pool,
metric_callback: None,
},
)), )),
Err(e) => Err(sqlx_error_to_conn_err(e)), Err(e) => Err(sqlx_error_to_conn_err(e)),
} }
@ -62,7 +65,10 @@ impl SqlxPostgresConnector {
impl SqlxPostgresConnector { impl SqlxPostgresConnector {
/// Instantiate a sqlx pool connection to a [DatabaseConnection] /// Instantiate a sqlx pool connection to a [DatabaseConnection]
pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection { pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection {
DatabaseConnection::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection { pool, metric_callback: None }) DatabaseConnection::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection {
pool,
metric_callback: None,
})
} }
} }
@ -136,7 +142,11 @@ impl SqlxPostgresPoolConnection {
debug_print!("{}", stmt); debug_print!("{}", stmt);
if let Ok(conn) = self.pool.acquire().await { if let Ok(conn) = self.pool.acquire().await {
Ok(QueryStream::from((conn, stmt, self.metric_callback.clone()))) Ok(QueryStream::from((
conn,
stmt,
self.metric_callback.clone(),
)))
} else { } else {
Err(DbErr::Query( Err(DbErr::Query(
"Failed to acquire connection from pool.".to_owned(), "Failed to acquire connection from pool.".to_owned(),

View File

@ -56,7 +56,10 @@ impl SqlxSqliteConnector {
} }
match options.pool_options().connect_with(opt).await { match options.pool_options().connect_with(opt).await {
Ok(pool) => Ok(DatabaseConnection::SqlxSqlitePoolConnection( Ok(pool) => Ok(DatabaseConnection::SqlxSqlitePoolConnection(
SqlxSqlitePoolConnection { pool, metric_callback: None }, SqlxSqlitePoolConnection {
pool,
metric_callback: None,
},
)), )),
Err(e) => Err(sqlx_error_to_conn_err(e)), Err(e) => Err(sqlx_error_to_conn_err(e)),
} }
@ -66,7 +69,10 @@ impl SqlxSqliteConnector {
impl SqlxSqliteConnector { impl SqlxSqliteConnector {
/// Instantiate a sqlx pool connection to a [DatabaseConnection] /// Instantiate a sqlx pool connection to a [DatabaseConnection]
pub fn from_sqlx_sqlite_pool(pool: SqlitePool) -> DatabaseConnection { pub fn from_sqlx_sqlite_pool(pool: SqlitePool) -> DatabaseConnection {
DatabaseConnection::SqlxSqlitePoolConnection(SqlxSqlitePoolConnection { pool, metric_callback: None }) DatabaseConnection::SqlxSqlitePoolConnection(SqlxSqlitePoolConnection {
pool,
metric_callback: None,
})
} }
} }
@ -140,7 +146,11 @@ impl SqlxSqlitePoolConnection {
debug_print!("{}", stmt); debug_print!("{}", stmt);
if let Ok(conn) = self.pool.acquire().await { if let Ok(conn) = self.pool.acquire().await {
Ok(QueryStream::from((conn, stmt, self.metric_callback.clone()))) Ok(QueryStream::from((
conn,
stmt,
self.metric_callback.clone(),
)))
} else { } else {
Err(DbErr::Query( Err(DbErr::Query(
"Failed to acquire connection from pool.".to_owned(), "Failed to acquire connection from pool.".to_owned(),

View File

@ -276,6 +276,8 @@ pub mod entity;
pub mod error; pub mod error;
/// This module performs execution of queries on a Model or ActiveModel /// This module performs execution of queries on a Model or ActiveModel
mod executor; mod executor;
/// Holds types and methods to perform metric collection
pub mod metric;
/// Holds types and methods to perform queries /// Holds types and methods to perform queries
pub mod query; pub mod query;
/// Holds types that defines the schemas of an Entity /// Holds types that defines the schemas of an Entity
@ -284,8 +286,6 @@ pub mod schema;
#[cfg(feature = "macros")] #[cfg(feature = "macros")]
pub mod tests_cfg; pub mod tests_cfg;
mod util; mod util;
/// Holds types and methods to perform metric collection
pub mod metric;
pub use database::*; pub use database::*;
pub use driver::*; pub use driver::*;

View File

@ -1,4 +1,4 @@
use std::{time::Duration, sync::Arc}; use std::{sync::Arc, time::Duration};
pub(crate) type Callback = Arc<dyn Fn(&Info<'_>) + Send + Sync>; pub(crate) type Callback = Arc<dyn Fn(&Info<'_>) + Send + Sync>;
@ -17,39 +17,35 @@ pub struct Info<'a> {
mod inner { mod inner {
macro_rules! metric { macro_rules! metric {
($metric_callback:expr, $stmt:expr, $code:block) => { ($metric_callback:expr, $stmt:expr, $code:block) => {{
{ let _start = std::time::SystemTime::now();
let _start = std::time::SystemTime::now(); let res = $code;
let res = $code; if let Some(callback) = $metric_callback.as_deref() {
if let Some(callback) = $metric_callback.as_deref() { let info = crate::metric::Info {
let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(),
elapsed: _start.elapsed().unwrap_or_default(), statement: $stmt,
statement: $stmt, failed: res.is_err(),
failed: res.is_err(), };
}; callback(&info);
callback(&info);
}
res
} }
}; res
}};
} }
pub(crate) use metric; pub(crate) use metric;
macro_rules! metric_ok { macro_rules! metric_ok {
($metric_callback:expr, $stmt:expr, $code:block) => { ($metric_callback:expr, $stmt:expr, $code:block) => {{
{ let _start = std::time::SystemTime::now();
let _start = std::time::SystemTime::now(); let res = $code;
let res = $code; if let Some(callback) = $metric_callback.as_deref() {
if let Some(callback) = $metric_callback.as_deref() { let info = crate::metric::Info {
let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(),
elapsed: _start.elapsed().unwrap_or_default(), statement: $stmt,
statement: $stmt, failed: false,
failed: false, };
}; callback(&info);
callback(&info);
}
res
} }
}; res
}};
} }
pub(crate) use metric_ok; pub(crate) use metric_ok;
} }