Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/backend/database/sql/
mod.rs

1//! SQL-based backend implementations for Eidetica storage.
2//!
3//! This module provides SQL database backends that implement the `BackendImpl` trait,
4//! allowing Eidetica entries to be stored in relational databases.
5//!
6//! ## Available Backends
7//!
8//! - **SQLite** (feature: `sqlite`): Embedded database
9//! - **PostgreSQL** (feature: `postgres`): PostgreSQL database
10//!
11//! ## Architecture
12//!
13//! The SQL backend uses sqlx with `AnyPool` for multi-database support.
14//! All methods are async to match the async `BackendImpl` trait.
15//!
16//! ## Schema and Migrations
17//!
18//! The database schema is defined in the [`schema`] module and automatically
19//! initialized when connecting. Migrations are handled via code-based functions
20//! rather than SQL files to support dialect differences between SQLite and PostgreSQL.
21//!
22//! See [`schema`] module documentation for details on adding migrations.
23
24mod cache;
25mod storage;
26mod traversal;
27
28/// Schema definition and migration system.
29pub mod schema;
30
31use std::any::Any;
32use std::time::Duration;
33
34use async_trait::async_trait;
35use sqlx::AnyPool;
36use sqlx::Executor;
37use sqlx::any::AnyPoolOptions;
38
39use crate::Result;
40use crate::backend::errors::BackendError;
41use crate::backend::{
42    BackendImpl, CacheScope, InstanceMetadata, InstanceSecrets, VerificationStatus,
43};
44use crate::entry::{Entry, ID};
45use crate::snapshot::Snapshot;
46
47/// Extension trait for sqlx Result types to simplify error handling.
48///
49/// Similar to `anyhow::Context`, this trait adds a method to convert
50/// sqlx errors to `BackendError::SqlxError` with a context message.
51pub(crate) trait SqlxResultExt<T> {
52    /// Convert sqlx error to BackendError with context message.
53    fn sql_context(self, context: &str) -> Result<T>;
54}
55
56impl<T> SqlxResultExt<T> for std::result::Result<T, sqlx::Error> {
57    fn sql_context(self, context: &str) -> Result<T> {
58        self.map_err(|e| {
59            BackendError::SqlxError {
60                reason: format!("{context}: {e}"),
61                source: Some(e),
62            }
63            .into()
64        })
65    }
66}
67
68/// Database backend kind for SQL dialect selection.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum DbKind {
71    /// SQLite database
72    Sqlite,
73    /// PostgreSQL database
74    Postgres,
75}
76
77/// SQL-based backend implementing `BackendImpl` using sqlx.
78///
79/// This backend supports both SQLite and PostgreSQL through sqlx's `AnyPool`.
80///
81/// # Thread Safety
82///
83/// `SqlxBackend` is `Send + Sync` as required by `BackendImpl`. The underlying
84/// sqlx pool handles connection pooling and thread safety.
85///
86/// # Test Isolation
87///
88/// For PostgreSQL, each backend instance can use its own schema for test isolation.
89/// Use `connect_postgres_isolated()` to create an isolated backend for testing.
90pub struct SqlxBackend {
91    pool: AnyPool,
92    kind: DbKind,
93}
94
95impl SqlxBackend {
96    /// Get a reference to the underlying pool.
97    pub fn pool(&self) -> &AnyPool {
98        &self.pool
99    }
100
101    /// Get the database kind.
102    pub fn kind(&self) -> DbKind {
103        self.kind
104    }
105
106    /// Check if this backend is using SQLite.
107    pub fn is_sqlite(&self) -> bool {
108        self.kind == DbKind::Sqlite
109    }
110
111    /// Check if this backend is using PostgreSQL.
112    pub fn is_postgres(&self) -> bool {
113        self.kind == DbKind::Postgres
114    }
115}
116
117// SQLite-specific implementations
118#[cfg(feature = "sqlite")]
119impl SqlxBackend {
120    /// Open a SQLite database at the given path.
121    ///
122    /// Creates the database file and schema if they don't exist.
123    ///
124    /// # Arguments
125    ///
126    /// * `path` - Path to the SQLite database file
127    ///
128    /// # Example
129    ///
130    /// ```ignore
131    /// use eidetica::backend::database::sql::SqlxBackend;
132    ///
133    /// #[tokio::main]
134    /// async fn main() {
135    ///     let backend = SqlxBackend::open_sqlite("my_database.db").await.unwrap();
136    /// }
137    /// ```
138    pub async fn open_sqlite<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
139        // mode=rwc: read-write-create (create file if it doesn't exist)
140        let url = format!("sqlite:{}?mode=rwc", path.as_ref().display());
141        Self::connect_sqlite(&url).await
142    }
143
144    /// Connect to a SQLite database using a connection URL.
145    ///
146    /// # Arguments
147    ///
148    /// * `url` - SQLite connection URL (e.g., "sqlite:./my.db")
149    pub async fn connect_sqlite(url: &str) -> Result<Self> {
150        // Install any driver support
151        sqlx::any::install_default_drivers();
152
153        // Detect if this is an in-memory database. Two URL conventions
154        // exist: `?mode=memory` (sqlx's explicit query flag, used by
155        // `Sqlite::in_memory`) and `:memory:` (SQLite's classic magic
156        // filename, embedded in URI-filename forms like
157        // `sqlite:file::memory:?cache=shared`).
158        let is_in_memory = url.contains("mode=memory") || url.contains(":memory:");
159
160        // For SQLite in-memory databases with shared cache, we must prevent
161        // all connections from being closed. When the last connection closes,
162        // the in-memory database is destroyed and all data is lost.
163        //
164        // IMPORTANT: SQLite pragmas like busy_timeout and synchronous are per-connection
165        // settings. We use after_connect to ensure every connection in the pool has
166        // these configured, not just one.
167        let pool = if is_in_memory {
168            AnyPoolOptions::new()
169                .max_connections(5)
170                .min_connections(1)
171                .idle_timeout(None)
172                .max_lifetime(None)
173                .after_connect(|conn, _meta| {
174                    Box::pin(async move {
175                        // In-memory databases don't need WAL mode (all in RAM)
176                        // but still need busy_timeout for lock contention
177                        conn.execute("PRAGMA busy_timeout = 5000;").await?;
178                        Ok(())
179                    })
180                })
181                .connect(url)
182                .await
183                .sql_context("Failed to connect to SQLite")?
184        } else {
185            AnyPoolOptions::new()
186                .max_connections(5)
187                .after_connect(|conn, _meta| {
188                    Box::pin(async move {
189                        // File-based SQLite per-connection settings:
190                        // - synchronous=NORMAL: Balanced durability (safe with WAL)
191                        // - busy_timeout=5000: Wait up to 5s for locks before failing
192                        //
193                        // Note: journal_mode=WAL is a database-level setting that persists,
194                        // so we only set it once after pool creation, not per-connection.
195                        conn.execute("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000;")
196                            .await?;
197                        Ok(())
198                    })
199                })
200                .connect(url)
201                .await
202                .sql_context("Failed to connect to SQLite")?
203        };
204
205        // Set WAL mode once (database-level setting that persists in the file)
206        if !is_in_memory {
207            sqlx::query("PRAGMA journal_mode = WAL;")
208                .execute(&pool)
209                .await
210                .sql_context("Failed to set SQLite WAL mode")?;
211        }
212
213        let backend = Self {
214            pool,
215            kind: DbKind::Sqlite,
216        };
217
218        // Initialize schema
219        schema::initialize(&backend).await?;
220
221        Ok(backend)
222    }
223
224    /// Create an in-memory SQLite database (async).
225    ///
226    /// The database exists only for the lifetime of this backend instance.
227    /// Useful for testing.
228    ///
229    /// # Example
230    ///
231    /// ```ignore
232    /// use eidetica::backend::database::sql::SqlxBackend;
233    ///
234    /// #[tokio::main]
235    /// async fn main() {
236    ///     let backend = SqlxBackend::sqlite_in_memory().await.unwrap();
237    /// }
238    /// ```
239    pub async fn sqlite_in_memory() -> Result<Self> {
240        // Use shared cache mode for in-memory SQLite so all connections in the pool
241        // share the same database. Without this, each connection gets its own
242        // isolated in-memory database.
243        // Use a unique name per instance to avoid sharing between tests.
244        let unique_id = uuid::Uuid::new_v4();
245        let url = format!("sqlite:file:mem_{unique_id}?mode=memory&cache=shared");
246        Self::connect_sqlite(&url).await
247    }
248}
249
250// PostgreSQL-specific implementations
251#[cfg(feature = "postgres")]
252impl SqlxBackend {
253    /// Connect to a PostgreSQL database using a connection URL.
254    ///
255    /// This connects to the default (public) schema. For test isolation,
256    /// use `connect_postgres_isolated()` instead.
257    ///
258    /// # Arguments
259    ///
260    /// * `url` - PostgreSQL connection URL (e.g., "postgres://user:pass@localhost/dbname")
261    ///
262    /// # Example
263    ///
264    /// ```ignore
265    /// use eidetica::backend::database::sql::SqlxBackend;
266    ///
267    /// let backend = SqlxBackend::connect_postgres("postgres://localhost/eidetica").await.unwrap();
268    /// ```
269    pub async fn connect_postgres(url: &str) -> Result<Self> {
270        Self::connect_postgres_with_schema(url, None).await
271    }
272
273    /// Connect to a PostgreSQL database with a specific schema for isolation.
274    ///
275    /// Creates a unique schema if `schema_name` is provided, providing test isolation.
276    /// Each test can use its own schema so they don't interfere with each other.
277    ///
278    /// # Arguments
279    ///
280    /// * `url` - PostgreSQL connection URL
281    /// * `schema_name` - Optional schema name. If None, uses the default (public) schema.
282    async fn connect_postgres_with_schema(url: &str, schema_name: Option<String>) -> Result<Self> {
283        // Install any driver support
284        sqlx::any::install_default_drivers();
285
286        // If schema_name is provided, first create the schema, then use after_connect
287        // to set search_path on each connection. This is more reliable than URL options
288        // which don't work consistently across all network configurations.
289        if let Some(ref schema) = schema_name {
290            // First connect to create the schema if needed
291            let temp_pool = AnyPoolOptions::new()
292                .max_connections(1)
293                .connect(url)
294                .await
295                .sql_context("Failed to connect to PostgreSQL")?;
296
297            // Create schema if it doesn't exist
298            let create_schema = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
299            sqlx::query(&create_schema)
300                .execute(&temp_pool)
301                .await
302                .sql_context(&format!("Failed to create schema {schema}"))?;
303
304            temp_pool.close().await;
305        }
306
307        // Build pool with after_connect hook to set search_path on each connection
308        // For isolated (test) connections, use smaller pool to avoid exhausting
309        // PostgreSQL's max_connections when running many tests in parallel.
310        let schema_for_hook = schema_name.clone();
311        let is_isolated = schema_name.is_some();
312        let mut pool_options = AnyPoolOptions::new();
313
314        if is_isolated {
315            // Test isolation: 2 connections is enough, with longer timeout to wait
316            // rather than fail when many tests run in parallel
317            pool_options = pool_options
318                .max_connections(2)
319                .acquire_timeout(Duration::from_secs(30));
320        } else {
321            // Production: 5 connections for real concurrency needs
322            pool_options = pool_options.max_connections(5);
323        }
324
325        let pool = pool_options
326            .after_connect(move |conn, _meta| {
327                let schema = schema_for_hook.clone();
328                Box::pin(async move {
329                    if let Some(ref s) = schema {
330                        let set_path = format!("SET search_path TO {s}");
331                        conn.execute(set_path.as_str()).await?;
332                    }
333                    Ok(())
334                })
335            })
336            .connect(url)
337            .await
338            .sql_context("Failed to connect to PostgreSQL")?;
339
340        let backend = Self {
341            pool,
342            kind: DbKind::Postgres,
343        };
344
345        // Initialize schema (tables will be created in the current search_path)
346        schema::initialize(&backend).await?;
347
348        Ok(backend)
349    }
350
351    /// Connect to a PostgreSQL database with test isolation.
352    ///
353    /// Creates a unique schema for this backend instance, ensuring tests
354    /// don't interfere with each other when run in parallel.
355    ///
356    /// # Arguments
357    ///
358    /// * `url` - PostgreSQL connection URL (e.g., "postgres://user:pass@localhost/dbname")
359    ///
360    /// # Example
361    ///
362    /// ```ignore
363    /// use eidetica::backend::database::sql::SqlxBackend;
364    ///
365    /// let backend = SqlxBackend::connect_postgres_isolated("postgres://localhost/eidetica").await.unwrap();
366    /// // This backend uses its own isolated schema
367    /// ```
368    pub async fn connect_postgres_isolated(url: &str) -> Result<Self> {
369        // Generate a unique schema name using UUID
370        // PostgreSQL schema names must start with a letter and be lowercase
371        let unique_id = uuid::Uuid::new_v4().simple().to_string();
372        let schema_name = format!("test_{unique_id}");
373        Self::connect_postgres_with_schema(url, Some(schema_name)).await
374    }
375}
376
377#[async_trait]
378impl BackendImpl for SqlxBackend {
379    async fn get(&self, id: &ID) -> Result<Entry> {
380        storage::get(self, id).await
381    }
382
383    async fn get_verification_status(&self, id: &ID) -> Result<VerificationStatus> {
384        storage::get_verification_status(self, id).await
385    }
386
387    async fn put(&self, entry: Entry) -> Result<()> {
388        storage::put(self, entry).await
389    }
390
391    async fn update_verification_status(
392        &self,
393        id: &ID,
394        verification_status: VerificationStatus,
395    ) -> Result<()> {
396        storage::update_verification_status(self, id, verification_status).await
397    }
398
399    async fn get_entries_by_verification_status(
400        &self,
401        status: VerificationStatus,
402    ) -> Result<Vec<ID>> {
403        storage::get_entries_by_verification_status(self, status).await
404    }
405
406    async fn snapshot(&self, tree: &ID) -> Result<Snapshot> {
407        traversal::snapshot(self, tree).await.map(Snapshot::new)
408    }
409
410    async fn store_snapshot(&self, tree: &ID, store: &str) -> Result<Snapshot> {
411        traversal::store_snapshot(self, tree, store)
412            .await
413            .map(Snapshot::new)
414    }
415
416    async fn store_snapshot_at(
417        &self,
418        tree: &ID,
419        store: &str,
420        main_snapshot: &Snapshot,
421    ) -> Result<Snapshot> {
422        traversal::store_snapshot_at(self, tree, store, main_snapshot.tips())
423            .await
424            .map(Snapshot::new)
425    }
426
427    async fn all_roots(&self) -> Result<Vec<ID>> {
428        storage::all_roots(self).await
429    }
430
431    async fn find_merge_base(&self, tree: &ID, store: &str, entry_ids: &[ID]) -> Result<ID> {
432        traversal::find_merge_base(self, tree, store, entry_ids).await
433    }
434
435    async fn collect_root_to_target(
436        &self,
437        tree: &ID,
438        store: &str,
439        target_entry: &ID,
440    ) -> Result<Vec<ID>> {
441        traversal::collect_root_to_target(self, tree, store, target_entry).await
442    }
443
444    fn as_any(&self) -> &dyn Any {
445        self
446    }
447
448    async fn get_tree(&self, tree: &ID) -> Result<Vec<Entry>> {
449        storage::get_tree(self, tree).await
450    }
451
452    async fn get_store(&self, tree: &ID, store: &str) -> Result<Vec<Entry>> {
453        storage::get_store(self, tree, store).await
454    }
455
456    async fn get_tree_from_tips(&self, tree: &ID, tips: &[ID]) -> Result<Vec<Entry>> {
457        traversal::get_tree_from_tips(self, tree, tips).await
458    }
459
460    async fn store_at(&self, tree: &ID, store: &str, snapshot: &Snapshot) -> Result<Vec<Entry>> {
461        traversal::store_at(self, tree, store, snapshot.tips()).await
462    }
463
464    async fn get_cached_crdt_state(
465        &self,
466        scope: &CacheScope,
467        entry_id: &ID,
468        store: &str,
469    ) -> Result<Option<Vec<u8>>> {
470        storage::get_cached_crdt_state(self, scope, entry_id, store).await
471    }
472
473    async fn cache_crdt_state(
474        &self,
475        scope: CacheScope,
476        entry_id: &ID,
477        store: &str,
478        state: Vec<u8>,
479    ) -> Result<()> {
480        storage::cache_crdt_state(self, scope, entry_id, store, state).await
481    }
482
483    async fn clear_crdt_cache(&self) -> Result<()> {
484        storage::clear_crdt_cache(self).await
485    }
486
487    async fn get_sorted_store_parents(
488        &self,
489        tree_id: &ID,
490        entry_id: &ID,
491        store: &str,
492    ) -> Result<Vec<ID>> {
493        traversal::get_sorted_store_parents(self, tree_id, entry_id, store).await
494    }
495
496    async fn get_path_from_to(
497        &self,
498        tree_id: &ID,
499        store: &str,
500        from_id: &ID,
501        to_ids: &[ID],
502    ) -> Result<Vec<ID>> {
503        traversal::get_path_from_to(self, tree_id, store, from_id, to_ids).await
504    }
505
506    async fn get_instance_metadata(&self) -> Result<Option<InstanceMetadata>> {
507        storage::get_instance_metadata(self).await
508    }
509
510    async fn set_instance_metadata(&self, metadata: &InstanceMetadata) -> Result<()> {
511        storage::set_instance_metadata(self, metadata).await
512    }
513
514    async fn get_instance_secrets(&self) -> Result<Option<InstanceSecrets>> {
515        storage::get_instance_secrets(self).await
516    }
517
518    async fn set_instance_secrets(&self, secrets: &InstanceSecrets) -> Result<()> {
519        storage::set_instance_secrets(self, secrets).await
520    }
521}
522
523/// Namespace for SQLite database constructors.
524///
525/// Provides ergonomic factory methods for creating SQLite-backed storage.
526/// All methods return `SqlxBackend` which implements `BackendImpl`.
527///
528/// # Example
529///
530/// ```ignore
531/// use eidetica::backend::database::Sqlite;
532///
533/// // File-based storage
534/// let backend = Sqlite::open("my_data.db").await?;
535///
536/// // In-memory (for testing)
537/// let backend = Sqlite::in_memory().await?;
538/// ```
539#[cfg(feature = "sqlite")]
540pub struct Sqlite;
541
542#[cfg(feature = "sqlite")]
543impl Sqlite {
544    /// Open a SQLite database at the given path.
545    ///
546    /// Creates the database file and schema if they don't exist.
547    ///
548    /// # Arguments
549    ///
550    /// * `path` - Path to the SQLite database file
551    pub async fn open<P: AsRef<std::path::Path>>(path: P) -> Result<SqlxBackend> {
552        SqlxBackend::open_sqlite(path).await
553    }
554
555    /// Create an in-memory SQLite database.
556    ///
557    /// The database exists only for the lifetime of the returned backend.
558    /// Useful for testing.
559    pub async fn in_memory() -> Result<SqlxBackend> {
560        SqlxBackend::sqlite_in_memory().await
561    }
562
563    /// Connect to a SQLite database using a connection URL.
564    ///
565    /// # Arguments
566    ///
567    /// * `url` - SQLite connection URL (e.g., "sqlite:./my.db")
568    pub async fn connect(url: &str) -> Result<SqlxBackend> {
569        SqlxBackend::connect_sqlite(url).await
570    }
571}
572
573/// Namespace for PostgreSQL database constructors.
574///
575/// Provides ergonomic factory methods for creating PostgreSQL-backed storage.
576/// All methods return `SqlxBackend` which implements `BackendImpl`.
577///
578/// # Example
579///
580/// ```ignore
581/// use eidetica::backend::database::Postgres;
582///
583/// // Connect to PostgreSQL
584/// let backend = Postgres::connect("postgres://user:pass@localhost/mydb").await?;
585///
586/// // With test isolation (unique schema per instance)
587/// let backend = Postgres::connect_isolated("postgres://localhost/test").await?;
588/// ```
589#[cfg(feature = "postgres")]
590pub struct Postgres;
591
592#[cfg(feature = "postgres")]
593impl Postgres {
594    /// Connect to a PostgreSQL database using a connection URL.
595    ///
596    /// This connects to the default (public) schema. For test isolation,
597    /// use `connect_isolated()` instead.
598    ///
599    /// # Arguments
600    ///
601    /// * `url` - PostgreSQL connection URL (e.g., "postgres://user:pass@localhost/dbname")
602    pub async fn connect(url: &str) -> Result<SqlxBackend> {
603        SqlxBackend::connect_postgres(url).await
604    }
605
606    /// Connect to a PostgreSQL database with test isolation.
607    ///
608    /// Creates a unique schema for this backend instance, ensuring tests
609    /// don't interfere with each other when run in parallel.
610    ///
611    /// # Arguments
612    ///
613    /// * `url` - PostgreSQL connection URL
614    pub async fn connect_isolated(url: &str) -> Result<SqlxBackend> {
615        SqlxBackend::connect_postgres_isolated(url).await
616    }
617}