Database Proxy (#2000)

* feat: Add proxy connection type

* feat: Add proxy database's proxy functions trait.

* fix: Remove some unused impl to fix the unit test

* test: Create the proxy by empty declaration.

* test: Try to genereate query and exec commands.

* perf: Add more query debug trait for debugging.

* chore: Add the example for wasi + proxy.

* chore: Try to read string from wasmtime vm.

* chore: Sucks, but how to do without tokio::spawn?

* chore: Complete the basic memory read logic.

* chore: Abandon the WASI demo, native demo first...

* refactor: Use single proxy connection generator
to avoid stack overflow

* refactor: Rename the inner structs' name

* fix: Fix CI clippy and unit test

* fix: Rename the example.

* chore: Try to embed surrealdb for proxy test.

* fix: Transfer the query result correctly.

* refactor: Rename the example.

* chore: Ready to add example for wasmtime proxy.

* feat: Try to compile sea-orm into wasm binary.
But it would failed on wasm32-wasi target because of the socket deps.
It can be compiled on wasm32-unknown-unknown target.

* fix: WASM targets can't use sqlx.

* fix: Try to fix CI by remove toml.

* fix: Try to fix CI by remove toml.

* fix: Move vm to the example's root dir.

* fix: Add a pre-build script.

* chore: Add README.

* fix: Try to fix CI.

* feat: Add proxy logic in wasm module.

* fix: Try to run the wasi module.
But WASI cannot support multi threads..
so the module was run failed.

* refactor: Bump wasmtime to 14.

* fix: Now we can use async traits on wasmtime.
The solution is add the current thread tag to tokio-wasi.

* build: Use build.rs instead of dynamic command.

* feat: Add the execute result's transfer logic.

* fix: Convert sqlx query result for sea-query.

* fix: Now we can transfer wasm's query to outside.

* refactor: Convert to ProxyRow first.
It's the solution to know the type information about the value.

* fix: Multiple time library reference.

* feat: Add a new proxy example which uses GlueSQL.

* test: Add the test cases for three new examples.
Just try to run once...

* ci: Add wasm component's compiler for unit test.

* ci: Add wasi target.

* ci: It may needs wasi target twice...

* feat: Add more keys for proxy execute result.
To transfer the fully information of the execute result.

* fix: Use custom id type instead of json value.

* fix: Wrong reference type.

* fix: Rewrite the transformer.

* perf: Add ToString trait for proxy exec result.

* revert: Again.
Refs: 9bac6e91ca9df04ccd8368906e1613cfc5b96218

* revert: Back to the basic proxy exec result.
Refs: e0330dde73a54d461d5f38c69eec5e13bcc928d4

* refactor: Update GlueSQL and SurrealDB examples. (#1980)

* refactor: Bump gluesql to 0.15
Relate to https://github.com/gluesql/gluesql/issues/1438

* Use SQLParser to parse and replace placeholders.

* Use SQLParser for surrealdb demo.

* Transform the query by SQLParser.

* Tweaks

* Remove wasmtime example. (#2001)

* ci: Add additional targets.

* Remove proxy wasmtime example.

* Format

---------

Co-authored-by: 伊欧 <langyo.china@gmail.com>
Co-authored-by: 伊欧 <m13776491897@163.com>
This commit is contained in:
Chris Tsang 2023-12-14 11:40:55 +00:00 committed by GitHub
parent d4f8e7226e
commit 955bbcbc12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1900 additions and 9 deletions

View File

@ -16,7 +16,7 @@ keywords = ["async", "orm", "mysql", "postgres", "sqlite"]
rust-version = "1.65"
[package.metadata.docs.rs]
features = ["default", "sqlx-all", "mock", "runtime-async-std-native-tls", "postgres-array", "sea-orm-internal"]
features = ["default", "sqlx-all", "mock", "proxy", "runtime-async-std-native-tls", "postgres-array", "sea-orm-internal"]
rustdoc-args = ["--cfg", "docsrs"]
[lib]
@ -76,6 +76,7 @@ default = [
]
macros = ["sea-orm-macros/derive"]
mock = []
proxy = ["serde_json"]
with-json = ["serde_json", "sea-query/with-json", "chrono?/serde", "time?/serde", "uuid?/serde", "sea-query-binder?/with-json", "sqlx?/json"]
with-chrono = ["chrono", "sea-query/with-chrono", "sea-query-binder?/with-chrono", "sqlx?/chrono"]
with-rust_decimal = ["rust_decimal", "sea-query/with-rust_decimal", "sea-query-binder?/with-rust_decimal", "sqlx?/rust_decimal"]

View File

@ -0,0 +1,29 @@
[package]
name = "sea-orm-proxy-gluesql-example"
version = "0.1.0"
authors = ["Langyo <langyo.china@gmail.com>"]
edition = "2021"
publish = false
[workspace]
[dependencies]
async-std = { version = "1.12", features = ["attributes", "tokio1"] }
serde_json = { version = "1" }
serde = { version = "1" }
futures = { version = "0.3" }
async-stream = { version = "0.3" }
futures-util = { version = "0.3" }
sqlparser = "0.40"
sea-orm = { path = "../../", features = [
"proxy",
"debug-print",
] }
gluesql = { version = "0.15", default-features = false, features = [
"memory-storage",
] }
[dev-dependencies]
smol = { version = "1.2" }
smol-potat = { version = "1.1" }

View File

@ -0,0 +1,7 @@
# SeaORM Proxy Demo for GlueSQL
Run this demo for [GlueSQL](https://gluesql.org/) with the following command:
```bash
cargo run
```

View File

@ -0,0 +1 @@
pub mod post;

View File

@ -0,0 +1,17 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)]
#[sea_orm(table_name = "posts")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,
pub title: String,
pub text: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -0,0 +1,190 @@
//! Proxy connection example.
#![deny(missing_docs)]
mod entity;
use std::{
collections::BTreeMap,
sync::{Arc, Mutex},
};
use gluesql::{memory_storage::MemoryStorage, prelude::Glue};
use sea_orm::{
ActiveValue::Set, Database, DbBackend, DbErr, EntityTrait, ProxyDatabaseTrait, ProxyExecResult,
ProxyRow, Statement,
};
use entity::post::{ActiveModel, Entity};
struct ProxyDb {
mem: Mutex<Glue<MemoryStorage>>,
}
impl std::fmt::Debug for ProxyDb {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProxyDb").finish()
}
}
impl ProxyDatabaseTrait for ProxyDb {
fn query(&self, statement: Statement) -> Result<Vec<ProxyRow>, DbErr> {
println!("SQL query: {:?}", statement);
let sql = statement.sql.clone();
let mut ret: Vec<ProxyRow> = vec![];
async_std::task::block_on(async {
for payload in self.mem.lock().unwrap().execute(sql).await.unwrap().iter() {
match payload {
gluesql::prelude::Payload::Select { labels, rows } => {
for row in rows.iter() {
let mut map = BTreeMap::new();
for (label, column) in labels.iter().zip(row.iter()) {
map.insert(
label.to_owned(),
match column {
gluesql::prelude::Value::I64(val) => {
sea_orm::Value::BigInt(Some(*val))
}
gluesql::prelude::Value::Str(val) => {
sea_orm::Value::String(Some(Box::new(val.to_owned())))
}
_ => unreachable!("Unsupported value: {:?}", column),
},
);
}
ret.push(map.into());
}
}
_ => unreachable!("Unsupported payload: {:?}", payload),
}
}
});
Ok(ret)
}
fn execute(&self, statement: Statement) -> Result<ProxyExecResult, DbErr> {
let sql = if let Some(values) = statement.values {
// Replace all the '?' with the statement values
use sqlparser::ast::{Expr, Value};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
let dialect = GenericDialect {};
let mut ast = Parser::parse_sql(&dialect, statement.sql.as_str()).unwrap();
match &mut ast[0] {
sqlparser::ast::Statement::Insert {
columns, source, ..
} => {
for item in columns.iter_mut() {
item.quote_style = Some('"');
}
if let Some(obj) = source {
match &mut *obj.body {
sqlparser::ast::SetExpr::Values(obj) => {
for (mut item, val) in obj.rows[0].iter_mut().zip(values.0.iter()) {
match &mut item {
Expr::Value(item) => {
*item = match val {
sea_orm::Value::String(val) => {
Value::SingleQuotedString(match val {
Some(val) => val.to_string(),
None => "".to_string(),
})
}
sea_orm::Value::BigInt(val) => Value::Number(
val.unwrap_or(0).to_string(),
false,
),
_ => todo!(),
};
}
_ => todo!(),
}
}
}
_ => todo!(),
}
}
}
_ => todo!(),
}
let statement = &ast[0];
statement.to_string()
} else {
statement.sql
};
println!("SQL execute: {}", sql);
async_std::task::block_on(async {
self.mem.lock().unwrap().execute(sql).await.unwrap();
});
Ok(ProxyExecResult {
last_insert_id: 1,
rows_affected: 1,
})
}
}
#[async_std::main]
async fn main() {
let mem = MemoryStorage::default();
let mut glue = Glue::new(mem);
glue.execute(
r#"
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
text TEXT NOT NULL
)
"#,
)
.await
.unwrap();
let db = Database::connect_proxy(
DbBackend::Sqlite,
Arc::new(Mutex::new(Box::new(ProxyDb {
mem: Mutex::new(glue),
}))),
)
.await
.unwrap();
println!("Initialized");
let data = ActiveModel {
id: Set(11),
title: Set("Homo".to_owned()),
text: Set("いいよ、来いよ".to_owned()),
};
Entity::insert(data).exec(&db).await.unwrap();
let data = ActiveModel {
id: Set(45),
title: Set("Homo".to_owned()),
text: Set("そうだよ".to_owned()),
};
Entity::insert(data).exec(&db).await.unwrap();
let data = ActiveModel {
id: Set(14),
title: Set("Homo".to_owned()),
text: Set("悔い改めて".to_owned()),
};
Entity::insert(data).exec(&db).await.unwrap();
let list = Entity::find().all(&db).await.unwrap().to_vec();
println!("Result: {:?}", list);
}
#[cfg(test)]
mod tests {
#[smol_potat::test]
async fn try_run() {
crate::main()
}
}

