Refactor and add stream() to SelectorRaw

This commit is contained in:
Chris Tsang 2021-11-14 20:49:04 +08:00
parent 5f2fa55253
commit aaf6c2555d

View File

@ -416,18 +416,25 @@ where
}
}
fn into_selector_raw<'a, C>(self, db: &C) -> SelectorRaw<S>
where
C: ConnectionTrait<'a>,
{
let builder = db.get_database_backend();
let stmt = builder.build(&self.query);
SelectorRaw {
stmt,
selector: self.selector,
}
}
/// Get a Model from a Select operation
pub async fn one<'a, C>(mut self, db: &C) -> Result<Option<S::Item>, DbErr>
where
C: ConnectionTrait<'a>,
{
let builder = db.get_database_backend();
self.query.limit(1);
let row = db.query_one(builder.build(&self.query)).await?;
match row {
Some(row) => Ok(Some(S::from_raw_query_result(row)?)),
None => Ok(None),
}
self.into_selector_raw(db).one(db).await
}
/// Get all results from a Select operation
@ -435,16 +442,10 @@ where
where
C: ConnectionTrait<'a>,
{
let builder = db.get_database_backend();
let rows = db.query_all(builder.build(&self.query)).await?;
let mut models = Vec::new();
for row in rows.into_iter() {
models.push(S::from_raw_query_result(row)?);
}
Ok(models)
self.into_selector_raw(db).all(db).await
}
/// Stream the results of the operation
/// Stream the results of the Select operation
pub async fn stream<'a: 'b, 'b, C>(
self,
db: &'a C,
@ -453,11 +454,7 @@ where
C: ConnectionTrait<'a>,
S: 'b,
{
let builder = db.get_database_backend();
let stream = db.stream(builder.build(&self.query)).await?;
Ok(Box::pin(stream.and_then(|row| {
futures::future::ready(S::from_raw_query_result(row))
})))
self.into_selector_raw(db).stream(db).await
}
}
@ -465,8 +462,7 @@ impl<S> SelectorRaw<S>
where
S: SelectorTrait,
{
/// Create `SelectorRaw` from Statment. Executing this `SelectorRaw` will
/// return a type `M` which implement `FromQueryResult`.
/// Select a custom Model from a raw SQL [Statement].
pub fn from_statement<M>(stmt: Statement) -> SelectorRaw<SelectModel<M>>
where
M: FromQueryResult,
@ -629,6 +625,7 @@ where
}
}
/// Get a Model from a Select operation
/// ```
/// # #[cfg(feature = "mock")]
/// # use sea_orm::{error::*, tests_cfg::*, MockDatabase, Transaction, DbBackend};
@ -671,6 +668,7 @@ where
}
}
/// Get all results from a Select operation
/// ```
/// # #[cfg(feature = "mock")]
/// # use sea_orm::{error::*, tests_cfg::*, MockDatabase, Transaction, DbBackend};
@ -713,6 +711,21 @@ where
}
Ok(models)
}
/// Stream the results of the Select operation
pub async fn stream<'a: 'b, 'b, C>(
self,
db: &'a C,
) -> Result<Pin<Box<dyn Stream<Item = Result<S::Item, DbErr>> + 'b>>, DbErr>
where
C: ConnectionTrait<'a>,
S: 'b,
{
let stream = db.stream(self.stmt).await?;
Ok(Box::pin(stream.and_then(|row| {
futures::future::ready(S::from_raw_query_result(row))
})))
}
}
fn consolidate_query_result<L, R>(