diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index c865b097..d6d5f7d0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -318,7 +318,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - path: [86, 249, 262, 319, 324, 352, 356] + path: [86, 249, 262, 319, 324, 352, 356, 471] steps: - uses: actions/checkout@v2 diff --git a/issues/471/.env b/issues/471/.env new file mode 100644 index 00000000..fb7fcfb5 --- /dev/null +++ b/issues/471/.env @@ -0,0 +1,3 @@ +HOST=127.0.0.1 +PORT=8000 +DATABASE_URL="postgres://postgres:password@localhost/axum_exmaple" \ No newline at end of file diff --git a/issues/471/Cargo.toml b/issues/471/Cargo.toml new file mode 100644 index 00000000..dffc14c6 --- /dev/null +++ b/issues/471/Cargo.toml @@ -0,0 +1,22 @@ +[workspace] +# A separate workspace + +[package] +name = "sea-orm-issues-400-471" +version = "0.1.0" +authors = ["Sebastian Pütz "] +edition = "2021" +publish = false + +[dependencies] +tokio = { version = "1.14", features = ["full"] } +anyhow = "1" +dotenv = "0.15" +futures-util = "0.3" +serde = "1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +[dependencies.sea-orm] +path = "../../" # remove this line in your own project +features = ["macros", "mock", "sqlx-all", "runtime-tokio-rustls", "debug-print"] +default-features = false diff --git a/issues/471/README.md b/issues/471/README.md new file mode 100644 index 00000000..df30b8be --- /dev/null +++ b/issues/471/README.md @@ -0,0 +1 @@ +Demonstrator for using streaming queries with `tokio::spawn` or in contexts that require `Send` futures. \ No newline at end of file diff --git a/issues/471/src/main.rs b/issues/471/src/main.rs new file mode 100644 index 00000000..9d7f664d --- /dev/null +++ b/issues/471/src/main.rs @@ -0,0 +1,29 @@ +mod post; +mod setup; + +use futures_util::StreamExt; +use post::Entity as Post; +use sea_orm::{prelude::*, Database}; +use std::env; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env::set_var("RUST_LOG", "debug"); + tracing_subscriber::fmt::init(); + + dotenv::dotenv().ok(); + let db_url = env::var("DATABASE_URL").expect("DATABASE_URL is not set in .env file"); + let db = Database::connect(db_url) + .await + .expect("Database connection failed"); + let _ = setup::create_post_table(&db); + tokio::task::spawn(async move { + let mut stream = Post::find().stream(&db).await.unwrap(); + while let Some(item) = stream.next().await { + let item = item?; + println!("got something: {}", item.text); + } + Ok::<(), anyhow::Error>(()) + }) + .await? +} diff --git a/issues/471/src/post.rs b/issues/471/src/post.rs new file mode 100644 index 00000000..3bb4d6a3 --- /dev/null +++ b/issues/471/src/post.rs @@ -0,0 +1,26 @@ +//! SeaORM Entity. Generated by sea-orm-codegen 0.3.2 + +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "posts")] +pub struct Model { + #[sea_orm(primary_key)] + #[serde(skip_deserializing)] + pub id: i32, + pub title: String, + #[sea_orm(column_type = "Text")] + pub text: String, +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/issues/471/src/setup.rs b/issues/471/src/setup.rs new file mode 100644 index 00000000..04677af4 --- /dev/null +++ b/issues/471/src/setup.rs @@ -0,0 +1,33 @@ +use sea_orm::sea_query::{ColumnDef, TableCreateStatement}; +use sea_orm::{error::*, sea_query, ConnectionTrait, DbConn, ExecResult}; + +async fn create_table(db: &DbConn, stmt: &TableCreateStatement) -> Result { + let builder = db.get_database_backend(); + db.execute(builder.build(stmt)).await +} + +pub async fn create_post_table(db: &DbConn) -> Result { + let stmt = sea_query::Table::create() + .table(super::post::Entity) + .if_not_exists() + .col( + ColumnDef::new(super::post::Column::Id) + .integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(super::post::Column::Title) + .string() + .not_null(), + ) + .col( + ColumnDef::new(super::post::Column::Text) + .string() + .not_null(), + ) + .to_owned(); + + create_table(db, &stmt).await +} diff --git a/src/database/connection.rs b/src/database/connection.rs index b6d74fd6..46da564b 100644 --- a/src/database/connection.rs +++ b/src/database/connection.rs @@ -37,13 +37,13 @@ pub trait ConnectionTrait: Sync { #[async_trait::async_trait] pub trait StreamTrait<'a>: Sync { /// Create a stream for the [QueryResult] - type Stream: Stream>; + type Stream: Stream> + Send; /// Execute a [Statement] and return a stream of results fn stream( &'a self, stmt: Statement, - ) -> Pin> + 'a>>; + ) -> Pin> + 'a + Send>>; } /// Spawn database transaction diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index 5031dc7a..0d4a618e 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -163,7 +163,7 @@ impl<'a> StreamTrait<'a> for DatabaseConnection { fn stream( &'a self, stmt: Statement, - ) -> Pin> + 'a>> { + ) -> Pin> + 'a + Send>> { Box::pin(async move { Ok(match self { #[cfg(feature = "sqlx-mysql")] diff --git a/src/database/stream/query.rs b/src/database/stream/query.rs index bfa66ba8..7a8eba13 100644 --- a/src/database/stream/query.rs +++ b/src/database/stream/query.rs @@ -24,7 +24,7 @@ pub struct QueryStream { metric_callback: Option, #[borrows(mut conn, stmt, metric_callback)] #[not_covariant] - stream: Pin> + 'this>>, + stream: Pin> + Send + 'this>>, } #[cfg(feature = "sqlx-mysql")] diff --git a/src/database/stream/transaction.rs b/src/database/stream/transaction.rs index 4f584274..74b04421 100644 --- a/src/database/stream/transaction.rs +++ b/src/database/stream/transaction.rs @@ -24,7 +24,7 @@ pub struct TransactionStream<'a> { metric_callback: Option, #[borrows(mut conn, stmt, metric_callback)] #[not_covariant] - stream: Pin> + 'this>>, + stream: Pin> + 'this + Send>>, } impl<'a> std::fmt::Debug for TransactionStream<'a> { @@ -35,64 +35,59 @@ impl<'a> std::fmt::Debug for TransactionStream<'a> { impl<'a> TransactionStream<'a> { #[instrument(level = "trace", skip(metric_callback))] - pub(crate) async fn build( + pub(crate) fn build( conn: MutexGuard<'a, InnerConnection>, stmt: Statement, metric_callback: Option, ) -> TransactionStream<'a> { - TransactionStreamAsyncBuilder { + TransactionStreamBuilder { stmt, conn, metric_callback, - stream_builder: |conn, stmt, _metric_callback| { - Box::pin(async move { - match conn.deref_mut() { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(c) => { - let query = crate::driver::sqlx_mysql::sqlx_query(stmt); - crate::metric::metric_ok!(_metric_callback, stmt, { - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ) - as Pin>>> - }) - } - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(c) => { - let query = crate::driver::sqlx_postgres::sqlx_query(stmt); - crate::metric::metric_ok!(_metric_callback, stmt, { - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ) - as Pin>>> - }) - } - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(c) => { - let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); - crate::metric::metric_ok!(_metric_callback, stmt, { - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ) - as Pin>>> - }) - } - #[cfg(feature = "mock")] - InnerConnection::Mock(c) => c.fetch(stmt), - #[allow(unreachable_patterns)] - _ => unreachable!(), - } - }) + stream_builder: |conn, stmt, _metric_callback| match conn.deref_mut() { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(c) => { + let query = crate::driver::sqlx_mysql::sqlx_query(stmt); + crate::metric::metric_ok!(_metric_callback, stmt, { + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + as Pin> + Send>> + }) + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(c) => { + let query = crate::driver::sqlx_postgres::sqlx_query(stmt); + crate::metric::metric_ok!(_metric_callback, stmt, { + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + as Pin> + Send>> + }) + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(c) => { + let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); + crate::metric::metric_ok!(_metric_callback, stmt, { + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + as Pin> + Send>> + }) + } + #[cfg(feature = "mock")] + InnerConnection::Mock(c) => c.fetch(stmt), + #[allow(unreachable_patterns)] + _ => unreachable!(), }, } .build() - .await } } diff --git a/src/database/transaction.rs b/src/database/transaction.rs index 6e2df54a..05aec165 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -365,14 +365,14 @@ impl<'a> StreamTrait<'a> for DatabaseTransaction { fn stream( &'a self, stmt: Statement, - ) -> Pin> + 'a>> { + ) -> Pin> + 'a + Send>> { Box::pin(async move { + let conn = self.conn.lock().await; Ok(crate::TransactionStream::build( - self.conn.lock().await, + conn, stmt, self.metric_callback.clone(), - ) - .await) + )) }) } } diff --git a/src/driver/mock.rs b/src/driver/mock.rs index cfdd22c5..8a163e91 100644 --- a/src/driver/mock.rs +++ b/src/driver/mock.rs @@ -148,7 +148,7 @@ impl MockDatabaseConnection { pub fn fetch( &self, statement: &Statement, - ) -> Pin>>> { + ) -> Pin> + Send>> { match self.query_all(statement.clone()) { Ok(v) => Box::pin(futures::stream::iter(v.into_iter().map(Ok))), Err(e) => Box::pin(futures::stream::iter(Some(Err(e)).into_iter())), diff --git a/src/executor/select.rs b/src/executor/select.rs index aa0b86de..e606d1ed 100644 --- a/src/executor/select.rs +++ b/src/executor/select.rs @@ -273,9 +273,9 @@ where pub async fn stream<'a: 'b, 'b, C>( self, db: &'a C, - ) -> Result> + 'b, DbErr> + ) -> Result> + 'b + Send, DbErr> where - C: ConnectionTrait + StreamTrait<'a>, + C: ConnectionTrait + StreamTrait<'a> + Send, { self.into_model().stream(db).await } @@ -329,7 +329,7 @@ where db: &'a C, ) -> Result), DbErr>> + 'b, DbErr> where - C: ConnectionTrait + StreamTrait<'a>, + C: ConnectionTrait + StreamTrait<'a> + Send, { self.into_model().stream(db).await } @@ -373,9 +373,9 @@ where pub async fn stream<'a: 'b, 'b, C>( self, db: &'a C, - ) -> Result), DbErr>> + 'b, DbErr> + ) -> Result), DbErr>> + 'b + Send, DbErr> where - C: ConnectionTrait + StreamTrait<'a>, + C: ConnectionTrait + StreamTrait<'a> + Send, { self.into_model().stream(db).await } @@ -452,10 +452,11 @@ where pub async fn stream<'a: 'b, 'b, C>( self, db: &'a C, - ) -> Result> + 'b>>, DbErr> + ) -> Result> + 'b + Send>>, DbErr> where - C: ConnectionTrait + StreamTrait<'a>, + C: ConnectionTrait + StreamTrait<'a> + Send, S: 'b, + S::Item: Send, { self.into_selector_raw(db).stream(db).await } @@ -737,10 +738,11 @@ where pub async fn stream<'a: 'b, 'b, C>( self, db: &'a C, - ) -> Result> + 'b>>, DbErr> + ) -> Result> + 'b + Send>>, DbErr> where - C: ConnectionTrait + StreamTrait<'a>, + C: ConnectionTrait + StreamTrait<'a> + Send, S: 'b, + S::Item: Send, { let stream = db.stream(self.stmt).await?; Ok(Box::pin(stream.and_then(|row| {