View File

@ -0,0 +1,27 @@
[package]
name = "sea-orm-proxy-surrealdb-example"
version = "0.1.0"
authors = ["Langyo <langyo.china@gmail.com>"]
edition = "2021"
publish = false
[workspace]
[dependencies]
async-std = { version = "1.12", features = ["attributes", "tokio1"] }
serde_json = { version = "1" }
serde = { version = "1" }
futures = { version = "0.3" }
async-stream = { version = "0.3" }
futures-util = { version = "0.3" }
sqlparser = "0.40"
sea-orm = { path = "../../", features = [
"proxy",
"debug-print",
] }
surrealdb = { version = "1", features = ["kv-mem"] }
[dev-dependencies]
smol = { version = "1.2" }
smol-potat = { version = "1.1" }

View File

@ -0,0 +1,7 @@
# SeaORM Proxy Demo for SurrealDB
Run this demo for [SurrealDB](https://surrealdb.com/) with the following command:
```bash
cargo run
```

View File

@ -0,0 +1 @@
pub mod post;

View File

@ -0,0 +1,17 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)]
#[sea_orm(table_name = "posts")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: String,
pub title: String,
pub text: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -0,0 +1,271 @@
//! Proxy connection example.
#![deny(missing_docs)]
mod entity;
use std::{
collections::BTreeMap,
sync::{Arc, Mutex},
};
use sea_orm::{
ActiveValue::Set, Database, DbBackend, DbErr, EntityTrait, ProxyDatabaseTrait, ProxyExecResult,
ProxyRow, Statement,
};
use surrealdb::{
engine::local::{Db, Mem},
Surreal,
};
use entity::post::{ActiveModel, Entity};
#[derive(Debug)]
struct ProxyDb {
mem: Surreal<Db>,
}
impl ProxyDatabaseTrait for ProxyDb {
fn query(&self, statement: Statement) -> Result<Vec<ProxyRow>, DbErr> {
println!("SQL query: {:?}", statement);
let mut ret = async_std::task::block_on(async {
// Surrealdb's grammar is not compatible with sea-orm's
// so we need to remove the extra clauses
// from "SELECT `from`.`col` FROM `from` WHERE `from`.`col` = xx"
// to "SELECT `col` FROM `from` WHERE `col` = xx"
use sqlparser::ast::{Expr, SelectItem, SetExpr, TableFactor};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
let dialect = GenericDialect {};
let mut ast = Parser::parse_sql(&dialect, statement.sql.as_str()).unwrap();
match &mut ast[0] {
sqlparser::ast::Statement::Query(query) => match &mut *query.body {
SetExpr::Select(body) => {
body.projection.iter_mut().for_each(|item| {
match item {
SelectItem::UnnamedExpr(expr) => {
match expr {
Expr::CompoundIdentifier(idents) => {
// Remove the head of the identifier
// e.g. `from`.`col` -> `col`
let ident = idents.pop().unwrap();
*expr = Expr::Identifier(ident);
}
_ => todo!(),
}
}
_ => todo!(),
}
});
body.from.iter_mut().for_each(|item| {
match &mut item.relation {
TableFactor::Table { name, .. } => {
// Remove the head of the identifier
// e.g. `from`.`col` -> `col`
let ident = name.0.pop().unwrap();
name.0 = vec![ident];
}
_ => todo!(),
}
});
}
_ => todo!(),
},
_ => todo!(),
};
let statement = &ast[0];
let sql = statement.to_string();
println!("SQL: {}", sql);
self.mem.query(sql).await
})
.unwrap();
// Convert the result to sea-orm's format
let ret: Vec<serde_json::Value> = ret.take(0).unwrap();
println!("SQL query result: {}", serde_json::to_string(&ret).unwrap());
let ret = ret
.iter()
.map(|row| {
let mut map = serde_json::Map::new();
for (k, v) in row.as_object().unwrap().iter() {
if k == "id" {
// Get `tb` and `id` columns from surrealdb
// and convert them to sea-orm's `id`
let tb = v.as_object().unwrap().get("tb").unwrap().to_string();
let id = v
.as_object()
.unwrap()
.get("id")
.unwrap()
.get("String")
.unwrap();
// Remove the quotes
let tb = tb.to_string().replace("\"", "");
let id = id.to_string().replace("\"", "");
map.insert("id".to_owned(), format!("{}:{}", tb, id).into());
continue;
}
map.insert(k.to_owned(), v.to_owned());
}
serde_json::Value::Object(map)
})
.map(|v: serde_json::Value| {
let mut ret: BTreeMap<String, sea_orm::Value> = BTreeMap::new();
for (k, v) in v.as_object().unwrap().iter() {
ret.insert(
k.to_owned(),
match v {
serde_json::Value::Bool(b) => {
sea_orm::Value::TinyInt(if *b { Some(1) } else { Some(0) })
}
serde_json::Value::Number(n) => {
if n.is_i64() {
sea_orm::Value::BigInt(Some(n.as_i64().unwrap()))
} else if n.is_u64() {
sea_orm::Value::BigUnsigned(Some(n.as_u64().unwrap()))
} else if n.is_f64() {
sea_orm::Value::Double(Some(n.as_f64().unwrap()))
} else {
unreachable!()
}
}
serde_json::Value::String(s) => {
sea_orm::Value::String(Some(Box::new(s.to_owned())))
}
_ => sea_orm::Value::Json(Some(Box::new(v.to_owned()))),
},
);
}
ProxyRow { values: ret }
})
.collect::<Vec<_>>();
Ok(ret)
}
fn execute(&self, statement: Statement) -> Result<ProxyExecResult, DbErr> {
async_std::task::block_on(async {
if let Some(values) = statement.values {
// Replace all the '?' with the statement values
use sqlparser::ast::{Expr, Value};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
let dialect = GenericDialect {};
let mut ast = Parser::parse_sql(&dialect, statement.sql.as_str()).unwrap();
match &mut ast[0] {
sqlparser::ast::Statement::Insert {
table_name,
columns,
source,
..
} => {
// Replace the table name's quote style
table_name.0[0].quote_style = Some('`');
// Replace all the column names' quote style
for item in columns.iter_mut() {
item.quote_style = Some('`');
}
// Convert the values to sea-orm's format
if let Some(obj) = source {
match &mut *obj.body {
sqlparser::ast::SetExpr::Values(obj) => {
for (mut item, val) in
obj.rows[0].iter_mut().zip(values.0.iter())
{
match &mut item {
Expr::Value(item) => {
*item = match val {
sea_orm::Value::String(val) => {
Value::SingleQuotedString(match val {
Some(val) => val.to_string(),
None => "".to_string(),
})
}
sea_orm::Value::BigInt(val) => Value::Number(
val.unwrap_or(0).to_string(),
false,
),
_ => todo!(),
};
}
_ => todo!(),
}
}
}
_ => todo!(),
}
}
}
_ => todo!(),
}
let statement = &ast[0];
let sql = statement.to_string();
println!("SQL: {}", sql);
self.mem.query(sql).await
} else {
self.mem.query(statement.sql).await
}
})
.unwrap();
Ok(ProxyExecResult {
last_insert_id: 1,
rows_affected: 1,
})
}
}
#[async_std::main]
async fn main() {
let mem = Surreal::new::<Mem>(()).await.unwrap();
mem.use_ns("test").use_db("post").await.unwrap();
let db = Database::connect_proxy(
DbBackend::MySql,
Arc::new(Mutex::new(Box::new(ProxyDb { mem }))),
)
.await
.unwrap();
println!("Initialized");
let data = ActiveModel {
title: Set("Homo".to_owned()),
text: Set("いいよ、来いよ".to_owned()),
..Default::default()
};
Entity::insert(data).exec(&db).await.unwrap();
let data = ActiveModel {
title: Set("Homo".to_owned()),
text: Set("そうだよ".to_owned()),
..Default::default()
};
Entity::insert(data).exec(&db).await.unwrap();
let data = ActiveModel {
title: Set("Homo".to_owned()),
text: Set("悔い改めて".to_owned()),
..Default::default()
};
Entity::insert(data).exec(&db).await.unwrap();
let list = Entity::find().all(&db).await.unwrap().to_vec();
println!("Result: {:?}", list);
}
#[cfg(test)]
mod tests {
#[smol_potat::test]
async fn try_run() {
crate::main()
}
}

