From 6ffed93b46d9536e7a2b4f4e39604e821540b02b Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Fri, 7 May 2021 15:09:07 +0800 Subject: [PATCH] DatabaseConnection --- Cargo.toml | 2 +- src/connector/mod.rs | 68 +++++++++++++++++++++++++++++++ src/database/mod.rs | 51 +++++++++++++++++++++++ src/driver/mod.rs | 4 +- src/driver/sqlx_mysql.rs | 66 ++++++++++++++++++------------ src/executor/mod.rs | 44 -------------------- src/executor/query.rs | 1 - src/lib.rs | 6 ++- src/query/mod.rs | 4 +- src/query/{types.rs => result.rs} | 0 src/query/select.rs | 14 ++++++- 11 files changed, 181 insertions(+), 79 deletions(-) create mode 100644 src/connector/mod.rs create mode 100644 src/database/mod.rs delete mode 100644 src/executor/mod.rs delete mode 100644 src/executor/query.rs rename src/query/{types.rs => result.rs} (100%) diff --git a/Cargo.toml b/Cargo.toml index ed529945..2dfc0072 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ path = "src/lib.rs" async-trait = "^0.1" async-std = { version = "^1.9", features = [ "attributes" ] } futures = { version = "^0.3" } -sea-query = { version = "^0.10", features = [ "sqlx-mysql" ] } +sea-query = { path = "../sea-query", version = "^0.10", features = [ "sqlx-mysql" ] } # sea-schema = { path = "../sea-schema" } serde = { version = "^1.0", features = [ "derive" ] } sqlx = { version = "^0.5", features = [ "runtime-async-std-native-tls", "mysql", "any", "chrono", "time", "bigdecimal", "decimal", "uuid", "json" ] } diff --git a/src/connector/mod.rs b/src/connector/mod.rs new file mode 100644 index 00000000..e63c398e --- /dev/null +++ b/src/connector/mod.rs @@ -0,0 +1,68 @@ +use crate::{DatabaseConnection, QueryResult}; +use async_trait::async_trait; +use sea_query::{inject_parameters, MySqlQueryBuilder, Values}; +use std::{error::Error, fmt}; + +pub struct Statement { + pub sql: String, + pub values: Values, +} + +#[async_trait] +pub trait Connector { + fn accepts(string: &str) -> bool; + + async fn connect(string: &str) -> Result; +} + +#[async_trait] +pub trait Connection { + async fn query_one(&self, stmt: Statement) -> Result; + + async fn query_all(&self, stmt: Statement) -> Result, QueryErr>; +} + +#[derive(Debug)] +pub struct QueryErr; + +#[derive(Debug)] +pub struct ConnectionErr; + +// Statement // + +impl From<(String, Values)> for Statement { + fn from(stmt: (String, Values)) -> Statement { + Statement { + sql: stmt.0, + values: stmt.1, + } + } +} + +impl fmt::Display for Statement { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let builder = MySqlQueryBuilder::default(); + let string = inject_parameters(&self.sql, self.values.0.clone(), &builder); + write!(f, "{}", &string) + } +} + +// QueryErr // + +impl Error for QueryErr {} + +impl fmt::Display for QueryErr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +// ConnectionErr // + +impl Error for ConnectionErr {} + +impl fmt::Display for ConnectionErr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs new file mode 100644 index 00000000..c6efa441 --- /dev/null +++ b/src/database/mod.rs @@ -0,0 +1,51 @@ +use crate::{Connection, ConnectionErr, Connector, SqlxMySqlConnector, SqlxMySqlPoolConnection}; + +#[derive(Debug, Default)] +pub struct Database { + connection: DatabaseConnection, +} + +pub enum DatabaseConnection { + SqlxMySqlPoolConnection(SqlxMySqlPoolConnection), + Disconnected, +} + +// DatabaseConnection // + +impl Default for DatabaseConnection { + fn default() -> Self { + Self::Disconnected + } +} + +impl std::fmt::Debug for DatabaseConnection { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "{}", + match self { + Self::SqlxMySqlPoolConnection(_) => "SqlxMySqlPoolConnection", + Self::Disconnected => "Disconnected", + } + ) + } +} + +// Database // + +impl Database { + pub async fn connect(&mut self, string: &str) -> Result<(), ConnectionErr> { + if SqlxMySqlConnector::accepts(string) { + self.connection = SqlxMySqlConnector::connect(string).await?; + return Ok(()); + } + Err(ConnectionErr) + } + + pub fn get_connection(&self) -> impl Connection + '_ { + match &self.connection { + DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn, + DatabaseConnection::Disconnected => panic!("Disconnected"), + } + } +} diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 3944f04f..f49ce557 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -1 +1,3 @@ -pub mod sqlx_mysql; +mod sqlx_mysql; + +pub use sqlx_mysql::*; diff --git a/src/driver/sqlx_mysql.rs b/src/driver/sqlx_mysql.rs index eee999d0..49bc7ae3 100644 --- a/src/driver/sqlx_mysql.rs +++ b/src/driver/sqlx_mysql.rs @@ -1,46 +1,58 @@ use async_trait::async_trait; use sqlx::{mysql::MySqlRow, MySqlPool}; -use sea_query::MysqlQueryBuilder; sea_query::sea_query_driver_mysql!(); use sea_query_driver_mysql::bind_query; -use crate::{debug_print, executor::*, query::*}; +use crate::{connector::*, debug_print, query::*, DatabaseConnection}; -pub struct SqlxMySqlExecutor { +pub struct SqlxMySqlConnector; + +pub struct SqlxMySqlPoolConnection { pool: MySqlPool, } #[async_trait] -impl Executor for SqlxMySqlExecutor { - type QueryBuilder = MysqlQueryBuilder; - - async fn query_one(&self, stmt: Statement) -> Result { - debug_print!("{}, {:?}", sql, values); - - let query = bind_query(sqlx::query(&stmt.sql), &stmt.values); - if let Ok(row) = query - .fetch_one(&mut self.pool.acquire().await.unwrap()) - .await - { - Ok(row.into()) - } else { - Err(ExecErr) - } +impl Connector for SqlxMySqlConnector { + fn accepts(string: &str) -> bool { + string.starts_with("mysql://") } - async fn query_all(&self, stmt: Statement) -> Result, ExecErr> { - debug_print!("{}, {:?}", sql, values); + async fn connect(string: &str) -> Result { + if let Ok(conn) = MySqlPool::connect(string).await { + Ok(DatabaseConnection::SqlxMySqlPoolConnection( + SqlxMySqlPoolConnection { pool: conn }, + )) + } else { + Err(ConnectionErr) + } + } +} + +#[async_trait] +impl Connection for &SqlxMySqlPoolConnection { + async fn query_one(&self, stmt: Statement) -> Result { + debug_print!("{}", stmt); let query = bind_query(sqlx::query(&stmt.sql), &stmt.values); - if let Ok(rows) = query - .fetch_all(&mut self.pool.acquire().await.unwrap()) - .await - { - Ok(rows.into_iter().map(|r| r.into()).collect()) - } else { - Err(ExecErr) + if let Ok(conn) = &mut self.pool.acquire().await { + if let Ok(row) = query.fetch_one(conn).await { + return Ok(row.into()); + } } + Err(QueryErr) + } + + async fn query_all(&self, stmt: Statement) -> Result, QueryErr> { + debug_print!("{}", stmt); + + let query = bind_query(sqlx::query(&stmt.sql), &stmt.values); + if let Ok(conn) = &mut self.pool.acquire().await { + if let Ok(rows) = query.fetch_all(conn).await { + return Ok(rows.into_iter().map(|r| r.into()).collect()); + } + } + Err(QueryErr) } } diff --git a/src/executor/mod.rs b/src/executor/mod.rs deleted file mode 100644 index 88e549bb..00000000 --- a/src/executor/mod.rs +++ /dev/null @@ -1,44 +0,0 @@ -mod query; - -pub use query::*; - -use crate::QueryResult; -use async_trait::async_trait; -use sea_query::{GenericBuilder, Values}; -use std::{error::Error, fmt}; - -pub struct Statement { - pub sql: String, - pub values: Values, -} - -#[async_trait] -pub trait Executor { - type QueryBuilder: GenericBuilder; - - async fn query_one(&self, stmt: Statement) -> Result; - - async fn query_all(&self, stmt: Statement) -> Result, ExecErr>; -} - -#[derive(Debug)] -pub struct ExecErr; - -// ----- // - -impl From<(String, Values)> for Statement { - fn from(stmt: (String, Values)) -> Statement { - Statement { - sql: stmt.0, - values: stmt.1, - } - } -} - -impl Error for ExecErr {} - -impl fmt::Display for ExecErr { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{:?}", self) - } -} diff --git a/src/executor/query.rs b/src/executor/query.rs deleted file mode 100644 index 8b137891..00000000 --- a/src/executor/query.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/lib.rs b/src/lib.rs index 0a3ed2c3..5db1fead 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,12 @@ +mod connector; +mod database; mod driver; mod entity; -mod executor; mod query; mod util; +pub use connector::*; +pub use database::*; pub use driver::*; pub use entity::*; -pub use executor::*; pub use query::*; diff --git a/src/query/mod.rs b/src/query/mod.rs index b44778cd..33091d5a 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -1,5 +1,5 @@ +mod result; mod select; -mod types; +pub use result::*; pub use select::*; -pub use types::*; diff --git a/src/query/types.rs b/src/query/result.rs similarity index 100% rename from src/query/types.rs rename to src/query/result.rs diff --git a/src/query/select.rs b/src/query/select.rs index 783a2bcd..46aeb672 100644 --- a/src/query/select.rs +++ b/src/query/select.rs @@ -1,8 +1,9 @@ +use crate::Statement; use crate::{entity::*, RelationDef}; use core::fmt::Debug; use core::marker::PhantomData; pub use sea_query::JoinType; -use sea_query::{Expr, Iden, IntoIden, SelectStatement}; +use sea_query::{Expr, Iden, IntoIden, QueryBuilder, SelectStatement}; use std::rc::Rc; use strum::IntoEnumIterator; @@ -71,7 +72,18 @@ where &mut self.select } + pub fn as_query(&self) -> &SelectStatement { + &self.select + } + pub fn into_query(self) -> SelectStatement { self.select } + + pub fn build(&self, builder: B) -> Statement + where + B: QueryBuilder, + { + self.as_query().build(builder).into() + } }