ConnectOptions

This commit is contained in:
Chris Tsang 2021-10-13 02:25:39 +08:00
parent 069040be8b
commit c673017b97
4 changed files with 136 additions and 29 deletions

View File

@ -1,3 +1,5 @@
use std::time::Duration;
mod connection; mod connection;
mod db_connection; mod db_connection;
#[cfg(feature = "mock")] #[cfg(feature = "mock")]
@ -19,27 +21,116 @@ use crate::DbErr;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Database; pub struct Database;
#[derive(Debug)]
pub struct ConnectOptions {
pub(crate) url: String,
pub(crate) max_connections: Option<u32>,
pub(crate) min_connections: Option<u32>,
pub(crate) connect_timeout: Option<Duration>,
pub(crate) idle_timeout: Option<Duration>,
}
impl Database { impl Database {
pub async fn connect(string: &str) -> Result<DatabaseConnection, DbErr> { pub async fn connect<C>(opt: C) -> Result<DatabaseConnection, DbErr>
where
C: Into<ConnectOptions>,
{
let opt: ConnectOptions = opt.into();
#[cfg(feature = "sqlx-mysql")] #[cfg(feature = "sqlx-mysql")]
if DbBackend::MySql.is_prefix_of(string) { if DbBackend::MySql.is_prefix_of(&opt.url) {
return crate::SqlxMySqlConnector::connect(string).await; return crate::SqlxMySqlConnector::connect(opt).await;
} }
#[cfg(feature = "sqlx-postgres")] #[cfg(feature = "sqlx-postgres")]
if DbBackend::Postgres.is_prefix_of(string) { if DbBackend::Postgres.is_prefix_of(&opt.url) {
return crate::SqlxPostgresConnector::connect(string).await; return crate::SqlxPostgresConnector::connect(opt).await;
} }
#[cfg(feature = "sqlx-sqlite")] #[cfg(feature = "sqlx-sqlite")]
if DbBackend::Sqlite.is_prefix_of(string) { if DbBackend::Sqlite.is_prefix_of(&opt.url) {
return crate::SqlxSqliteConnector::connect(string).await; return crate::SqlxSqliteConnector::connect(opt).await;
} }
#[cfg(feature = "mock")] #[cfg(feature = "mock")]
if crate::MockDatabaseConnector::accepts(string) { if crate::MockDatabaseConnector::accepts(&opt.url) {
return crate::MockDatabaseConnector::connect(string).await; return crate::MockDatabaseConnector::connect(&opt.url).await;
} }
Err(DbErr::Conn(format!( Err(DbErr::Conn(format!(
"The connection string '{}' has no supporting driver.", "The connection string '{}' has no supporting driver.",
string opt.url
))) )))
} }
} }
impl From<&str> for ConnectOptions {
fn from(string: &str) -> ConnectOptions {
ConnectOptions::from_str(string)
}
}
impl From<&String> for ConnectOptions {
fn from(string: &String) -> ConnectOptions {
ConnectOptions::from_str(string.as_str())
}
}
impl From<String> for ConnectOptions {
fn from(string: String) -> ConnectOptions {
ConnectOptions::new(string)
}
}
impl ConnectOptions {
pub fn new(url: String) -> Self {
Self {
url,
max_connections: None,
min_connections: None,
connect_timeout: None,
idle_timeout: None,
}
}
fn from_str(url: &str) -> Self {
Self::new(url.to_owned())
}
#[cfg(feature = "sqlx-dep")]
pub fn pool_options<DB>(self) -> sqlx::pool::PoolOptions<DB>
where
DB: sqlx::Database,
{
let mut opt = sqlx::pool::PoolOptions::new();
if let Some(max_connections) = self.max_connections {
opt = opt.max_connections(max_connections);
}
if let Some(min_connections) = self.min_connections {
opt = opt.min_connections(min_connections);
}
if let Some(connect_timeout) = self.connect_timeout {
opt = opt.connect_timeout(connect_timeout);
}
if let Some(idle_timeout) = self.idle_timeout {
opt = opt.idle_timeout(Some(idle_timeout));
}
opt
}
/// Set the maximum number of connections of the pool
pub fn max_connections(&mut self, value: u32) {
self.max_connections = Some(value);
}
/// Set the minimum number of connections of the pool
pub fn min_connections(&mut self, value: u32) {
self.min_connections = Some(value);
}
/// Set the timeout duration when acquiring a connection
pub fn connect_timeout(&mut self, value: Duration) {
self.connect_timeout = Some(value);
}
/// Set the idle duration before closing a connection
pub fn idle_timeout(&mut self, value: Duration) {
self.idle_timeout = Some(value);
}
}

View File