View File

@ -10,7 +10,7 @@ use url::Url;
#[cfg(feature = "sqlx-dep")]
use sqlx::pool::PoolConnection;
#[cfg(feature = "mock")]
#[cfg(any(feature = "mock", feature = "proxy"))]
use std::sync::Arc;
/// Handle a database connection depending on the backend
@ -20,15 +20,23 @@ pub enum DatabaseConnection {
/// Create a MYSQL database connection and pool
#[cfg(feature = "sqlx-mysql")]
SqlxMySqlPoolConnection(crate::SqlxMySqlPoolConnection),
/// Create a PostgreSQL database connection and pool
/// Create a PostgreSQL database connection and pool
#[cfg(feature = "sqlx-postgres")]
SqlxPostgresPoolConnection(crate::SqlxPostgresPoolConnection),
/// Create a SQLite database connection and pool
/// Create a SQLite database connection and pool
#[cfg(feature = "sqlx-sqlite")]
SqlxSqlitePoolConnection(crate::SqlxSqlitePoolConnection),
/// Create a Mock database connection useful for testing
/// Create a Mock database connection useful for testing
#[cfg(feature = "mock")]
MockDatabaseConnection(Arc<crate::MockDatabaseConnection>),
/// Create a Proxy database connection useful for proxying
#[cfg(feature = "proxy")]
ProxyDatabaseConnection(Arc<crate::ProxyDatabaseConnection>),
/// The connection to the database has been severed
Disconnected,
}
@ -66,7 +74,9 @@ pub(crate) enum InnerConnection {
#[cfg(feature = "sqlx-sqlite")]
Sqlite(PoolConnection<sqlx::Sqlite>),
#[cfg(feature = "mock")]
Mock(std::sync::Arc<crate::MockDatabaseConnection>),
Mock(Arc<crate::MockDatabaseConnection>),
#[cfg(feature = "proxy")]
Proxy(Arc<crate::ProxyDatabaseConnection>),
}
impl std::fmt::Debug for DatabaseConnection {
@ -83,6 +93,8 @@ impl std::fmt::Debug for DatabaseConnection {
Self::SqlxSqlitePoolConnection(_) => "SqlxSqlitePoolConnection",
#[cfg(feature = "mock")]
Self::MockDatabaseConnection(_) => "MockDatabaseConnection",
#[cfg(feature = "proxy")]
Self::ProxyDatabaseConnection(_) => "ProxyDatabaseConnection",
Self::Disconnected => "Disconnected",
}
)
@ -101,6 +113,8 @@ impl ConnectionTrait for DatabaseConnection {
DatabaseConnection::SqlxSqlitePoolConnection(_) => DbBackend::Sqlite,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.get_database_backend(),
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => conn.get_database_backend(),
DatabaseConnection::Disconnected => panic!("Disconnected"),
}
}
@ -117,6 +131,8 @@ impl ConnectionTrait for DatabaseConnection {
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.execute(stmt).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.execute(stmt),
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => conn.execute(stmt),
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
}
@ -141,6 +157,12 @@ impl ConnectionTrait for DatabaseConnection {
let stmt = Statement::from_string(db_backend, sql);
conn.execute(stmt)
}
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => {
let db_backend = conn.get_database_backend();
let stmt = Statement::from_string(db_backend, sql);
conn.execute(stmt)
}
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
}
@ -157,6 +179,8 @@ impl ConnectionTrait for DatabaseConnection {
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.query_one(stmt).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.query_one(stmt),
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => conn.query_one(stmt),
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
}
@ -173,6 +197,8 @@ impl ConnectionTrait for DatabaseConnection {
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.query_all(stmt).await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.query_all(stmt),
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => conn.query_all(stmt),
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
}
@ -205,6 +231,10 @@ impl StreamTrait for DatabaseConnection {
DatabaseConnection::MockDatabaseConnection(conn) => {
Ok(crate::QueryStream::from((Arc::clone(conn), stmt, None)))
}
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => {
Ok(crate::QueryStream::from((Arc::clone(conn), stmt, None)))
}
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
})
@ -226,6 +256,10 @@ impl TransactionTrait for DatabaseConnection {
DatabaseConnection::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
}
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => {
DatabaseTransaction::new_proxy(conn.clone(), None).await
}
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
}
@ -253,6 +287,10 @@ impl TransactionTrait for DatabaseConnection {
DatabaseConnection::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
}
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => {
DatabaseTransaction::new_proxy(conn.clone(), None).await
}
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
}
@ -289,6 +327,13 @@ impl TransactionTrait for DatabaseConnection {
.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => {
let transaction = DatabaseTransaction::new_proxy(conn.clone(), None)
.await
.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}
DatabaseConnection::Disconnected => Err(conn_err("Disconnected").into()),
}
}
@ -333,6 +378,13 @@ impl TransactionTrait for DatabaseConnection {
.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => {
let transaction = DatabaseTransaction::new_proxy(conn.clone(), None)
.await
.map_err(TransactionError::Connection)?;
transaction.run(_callback).await
}
DatabaseConnection::Disconnected => Err(conn_err("Disconnected").into()),
}
}
@ -367,6 +419,21 @@ impl DatabaseConnection {
}
}
#[cfg(feature = "proxy")]
impl DatabaseConnection {
/// Generate a database connection for testing the Proxy database
///
/// # Panics
///
/// Panics if [DbConn] is not a proxy connection.
pub fn as_proxy_connection(&self) -> &crate::ProxyDatabaseConnection {
match self {
DatabaseConnection::ProxyDatabaseConnection(proxy_conn) => proxy_conn,
_ => panic!("Not proxy connection"),
}
}
}
impl DatabaseConnection {
/// Sets a callback to metric this connection
pub fn set_metric_callback<F>(&mut self, _callback: F)
@ -401,6 +468,8 @@ impl DatabaseConnection {
DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.ping().await,
#[cfg(feature = "mock")]
DatabaseConnection::MockDatabaseConnection(conn) => conn.ping(),
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(conn) => conn.ping(),
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
}
@ -419,6 +488,11 @@ impl DatabaseConnection {
// Nothing to cleanup, we just consume the `DatabaseConnection`
Ok(())
}
#[cfg(feature = "proxy")]
DatabaseConnection::ProxyDatabaseConnection(_) => {
// Nothing to cleanup, we just consume the `DatabaseConnection`
Ok(())
}
DatabaseConnection::Disconnected => Err(conn_err("Disconnected")),
}
}

