Make TransactionStream::build sync

This commit is contained in:
Sebastian Pütz 2022-01-19 23:54:16 +01:00
parent f012ccaec3
commit 949e3115f5
3 changed files with 15 additions and 32 deletions

View File

@ -36,7 +36,7 @@ serde = { version = "^1.0", features = ["derive"] }
serde_json = { version = "^1", optional = true } serde_json = { version = "^1", optional = true }
sqlx = { version = "^0.5", optional = true } sqlx = { version = "^0.5", optional = true }
uuid = { version = "0.8", features = ["serde", "v4"], optional = true } uuid = { version = "0.8", features = ["serde", "v4"], optional = true }
ouroboros = { git = "https://github.com/sebpuetz/ouroboros", branch = "send-builder-functions" } ouroboros = "0.14"
url = "^2.2" url = "^2.2"
once_cell = "1.8" once_cell = "1.8"

View File

@ -2,7 +2,7 @@
use std::{ops::DerefMut, pin::Pin, task::Poll}; use std::{ops::DerefMut, pin::Pin, task::Poll};
use futures::{FutureExt, Stream}; use futures::Stream;
#[cfg(feature = "sqlx-dep")] #[cfg(feature = "sqlx-dep")]
use futures::TryStreamExt; use futures::TryStreamExt;
@ -33,16 +33,18 @@ impl<'a> std::fmt::Debug for TransactionStream<'a> {
} }
} }
type PinStream<'s> = Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>> + Send + 's>>;
impl<'a> TransactionStream<'a> { impl<'a> TransactionStream<'a> {
fn stream_builder<'s>( #[instrument(level = "trace", skip(metric_callback))]
conn: &'s mut MutexGuard<'a, InnerConnection>, pub(crate) fn build(
stmt: &'s Statement, conn: MutexGuard<'a, InnerConnection>,
_metric_callback: &'s Option<crate::metric::Callback>, stmt: Statement,
) -> Pin<Box<dyn std::future::Future<Output = PinStream<'s>> + 's + Send>> { metric_callback: Option<crate::metric::Callback>,
async move { ) -> TransactionStream<'a> {
match conn.deref_mut() { TransactionStreamBuilder {
stmt,
conn,
metric_callback,
stream_builder: |conn, stmt, _metric_callback| match conn.deref_mut() {
#[cfg(feature = "sqlx-mysql")] #[cfg(feature = "sqlx-mysql")]
InnerConnection::MySql(c) => { InnerConnection::MySql(c) => {
let query = crate::driver::sqlx_mysql::sqlx_query(stmt); let query = crate::driver::sqlx_mysql::sqlx_query(stmt);
@ -81,27 +83,9 @@ impl<'a> TransactionStream<'a> {
} }
#[cfg(feature = "mock")] #[cfg(feature = "mock")]
InnerConnection::Mock(c) => c.fetch(stmt), InnerConnection::Mock(c) => c.fetch(stmt),
} },
}
.boxed()
}
#[instrument(level = "trace", skip(metric_callback))]
pub(crate) async fn build(
conn: MutexGuard<'a, InnerConnection>,
stmt: Statement,
metric_callback: Option<crate::metric::Callback>,
) -> TransactionStream<'a> {
let stream_builder = Self::stream_builder;
TransactionStreamAsyncBuilder {
stmt,
conn,
metric_callback,
stream_builder,
} }
.build() .build()
.await
} }
} }

View File

@ -361,8 +361,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction {
conn, conn,
stmt, stmt,
self.metric_callback.clone(), self.metric_callback.clone(),
) ))
.await)
}) })
} }