@ -1,7 +1,7 @@
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use sqlx::{ use sqlx::{
mysql::{MySqlArguments, MySqlQueryResult, MySqlRow}, mysql::{MySqlArguments, MySqlConnectOptions, MySqlQueryResult, MySqlRow},
MySql, MySqlPool, MySql, MySqlPool,
}; };
@ -9,8 +9,8 @@ sea_query::sea_query_driver_mysql!();
use sea_query_driver_mysql::bind_query; use sea_query_driver_mysql::bind_query;
use crate::{ use crate::{
debug_print, error::*, executor::*, DatabaseConnection, DatabaseTransaction, QueryStream, debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction,
Statement, TransactionError, QueryStream, Statement, TransactionError,
}; };
use super::sqlx_common::*; use super::sqlx_common::*;
@ -25,11 +25,16 @@ pub struct SqlxMySqlPoolConnection {
impl SqlxMySqlConnector { impl SqlxMySqlConnector {
pub fn accepts(string: &str) -> bool { pub fn accepts(string: &str) -> bool {
string.starts_with("mysql://") string.starts_with("mysql://") && string.parse::<MySqlConnectOptions>().is_ok()
} }
pub async fn connect(string: &str) -> Result<DatabaseConnection, DbErr> { pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
if let Ok(pool) = MySqlPool::connect(string).await { let opt = options
.url
.parse::<MySqlConnectOptions>()
.map_err(|e| DbErr::Conn(e.to_string()))?;
// opt.disable_statement_logging();
if let Ok(pool) = options.pool_options().connect_with(opt).await {
Ok(DatabaseConnection::SqlxMySqlPoolConnection( Ok(DatabaseConnection::SqlxMySqlPoolConnection(
SqlxMySqlPoolConnection { pool }, SqlxMySqlPoolConnection { pool },
)) ))

View File

@ -1,7 +1,7 @@
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use sqlx::{ use sqlx::{
postgres::{PgArguments, PgQueryResult, PgRow}, postgres::{PgArguments, PgConnectOptions, PgQueryResult, PgRow},
PgPool, Postgres, PgPool, Postgres,
}; };
@ -9,8 +9,8 @@ sea_query::sea_query_driver_postgres!();
use sea_query_driver_postgres::bind_query; use sea_query_driver_postgres::bind_query;
use crate::{ use crate::{
debug_print, error::*, executor::*, DatabaseConnection, DatabaseTransaction, QueryStream, debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction,
Statement, TransactionError, QueryStream, Statement, TransactionError,
}; };
use super::sqlx_common::*; use super::sqlx_common::*;
@ -25,11 +25,16 @@ pub struct SqlxPostgresPoolConnection {
impl SqlxPostgresConnector { impl SqlxPostgresConnector {
pub fn accepts(string: &str) -> bool { pub fn accepts(string: &str) -> bool {
string.starts_with("postgres://") string.starts_with("postgres://") && string.parse::<PgConnectOptions>().is_ok()
} }
pub async fn connect(string: &str) -> Result<DatabaseConnection, DbErr> { pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
if let Ok(pool) = PgPool::connect(string).await { let opt = options
.url
.parse::<PgConnectOptions>()
.map_err(|e| DbErr::Conn(e.to_string()))?;
// opt.disable_statement_logging();
if let Ok(pool) = options.pool_options().connect_with(opt).await {
Ok(DatabaseConnection::SqlxPostgresPoolConnection( Ok(DatabaseConnection::SqlxPostgresPoolConnection(
SqlxPostgresPoolConnection { pool }, SqlxPostgresPoolConnection { pool },
)) ))

View File

@ -1,7 +1,7 @@
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use sqlx::{ use sqlx::{
sqlite::{SqliteArguments, SqlitePoolOptions, SqliteQueryResult, SqliteRow}, sqlite::{SqliteArguments, SqliteConnectOptions, SqliteQueryResult, SqliteRow},
Sqlite, SqlitePool, Sqlite, SqlitePool,
}; };
@ -9,8 +9,8 @@ sea_query::sea_query_driver_sqlite!();
use sea_query_driver_sqlite::bind_query; use sea_query_driver_sqlite::bind_query;
use crate::{ use crate::{
debug_print, error::*, executor::*, DatabaseConnection, DatabaseTransaction, QueryStream, debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction,
Statement, TransactionError, QueryStream, Statement, TransactionError,
}; };
use super::sqlx_common::*; use super::sqlx_common::*;
@ -25,13 +25,19 @@ pub struct SqlxSqlitePoolConnection {
impl SqlxSqliteConnector { impl SqlxSqliteConnector {
pub fn accepts(string: &str) -> bool { pub fn accepts(string: &str) -> bool {
string.starts_with("sqlite:") string.starts_with("sqlite:") && string.parse::<SqliteConnectOptions>().is_ok()
} }
pub async fn connect(string: &str) -> Result<DatabaseConnection, DbErr> { pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
if let Ok(pool) = SqlitePoolOptions::new() let opt = options
.url
.parse::<SqliteConnectOptions>()
.map_err(|e| DbErr::Conn(e.to_string()))?;
// opt.disable_statement_logging();
if let Ok(pool) = options
.pool_options()
.max_connections(1) .max_connections(1)
.connect(string) .connect_with(opt)
.await .await
{ {
Ok(DatabaseConnection::SqlxSqlitePoolConnection( Ok(DatabaseConnection::SqlxSqlitePoolConnection(