View File

@ -30,7 +30,8 @@ pub struct MockExecResult {
/// which is just a [BTreeMap]<[String], [Value]>
#[derive(Clone, Debug)]
pub struct MockRow {
values: BTreeMap<String, Value>,
/// The values of the single row
pub values: BTreeMap<String, Value>,
}
/// A trait to get a [MockRow] from a type useful for testing in the [MockDatabase]

View File

@ -5,6 +5,9 @@ mod db_connection;
#[cfg(feature = "mock")]
#[cfg_attr(docsrs, doc(cfg(feature = "mock")))]
mod mock;
#[cfg(feature = "proxy")]
#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))]
mod proxy;
mod statement;
mod stream;
mod transaction;
@ -14,6 +17,9 @@ pub use db_connection::*;
#[cfg(feature = "mock")]
#[cfg_attr(docsrs, doc(cfg(feature = "mock")))]
pub use mock::*;
#[cfg(feature = "proxy")]
#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))]
pub use proxy::*;
pub use statement::*;
use std::borrow::Cow;
pub use stream::*;
@ -79,11 +85,41 @@ impl Database {
if crate::MockDatabaseConnector::accepts(&opt.url) {
return crate::MockDatabaseConnector::connect(&opt.url).await;
}
Err(conn_err(format!(
"The connection string '{}' has no supporting driver.",
opt.url
)))
}
/// Method to create a [DatabaseConnection] on a proxy database
#[cfg(feature = "proxy")]
#[instrument(level = "trace", skip(proxy_func_arc))]
pub async fn connect_proxy(
db_type: DbBackend,
proxy_func_arc: std::sync::Arc<std::sync::Mutex<Box<dyn ProxyDatabaseTrait>>>,
) -> Result<DatabaseConnection, DbErr> {
match db_type {
DbBackend::MySql => {
return crate::ProxyDatabaseConnector::connect(
DbBackend::MySql,
proxy_func_arc.to_owned(),
);
}
DbBackend::Postgres => {
return crate::ProxyDatabaseConnector::connect(
DbBackend::Postgres,
proxy_func_arc.to_owned(),
);
}
DbBackend::Sqlite => {
return crate::ProxyDatabaseConnector::connect(
DbBackend::Sqlite,
proxy_func_arc.to_owned(),
);
}
}
}
}
impl<T> From<T> for ConnectOptions

925
src/database/proxy.rs Normal file
View File

