eidetica/backend/database/sql/mod.rs
1//! SQL-based backend implementations for Eidetica storage.
2//!
3//! This module provides SQL database backends that implement the `BackendImpl` trait,
4//! allowing Eidetica entries to be stored in relational databases.
5//!
6//! ## Available Backends
7//!
8//! - **SQLite** (feature: `sqlite`): Embedded database
9//! - **PostgreSQL** (feature: `postgres`): PostgreSQL database
10//!
11//! ## Architecture
12//!
13//! The SQL backend uses sqlx with `AnyPool` for multi-database support.
14//! All methods are async to match the async `BackendImpl` trait.
15//!
16//! ## Schema and Migrations
17//!
18//! The database schema is defined in the [`schema`] module and automatically
19//! initialized when connecting. Migrations are handled via code-based functions
20//! rather than SQL files to support dialect differences between SQLite and PostgreSQL.
21//!
22//! See [`schema`] module documentation for details on adding migrations.
23
24mod cache;
25mod storage;
26mod traversal;
27
28/// Schema definition and migration system.
29pub mod schema;
30
31use std::any::Any;
32use std::time::Duration;
33
34use async_trait::async_trait;
35use sqlx::AnyPool;
36use sqlx::Executor;
37use sqlx::any::AnyPoolOptions;
38
39use crate::Result;
40use crate::backend::errors::BackendError;
41use crate::backend::{
42 BackendImpl, CacheScope, InstanceMetadata, InstanceSecrets, VerificationStatus,
43};
44use crate::entry::{Entry, ID};
45use crate::snapshot::Snapshot;
46
47/// Extension trait for sqlx Result types to simplify error handling.
48///
49/// Similar to `anyhow::Context`, this trait adds a method to convert
50/// sqlx errors to `BackendError::SqlxError` with a context message.
51pub(crate) trait SqlxResultExt<T> {
52 /// Convert sqlx error to BackendError with context message.
53 fn sql_context(self, context: &str) -> Result<T>;
54}
55
56impl<T> SqlxResultExt<T> for std::result::Result<T, sqlx::Error> {
57 fn sql_context(self, context: &str) -> Result<T> {
58 self.map_err(|e| {
59 BackendError::SqlxError {
60 reason: format!("{context}: {e}"),
61 source: Some(e),
62 }
63 .into()
64 })
65 }
66}
67
68/// Database backend kind for SQL dialect selection.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum DbKind {
71 /// SQLite database
72 Sqlite,
73 /// PostgreSQL database
74 Postgres,
75}
76
77/// SQL-based backend implementing `BackendImpl` using sqlx.
78///
79/// This backend supports both SQLite and PostgreSQL through sqlx's `AnyPool`.
80///
81/// # Thread Safety
82///
83/// `SqlxBackend` is `Send + Sync` as required by `BackendImpl`. The underlying
84/// sqlx pool handles connection pooling and thread safety.
85///
86/// # Test Isolation
87///
88/// For PostgreSQL, each backend instance can use its own schema for test isolation.
89/// Use `connect_postgres_isolated()` to create an isolated backend for testing.
90pub struct SqlxBackend {
91 pool: AnyPool,
92 kind: DbKind,
93}
94
95impl SqlxBackend {
96 /// Get a reference to the underlying pool.
97 pub fn pool(&self) -> &AnyPool {
98 &self.pool
99 }
100
101 /// Get the database kind.
102 pub fn kind(&self) -> DbKind {
103 self.kind
104 }
105
106 /// Check if this backend is using SQLite.
107 pub fn is_sqlite(&self) -> bool {
108 self.kind == DbKind::Sqlite
109 }
110
111 /// Check if this backend is using PostgreSQL.
112 pub fn is_postgres(&self) -> bool {
113 self.kind == DbKind::Postgres
114 }
115}
116
117// SQLite-specific implementations
118#[cfg(feature = "sqlite")]
119impl SqlxBackend {
120 /// Open a SQLite database at the given path.
121 ///
122 /// Creates the database file and schema if they don't exist.
123 ///
124 /// # Arguments
125 ///
126 /// * `path` - Path to the SQLite database file
127 ///
128 /// # Example
129 ///
130 /// ```ignore
131 /// use eidetica::backend::database::sql::SqlxBackend;
132 ///
133 /// #[tokio::main]
134 /// async fn main() {
135 /// let backend = SqlxBackend::open_sqlite("my_database.db").await.unwrap();
136 /// }
137 /// ```
138 pub async fn open_sqlite<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
139 // mode=rwc: read-write-create (create file if it doesn't exist)
140 let url = format!("sqlite:{}?mode=rwc", path.as_ref().display());
141 Self::connect_sqlite(&url).await
142 }
143
144 /// Connect to a SQLite database using a connection URL.
145 ///
146 /// # Arguments
147 ///
148 /// * `url` - SQLite connection URL (e.g., "sqlite:./my.db")
149 pub async fn connect_sqlite(url: &str) -> Result<Self> {
150 // Install any driver support
151 sqlx::any::install_default_drivers();
152
153 // Detect if this is an in-memory database. Two URL conventions
154 // exist: `?mode=memory` (sqlx's explicit query flag, used by
155 // `Sqlite::in_memory`) and `:memory:` (SQLite's classic magic
156 // filename, embedded in URI-filename forms like
157 // `sqlite:file::memory:?cache=shared`).
158 let is_in_memory = url.contains("mode=memory") || url.contains(":memory:");
159
160 // For SQLite in-memory databases with shared cache, we must prevent
161 // all connections from being closed. When the last connection closes,
162 // the in-memory database is destroyed and all data is lost.
163 //
164 // IMPORTANT: SQLite pragmas like busy_timeout and synchronous are per-connection
165 // settings. We use after_connect to ensure every connection in the pool has
166 // these configured, not just one.
167 let pool = if is_in_memory {
168 AnyPoolOptions::new()
169 .max_connections(5)
170 .min_connections(1)
171 .idle_timeout(None)
172 .max_lifetime(None)
173 .after_connect(|conn, _meta| {
174 Box::pin(async move {
175 // In-memory databases don't need WAL mode (all in RAM)
176 // but still need busy_timeout for lock contention
177 conn.execute("PRAGMA busy_timeout = 5000;").await?;
178 Ok(())
179 })
180 })
181 .connect(url)
182 .await
183 .sql_context("Failed to connect to SQLite")?
184 } else {
185 AnyPoolOptions::new()
186 .max_connections(5)
187 .after_connect(|conn, _meta| {
188 Box::pin(async move {
189 // File-based SQLite per-connection settings:
190 // - synchronous=NORMAL: Balanced durability (safe with WAL)
191 // - busy_timeout=5000: Wait up to 5s for locks before failing
192 //
193 // Note: journal_mode=WAL is a database-level setting that persists,
194 // so we only set it once after pool creation, not per-connection.
195 conn.execute("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000;")
196 .await?;
197 Ok(())
198 })
199 })
200 .connect(url)
201 .await
202 .sql_context("Failed to connect to SQLite")?
203 };
204
205 // Set WAL mode once (database-level setting that persists in the file)
206 if !is_in_memory {
207 sqlx::query("PRAGMA journal_mode = WAL;")
208 .execute(&pool)
209 .await
210 .sql_context("Failed to set SQLite WAL mode")?;
211 }
212
213 let backend = Self {
214 pool,
215 kind: DbKind::Sqlite,
216 };
217
218 // Initialize schema
219 schema::initialize(&backend).await?;
220
221 Ok(backend)
222 }
223
224 /// Create an in-memory SQLite database (async).
225 ///
226 /// The database exists only for the lifetime of this backend instance.
227 /// Useful for testing.
228 ///
229 /// # Example
230 ///
231 /// ```ignore
232 /// use eidetica::backend::database::sql::SqlxBackend;
233 ///
234 /// #[tokio::main]
235 /// async fn main() {
236 /// let backend = SqlxBackend::sqlite_in_memory().await.unwrap();
237 /// }
238 /// ```
239 pub async fn sqlite_in_memory() -> Result<Self> {
240 // Use shared cache mode for in-memory SQLite so all connections in the pool
241 // share the same database. Without this, each connection gets its own
242 // isolated in-memory database.
243 // Use a unique name per instance to avoid sharing between tests.
244 let unique_id = uuid::Uuid::new_v4();
245 let url = format!("sqlite:file:mem_{unique_id}?mode=memory&cache=shared");
246 Self::connect_sqlite(&url).await
247 }
248}
249
250// PostgreSQL-specific implementations
251#[cfg(feature = "postgres")]
252impl SqlxBackend {
253 /// Connect to a PostgreSQL database using a connection URL.
254 ///
255 /// This connects to the default (public) schema. For test isolation,
256 /// use `connect_postgres_isolated()` instead.
257 ///
258 /// # Arguments
259 ///
260 /// * `url` - PostgreSQL connection URL (e.g., "postgres://user:pass@localhost/dbname")
261 ///
262 /// # Example
263 ///
264 /// ```ignore
265 /// use eidetica::backend::database::sql::SqlxBackend;
266 ///
267 /// let backend = SqlxBackend::connect_postgres("postgres://localhost/eidetica").await.unwrap();
268 /// ```
269 pub async fn connect_postgres(url: &str) -> Result<Self> {
270 Self::connect_postgres_with_schema(url, None).await
271 }
272
273 /// Connect to a PostgreSQL database with a specific schema for isolation.
274 ///
275 /// Creates a unique schema if `schema_name` is provided, providing test isolation.
276 /// Each test can use its own schema so they don't interfere with each other.
277 ///
278 /// # Arguments
279 ///
280 /// * `url` - PostgreSQL connection URL
281 /// * `schema_name` - Optional schema name. If None, uses the default (public) schema.
282 async fn connect_postgres_with_schema(url: &str, schema_name: Option<String>) -> Result<Self> {
283 // Install any driver support
284 sqlx::any::install_default_drivers();
285
286 // If schema_name is provided, first create the schema, then use after_connect
287 // to set search_path on each connection. This is more reliable than URL options
288 // which don't work consistently across all network configurations.
289 if let Some(ref schema) = schema_name {
290 // First connect to create the schema if needed
291 let temp_pool = AnyPoolOptions::new()
292 .max_connections(1)
293 .connect(url)
294 .await
295 .sql_context("Failed to connect to PostgreSQL")?;
296
297 // Create schema if it doesn't exist
298 let create_schema = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
299 sqlx::query(&create_schema)
300 .execute(&temp_pool)
301 .await
302 .sql_context(&format!("Failed to create schema {schema}"))?;
303
304 temp_pool.close().await;
305 }
306
307 // Build pool with after_connect hook to set search_path on each connection
308 // For isolated (test) connections, use smaller pool to avoid exhausting
309 // PostgreSQL's max_connections when running many tests in parallel.
310 let schema_for_hook = schema_name.clone();
311 let is_isolated = schema_name.is_some();
312 let mut pool_options = AnyPoolOptions::new();
313
314 if is_isolated {
315 // Test isolation: 2 connections is enough, with longer timeout to wait
316 // rather than fail when many tests run in parallel
317 pool_options = pool_options
318 .max_connections(2)
319 .acquire_timeout(Duration::from_secs(30));
320 } else {
321 // Production: 5 connections for real concurrency needs
322 pool_options = pool_options.max_connections(5);
323 }
324
325 let pool = pool_options
326 .after_connect(move |conn, _meta| {
327 let schema = schema_for_hook.clone();
328 Box::pin(async move {
329 if let Some(ref s) = schema {
330 let set_path = format!("SET search_path TO {s}");
331 conn.execute(set_path.as_str()).await?;
332 }
333 Ok(())
334 })
335 })
336 .connect(url)
337 .await
338 .sql_context("Failed to connect to PostgreSQL")?;
339
340 let backend = Self {
341 pool,
342 kind: DbKind::Postgres,
343 };
344
345 // Initialize schema (tables will be created in the current search_path)
346 schema::initialize(&backend).await?;
347
348 Ok(backend)
349 }
350
351 /// Connect to a PostgreSQL database with test isolation.
352 ///
353 /// Creates a unique schema for this backend instance, ensuring tests
354 /// don't interfere with each other when run in parallel.
355 ///
356 /// # Arguments
357 ///
358 /// * `url` - PostgreSQL connection URL (e.g., "postgres://user:pass@localhost/dbname")
359 ///
360 /// # Example
361 ///
362 /// ```ignore
363 /// use eidetica::backend::database::sql::SqlxBackend;
364 ///
365 /// let backend = SqlxBackend::connect_postgres_isolated("postgres://localhost/eidetica").await.unwrap();
366 /// // This backend uses its own isolated schema
367 /// ```
368 pub async fn connect_postgres_isolated(url: &str) -> Result<Self> {
369 // Generate a unique schema name using UUID
370 // PostgreSQL schema names must start with a letter and be lowercase
371 let unique_id = uuid::Uuid::new_v4().simple().to_string();
372 let schema_name = format!("test_{unique_id}");
373 Self::connect_postgres_with_schema(url, Some(schema_name)).await
374 }
375}
376
377#[async_trait]
378impl BackendImpl for SqlxBackend {
379 async fn get(&self, id: &ID) -> Result<Entry> {
380 storage::get(self, id).await
381 }
382
383 async fn get_verification_status(&self, id: &ID) -> Result<VerificationStatus> {
384 storage::get_verification_status(self, id).await
385 }
386
387 async fn put(&self, entry: Entry) -> Result<()> {
388 storage::put(self, entry).await
389 }
390
391 async fn update_verification_status(
392 &self,
393 id: &ID,
394 verification_status: VerificationStatus,
395 ) -> Result<()> {
396 storage::update_verification_status(self, id, verification_status).await
397 }
398
399 async fn get_entries_by_verification_status(
400 &self,
401 status: VerificationStatus,
402 ) -> Result<Vec<ID>> {
403 storage::get_entries_by_verification_status(self, status).await
404 }
405
406 async fn snapshot(&self, tree: &ID) -> Result<Snapshot> {
407 traversal::snapshot(self, tree).await.map(Snapshot::new)
408 }
409
410 async fn store_snapshot(&self, tree: &ID, store: &str) -> Result<Snapshot> {
411 traversal::store_snapshot(self, tree, store)
412 .await
413 .map(Snapshot::new)
414 }
415
416 async fn store_snapshot_at(
417 &self,
418 tree: &ID,
419 store: &str,
420 main_snapshot: &Snapshot,
421 ) -> Result<Snapshot> {
422 traversal::store_snapshot_at(self, tree, store, main_snapshot.tips())
423 .await
424 .map(Snapshot::new)
425 }
426
427 async fn all_roots(&self) -> Result<Vec<ID>> {
428 storage::all_roots(self).await
429 }
430
431 async fn find_merge_base(&self, tree: &ID, store: &str, entry_ids: &[ID]) -> Result<ID> {
432 traversal::find_merge_base(self, tree, store, entry_ids).await
433 }
434
435 async fn collect_root_to_target(
436 &self,
437 tree: &ID,
438 store: &str,
439 target_entry: &ID,
440 ) -> Result<Vec<ID>> {
441 traversal::collect_root_to_target(self, tree, store, target_entry).await
442 }
443
444 fn as_any(&self) -> &dyn Any {
445 self
446 }
447
448 async fn get_tree(&self, tree: &ID) -> Result<Vec<Entry>> {
449 storage::get_tree(self, tree).await
450 }
451
452 async fn get_store(&self, tree: &ID, store: &str) -> Result<Vec<Entry>> {
453 storage::get_store(self, tree, store).await
454 }
455
456 async fn get_tree_from_tips(&self, tree: &ID, tips: &[ID]) -> Result<Vec<Entry>> {
457 traversal::get_tree_from_tips(self, tree, tips).await
458 }
459
460 async fn store_at(&self, tree: &ID, store: &str, snapshot: &Snapshot) -> Result<Vec<Entry>> {
461 traversal::store_at(self, tree, store, snapshot.tips()).await
462 }
463
464 async fn get_cached_crdt_state(
465 &self,
466 scope: &CacheScope,
467 entry_id: &ID,
468 store: &str,
469 ) -> Result<Option<Vec<u8>>> {
470 storage::get_cached_crdt_state(self, scope, entry_id, store).await
471 }
472
473 async fn cache_crdt_state(
474 &self,
475 scope: CacheScope,
476 entry_id: &ID,
477 store: &str,
478 state: Vec<u8>,
479 ) -> Result<()> {
480 storage::cache_crdt_state(self, scope, entry_id, store, state).await
481 }
482
483 async fn clear_crdt_cache(&self) -> Result<()> {
484 storage::clear_crdt_cache(self).await
485 }
486
487 async fn get_sorted_store_parents(
488 &self,
489 tree_id: &ID,
490 entry_id: &ID,
491 store: &str,
492 ) -> Result<Vec<ID>> {
493 traversal::get_sorted_store_parents(self, tree_id, entry_id, store).await
494 }
495
496 async fn get_path_from_to(
497 &self,
498 tree_id: &ID,
499 store: &str,
500 from_id: &ID,
501 to_ids: &[ID],
502 ) -> Result<Vec<ID>> {
503 traversal::get_path_from_to(self, tree_id, store, from_id, to_ids).await
504 }
505
506 async fn get_instance_metadata(&self) -> Result<Option<InstanceMetadata>> {
507 storage::get_instance_metadata(self).await
508 }
509
510 async fn set_instance_metadata(&self, metadata: &InstanceMetadata) -> Result<()> {
511 storage::set_instance_metadata(self, metadata).await
512 }
513
514 async fn get_instance_secrets(&self) -> Result<Option<InstanceSecrets>> {
515 storage::get_instance_secrets(self).await
516 }
517
518 async fn set_instance_secrets(&self, secrets: &InstanceSecrets) -> Result<()> {
519 storage::set_instance_secrets(self, secrets).await
520 }
521}
522
523/// Namespace for SQLite database constructors.
524///
525/// Provides ergonomic factory methods for creating SQLite-backed storage.
526/// All methods return `SqlxBackend` which implements `BackendImpl`.
527///
528/// # Example
529///
530/// ```ignore
531/// use eidetica::backend::database::Sqlite;
532///
533/// // File-based storage
534/// let backend = Sqlite::open("my_data.db").await?;
535///
536/// // In-memory (for testing)
537/// let backend = Sqlite::in_memory().await?;
538/// ```
539#[cfg(feature = "sqlite")]
540pub struct Sqlite;
541
542#[cfg(feature = "sqlite")]
543impl Sqlite {
544 /// Open a SQLite database at the given path.
545 ///
546 /// Creates the database file and schema if they don't exist.
547 ///
548 /// # Arguments
549 ///
550 /// * `path` - Path to the SQLite database file
551 pub async fn open<P: AsRef<std::path::Path>>(path: P) -> Result<SqlxBackend> {
552 SqlxBackend::open_sqlite(path).await
553 }
554
555 /// Create an in-memory SQLite database.
556 ///
557 /// The database exists only for the lifetime of the returned backend.
558 /// Useful for testing.
559 pub async fn in_memory() -> Result<SqlxBackend> {
560 SqlxBackend::sqlite_in_memory().await
561 }
562
563 /// Connect to a SQLite database using a connection URL.
564 ///
565 /// # Arguments
566 ///
567 /// * `url` - SQLite connection URL (e.g., "sqlite:./my.db")
568 pub async fn connect(url: &str) -> Result<SqlxBackend> {
569 SqlxBackend::connect_sqlite(url).await
570 }
571}
572
573/// Namespace for PostgreSQL database constructors.
574///
575/// Provides ergonomic factory methods for creating PostgreSQL-backed storage.
576/// All methods return `SqlxBackend` which implements `BackendImpl`.
577///
578/// # Example
579///
580/// ```ignore
581/// use eidetica::backend::database::Postgres;
582///
583/// // Connect to PostgreSQL
584/// let backend = Postgres::connect("postgres://user:pass@localhost/mydb").await?;
585///
586/// // With test isolation (unique schema per instance)
587/// let backend = Postgres::connect_isolated("postgres://localhost/test").await?;
588/// ```
589#[cfg(feature = "postgres")]
590pub struct Postgres;
591
592#[cfg(feature = "postgres")]
593impl Postgres {
594 /// Connect to a PostgreSQL database using a connection URL.
595 ///
596 /// This connects to the default (public) schema. For test isolation,
597 /// use `connect_isolated()` instead.
598 ///
599 /// # Arguments
600 ///
601 /// * `url` - PostgreSQL connection URL (e.g., "postgres://user:pass@localhost/dbname")
602 pub async fn connect(url: &str) -> Result<SqlxBackend> {
603 SqlxBackend::connect_postgres(url).await
604 }
605
606 /// Connect to a PostgreSQL database with test isolation.
607 ///
608 /// Creates a unique schema for this backend instance, ensuring tests
609 /// don't interfere with each other when run in parallel.
610 ///
611 /// # Arguments
612 ///
613 /// * `url` - PostgreSQL connection URL
614 pub async fn connect_isolated(url: &str) -> Result<SqlxBackend> {
615 SqlxBackend::connect_postgres_isolated(url).await
616 }
617}