Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/backend/database/sql/
mod.rs

1//! SQL-based backend implementations for Eidetica storage.
2//!
3//! This module provides SQL database backends that implement the `BackendImpl` trait,
4//! allowing Eidetica entries to be stored in relational databases.
5//!
6//! ## Available Backends
7//!
8//! - **SQLite** (feature: `sqlite`): Embedded database
9//! - **PostgreSQL** (feature: `postgres`): PostgreSQL database
10//!
11//! ## Architecture
12//!
13//! The SQL backend uses sqlx with `AnyPool` for multi-database support.
14//! All methods are async to match the async `BackendImpl` trait.
15//!
16//! ## Schema and Migrations
17//!
18//! The database schema is defined in the [`schema`] module and automatically
19//! initialized when connecting. Migrations are handled via code-based functions
20//! rather than SQL files to support dialect differences between SQLite and PostgreSQL.
21//!
22//! See [`schema`] module documentation for details on adding migrations.
23
24mod cache;
25mod storage;
26mod traversal;
27
28/// Schema definition and migration system.
29pub mod schema;
30
31use std::any::Any;
32use std::time::Duration;
33
34use async_trait::async_trait;
35use sqlx::AnyPool;
36use sqlx::Executor;
37use sqlx::any::AnyPoolOptions;
38
39use crate::Result;
40use crate::backend::errors::BackendError;
41use crate::backend::{BackendImpl, InstanceMetadata, VerificationStatus};
42use crate::entry::{Entry, ID};
43
44/// Extension trait for sqlx Result types to simplify error handling.
45///
46/// Similar to `anyhow::Context`, this trait adds a method to convert
47/// sqlx errors to `BackendError::SqlxError` with a context message.
48pub(crate) trait SqlxResultExt<T> {
49    /// Convert sqlx error to BackendError with context message.
50    fn sql_context(self, context: &str) -> Result<T>;
51}
52
53impl<T> SqlxResultExt<T> for std::result::Result<T, sqlx::Error> {
54    fn sql_context(self, context: &str) -> Result<T> {
55        self.map_err(|e| {
56            BackendError::SqlxError {
57                reason: format!("{context}: {e}"),
58                source: Some(e),
59            }
60            .into()
61        })
62    }
63}
64
65/// Database backend kind for SQL dialect selection.
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum DbKind {
68    /// SQLite database
69    Sqlite,
70    /// PostgreSQL database
71    Postgres,
72}
73
74/// SQL-based backend implementing `BackendImpl` using sqlx.
75///
76/// This backend supports both SQLite and PostgreSQL through sqlx's `AnyPool`.
77///
78/// # Thread Safety
79///
80/// `SqlxBackend` is `Send + Sync` as required by `BackendImpl`. The underlying
81/// sqlx pool handles connection pooling and thread safety.
82///
83/// # Test Isolation
84///
85/// For PostgreSQL, each backend instance can use its own schema for test isolation.
86/// Use `connect_postgres_isolated()` to create an isolated backend for testing.
87pub struct SqlxBackend {
88    pool: AnyPool,
89    kind: DbKind,
90}
91
92impl SqlxBackend {
93    /// Get a reference to the underlying pool.
94    pub fn pool(&self) -> &AnyPool {
95        &self.pool
96    }
97
98    /// Get the database kind.
99    pub fn kind(&self) -> DbKind {
100        self.kind
101    }
102
103    /// Check if this backend is using SQLite.
104    pub fn is_sqlite(&self) -> bool {
105        self.kind == DbKind::Sqlite
106    }
107
108    /// Check if this backend is using PostgreSQL.
109    pub fn is_postgres(&self) -> bool {
110        self.kind == DbKind::Postgres
111    }
112}
113
114// SQLite-specific implementations
115#[cfg(feature = "sqlite")]
116impl SqlxBackend {
117    /// Open a SQLite database at the given path.
118    ///
119    /// Creates the database file and schema if they don't exist.
120    ///
121    /// # Arguments
122    ///
123    /// * `path` - Path to the SQLite database file
124    ///
125    /// # Example
126    ///
127    /// ```ignore
128    /// use eidetica::backend::database::sql::SqlxBackend;
129    ///
130    /// #[tokio::main]
131    /// async fn main() {
132    ///     let backend = SqlxBackend::open_sqlite("my_database.db").await.unwrap();
133    /// }
134    /// ```
135    pub async fn open_sqlite<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
136        // mode=rwc: read-write-create (create file if it doesn't exist)
137        let url = format!("sqlite:{}?mode=rwc", path.as_ref().display());
138        Self::connect_sqlite(&url).await
139    }
140
141    /// Connect to a SQLite database using a connection URL.
142    ///
143    /// # Arguments
144    ///
145    /// * `url` - SQLite connection URL (e.g., "sqlite:./my.db")
146    pub async fn connect_sqlite(url: &str) -> Result<Self> {
147        // Install any driver support
148        sqlx::any::install_default_drivers();
149
150        // Detect if this is an in-memory database
151        let is_in_memory = url.contains("mode=memory");
152
153        // For SQLite in-memory databases with shared cache, we must prevent
154        // all connections from being closed. When the last connection closes,
155        // the in-memory database is destroyed and all data is lost.
156        //
157        // IMPORTANT: SQLite pragmas like busy_timeout and synchronous are per-connection
158        // settings. We use after_connect to ensure every connection in the pool has
159        // these configured, not just one.
160        let pool = if is_in_memory {
161            AnyPoolOptions::new()
162                .max_connections(5)
163                .min_connections(1)
164                .idle_timeout(None)
165                .max_lifetime(None)
166                .after_connect(|conn, _meta| {
167                    Box::pin(async move {
168                        // In-memory databases don't need WAL mode (all in RAM)
169                        // but still need busy_timeout for lock contention
170                        conn.execute("PRAGMA busy_timeout = 5000;").await?;
171                        Ok(())
172                    })
173                })
174                .connect(url)
175                .await
176                .sql_context("Failed to connect to SQLite")?
177        } else {
178            AnyPoolOptions::new()
179                .max_connections(5)
180                .after_connect(|conn, _meta| {
181                    Box::pin(async move {
182                        // File-based SQLite per-connection settings:
183                        // - synchronous=NORMAL: Balanced durability (safe with WAL)
184                        // - busy_timeout=5000: Wait up to 5s for locks before failing
185                        //
186                        // Note: journal_mode=WAL is a database-level setting that persists,
187                        // so we only set it once after pool creation, not per-connection.
188                        conn.execute("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000;")
189                            .await?;
190                        Ok(())
191                    })
192                })
193                .connect(url)
194                .await
195                .sql_context("Failed to connect to SQLite")?
196        };
197
198        // Set WAL mode once (database-level setting that persists in the file)
199        if !is_in_memory {
200            sqlx::query("PRAGMA journal_mode = WAL;")
201                .execute(&pool)
202                .await
203                .sql_context("Failed to set SQLite WAL mode")?;
204        }
205
206        let backend = Self {
207            pool,
208            kind: DbKind::Sqlite,
209        };
210
211        // Initialize schema
212        schema::initialize(&backend).await?;
213
214        Ok(backend)
215    }
216
217    /// Create an in-memory SQLite database (async).
218    ///
219    /// The database exists only for the lifetime of this backend instance.
220    /// Useful for testing.
221    ///
222    /// # Example
223    ///
224    /// ```ignore
225    /// use eidetica::backend::database::sql::SqlxBackend;
226    ///
227    /// #[tokio::main]
228    /// async fn main() {
229    ///     let backend = SqlxBackend::sqlite_in_memory().await.unwrap();
230    /// }
231    /// ```
232    pub async fn sqlite_in_memory() -> Result<Self> {
233        // Use shared cache mode for in-memory SQLite so all connections in the pool
234        // share the same database. Without this, each connection gets its own
235        // isolated in-memory database.
236        // Use a unique name per instance to avoid sharing between tests.
237        let unique_id = uuid::Uuid::new_v4();
238        let url = format!("sqlite:file:mem_{unique_id}?mode=memory&cache=shared");
239        Self::connect_sqlite(&url).await
240    }
241}
242
243// PostgreSQL-specific implementations
244#[cfg(feature = "postgres")]
245impl SqlxBackend {
246    /// Connect to a PostgreSQL database using a connection URL.
247    ///
248    /// This connects to the default (public) schema. For test isolation,
249    /// use `connect_postgres_isolated()` instead.
250    ///
251    /// # Arguments
252    ///
253    /// * `url` - PostgreSQL connection URL (e.g., "postgres://user:pass@localhost/dbname")
254    ///
255    /// # Example
256    ///
257    /// ```ignore
258    /// use eidetica::backend::database::sql::SqlxBackend;
259    ///
260    /// let backend = SqlxBackend::connect_postgres("postgres://localhost/eidetica").await.unwrap();
261    /// ```
262    pub async fn connect_postgres(url: &str) -> Result<Self> {
263        Self::connect_postgres_with_schema(url, None).await
264    }
265
266    /// Connect to a PostgreSQL database with a specific schema for isolation.
267    ///
268    /// Creates a unique schema if `schema_name` is provided, providing test isolation.
269    /// Each test can use its own schema so they don't interfere with each other.
270    ///
271    /// # Arguments
272    ///
273    /// * `url` - PostgreSQL connection URL
274    /// * `schema_name` - Optional schema name. If None, uses the default (public) schema.
275    async fn connect_postgres_with_schema(url: &str, schema_name: Option<String>) -> Result<Self> {
276        // Install any driver support
277        sqlx::any::install_default_drivers();
278
279        // If schema_name is provided, first create the schema, then use after_connect
280        // to set search_path on each connection. This is more reliable than URL options
281        // which don't work consistently across all network configurations.
282        if let Some(ref schema) = schema_name {
283            // First connect to create the schema if needed
284            let temp_pool = AnyPoolOptions::new()
285                .max_connections(1)
286                .connect(url)
287                .await
288                .sql_context("Failed to connect to PostgreSQL")?;
289
290            // Create schema if it doesn't exist
291            let create_schema = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
292            sqlx::query(&create_schema)
293                .execute(&temp_pool)
294                .await
295                .sql_context(&format!("Failed to create schema {schema}"))?;
296
297            temp_pool.close().await;
298        }
299
300        // Build pool with after_connect hook to set search_path on each connection
301        // For isolated (test) connections, use smaller pool to avoid exhausting
302        // PostgreSQL's max_connections when running many tests in parallel.
303        let schema_for_hook = schema_name.clone();
304        let is_isolated = schema_name.is_some();
305        let mut pool_options = AnyPoolOptions::new();
306
307        if is_isolated {
308            // Test isolation: 2 connections is enough, with longer timeout to wait
309            // rather than fail when many tests run in parallel
310            pool_options = pool_options
311                .max_connections(2)
312                .acquire_timeout(Duration::from_secs(30));
313        } else {
314            // Production: 5 connections for real concurrency needs
315            pool_options = pool_options.max_connections(5);
316        }
317
318        let pool = pool_options
319            .after_connect(move |conn, _meta| {
320                let schema = schema_for_hook.clone();
321                Box::pin(async move {
322                    if let Some(ref s) = schema {
323                        let set_path = format!("SET search_path TO {s}");
324                        conn.execute(set_path.as_str()).await?;
325                    }
326                    Ok(())
327                })
328            })
329            .connect(url)
330            .await
331            .sql_context("Failed to connect to PostgreSQL")?;
332
333        let backend = Self {
334            pool,
335            kind: DbKind::Postgres,
336        };
337
338        // Initialize schema (tables will be created in the current search_path)
339        schema::initialize(&backend).await?;
340
341        Ok(backend)
342    }
343
344    /// Connect to a PostgreSQL database with test isolation.
345    ///
346    /// Creates a unique schema for this backend instance, ensuring tests
347    /// don't interfere with each other when run in parallel.
348    ///
349    /// # Arguments
350    ///
351    /// * `url` - PostgreSQL connection URL (e.g., "postgres://user:pass@localhost/dbname")
352    ///
353    /// # Example
354    ///
355    /// ```ignore
356    /// use eidetica::backend::database::sql::SqlxBackend;
357    ///
358    /// let backend = SqlxBackend::connect_postgres_isolated("postgres://localhost/eidetica").await.unwrap();
359    /// // This backend uses its own isolated schema
360    /// ```
361    pub async fn connect_postgres_isolated(url: &str) -> Result<Self> {
362        // Generate a unique schema name using UUID
363        // PostgreSQL schema names must start with a letter and be lowercase
364        let unique_id = uuid::Uuid::new_v4().simple().to_string();
365        let schema_name = format!("test_{unique_id}");
366        Self::connect_postgres_with_schema(url, Some(schema_name)).await
367    }
368}
369
370#[async_trait]
371impl BackendImpl for SqlxBackend {
372    async fn get(&self, id: &ID) -> Result<Entry> {
373        storage::get(self, id).await
374    }
375
376    async fn get_verification_status(&self, id: &ID) -> Result<VerificationStatus> {
377        storage::get_verification_status(self, id).await
378    }
379
380    async fn put(&self, verification_status: VerificationStatus, entry: Entry) -> Result<()> {
381        storage::put(self, verification_status, entry).await
382    }
383
384    async fn update_verification_status(
385        &self,
386        id: &ID,
387        verification_status: VerificationStatus,
388    ) -> Result<()> {
389        storage::update_verification_status(self, id, verification_status).await
390    }
391
392    async fn get_entries_by_verification_status(
393        &self,
394        status: VerificationStatus,
395    ) -> Result<Vec<ID>> {
396        storage::get_entries_by_verification_status(self, status).await
397    }
398
399    async fn get_tips(&self, tree: &ID) -> Result<Vec<ID>> {
400        traversal::get_tips(self, tree).await
401    }
402
403    async fn get_store_tips(&self, tree: &ID, store: &str) -> Result<Vec<ID>> {
404        traversal::get_store_tips(self, tree, store).await
405    }
406
407    async fn get_store_tips_up_to_entries(
408        &self,
409        tree: &ID,
410        store: &str,
411        main_entries: &[ID],
412    ) -> Result<Vec<ID>> {
413        traversal::get_store_tips_up_to_entries(self, tree, store, main_entries).await
414    }
415
416    async fn all_roots(&self) -> Result<Vec<ID>> {
417        storage::all_roots(self).await
418    }
419
420    async fn find_merge_base(&self, tree: &ID, store: &str, entry_ids: &[ID]) -> Result<ID> {
421        traversal::find_merge_base(self, tree, store, entry_ids).await
422    }
423
424    async fn collect_root_to_target(
425        &self,
426        tree: &ID,
427        store: &str,
428        target_entry: &ID,
429    ) -> Result<Vec<ID>> {
430        traversal::collect_root_to_target(self, tree, store, target_entry).await
431    }
432
433    fn as_any(&self) -> &dyn Any {
434        self
435    }
436
437    async fn get_tree(&self, tree: &ID) -> Result<Vec<Entry>> {
438        storage::get_tree(self, tree).await
439    }
440
441    async fn get_store(&self, tree: &ID, store: &str) -> Result<Vec<Entry>> {
442        storage::get_store(self, tree, store).await
443    }
444
445    async fn get_tree_from_tips(&self, tree: &ID, tips: &[ID]) -> Result<Vec<Entry>> {
446        traversal::get_tree_from_tips(self, tree, tips).await
447    }
448
449    async fn get_store_from_tips(&self, tree: &ID, store: &str, tips: &[ID]) -> Result<Vec<Entry>> {
450        traversal::get_store_from_tips(self, tree, store, tips).await
451    }
452
453    async fn get_cached_crdt_state(&self, entry_id: &ID, store: &str) -> Result<Option<String>> {
454        storage::get_cached_crdt_state(self, entry_id, store).await
455    }
456
457    async fn cache_crdt_state(&self, entry_id: &ID, store: &str, state: String) -> Result<()> {
458        storage::cache_crdt_state(self, entry_id, store, state).await
459    }
460
461    async fn clear_crdt_cache(&self) -> Result<()> {
462        storage::clear_crdt_cache(self).await
463    }
464
465    async fn get_sorted_store_parents(
466        &self,
467        tree_id: &ID,
468        entry_id: &ID,
469        store: &str,
470    ) -> Result<Vec<ID>> {
471        traversal::get_sorted_store_parents(self, tree_id, entry_id, store).await
472    }
473
474    async fn get_path_from_to(
475        &self,
476        tree_id: &ID,
477        store: &str,
478        from_id: &ID,
479        to_ids: &[ID],
480    ) -> Result<Vec<ID>> {
481        traversal::get_path_from_to(self, tree_id, store, from_id, to_ids).await
482    }
483
484    async fn get_instance_metadata(&self) -> Result<Option<InstanceMetadata>> {
485        storage::get_instance_metadata(self).await
486    }
487
488    async fn set_instance_metadata(&self, metadata: &InstanceMetadata) -> Result<()> {
489        storage::set_instance_metadata(self, metadata).await
490    }
491}
492
493/// Namespace for SQLite database constructors.
494///
495/// Provides ergonomic factory methods for creating SQLite-backed storage.
496/// All methods return `SqlxBackend` which implements `BackendImpl`.
497///
498/// # Example
499///
500/// ```ignore
501/// use eidetica::backend::database::Sqlite;
502///
503/// // File-based storage
504/// let backend = Sqlite::open("my_data.db").await?;
505///
506/// // In-memory (for testing)
507/// let backend = Sqlite::in_memory().await?;
508/// ```
509#[cfg(feature = "sqlite")]
510pub struct Sqlite;
511
512#[cfg(feature = "sqlite")]
513impl Sqlite {
514    /// Open a SQLite database at the given path.
515    ///
516    /// Creates the database file and schema if they don't exist.
517    ///
518    /// # Arguments
519    ///
520    /// * `path` - Path to the SQLite database file
521    pub async fn open<P: AsRef<std::path::Path>>(path: P) -> Result<SqlxBackend> {
522        SqlxBackend::open_sqlite(path).await
523    }
524
525    /// Create an in-memory SQLite database.
526    ///
527    /// The database exists only for the lifetime of the returned backend.
528    /// Useful for testing.
529    pub async fn in_memory() -> Result<SqlxBackend> {
530        SqlxBackend::sqlite_in_memory().await
531    }
532
533    /// Connect to a SQLite database using a connection URL.
534    ///
535    /// # Arguments
536    ///
537    /// * `url` - SQLite connection URL (e.g., "sqlite:./my.db")
538    pub async fn connect(url: &str) -> Result<SqlxBackend> {
539        SqlxBackend::connect_sqlite(url).await
540    }
541}
542
543/// Namespace for PostgreSQL database constructors.
544///
545/// Provides ergonomic factory methods for creating PostgreSQL-backed storage.
546/// All methods return `SqlxBackend` which implements `BackendImpl`.
547///
548/// # Example
549///
550/// ```ignore
551/// use eidetica::backend::database::Postgres;
552///
553/// // Connect to PostgreSQL
554/// let backend = Postgres::connect("postgres://user:pass@localhost/mydb").await?;
555///
556/// // With test isolation (unique schema per instance)
557/// let backend = Postgres::connect_isolated("postgres://localhost/test").await?;
558/// ```
559#[cfg(feature = "postgres")]
560pub struct Postgres;
561
562#[cfg(feature = "postgres")]
563impl Postgres {
564    /// Connect to a PostgreSQL database using a connection URL.
565    ///
566    /// This connects to the default (public) schema. For test isolation,
567    /// use `connect_isolated()` instead.
568    ///
569    /// # Arguments
570    ///
571    /// * `url` - PostgreSQL connection URL (e.g., "postgres://user:pass@localhost/dbname")
572    pub async fn connect(url: &str) -> Result<SqlxBackend> {
573        SqlxBackend::connect_postgres(url).await
574    }
575
576    /// Connect to a PostgreSQL database with test isolation.
577    ///
578    /// Creates a unique schema for this backend instance, ensuring tests
579    /// don't interfere with each other when run in parallel.
580    ///
581    /// # Arguments
582    ///
583    /// * `url` - PostgreSQL connection URL
584    pub async fn connect_isolated(url: &str) -> Result<SqlxBackend> {
585        SqlxBackend::connect_postgres_isolated(url).await
586    }
587}