use macro to simplify code, add failure boolean
This commit is contained in:
parent
aaf11dd265
commit
9a342546f3
@ -73,56 +73,35 @@ impl QueryStream {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
InnerConnection::MySql(c) => {
|
||||
let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
);
|
||||
if let Some(callback) = metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
InnerConnection::Postgres(c) => {
|
||||
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
);
|
||||
if let Some(callback) = metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
InnerConnection::Sqlite(c) => {
|
||||
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
);
|
||||
if let Some(callback) = metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
InnerConnection::Mock(c) => c.fetch(stmt),
|
||||
|
@ -50,56 +50,35 @@ impl<'a> TransactionStream<'a> {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
InnerConnection::MySql(c) => {
|
||||
let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>;
|
||||
if let Some(callback) = metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
InnerConnection::Postgres(c) => {
|
||||
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>;
|
||||
if let Some(callback) = metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
InnerConnection::Sqlite(c) => {
|
||||
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>;
|
||||
if let Some(callback) = metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric_ok!(metric_callback, stmt, {
|
||||
Box::pin(
|
||||
c.fetch(query)
|
||||
.map_ok(Into::into)
|
||||
.map_err(crate::sqlx_error_to_query_err),
|
||||
) as Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>>>>
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
InnerConnection::Mock(c) => c.fetch(stmt),
|
||||
|
@ -257,44 +257,23 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
|
||||
#[cfg(feature = "sqlx-mysql")]
|
||||
InnerConnection::MySql(conn) => {
|
||||
let query = crate::driver::sqlx_mysql::sqlx_query(&stmt);
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = query.execute(conn).await.map(Into::into);
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
query.execute(conn).await.map(Into::into)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-postgres")]
|
||||
InnerConnection::Postgres(conn) => {
|
||||
let query = crate::driver::sqlx_postgres::sqlx_query(&stmt);
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = query.execute(conn).await.map(Into::into);
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
query.execute(conn).await.map(Into::into)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "sqlx-sqlite")]
|
||||
InnerConnection::Sqlite(conn) => {
|
||||
let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt);
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = query.execute(conn).await.map(Into::into);
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
query.execute(conn).await.map(Into::into)
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "mock")]
|
||||
InnerConnection::Mock(conn) => return conn.execute(stmt),
|
||||
|
@ -74,19 +74,12 @@ impl SqlxMySqlPoolConnection {
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
};
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Exec(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -101,22 +94,15 @@ impl SqlxMySqlPoolConnection {
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
};
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -131,19 +117,12 @@ impl SqlxMySqlPoolConnection {
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
};
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
|
@ -74,19 +74,12 @@ impl SqlxPostgresPoolConnection {
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
};
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Exec(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -101,22 +94,15 @@ impl SqlxPostgresPoolConnection {
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
};
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -131,19 +117,12 @@ impl SqlxPostgresPoolConnection {
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
};
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
|
@ -78,19 +78,12 @@ impl SqlxSqlitePoolConnection {
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
};
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.execute(conn).await {
|
||||
Ok(res) => Ok(res.into()),
|
||||
Err(err) => Err(sqlx_error_to_exec_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Exec(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -105,22 +98,15 @@ impl SqlxSqlitePoolConnection {
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
};
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_one(conn).await {
|
||||
Ok(row) => Ok(Some(row.into())),
|
||||
Err(err) => match err {
|
||||
sqlx::Error::RowNotFound => Ok(None),
|
||||
_ => Err(DbErr::Query(err.to_string())),
|
||||
},
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
@ -135,19 +121,12 @@ impl SqlxSqlitePoolConnection {
|
||||
|
||||
let query = sqlx_query(&stmt);
|
||||
if let Ok(conn) = &mut self.pool.acquire().await {
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
};
|
||||
if let Some(callback) = self.metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: &stmt,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
crate::metric::metric!(self.metric_callback, &stmt, {
|
||||
match query.fetch_all(conn).await {
|
||||
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
|
||||
Err(err) => Err(sqlx_error_to_query_err(err)),
|
||||
}
|
||||
})
|
||||
} else {
|
||||
Err(DbErr::Query(
|
||||
"Failed to acquire connection from pool.".to_owned(),
|
||||
|
@ -2,6 +2,8 @@ use std::{time::Duration, sync::Arc};
|
||||
|
||||
pub(crate) type Callback = Arc<dyn Fn(&Info<'_>) + Send + Sync>;
|
||||
|
||||
pub(crate) use inner::{metric, metric_ok};
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Query execution infos
|
||||
pub struct Info<'a> {
|
||||
@ -9,4 +11,45 @@ pub struct Info<'a> {
|
||||
pub elapsed: Duration,
|
||||
/// Query data
|
||||
pub statement: &'a crate::Statement,
|
||||
/// Query execution failed
|
||||
pub failed: bool,
|
||||
}
|
||||
|
||||
mod inner {
|
||||
macro_rules! metric {
|
||||
($metric_callback:expr, $stmt:expr, $code:block) => {
|
||||
{
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = $code;
|
||||
if let Some(callback) = $metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: $stmt,
|
||||
failed: res.is_err(),
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
}
|
||||
};
|
||||
}
|
||||
pub(crate) use metric;
|
||||
macro_rules! metric_ok {
|
||||
($metric_callback:expr, $stmt:expr, $code:block) => {
|
||||
{
|
||||
let _start = std::time::SystemTime::now();
|
||||
let res = $code;
|
||||
if let Some(callback) = $metric_callback.as_deref() {
|
||||
let info = crate::metric::Info {
|
||||
elapsed: _start.elapsed().unwrap_or_default(),
|
||||
statement: $stmt,
|
||||
failed: false,
|
||||
};
|
||||
callback(&info);
|
||||
}
|
||||
res
|
||||
}
|
||||
};
|
||||
}
|
||||
pub(crate) use metric_ok;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user