@ -0,0 +1,925 @@
use crate::{error::*, ExecResult, ExecResultHolder, QueryResult, QueryResultRow, Statement};
use sea_query::{Value, ValueType};
use std::{collections::BTreeMap, fmt::Debug};
/// Defines the [ProxyDatabaseTrait] to save the functions
pub trait ProxyDatabaseTrait: Send + Sync + std::fmt::Debug {
/// Execute a query in the [ProxyDatabase], and return the query results
fn query(&self, statement: Statement) -> Result<Vec<ProxyRow>, DbErr>;
/// Execute a command in the [ProxyDatabase], and report the number of rows affected
fn execute(&self, statement: Statement) -> Result<ProxyExecResult, DbErr>;
/// Begin a transaction in the [ProxyDatabase]
fn begin(&self) {}
/// Commit a transaction in the [ProxyDatabase]
fn commit(&self) {}
/// Rollback a transaction in the [ProxyDatabase]
fn rollback(&self) {}
/// Ping the [ProxyDatabase], it should return an error if the database is not available
fn ping(&self) -> Result<(), DbErr> {
Ok(())
}
}
/// Defines the results obtained from a [ProxyDatabase]
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct ProxyExecResult {
/// The last inserted id on auto-increment
pub last_insert_id: u64,
/// The number of rows affected by the database operation
pub rows_affected: u64,
}
impl ProxyExecResult {
/// Create a new [ProxyExecResult] from the last inserted id and the number of rows affected
pub fn new(last_insert_id: u64, rows_affected: u64) -> Self {
Self {
last_insert_id,
rows_affected,
}
}
}
impl Default for ExecResultHolder {
fn default() -> Self {
Self::Proxy(ProxyExecResult::default())
}
}
impl From<ProxyExecResult> for ExecResult {
fn from(result: ProxyExecResult) -> Self {
Self {
result: ExecResultHolder::Proxy(result),
}
}
}
impl From<ExecResult> for ProxyExecResult {
fn from(result: ExecResult) -> Self {
match result.result {
#[cfg(feature = "sqlx-mysql")]
ExecResultHolder::SqlxMySql(result) => Self {
last_insert_id: result.last_insert_id() as u64,
rows_affected: result.rows_affected(),
},
#[cfg(feature = "sqlx-postgres")]
ExecResultHolder::SqlxPostgres(result) => Self {
last_insert_id: 0,
rows_affected: result.rows_affected(),
},
#[cfg(feature = "sqlx-sqlite")]
ExecResultHolder::SqlxSqlite(result) => Self {
last_insert_id: result.last_insert_rowid() as u64,
rows_affected: result.rows_affected(),
},
#[cfg(feature = "mock")]
ExecResultHolder::Mock(result) => Self {
last_insert_id: result.last_insert_id,
rows_affected: result.rows_affected,
},
ExecResultHolder::Proxy(result) => result,
}
}
}
/// Defines the structure of a Row for the [ProxyDatabase]
/// which is just a [BTreeMap]<[String], [Value]>
#[derive(Clone, Debug)]
pub struct ProxyRow {
/// The values of the single row
pub values: BTreeMap<String, Value>,
}
impl ProxyRow {
/// Create a new [ProxyRow] from a [BTreeMap]<[String], [Value]>
pub fn new(values: BTreeMap<String, Value>) -> Self {
Self { values }
}
}
impl Default for ProxyRow {
fn default() -> Self {
Self {
values: BTreeMap::new(),
}
}
}
impl From<BTreeMap<String, Value>> for ProxyRow {
fn from(values: BTreeMap<String, Value>) -> Self {
Self { values }
}
}
impl From<ProxyRow> for BTreeMap<String, Value> {
fn from(row: ProxyRow) -> Self {
row.values
}
}
impl From<ProxyRow> for Vec<(String, Value)> {
fn from(row: ProxyRow) -> Self {
row.values.into_iter().collect()
}
}
impl From<ProxyRow> for QueryResult {
fn from(row: ProxyRow) -> Self {
QueryResult {
row: QueryResultRow::Proxy(row),
}
}
}
#[cfg(feature = "with-json")]
impl Into<serde_json::Value> for ProxyRow {
fn into(self) -> serde_json::Value {
self.values
.into_iter()
.map(|(k, v)| (k, sea_query::sea_value_to_json_value(&v)))
.collect()
}
}
/// Convert [QueryResult] to [ProxyRow]
pub fn from_query_result_to_proxy_row(result: &QueryResult) -> ProxyRow {
match &result.row {
#[cfg(feature = "sqlx-mysql")]
QueryResultRow::SqlxMySql(row) => from_sqlx_mysql_row_to_proxy_row(&row),
#[cfg(feature = "sqlx-postgres")]
QueryResultRow::SqlxPostgres(row) => from_sqlx_postgres_row_to_proxy_row(&row),
#[cfg(feature = "sqlx-sqlite")]
QueryResultRow::SqlxSqlite(row) => from_sqlx_sqlite_row_to_proxy_row(&row),
#[cfg(feature = "mock")]
QueryResultRow::Mock(row) => ProxyRow {
values: row.values.clone(),
},
QueryResultRow::Proxy(row) => row.to_owned(),
}
}
#[cfg(feature = "sqlx-mysql")]
pub(crate) fn from_sqlx_mysql_row_to_proxy_row(row: &sqlx::mysql::MySqlRow) -> ProxyRow {
// https://docs.rs/sqlx-mysql/0.7.2/src/sqlx_mysql/protocol/text/column.rs.html
// https://docs.rs/sqlx-mysql/0.7.2/sqlx_mysql/types/index.html
use sqlx::{Column, Row, TypeInfo};
ProxyRow {
values: row
.columns()
.iter()
.map(|c| {
(
c.name().to_string(),
match c.type_info().name() {
"TINYINT(1)" | "BOOLEAN" => Value::Bool(Some(
row.try_get(c.ordinal()).expect("Failed to get boolean"),
)),
"TINYINT UNSIGNED" => Value::TinyUnsigned(Some(
row.try_get(c.ordinal())
.expect("Failed to get unsigned tiny integer"),
)),
"SMALLINT UNSIGNED" => Value::SmallUnsigned(Some(
row.try_get(c.ordinal())
.expect("Failed to get unsigned small integer"),
)),
"INT UNSIGNED" => Value::Unsigned(Some(
row.try_get(c.ordinal())
.expect("Failed to get unsigned integer"),
)),
"MEDIUMINT UNSIGNED" | "BIGINT UNSIGNED" => Value::BigUnsigned(Some(
row.try_get(c.ordinal())
.expect("Failed to get unsigned big integer"),
)),
"TINYINT" => Value::TinyInt(Some(
row.try_get(c.ordinal())
.expect("Failed to get tiny integer"),
)),
"SMALLINT" => Value::SmallInt(Some(
row.try_get(c.ordinal())
.expect("Failed to get small integer"),
)),
"INT" => Value::Int(Some(
row.try_get(c.ordinal()).expect("Failed to get integer"),
)),
"MEDIUMINT" | "BIGINT" => Value::BigInt(Some(
row.try_get(c.ordinal()).expect("Failed to get big integer"),
)),
"FLOAT" => Value::Float(Some(
row.try_get(c.ordinal()).expect("Failed to get float"),
)),
"DOUBLE" => Value::Double(Some(
row.try_get(c.ordinal()).expect("Failed to get double"),
)),
"BIT" | "BINARY" | "VARBINARY" | "TINYBLOB" | "BLOB" | "MEDIUMBLOB"
| "LONGBLOB" => Value::Bytes(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get bytes"),
))),
"CHAR" | "VARCHAR" | "TINYTEXT" | "TEXT" | "MEDIUMTEXT" | "LONGTEXT" => {
Value::String(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get string"),
)))
}
#[cfg(feature = "with-chrono")]
"TIMESTAMP" => Value::ChronoDateTimeUtc(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timestamp"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"TIMESTAMP" => Value::TimeDateTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timestamp"),
))),
#[cfg(feature = "with-chrono")]
"DATE" => Value::ChronoDate(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get date"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"DATE" => Value::TimeDate(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get date"),
))),
#[cfg(feature = "with-chrono")]
"TIME" => Value::ChronoTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get time"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"TIME" => Value::TimeTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get time"),
))),
#[cfg(feature = "with-chrono")]
"DATETIME" => Value::ChronoDateTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get datetime"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"DATETIME" => Value::TimeDateTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get datetime"),
))),
#[cfg(feature = "with-chrono")]
"YEAR" => Value::ChronoDate(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get year"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"YEAR" => Value::TimeDate(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get year"),
))),
"ENUM" | "SET" | "GEOMETRY" => Value::String(Some(Box::new(
row.try_get(c.ordinal())
.expect("Failed to get serialized string"),
))),
#[cfg(feature = "with-bigdecimal")]
"DECIMAL" => Value::BigDecimal(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get decimal"),
))),
#[cfg(all(
feature = "with-rust_decimal",
not(feature = "with-bigdecimal")
))]
"DECIMAL" => Value::Decimal(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get decimal"),
))),
#[cfg(feature = "with-json")]
"JSON" => Value::Json(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get json"),
))),
_ => unreachable!("Unknown column type: {}", c.type_info().name()),
},
)
})
.collect(),
}
}
#[cfg(feature = "sqlx-postgres")]
pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> ProxyRow {
// https://docs.rs/sqlx-postgres/0.7.2/src/sqlx_postgres/type_info.rs.html
// https://docs.rs/sqlx-postgres/0.7.2/sqlx_postgres/types/index.html
use sqlx::{Column, Row, TypeInfo};
ProxyRow {
values: row
.columns()
.iter()
.map(|c| {
(
c.name().to_string(),
match c.type_info().name() {
"BOOL" => Value::Bool(Some(
row.try_get(c.ordinal()).expect("Failed to get boolean"),
)),
#[cfg(feature = "postgres-array")]
"BOOL[]" => Value::Array(
sea_query::ArrayType::Bool,
Some(Box::new(
row.try_get::<Vec<bool>, _>(c.ordinal())
.expect("Failed to get boolean array")
.iter()
.map(|val| Value::Bool(Some(*val)))
.collect(),
)),
),
"\"CHAR\"" => Value::TinyInt(Some(
row.try_get(c.ordinal())
.expect("Failed to get small integer"),
)),
#[cfg(feature = "postgres-array")]
"\"CHAR\"[]" => Value::Array(
sea_query::ArrayType::TinyInt,
Some(Box::new(
row.try_get::<Vec<i8>, _>(c.ordinal())
.expect("Failed to get small integer array")
.iter()
.map(|val| Value::TinyInt(Some(*val)))
.collect(),
)),
),
"SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(Some(
row.try_get(c.ordinal())
.expect("Failed to get small integer"),
)),
#[cfg(feature = "postgres-array")]
"SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array(
sea_query::ArrayType::SmallInt,
Some(Box::new(
row.try_get::<Vec<i16>, _>(c.ordinal())
.expect("Failed to get small integer array")
.iter()
.map(|val| Value::SmallInt(Some(*val)))
.collect(),
)),
),
"INT" | "SERIAL" | "INT4" => Value::Int(Some(
row.try_get(c.ordinal()).expect("Failed to get integer"),
)),
#[cfg(feature = "postgres-array")]
"INT[]" | "SERIAL[]" | "INT4[]" => Value::Array(
sea_query::ArrayType::Int,
Some(Box::new(
row.try_get::<Vec<i32>, _>(c.ordinal())
.expect("Failed to get integer array")
.iter()
.map(|val| Value::Int(Some(*val)))
.collect(),
)),
),
"BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(Some(
row.try_get(c.ordinal()).expect("Failed to get big integer"),
)),
#[cfg(feature = "postgres-array")]
"BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array(
sea_query::ArrayType::BigInt,
Some(Box::new(
row.try_get::<Vec<i64>, _>(c.ordinal())
.expect("Failed to get big integer array")
.iter()
.map(|val| Value::BigInt(Some(*val)))
.collect(),
)),
),
"FLOAT4" | "REAL" => Value::Float(Some(
row.try_get(c.ordinal()).expect("Failed to get float"),
)),
#[cfg(feature = "postgres-array")]
"FLOAT4[]" | "REAL[]" => Value::Array(
sea_query::ArrayType::Float,
Some(Box::new(
row.try_get::<Vec<f32>, _>(c.ordinal())
.expect("Failed to get float array")
.iter()
.map(|val| Value::Float(Some(*val)))
.collect(),
)),
),
"FLOAT8" | "DOUBLE PRECISION" => Value::Double(Some(
row.try_get(c.ordinal()).expect("Failed to get double"),
)),
#[cfg(feature = "postgres-array")]
"FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array(
sea_query::ArrayType::Double,
Some(Box::new(
row.try_get::<Vec<f64>, _>(c.ordinal())
.expect("Failed to get double array")
.iter()
.map(|val| Value::Double(Some(*val)))
.collect(),
)),
),
"VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get string"),
))),
#[cfg(feature = "postgres-array")]
"VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array(
sea_query::ArrayType::String,
Some(Box::new(
row.try_get::<Vec<String>, _>(c.ordinal())
.expect("Failed to get string array")
.iter()
.map(|val| Value::String(Some(Box::new(val.clone()))))
.collect(),
)),
),
"BYTEA" => Value::Bytes(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get bytes"),
))),
#[cfg(feature = "postgres-array")]
"BYTEA[]" => Value::Array(
sea_query::ArrayType::Bytes,
Some(Box::new(
row.try_get::<Vec<Vec<u8>>, _>(c.ordinal())
.expect("Failed to get bytes array")
.iter()
.map(|val| Value::Bytes(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(feature = "with-bigdecimal")]
"NUMERIC" => Value::BigDecimal(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get numeric"),
))),
#[cfg(all(
feature = "with-rust_decimal",
not(feature = "with-bigdecimal")
))]
"NUMERIC" => Value::Decimal(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get numeric"),
))),
#[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))]
"NUMERIC[]" => Value::Array(
sea_query::ArrayType::BigDecimal,
Some(Box::new(
row.try_get::<Vec<bigdecimal::BigDecimal>, _>(c.ordinal())
.expect("Failed to get numeric array")
.iter()
.map(|val| Value::BigDecimal(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(all(
feature = "with-rust_decimal",
not(feature = "with-bigdecimal"),
feature = "postgres-array"
))]
"NUMERIC[]" => Value::Array(
sea_query::ArrayType::Decimal,
Some(Box::new(
row.try_get::<Vec<rust_decimal::Decimal>, _>(c.ordinal())
.expect("Failed to get numeric array")
.iter()
.map(|val| Value::Decimal(Some(Box::new(val.clone()))))
.collect(),
)),
),
"OID" => Value::BigInt(Some(
row.try_get(c.ordinal()).expect("Failed to get oid"),
)),
#[cfg(feature = "postgres-array")]
"OID[]" => Value::Array(
sea_query::ArrayType::BigInt,
Some(Box::new(
row.try_get::<Vec<i64>, _>(c.ordinal())
.expect("Failed to get oid array")
.iter()
.map(|val| Value::BigInt(Some(*val)))
.collect(),
)),
),
"JSON" | "JSONB" => Value::Json(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get json"),
))),
#[cfg(any(feature = "json-array", feature = "postgres-array"))]
"JSON[]" | "JSONB[]" => Value::Array(
sea_query::ArrayType::Json,
Some(Box::new(
row.try_get::<Vec<serde_json::Value>, _>(c.ordinal())
.expect("Failed to get json array")
.iter()
.map(|val| Value::Json(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(feature = "with-ipnetwork")]
"INET" | "CIDR" => Value::IpNetwork(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get ip address"),
))),
#[cfg(feature = "with-ipnetwork")]
"INET[]" | "CIDR[]" => Value::Array(
sea_query::ArrayType::IpNetwork,
Some(Box::new(
row.try_get::<Vec<ipnetwork::IpNetwork>, _>(c.ordinal())
.expect("Failed to get ip address array")
.iter()
.map(|val| Value::IpNetwork(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(feature = "with-mac_address")]
"MACADDR" | "MACADDR8" => Value::MacAddress(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get mac address"),
))),
#[cfg(all(feature = "with-mac_address", feature = "postgres-array"))]
"MACADDR[]" | "MACADDR8[]" => Value::Array(
sea_query::ArrayType::MacAddress,
Some(Box::new(
row.try_get::<Vec<mac_address::MacAddress>, _>(c.ordinal())
.expect("Failed to get mac address array")
.iter()
.map(|val| Value::MacAddress(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(feature = "with-chrono")]
"TIMESTAMP" => Value::ChronoDateTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timestamp"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"TIMESTAMP" => Value::TimeDateTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timestamp"),
))),
#[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
"TIMESTAMP[]" => Value::Array(
sea_query::ArrayType::ChronoDateTime,
Some(Box::new(
row.try_get::<Vec<chrono::NaiveDateTime>, _>(c.ordinal())
.expect("Failed to get timestamp array")
.iter()
.map(|val| Value::ChronoDateTime(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(all(
feature = "with-time",
not(feature = "with-chrono"),
feature = "postgres-array"
))]
"TIMESTAMP[]" => Value::Array(
sea_query::ArrayType::TimeDateTime,
Some(Box::new(
row.try_get::<Vec<time::OffsetDateTime>, _>(c.ordinal())
.expect("Failed to get timestamp array")
.iter()
.map(|val| Value::TimeDateTime(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(feature = "with-chrono")]
"DATE" => Value::ChronoDate(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get date"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"DATE" => Value::TimeDate(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get date"),
))),
#[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
"DATE[]" => Value::Array(
sea_query::ArrayType::ChronoDate,
Some(Box::new(
row.try_get::<Vec<chrono::NaiveDate>, _>(c.ordinal())
.expect("Failed to get date array")
.iter()
.map(|val| Value::ChronoDate(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(all(
feature = "with-time",
not(feature = "with-chrono"),
feature = "postgres-array"
))]
"DATE[]" => Value::Array(
sea_query::ArrayType::TimeDate,
Some(Box::new(
row.try_get::<Vec<time::Date>, _>(c.ordinal())
.expect("Failed to get date array")
.iter()
.map(|val| Value::TimeDate(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(feature = "with-chrono")]
"TIME" => Value::ChronoTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get time"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"TIME" => Value::TimeTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get time"),
))),
#[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
"TIME[]" => Value::Array(
sea_query::ArrayType::ChronoTime,
Some(Box::new(
row.try_get::<Vec<chrono::NaiveTime>, _>(c.ordinal())
.expect("Failed to get time array")
.iter()
.map(|val| Value::ChronoTime(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(all(
feature = "with-time",
not(feature = "with-chrono"),
feature = "postgres-array"
))]
"TIME[]" => Value::Array(
sea_query::ArrayType::TimeTime,
Some(Box::new(
row.try_get::<Vec<time::Time>, _>(c.ordinal())
.expect("Failed to get time array")
.iter()
.map(|val| Value::TimeTime(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(feature = "with-chrono")]
"TIMESTAMPTZ" => Value::ChronoDateTimeUtc(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timestamptz"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"TIMESTAMPTZ" => Value::TimeDateTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timestamptz"),
))),
#[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
"TIMESTAMPTZ[]" => Value::Array(
sea_query::ArrayType::ChronoDateTimeUtc,
Some(Box::new(
row.try_get::<Vec<chrono::DateTime<chrono::Utc>>, _>(c.ordinal())
.expect("Failed to get timestamptz array")
.iter()
.map(|val| {
Value::ChronoDateTimeUtc(Some(Box::new(val.clone())))
})
.collect(),
)),
),
#[cfg(all(
feature = "with-time",
not(feature = "with-chrono"),
feature = "postgres-array"
))]
"TIMESTAMPTZ[]" => Value::Array(
sea_query::ArrayType::TimeDateTime,
Some(Box::new(
row.try_get::<Vec<time::OffsetDateTime>, _>(c.ordinal())
.expect("Failed to get timestamptz array")
.iter()
.map(|val| Value::TimeDateTime(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(feature = "with-chrono")]
"TIMETZ" => Value::ChronoTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timetz"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"TIMETZ" => Value::TimeTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timetz"),
))),
#[cfg(all(feature = "with-chrono", feature = "postgres-array"))]
"TIMETZ[]" => Value::Array(
sea_query::ArrayType::ChronoTime,
Some(Box::new(
row.try_get::<Vec<chrono::NaiveTime>, _>(c.ordinal())
.expect("Failed to get timetz array")
.iter()
.map(|val| Value::ChronoTime(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(all(
feature = "with-time",
not(feature = "with-chrono"),
feature = "postgres-array"
))]
"TIMETZ[]" => Value::Array(
sea_query::ArrayType::TimeTime,
Some(Box::new(
row.try_get::<Vec<time::Time>, _>(c.ordinal())
.expect("Failed to get timetz array")
.iter()
.map(|val| Value::TimeTime(Some(Box::new(val.clone()))))
.collect(),
)),
),
#[cfg(feature = "with-uuid")]
"UUID" => Value::Uuid(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get uuid"),
))),
#[cfg(all(feature = "with-uuid", feature = "postgres-array"))]
"UUID[]" => Value::Array(
sea_query::ArrayType::Uuid,
Some(Box::new(
row.try_get::<Vec<uuid::Uuid>, _>(c.ordinal())
.expect("Failed to get uuid array")
.iter()
.map(|val| Value::Uuid(Some(Box::new(val.clone()))))
.collect(),
)),
),
_ => unreachable!("Unknown column type: {}", c.type_info().name()),
},
)
})
.collect(),
}
}
#[cfg(feature = "sqlx-sqlite")]
pub(crate) fn from_sqlx_sqlite_row_to_proxy_row(row: &sqlx::sqlite::SqliteRow) -> ProxyRow {
// https://docs.rs/sqlx-sqlite/0.7.2/src/sqlx_sqlite/type_info.rs.html
// https://docs.rs/sqlx-sqlite/0.7.2/sqlx_sqlite/types/index.html
use sqlx::{Column, Row, TypeInfo};
ProxyRow {
values: row
.columns()
.iter()
.map(|c| {
(
c.name().to_string(),
match c.type_info().name() {
"BOOLEAN" => Value::Bool(Some(
row.try_get(c.ordinal()).expect("Failed to get boolean"),
)),
"INTEGER" => Value::Int(Some(
row.try_get(c.ordinal()).expect("Failed to get integer"),
)),
"BIGINT" | "INT8" => Value::BigInt(Some(
row.try_get(c.ordinal()).expect("Failed to get big integer"),
)),
"REAL" => Value::Double(Some(
row.try_get(c.ordinal()).expect("Failed to get double"),
)),
"TEXT" => Value::String(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get string"),
))),
"BLOB" => Value::Bytes(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get bytes"),
))),
#[cfg(feature = "with-chrono")]
"DATETIME" => Value::ChronoDateTimeUtc(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timestamp"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"DATETIME" => Value::TimeDateTimeWithTimeZone(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get timestamp"),
))),
#[cfg(feature = "with-chrono")]
"DATE" => Value::ChronoDate(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get date"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"DATE" => Value::TimeDate(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get date"),
))),
#[cfg(feature = "with-chrono")]
"TIME" => Value::ChronoTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get time"),
))),
#[cfg(all(feature = "with-time", not(feature = "with-chrono")))]
"TIME" => Value::TimeTime(Some(Box::new(
row.try_get(c.ordinal()).expect("Failed to get time"),
))),
_ => unreachable!("Unknown column type: {}", c.type_info().name()),
},
)
})
.collect(),
}
}
impl ProxyRow {
/// Get a value from the [ProxyRow]
pub fn try_get<T, I: crate::ColIdx>(&self, index: I) -> Result<T, DbErr>
where
T: ValueType,
{
if let Some(index) = index.as_str() {
T::try_from(
self.values
.get(index)
.ok_or_else(|| query_err(format!("No column for ColIdx {index:?}")))?
.clone(),
)
.map_err(type_err)
} else if let Some(index) = index.as_usize() {
let (_, value) = self
.values
.iter()
.nth(*index)
.ok_or_else(|| query_err(format!("Column at index {index} not found")))?;
T::try_from(value.clone()).map_err(type_err)
} else {
unreachable!("Missing ColIdx implementation for ProxyRow");
}
}
/// An iterator over the keys and values of a proxy row
pub fn into_column_value_tuples(self) -> impl Iterator<Item = (String, Value)> {
self.values.into_iter()
}
}
#[cfg(test)]
mod tests {
use crate::{
entity::*, tests_cfg::*, Database, DbBackend, DbErr, ProxyDatabaseTrait, ProxyExecResult,
ProxyRow, Statement,
};
use std::sync::{Arc, Mutex};
#[derive(Debug)]
struct ProxyDb {}
impl ProxyDatabaseTrait for ProxyDb {
fn query(&self, statement: Statement) -> Result<Vec<ProxyRow>, DbErr> {
println!("SQL query: {}", statement.sql);
Ok(vec![].into())
}
fn execute(&self, statement: Statement) -> Result<ProxyExecResult, DbErr> {
println!("SQL execute: {}", statement.sql);
Ok(ProxyExecResult {
last_insert_id: 1,
rows_affected: 1,
})
}
}
#[smol_potat::test]
async fn create_proxy_conn() {
let _db =
Database::connect_proxy(DbBackend::MySql, Arc::new(Mutex::new(Box::new(ProxyDb {}))))
.await
.unwrap();
}
#[smol_potat::test]
async fn select_rows() {
let db =
Database::connect_proxy(DbBackend::MySql, Arc::new(Mutex::new(Box::new(ProxyDb {}))))
.await
.unwrap();
let _ = cake::Entity::find().all(&db).await;
}
#[smol_potat::test]
async fn insert_one_row() {
let db =
Database::connect_proxy(DbBackend::MySql, Arc::new(Mutex::new(Box::new(ProxyDb {}))))
.await
.unwrap();
let item = cake::ActiveModel {
id: NotSet,
name: Set("Alice".to_string()),
};
cake::Entity::insert(item).exec(&db).await.unwrap();
}
}

