cargo fmt
This commit is contained in:
parent
6bc76698cc
commit
80d35a9d18
@ -1,8 +1,8 @@
|
|||||||
use std::{time::Duration, pin::Pin, task::Poll};
|
use std::{pin::Pin, task::Poll, time::Duration};
|
||||||
|
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
|
|
||||||
use crate::{QueryResult, DbErr, Statement};
|
use crate::{DbErr, QueryResult, Statement};
|
||||||
|
|
||||||
pub(crate) struct MetricStream<'a> {
|
pub(crate) struct MetricStream<'a> {
|
||||||
metric_callback: &'a Option<crate::metric::Callback>,
|
metric_callback: &'a Option<crate::metric::Callback>,
|
||||||
@ -12,7 +12,12 @@ pub(crate) struct MetricStream<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> MetricStream<'a> {
|
impl<'a> MetricStream<'a> {
|
||||||
pub(crate) fn new<S>(metric_callback: &'a Option<crate::metric::Callback>, stmt: &'a Statement, elapsed: Option<Duration>, stream: S) -> Self
|
pub(crate) fn new<S>(
|
||||||
|
metric_callback: &'a Option<crate::metric::Callback>,
|
||||||
|
stmt: &'a Statement,
|
||||||
|
elapsed: Option<Duration>,
|
||||||
|
stream: S,
|
||||||
|
) -> Self
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<QueryResult, DbErr>> + 'a + Send,
|
S: Stream<Item = Result<QueryResult, DbErr>> + 'a + Send,
|
||||||
{
|
{
|
||||||
@ -33,7 +38,10 @@ impl<'a> Stream for MetricStream<'a> {
|
|||||||
cx: &mut std::task::Context<'_>,
|
cx: &mut std::task::Context<'_>,
|
||||||
) -> Poll<Option<Self::Item>> {
|
) -> Poll<Option<Self::Item>> {
|
||||||
let this = self.get_mut();
|
let this = self.get_mut();
|
||||||
let _start = this.metric_callback.is_some().then(std::time::SystemTime::now);
|
let _start = this
|
||||||
|
.metric_callback
|
||||||
|
.is_some()
|
||||||
|
.then(std::time::SystemTime::now);
|
||||||
let res = Pin::new(&mut this.stream).poll_next(cx);
|
let res = Pin::new(&mut this.stream).poll_next(cx);
|
||||||
if let (Some(_start), Some(elapsed)) = (_start, &mut this.elapsed) {
|
if let (Some(_start), Some(elapsed)) = (_start, &mut this.elapsed) {
|
||||||
*elapsed += _start.elapsed().unwrap_or_default();
|
*elapsed += _start.elapsed().unwrap_or_default();
|
||||||
|
@ -127,9 +127,10 @@ impl QueryStream {
|
|||||||
InnerConnection::MySql(c) => {
|
InnerConnection::MySql(c) => {
|
||||||
let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
|
let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
|
||||||
let _start = _metric_callback.is_some().then(SystemTime::now);
|
let _start = _metric_callback.is_some().then(SystemTime::now);
|
||||||
let stream = c.fetch(query)
|
let stream = c
|
||||||
.map_ok(Into::into)
|
.fetch(query)
|
||||||
.map_err(crate::sqlx_error_to_query_err);
|
.map_ok(Into::into)
|
||||||
|
.map_err(crate::sqlx_error_to_query_err);
|
||||||
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
||||||
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
||||||
}
|
}
|
||||||
@ -137,9 +138,10 @@ impl QueryStream {
|
|||||||
InnerConnection::Postgres(c) => {
|
InnerConnection::Postgres(c) => {
|
||||||
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
|
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
|
||||||
let _start = _metric_callback.is_some().then(SystemTime::now);
|
let _start = _metric_callback.is_some().then(SystemTime::now);
|
||||||
let stream = c.fetch(query)
|
let stream = c
|
||||||
.map_ok(Into::into)
|
.fetch(query)
|
||||||
.map_err(crate::sqlx_error_to_query_err);
|
.map_ok(Into::into)
|
||||||
|
.map_err(crate::sqlx_error_to_query_err);
|
||||||
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
||||||
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
||||||
}
|
}
|
||||||
@ -147,9 +149,10 @@ impl QueryStream {
|
|||||||
InnerConnection::Sqlite(c) => {
|
InnerConnection::Sqlite(c) => {
|
||||||
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
|
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
|
||||||
let _start = _metric_callback.is_some().then(SystemTime::now);
|
let _start = _metric_callback.is_some().then(SystemTime::now);
|
||||||
let stream = c.fetch(query)
|
let stream = c
|
||||||
.map_ok(Into::into)
|
.fetch(query)
|
||||||
.map_err(crate::sqlx_error_to_query_err);
|
.map_ok(Into::into)
|
||||||
|
.map_err(crate::sqlx_error_to_query_err);
|
||||||
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
||||||
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
||||||
}
|
}
|
||||||
@ -159,7 +162,7 @@ impl QueryStream {
|
|||||||
let stream = c.fetch(stmt);
|
let stream = c.fetch(stmt);
|
||||||
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
||||||
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
||||||
},
|
}
|
||||||
#[allow(unreachable_patterns)]
|
#[allow(unreachable_patterns)]
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
},
|
},
|
||||||
|
@ -51,9 +51,10 @@ impl<'a> TransactionStream<'a> {
|
|||||||
InnerConnection::MySql(c) => {
|
InnerConnection::MySql(c) => {
|
||||||
let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
|
let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
|
||||||
let _start = _metric_callback.is_some().then(SystemTime::now);
|
let _start = _metric_callback.is_some().then(SystemTime::now);
|
||||||
let stream = c.fetch(query)
|
let stream = c
|
||||||
.map_ok(Into::into)
|
.fetch(query)
|
||||||
.map_err(crate::sqlx_error_to_query_err);
|
.map_ok(Into::into)
|
||||||
|
.map_err(crate::sqlx_error_to_query_err);
|
||||||
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
||||||
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
||||||
}
|
}
|
||||||
@ -61,9 +62,10 @@ impl<'a> TransactionStream<'a> {
|
|||||||
InnerConnection::Postgres(c) => {
|
InnerConnection::Postgres(c) => {
|
||||||
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
|
let query = crate::driver::sqlx_postgres::sqlx_query(stmt);
|
||||||
let _start = _metric_callback.is_some().then(SystemTime::now);
|
let _start = _metric_callback.is_some().then(SystemTime::now);
|
||||||
let stream = c.fetch(query)
|
let stream = c
|
||||||
.map_ok(Into::into)
|
.fetch(query)
|
||||||
.map_err(crate::sqlx_error_to_query_err);
|
.map_ok(Into::into)
|
||||||
|
.map_err(crate::sqlx_error_to_query_err);
|
||||||
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
||||||
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
||||||
}
|
}
|
||||||
@ -71,9 +73,10 @@ impl<'a> TransactionStream<'a> {
|
|||||||
InnerConnection::Sqlite(c) => {
|
InnerConnection::Sqlite(c) => {
|
||||||
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
|
let query = crate::driver::sqlx_sqlite::sqlx_query(stmt);
|
||||||
let _start = _metric_callback.is_some().then(SystemTime::now);
|
let _start = _metric_callback.is_some().then(SystemTime::now);
|
||||||
let stream = c.fetch(query)
|
let stream = c
|
||||||
.map_ok(Into::into)
|
.fetch(query)
|
||||||
.map_err(crate::sqlx_error_to_query_err);
|
.map_ok(Into::into)
|
||||||
|
.map_err(crate::sqlx_error_to_query_err);
|
||||||
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
||||||
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
||||||
}
|
}
|
||||||
@ -83,7 +86,7 @@ impl<'a> TransactionStream<'a> {
|
|||||||
let stream = c.fetch(stmt);
|
let stream = c.fetch(stmt);
|
||||||
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
|
||||||
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
MetricStream::new(_metric_callback, stmt, elapsed, stream)
|
||||||
},
|
}
|
||||||
#[allow(unreachable_patterns)]
|
#[allow(unreachable_patterns)]
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
},
|
},
|
||||||
|
Loading…
x
Reference in New Issue
Block a user