From 91efa1fae29bd661d8bbf1b347ecfa42a7f9ff9f Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Sat, 9 Oct 2021 21:45:25 +0800 Subject: [PATCH] Test streaming in transaction --- src/database/mock.rs | 59 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/src/database/mock.rs b/src/database/mock.rs index d0fce2cd..8bde4725 100644 --- a/src/database/mock.rs +++ b/src/database/mock.rs @@ -241,6 +241,17 @@ mod tests { Transaction, TransactionError, }; + #[derive(Debug, PartialEq)] + pub struct MyErr(String); + + impl std::error::Error for MyErr {} + + impl std::fmt::Display for MyErr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0.as_str()) + } + } + #[smol_potat::test] async fn test_transaction_1() { let db = MockDatabase::new(DbBackend::Postgres).into_connection(); @@ -286,17 +297,6 @@ mod tests { async fn test_transaction_2() { let db = MockDatabase::new(DbBackend::Postgres).into_connection(); - #[derive(Debug, PartialEq)] - pub struct MyErr(String); - - impl std::error::Error for MyErr {} - - impl std::fmt::Display for MyErr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0.as_str()) - } - } - let result = db .transaction::<_, (), MyErr>(|txn| { Box::pin(async move { @@ -346,4 +346,41 @@ mod tests { Ok(()) } + + #[smol_potat::test] + async fn test_stream_in_transaction() -> Result<(), DbErr> { + use futures::TryStreamExt; + + let apple = fruit::Model { + id: 1, + name: "Apple".to_owned(), + cake_id: Some(1), + }; + + let orange = fruit::Model { + id: 2, + name: "orange".to_owned(), + cake_id: None, + }; + + let db = MockDatabase::new(DbBackend::Postgres) + .append_query_results(vec![vec![apple.clone(), orange.clone()]]) + .into_connection(); + + let txn = db.begin().await?; + + let mut stream = fruit::Entity::find().stream(&txn).await?; + + assert_eq!(stream.try_next().await?, Some(apple)); + + assert_eq!(stream.try_next().await?, Some(orange)); + + assert_eq!(stream.try_next().await?, None); + + std::mem::drop(stream); + + txn.commit().await?; + + Ok(()) + } }