View File

@ -2,7 +2,7 @@
use tracing::instrument;
#[cfg(feature = "mock")]
#[cfg(any(feature = "mock", feature = "proxy"))]
use std::sync::Arc;
use std::{pin::Pin, task::Poll};
@ -105,6 +105,25 @@ impl
}
}
#[cfg(feature = "proxy")]
impl
From<(
Arc<crate::ProxyDatabaseConnection>,
Statement,
Option<crate::metric::Callback>,
)> for QueryStream
{
fn from(
(conn, stmt, metric_callback): (
Arc<crate::ProxyDatabaseConnection>,
Statement,
Option<crate::metric::Callback>,
),
) -> Self {
QueryStream::build(stmt, InnerConnection::Proxy(conn), metric_callback)
}
}
impl std::fmt::Debug for QueryStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "QueryStream")
@ -163,6 +182,13 @@ impl QueryStream {
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
MetricStream::new(_metric_callback, stmt, elapsed, stream)
}
#[cfg(feature = "proxy")]
InnerConnection::Proxy(c) => {
let _start = _metric_callback.is_some().then(std::time::SystemTime::now);
let stream = c.fetch(stmt);
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
MetricStream::new(_metric_callback, stmt, elapsed, stream)
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
},

View File

@ -86,6 +86,13 @@ impl<'a> TransactionStream<'a> {
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
MetricStream::new(_metric_callback, stmt, elapsed, stream)
}
#[cfg(feature = "proxy")]
InnerConnection::Proxy(c) => {
let _start = _metric_callback.is_some().then(std::time::SystemTime::now);
let stream = c.fetch(stmt);
let elapsed = _start.map(|s| s.elapsed().unwrap_or_default());
MetricStream::new(_metric_callback, stmt, elapsed, stream)
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
},

View File

@ -95,6 +95,22 @@ impl DatabaseTransaction {
.await
}
#[cfg(feature = "proxy")]
pub(crate) async fn new_proxy(
inner: Arc<crate::ProxyDatabaseConnection>,
metric_callback: Option<crate::metric::Callback>,
) -> Result<DatabaseTransaction, DbErr> {
let backend = inner.get_database_backend();
Self::begin(
Arc::new(Mutex::new(InnerConnection::Proxy(inner))),
backend,
metric_callback,
None,
None,
)
.await
}
#[instrument(level = "trace", skip(metric_callback))]
async fn begin(
conn: Arc<Mutex<InnerConnection>>,

View File

@ -1,5 +1,7 @@
#[cfg(feature = "mock")]
mod mock;
#[cfg(feature = "proxy")]
mod proxy;
#[cfg(feature = "sqlx-dep")]
mod sqlx_common;
#[cfg(feature = "sqlx-mysql")]
@ -11,6 +13,8 @@ pub(crate) mod sqlx_sqlite;
#[cfg(feature = "mock")]
pub use mock::*;
#[cfg(feature = "proxy")]
pub use proxy::*;
#[cfg(feature = "sqlx-dep")]
pub use sqlx_common::*;
#[cfg(feature = "sqlx-mysql")]

140
src/driver/proxy.rs Normal file
View File

@ -0,0 +1,140 @@
use crate::{
debug_print, error::*, DatabaseConnection, DbBackend, ExecResult, ProxyDatabaseTrait,
QueryResult, Statement,
};
use futures::Stream;
use std::{
fmt::Debug,
pin::Pin,
sync::{Arc, Mutex},
};
use tracing::instrument;
/// Defines a database driver for the [ProxyDatabase]
#[derive(Debug)]
pub struct ProxyDatabaseConnector;
/// Defines a connection for the [ProxyDatabase]
#[derive(Debug)]
pub struct ProxyDatabaseConnection {
db_backend: DbBackend,
proxy: Arc<Mutex<Box<dyn ProxyDatabaseTrait>>>,
}
impl ProxyDatabaseConnector {
/// Check if the database URI given and the [DatabaseBackend](crate::DatabaseBackend) selected are the same
#[allow(unused_variables)]
pub fn accepts(string: &str) -> bool {
// As this is a proxy database, it accepts any URI
true
}
/// Connect to the [ProxyDatabase]
#[allow(unused_variables)]
#[instrument(level = "trace")]
pub fn connect(
db_type: DbBackend,
func: Arc<Mutex<Box<dyn ProxyDatabaseTrait>>>,
) -> Result<DatabaseConnection, DbErr> {
Ok(DatabaseConnection::ProxyDatabaseConnection(Arc::new(
ProxyDatabaseConnection::new(db_type, func),
)))
}
}
impl ProxyDatabaseConnection {
/// Create a connection to the [ProxyDatabase]
pub fn new(db_backend: DbBackend, funcs: Arc<Mutex<Box<dyn ProxyDatabaseTrait>>>) -> Self {
Self {
db_backend,
proxy: funcs.to_owned(),
}
}
/// Get the [DatabaseBackend](crate::DatabaseBackend) being used by the [ProxyDatabase]
pub fn get_database_backend(&self) -> DbBackend {
self.db_backend
}
/// Execute the SQL statement in the [ProxyDatabase]
#[instrument(level = "trace")]
pub fn execute(&self, statement: Statement) -> Result<ExecResult, DbErr> {
debug_print!("{}", statement);
Ok(self
.proxy
.lock()
.map_err(exec_err)?
.execute(statement)?
.into())
}
/// Return one [QueryResult] if the query was successful
#[instrument(level = "trace")]
pub fn query_one(&self, statement: Statement) -> Result<Option<QueryResult>, DbErr> {
debug_print!("{}", statement);
let result = self.proxy.lock().map_err(query_err)?.query(statement)?;
if let Some(first) = result.first() {
return Ok(Some(QueryResult {
row: crate::QueryResultRow::Proxy(first.to_owned()),
}));
} else {
return Ok(None);
}
}
/// Return all [QueryResult]s if the query was successful
#[instrument(level = "trace")]
pub fn query_all(&self, statement: Statement) -> Result<Vec<QueryResult>, DbErr> {
debug_print!("{}", statement);
let result = self.proxy.lock().map_err(query_err)?.query(statement)?;
Ok(result
.into_iter()
.map(|row| QueryResult {
row: crate::QueryResultRow::Proxy(row),
})
.collect())
}
/// Return [QueryResult]s from a multi-query operation
#[instrument(level = "trace")]
pub fn fetch(
&self,
statement: &Statement,
) -> Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>> + Send>> {
match self.query_all(statement.clone()) {
Ok(v) => Box::pin(futures::stream::iter(v.into_iter().map(Ok))),
Err(e) => Box::pin(futures::stream::iter(Some(Err(e)).into_iter())),
}
}
/// Create a statement block of SQL statements that execute together.
#[instrument(level = "trace")]
pub fn begin(&self) {
self.proxy.lock().expect("Failed to acquire mocker").begin()
}
/// Commit a transaction atomically to the database
#[instrument(level = "trace")]
pub fn commit(&self) {
self.proxy
.lock()
.expect("Failed to acquire mocker")
.commit()
}
/// Roll back a faulty transaction
#[instrument(level = "trace")]
pub fn rollback(&self) {
self.proxy
.lock()
.expect("Failed to acquire mocker")
.rollback()
}
/// Checks if a connection to the database is still valid.
pub fn ping(&self) -> Result<(), DbErr> {
self.proxy.lock().map_err(query_err)?.ping()
}
}

View File

@ -22,6 +22,9 @@ pub(crate) enum ExecResultHolder {
/// Holds the result of executing an operation on the Mock database
#[cfg(feature = "mock")]
Mock(crate::MockExecResult),
/// Holds the result of executing an operation on the Proxy database
#[cfg(feature = "proxy")]
Proxy(crate::ProxyExecResult),
}
// ExecResult //
@ -51,6 +54,8 @@ impl ExecResult {
}
#[cfg(feature = "mock")]
ExecResultHolder::Mock(result) => result.last_insert_id,
#[cfg(feature = "proxy")]
ExecResultHolder::Proxy(result) => result.last_insert_id,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -67,6 +72,8 @@ impl ExecResult {
ExecResultHolder::SqlxSqlite(result) => result.rows_affected(),
#[cfg(feature = "mock")]
ExecResultHolder::Mock(result) => result.rows_affected,
#[cfg(feature = "proxy")]
ExecResultHolder::Proxy(result) => result.rows_affected,
#[allow(unreachable_patterns)]
_ => unreachable!(),
}

View File

@ -1,7 +1,7 @@
use crate::{error::*, SelectGetableValue, SelectorRaw, Statement};
use std::fmt;
#[cfg(feature = "mock")]
#[cfg(any(feature = "mock", feature = "proxy"))]
use crate::debug_print;
#[cfg(feature = "sqlx-dep")]
@ -25,6 +25,8 @@ pub(crate) enum QueryResultRow {
SqlxSqlite(sqlx::sqlite::SqliteRow),
#[cfg(feature = "mock")]
Mock(crate::MockRow),
#[cfg(feature = "proxy")]
Proxy(crate::ProxyRow),
}
/// An interface to get a value from the query result
@ -127,6 +129,8 @@ impl fmt::Debug for QueryResultRow {
Self::SqlxSqlite(_) => write!(f, "QueryResultRow::SqlxSqlite cannot be inspected"),
#[cfg(feature = "mock")]
Self::Mock(row) => write!(f, "{row:?}"),
#[cfg(feature = "proxy")]
Self::Proxy(row) => write!(f, "{row:?}"),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -271,6 +275,11 @@ macro_rules! try_getable_all {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -306,6 +315,11 @@ macro_rules! try_getable_unsigned {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -342,6 +356,11 @@ macro_rules! try_getable_mysql {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -383,6 +402,11 @@ macro_rules! try_getable_date_time {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -478,6 +502,12 @@ impl TryGetable for Decimal {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
#[allow(unused_variables)]
QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -525,6 +555,12 @@ impl TryGetable for BigDecimal {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
#[allow(unused_variables)]
QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -559,6 +595,12 @@ macro_rules! try_getable_uuid {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
#[allow(unused_variables)]
QueryResultRow::Proxy(row) => row.try_get::<uuid::Uuid, _>(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
};
@ -613,6 +655,12 @@ impl TryGetable for u32 {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
#[allow(unused_variables)]
QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -658,6 +706,12 @@ mod postgres_array {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
#[allow(unused_variables)]
QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -745,6 +799,13 @@ mod postgres_array {
err_null_idx_col(idx)
})
}
#[cfg(feature = "proxy")]
QueryResultRow::Proxy(row) => {
row.try_get::<Vec<uuid::Uuid>, _>(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
})
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
};
@ -799,6 +860,12 @@ mod postgres_array {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[cfg(feature = "proxy")]
#[allow(unused_variables)]
QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
}),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
@ -1014,6 +1081,14 @@ where
err_null_idx_col(idx)
})
.and_then(|json| serde_json::from_value(json).map_err(|e| json_err(e).into())),
#[cfg(feature = "proxy")]
QueryResultRow::Proxy(row) => row
.try_get::<serde_json::Value, I>(idx)
.map_err(|e| {
debug_print!("{:#?}", e.to_string());
err_null_idx_col(idx)
})
.and_then(|json| serde_json::from_value(json).map_err(|e| json_err(e).into())),
#[allow(unreachable_patterns)]
_ => unreachable!(),
}

View File

@ -214,6 +214,18 @@ impl FromQueryResult for JsonValue {
}
Ok(JsonValue::Object(map))
}
#[cfg(feature = "proxy")]
crate::QueryResultRow::Proxy(row) => {
for (column, value) in row.clone().into_column_value_tuples() {
let col = if !column.starts_with(pre) {
continue;
} else {
column.replacen(pre, "", 1)
};
map.insert(col, sea_query::sea_value_to_json_value(&value));
}
Ok(JsonValue::Object(map))
}
#[allow(unreachable_patterns)]
_ => unreachable!(),
}