Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/instance/
mod.rs

1//!
2//! Provides the main database structures (`Instance` and `Database`).
3//!
4//! `Instance` manages multiple `Database` instances and interacts with the storage `Database`.
5//! `Database` represents a single, independent history of data entries, analogous to a table or branch.
6
7use std::{
8    collections::HashMap,
9    future::Future,
10    path::PathBuf,
11    pin::Pin,
12    sync::{
13        Arc, Mutex, Weak,
14        atomic::{AtomicU64, Ordering},
15    },
16};
17
18use handle_trait::Handle;
19
20use crate::{
21    Clock, Database, Entry, Result, SystemClock,
22    auth::crypto::{PrivateKey, PublicKey},
23    backend::{BackendImpl, InstanceMetadata, InstanceSecrets},
24    entry::ID,
25    sync::Sync,
26    user::User,
27};
28#[cfg(all(unix, feature = "service"))]
29use crate::{auth::SigKey, service::client::RemoteConnection};
30
31pub mod backend;
32pub mod errors;
33pub mod new_user;
34pub mod settings_merge;
35pub mod url;
36
37#[cfg(test)]
38mod tests;
39
40// Re-export main types for easier access
41#[cfg(all(unix, feature = "service"))]
42use backend::RemoteBackend;
43use backend::{Backend, LocalBackend};
44pub use errors::InstanceError;
45pub use new_user::NewUser;
46
47/// Indicates whether an entry write originated locally or from a remote source (e.g., sync).
48///
49/// This distinction allows different callbacks to be triggered based on the write source,
50/// enabling behaviors like "only trigger sync for local writes" or "only update UI for remote writes".
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
52pub enum WriteSource {
53    /// Write originated from a local transaction commit
54    Local,
55    /// Write originated from a remote source (e.g., sync, replication)
56    Remote,
57}
58
59/// Context provided to write callbacks describing what changed in the database.
60///
61/// For local writes (transaction commits), this contains a single entry.
62/// For remote writes (sync), this may contain a batch of entries that were
63/// received and stored together.
64///
65/// # Catching up on missed writes
66///
67/// The `previous_tips` field contains the DAG tips of the database *before* the
68/// write(s) that triggered this callback. Consumers can use this to determine
69/// exactly what changed by walking the DAG from the current tips back to these
70/// previous tips. This is analogous to `git log previous_tip..HEAD`.
71///
72/// This design means callbacks never need to "miss" writes — even if multiple
73/// entries are batched (as in sync), the consumer can reconstruct the full set
74/// of changes from the tip diff.
75///
76/// # Example
77///
78/// ```rust,no_run
79/// # use eidetica::instance::WriteEvent;
80/// # fn example(event: &WriteEvent) {
81/// // Check what stores were touched
82/// for entry in event.entries() {
83///     if entry.in_subtree("messages") {
84///         // A write touched the "messages" store
85///     }
86/// }
87///
88/// // Use previous_tips to find what's new
89/// let prev = event.previous_tips();
90/// // Walk DAG from current tips back to prev to find all new entries
91/// # }
92/// ```
93#[derive(Debug, Clone)]
94pub struct WriteEvent {
95    /// The entries written in this event. For local writes, this is always
96    /// exactly one entry. For remote sync, this is the full batch of entries
97    /// that were received and stored together.
98    entries: Vec<Entry>,
99    /// The DAG tips of the database immediately before this write.
100    /// Consumers can diff current tips against these to determine what changed.
101    previous_tips: Vec<ID>,
102    /// Whether this write originated locally or from a remote sync.
103    source: WriteSource,
104}
105
106impl WriteEvent {
107    /// Get the entries written in this event.
108    ///
109    /// For local writes (transaction commits), this always contains exactly one entry.
110    /// For remote writes (sync), this contains the full batch of entries received together.
111    pub fn entries(&self) -> &[Entry] {
112        &self.entries
113    }
114
115    /// Get the DAG tips of the database before this write.
116    ///
117    /// Use these to determine what changed: walk from the database's current tips
118    /// back to these previous tips to find all new entries.
119    pub fn previous_tips(&self) -> &[ID] {
120        &self.previous_tips
121    }
122
123    /// The source of this write (local commit or remote sync).
124    pub fn source(&self) -> WriteSource {
125        self.source
126    }
127}
128
129/// Boxed future returned by the internal async callback dispatcher.
130pub(crate) type AsyncWriteCallbackFuture<'a> =
131    Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
132
133/// Internal async callback function type. The user-facing callback contract
134/// is documented on [`Database::on_write`](crate::Database::on_write).
135pub(crate) type AsyncWriteCallbackFn = Arc<
136    dyn for<'a> Fn(&'a WriteEvent, &'a Database) -> AsyncWriteCallbackFuture<'a>
137        + Send
138        + std::marker::Sync,
139>;
140
141/// Opaque identifier for a registered callback. Stable for the life of the registration.
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
143pub(crate) struct CallbackId(u64);
144
145/// Type alias for a collection of write callbacks paired with their ids.
146type CallbackVec = Vec<(CallbackId, AsyncWriteCallbackFn)>;
147
148/// Handle to a registered write callback. **Drop to unregister.**
149///
150/// Returned by [`Database::on_write`](crate::Database::on_write). While this
151/// value is alive the callback fires on writes; dropping it removes the
152/// registration. Use [`detach`](Self::detach) to keep the callback registered
153/// for the life of the [`Instance`] when you don't want to manage the lifetime
154/// yourself.
155///
156/// Holds a weak reference to the [`Instance`], so a `WriteCallback` will not
157/// keep the Instance alive on its own.
158#[must_use = "dropping a WriteCallback unregisters it; call .detach() to keep the callback registered"]
159pub struct WriteCallback {
160    instance: WeakInstance,
161    tree_id: ID,
162    id: CallbackId,
163    detached: bool,
164}
165
166impl WriteCallback {
167    pub(crate) fn new_per_database(instance: WeakInstance, tree_id: ID, id: CallbackId) -> Self {
168        Self {
169            instance,
170            tree_id,
171            id,
172            detached: false,
173        }
174    }
175
176    /// Consume the handle without unregistering. The callback remains active
177    /// for the life of the [`Instance`].
178    ///
179    /// Implementation note: this sets a flag rather than calling `mem::forget`
180    /// so that field destructors (the `WeakInstance`'s weak count, the
181    /// `tree_id`'s heap allocation) still run — only our `Drop` impl is
182    /// short-circuited.
183    pub fn detach(mut self) {
184        self.detached = true;
185    }
186}
187
188impl std::fmt::Debug for WriteCallback {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        f.debug_struct("WriteCallback")
191            .field("id", &self.id)
192            .field("tree_id", &self.tree_id)
193            .field("detached", &self.detached)
194            .finish()
195    }
196}
197
198impl Drop for WriteCallback {
199    fn drop(&mut self) {
200        if self.detached {
201            return;
202        }
203        if let Some(instance) = self.instance.upgrade() {
204            instance.remove_write_callback(&self.tree_id, self.id);
205        }
206    }
207}
208
209/// Internal state for Instance
210///
211/// This structure holds the actual implementation data for Instance.
212/// Instance itself is just a cheap-to-clone handle wrapping Arc<InstanceInternal>.
213pub(crate) struct InstanceInternal {
214    /// The database storage backend
215    backend: Arc<dyn Backend>,
216    /// Time provider for timestamps
217    clock: Arc<dyn Clock>,
218    /// Synchronization module for this database instance
219    /// TODO: Overengineered, Sync can be created by default but disabled
220    sync: std::sync::OnceLock<Arc<Sync>>,
221    /// Public instance metadata (device identity, system database IDs)
222    metadata: InstanceMetadata,
223    /// Private instance secrets (None for remote instances without key access)
224    secrets: Option<InstanceSecrets>,
225    /// JSON snapshot file path for an in-memory backend constructed via
226    /// `memory:///path.json` (or set explicitly through
227    /// [`Instance::snapshot_to_path`]). [`Instance::flush`] and the
228    /// [`Drop`] safety net write through this. `None` on any non-snapshot
229    /// backend.
230    ///
231    /// The mutex serves double duty: it guards the path slot itself (so
232    /// `set_snapshot_path` doesn't race with readers) AND serializes the
233    /// actual write so concurrent callers from `flush` / `snapshot_to_path`
234    /// / `Drop` don't race on the shared `<path>.tmp` staging file in
235    /// [`InMemory::save_to_file`]. Held across sync I/O only — never
236    /// across an `.await`. Poison-tolerant: a panic mid-write leaves the
237    /// on-disk snapshot unchanged but must not strand the [`Instance`].
238    snapshot_path: Mutex<Option<PathBuf>>,
239    /// Per-database callbacks keyed by tree_id. Each callback fires for both
240    /// local and remote writes; consumers branch on [`WriteEvent::source`] if
241    /// they only care about one.
242    write_callbacks: Mutex<HashMap<ID, CallbackVec>>,
243    /// Global callbacks fired for every write across every database.
244    global_write_callbacks: Mutex<CallbackVec>,
245    /// Monotonic id source for [`CallbackId`].
246    next_callback_id: AtomicU64,
247    /// Per-tree async locks serializing the
248    /// `snapshot` → backend write → callback dispatch sequence so
249    /// `WriteEvent::previous_tips` is consistent for concurrent writers
250    /// to the same tree.
251    tree_locks: Mutex<HashMap<ID, Arc<tokio::sync::Mutex<()>>>>,
252}
253
254impl std::fmt::Debug for InstanceInternal {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        f.debug_struct("InstanceInternal")
257            .field("backend", &"<BackendDB>")
258            .field("clock", &self.clock)
259            .field("sync", &self.sync)
260            .field("metadata", &self.metadata)
261            .field("secrets", &self.secrets.is_some())
262            .field(
263                "write_callbacks",
264                &format!(
265                    "<{} per-db callbacks>",
266                    self.write_callbacks.lock().unwrap().len()
267                ),
268            )
269            .field(
270                "global_write_callbacks",
271                &format!(
272                    "<{} global callbacks>",
273                    self.global_write_callbacks.lock().unwrap().len()
274                ),
275            )
276            .field(
277                "next_callback_id",
278                &self.next_callback_id.load(Ordering::Relaxed),
279            )
280            .finish()
281    }
282}
283
284impl InstanceInternal {
285    /// Synchronously write a JSON snapshot of the underlying backend to `path`.
286    ///
287    /// Returns [`InstanceError::SnapshotNotSupported`] for any backend other
288    /// than the local in-memory backend. Shared by [`Instance::snapshot_to_path`],
289    /// [`Instance::flush`], and the [`Drop`] fallback so the three can't drift.
290    ///
291    /// **Caller must hold the [`snapshot_path`](Self::snapshot_path) mutex.**
292    /// That lock serializes the write — without it, concurrent callers
293    /// would race on the shared `<path>.tmp` staging file in
294    /// [`InMemory::save_to_file`]. The critical section is fully sync; no
295    /// `.await` happens while the lock is held.
296    fn save_snapshot_locked(&self, path: &std::path::Path) -> Result<()> {
297        use crate::backend::database::InMemory;
298        let engine = self
299            .backend
300            .local_engine()
301            .ok_or(InstanceError::SnapshotNotSupported)?;
302        let in_memory = engine
303            .as_any()
304            .downcast_ref::<InMemory>()
305            .ok_or(InstanceError::SnapshotNotSupported)?;
306        in_memory.save_to_file(path)
307    }
308}
309
310/// Best-effort snapshot save on the *last* `InstanceInternal` drop.
311///
312/// Fires when the `Arc<InstanceInternal>` reaches refcount 0 and a snapshot
313/// path is armed (i.e. the `Instance` was constructed via a
314/// `memory:///path.json` URL). [`Instance::flush`] does **not** clear the
315/// snapshot path, so Drop fires even after a successful `flush()` — the
316/// write is idempotent (same atomic tmp+rename), so the worst case is one
317/// extra write of unchanged JSON.
318///
319/// **Errors are logged via `tracing::error!`, not surfaced** — `Drop` can't
320/// return a `Result` and panicking would be worse than logging. Apps that
321/// care about snapshot durability should call [`Instance::flush`] at
322/// well-defined checkpoints and inspect its `Result`; Drop is a safety net,
323/// not the primary persistence path. If `flush()` failed with a permanent
324/// error (e.g. nonexistent parent directory), Drop will fail the same way
325/// and emit a second log line — accept this redundancy as the cost of a
326/// best-effort fallback.
327///
328/// **Blocking I/O warning:** the snapshot write is synchronous
329/// (`std::fs::write` + `rename`). If the `Instance` is dropped on a tokio
330/// worker thread, this blocks that worker for the duration of the write —
331/// negligible for small snapshots, but pathological for very large ones.
332/// Prefer `flush().await` (which still blocks briefly, but does so under
333/// explicit caller control).
334impl Drop for InstanceInternal {
335    fn drop(&mut self) {
336        // Drop runs at Arc refcount 0, so no other handle can race here —
337        // any in-flight `flush()` future holds `&self` and thus an Arc
338        // clone, which would have prevented Drop from firing. We still
339        // acquire the lock for the write so the locking discipline in
340        // `save_snapshot_locked`'s doc-comment holds uniformly. The lock
341        // is uncontended at this point.
342        let mut guard = match self.snapshot_path.lock() {
343            Ok(g) => g,
344            Err(p) => p.into_inner(),
345        };
346        let Some(path) = guard.take() else { return };
347
348        if let Err(e) = self.save_snapshot_locked(&path) {
349            tracing::error!(
350                snapshot_path = %path.display(),
351                error = %e,
352                "Drop: snapshot save failed. Call `Instance::flush().await` at \
353                 checkpoints to inspect the error via Result; Drop is a safety net only.",
354            );
355        }
356    }
357}
358/// Database implementation on top of the storage backend.
359///
360/// Instance manages infrastructure only:
361/// - Backend storage and device identity
362/// - System databases (_users, _databases, _sync)
363/// - User account management (create, login, list)
364///
365/// All database creation and key operations happen through User after login.
366///
367/// Instance is a cheap-to-clone handle around `Arc<InstanceInternal>`.
368///
369/// ## Example
370///
371/// ```
372/// # use eidetica::{Instance, NewUser, crdt::Doc};
373/// # #[tokio::main]
374/// # async fn main() -> eidetica::Result<()> {
375/// // Bootstrap a fresh instance with an initial admin user. The first user
376/// // created on an instance is automatically granted Admin on the system
377/// // databases.
378/// let (instance, maybe_user) = Instance::connect_or_create(
379///     "memory://",
380///     NewUser::passwordless("alice"),
381/// ).await?;
382/// let mut user = maybe_user.expect("memory:// is always fresh");
383///
384/// // Use User API for operations
385/// let mut settings = Doc::new();
386/// settings.set("name", "my_database");
387/// let default_key = user.get_default_key()?;
388/// let db = user.create_database(settings, &default_key).await?;
389/// # Ok(())
390/// # }
391/// ```
392#[derive(Clone, Debug, Handle)]
393pub struct Instance {
394    inner: Arc<InstanceInternal>,
395}
396
397/// Weak reference to an Instance.
398///
399/// This is a weak handle that does not prevent the Instance from being dropped.
400/// Dependent objects (Database, Sync, BackgroundSync) hold weak references to avoid
401/// circular reference cycles that would leak memory.
402///
403/// Use `upgrade()` to convert to a strong `Instance` reference.
404#[derive(Clone, Debug, Handle)]
405pub struct WeakInstance {
406    inner: Weak<InstanceInternal>,
407}
408
409impl Instance {
410    /// Open a connection to an eidetica instance described by a connection URL.
411    ///
412    /// Strict load: returns [`InstanceError::NotInitialized`] when the URL
413    /// points at an embedded backend (`sqlite://`, `postgres://`, `memory://`)
414    /// that has no eidetica metadata yet. Use
415    /// [`Instance::connect_or_create`] to bootstrap an embedded backend on
416    /// first run.
417    ///
418    /// Supported URL schemes:
419    /// - `sqlite://./app.db` — embedded sqlite backend; URL is passed through
420    ///   to `sqlx::sqlite`, so any sqlx-accepted form works
421    ///   (`?mode=rwc&journal_mode=WAL` etc.).
422    /// - `postgres://user:pwd@host/db` — embedded postgres backend; URL is
423    ///   passed through to `sqlx::postgres`.
424    /// - `unix:///run/eidetica/sock` — thin client to a running daemon.
425    /// - `memory://` — empty in-memory backend. Strict load against an
426    ///   empty in-memory backend always errors `NotInitialized`; use
427    ///   `connect_or_create` for a fresh in-memory instance.
428    /// - `memory:///path/to/snap.json` — in-memory backend with a JSON
429    ///   snapshot file (load-on-start; snapshot writes via
430    ///   [`Instance::flush`] / [`Instance::snapshot_to_path`] / Drop fallback).
431    ///
432    /// See [`crate::instance::url`] for the full URL grammar.
433    ///
434    /// # Example
435    ///
436    /// `connect()` only succeeds against an already-initialised backend.
437    /// The two-phase pattern below bootstraps once, then re-opens with
438    /// the strict load:
439    ///
440    /// ```
441    /// # #[tokio::main]
442    /// # async fn main() -> eidetica::Result<()> {
443    /// use eidetica::{Instance, NewUser};
444    ///
445    /// let temp = tempfile::tempdir()?;
446    /// let snapshot = temp.path().join("app.json");
447    /// let url = format!("memory://{}", snapshot.display());
448    ///
449    /// // First run: bootstrap and flush a snapshot to disk.
450    /// {
451    ///     let (instance, maybe_user) =
452    ///         Instance::connect_or_create(&url, NewUser::passwordless("alice")).await?;
453    ///     let _user = maybe_user.expect("fresh bootstrap on first run");
454    ///     instance.flush()?;
455    /// }
456    ///
457    /// // Later: strict connect against the persisted snapshot.
458    /// let instance = Instance::connect(&url).await?;
459    /// let _user = instance.login_user("alice", None).await?;
460    /// # Ok(())
461    /// # }
462    /// ```
463    ///
464    /// Calling `connect()` on a backend with no eidetica metadata returns
465    /// [`InstanceError::NotInitialized`]; reach for
466    /// [`Instance::connect_or_create`] when first-run bootstrap is part
467    /// of the expected lifecycle.
468    pub async fn connect(url: impl AsRef<str>) -> Result<Self> {
469        Self::connect_impl(url.as_ref(), Arc::new(SystemClock)).await
470    }
471
472    /// Open or initialise an eidetica instance described by a connection URL.
473    ///
474    /// On the load arm: identical to [`Instance::connect`]; `initial` is
475    /// silently ignored and the second tuple element is `None`.
476    ///
477    /// On the bootstrap arm: initialises the backend at the URL with the
478    /// supplied [`NewUser`] as the first admin and returns
479    /// `(Instance, Some(User))`. Only embedded backends
480    /// (sqlite/postgres/memory) ever take the bootstrap arm — `unix://` URLs
481    /// degrade to `connect` (the daemon owns its own initialisation), so the
482    /// returned `Option<User>` is always `None` for `unix://`.
483    ///
484    /// # Example
485    /// ```
486    /// # use eidetica::{Instance, NewUser};
487    /// # #[tokio::main]
488    /// # async fn main() -> eidetica::Result<()> {
489    /// let (instance, maybe_user) = Instance::connect_or_create(
490    ///     "memory://",
491    ///     NewUser::passwordless("alice"),
492    /// ).await?;
493    /// let mut user = match maybe_user {
494    ///     Some(u) => u,
495    ///     None => instance.login_user("alice", None).await?,
496    /// };
497    /// # let _ = user.get_default_key()?;
498    /// # Ok(())
499    /// # }
500    /// ```
501    pub async fn connect_or_create(
502        url: impl AsRef<str>,
503        initial: NewUser,
504    ) -> Result<(Self, Option<User>)> {
505        Self::connect_or_create_impl(url.as_ref(), initial, Arc::new(SystemClock)).await
506    }
507
508    /// Escape hatch: open or initialise an eidetica instance against a
509    /// pre-built [`BackendImpl`] (sqlite, postgres, in-memory, or custom).
510    ///
511    /// Same load-or-bootstrap semantics as [`Instance::connect_or_create`]
512    /// but skips URL parsing. Useful for tests, embedded apps that want to
513    /// configure the backend's pool/runtime manually, or backends not yet
514    /// exposed via a URL scheme.
515    pub async fn connect_or_create_backend(
516        backend: Box<dyn BackendImpl>,
517        initial: NewUser,
518    ) -> Result<(Self, Option<User>)> {
519        Self::connect_or_create_backend_impl(
520            Arc::from(backend),
521            initial,
522            Arc::new(SystemClock),
523            None,
524        )
525        .await
526    }
527
528    /// Strict-load escape hatch: open an eidetica instance against a
529    /// pre-built [`BackendImpl`] that's already been initialised. Mirrors
530    /// [`Instance::connect`]'s strict semantics for the URL-less case.
531    ///
532    /// Errors with [`InstanceError::NotInitialized`] if the backend has no
533    /// instance metadata; use [`Instance::connect_or_create_backend`] when you want
534    /// to bootstrap on an empty backend.
535    pub async fn open_backend(backend: Box<dyn BackendImpl>) -> Result<Self> {
536        Self::open_impl(backend, Arc::new(SystemClock)).await
537    }
538
539    /// Test variant of [`Instance::open_backend`] with an injectable clock.
540    #[cfg(any(test, feature = "testing"))]
541    pub async fn open_backend_with_clock(
542        backend: Box<dyn BackendImpl>,
543        clock: Arc<dyn Clock>,
544    ) -> Result<Self> {
545        Self::open_impl(backend, clock).await
546    }
547
548    /// Strict-create escape hatch: initialise an eidetica instance on a
549    /// fresh pre-built [`BackendImpl`] and bootstrap an initial admin user.
550    ///
551    /// Errors with [`InstanceError::InstanceAlreadyExists`] if the backend
552    /// is already initialised; use [`Instance::connect_or_create_backend`] when the
553    /// caller doesn't want to choose between load and create up front.
554    pub async fn create_backend(
555        backend: Box<dyn BackendImpl>,
556        initial: NewUser,
557    ) -> Result<(Self, User)> {
558        Self::create_backend_impl(backend, initial, Arc::new(SystemClock)).await
559    }
560
561    /// Test variant of [`Instance::create_backend`] with an injectable clock.
562    ///
563    /// Arg order: backend, clock, initial — clock goes in the middle so
564    /// migrating from the prior `create_with_clock` is a pure rename.
565    #[cfg(any(test, feature = "testing"))]
566    pub async fn create_backend_with_clock(
567        backend: Box<dyn BackendImpl>,
568        clock: Arc<dyn Clock>,
569        initial: NewUser,
570    ) -> Result<(Self, User)> {
571        Self::create_backend_impl(backend, initial, clock).await
572    }
573
574    async fn create_backend_impl(
575        backend: Box<dyn BackendImpl>,
576        initial: NewUser,
577        clock: Arc<dyn Clock>,
578    ) -> Result<(Self, User)> {
579        let backend: Arc<dyn BackendImpl> = Arc::from(backend);
580        if backend.get_instance_metadata().await?.is_some() {
581            return Err(InstanceError::InstanceAlreadyExists.into());
582        }
583        Self::create_internal(backend, clock, initial).await
584    }
585
586    // Clock injection is exposed only through the pre-built-backend
587    // variants ([`open_backend_with_clock`] and [`create_backend_with_clock`]).
588    // The URL-based `connect_*` constructors deliberately have no
589    // `_with_clock` siblings: every existing test that needs deterministic
590    // timestamps already builds an `InMemory` backend directly, so a URL-
591    // shaped clock entry point would be dead weight.
592
593    // ============ Internal URL dispatchers ============
594
595    async fn connect_impl(url: &str, clock: Arc<dyn Clock>) -> Result<Self> {
596        let parsed = url::parse(url)?;
597        match parsed {
598            url::ConnectionUrl::Sqlite { url } => Self::connect_sqlite(&url, clock).await,
599            url::ConnectionUrl::Postgres { url } => Self::connect_postgres(&url, clock).await,
600            url::ConnectionUrl::Unix { socket_path } => {
601                Self::connect_unix_socket(socket_path, clock).await
602            }
603            url::ConnectionUrl::Memory { snapshot_path } => {
604                Self::connect_memory(snapshot_path, clock).await
605            }
606        }
607    }
608
609    async fn connect_or_create_impl(
610        url: &str,
611        initial: NewUser,
612        clock: Arc<dyn Clock>,
613    ) -> Result<(Self, Option<User>)> {
614        let parsed = url::parse(url)?;
615        match parsed {
616            url::ConnectionUrl::Sqlite { url } => {
617                let backend = open_sqlite_backend(&url).await?;
618                Self::connect_or_create_backend_impl(Arc::from(backend), initial, clock, None).await
619            }
620            url::ConnectionUrl::Postgres { url } => {
621                let backend = open_postgres_backend(&url).await?;
622                Self::connect_or_create_backend_impl(Arc::from(backend), initial, clock, None).await
623            }
624            url::ConnectionUrl::Unix { socket_path } => {
625                // Daemons own their own initialisation. connect_or_create
626                // against `unix://` degrades to a plain connect; `initial`
627                // is unused on this arm. Log it so the silent drop is
628                // discoverable when debugging "why didn't my initial user
629                // get created?" on a remote URL.
630                tracing::debug!(
631                    socket_path = %socket_path.display(),
632                    username = %initial.username,
633                    "connect_or_create against `unix://` is degrading to `connect`; \
634                     `initial` is ignored — daemons own their own initialisation. \
635                     Run `eidetica daemon init` to bootstrap a daemon-side instance."
636                );
637                let instance = Self::connect_unix_socket(socket_path, clock).await?;
638                Ok((instance, None))
639            }
640            url::ConnectionUrl::Memory { snapshot_path } => {
641                use crate::backend::database::InMemory;
642                // Build the backend from a single `try_load_from_file` call.
643                // `Ok(None)` means the file didn't exist at read time (the
644                // bootstrap-friendly "first run" case → empty backend).
645                // `Ok(Some(loaded))` means the file existed and parsed; if
646                // it carries no instance metadata it's foreign data that
647                // happened to satisfy the `SerializableDatabase` shape, and
648                // we refuse to bootstrap over it (the next snapshot would
649                // silently overwrite the caller's file). Doing the existence
650                // test and the read in one call removes the TOCTOU window a
651                // separate `path.exists()` check would open.
652                let backend: Box<dyn BackendImpl> = match snapshot_path.as_deref() {
653                    None => Box::new(InMemory::new()),
654                    Some(path) => {
655                        let loaded = InMemory::try_load_from_file(path).await.map_err(|e| {
656                            InstanceError::InvalidSnapshot {
657                                path: path.to_path_buf(),
658                                reason: e.to_string(),
659                            }
660                        })?;
661                        match loaded {
662                            None => Box::new(InMemory::new()),
663                            Some(loaded) => {
664                                let boxed: Box<dyn BackendImpl> = Box::new(loaded);
665                                if boxed.get_instance_metadata().await?.is_none() {
666                                    return Err(InstanceError::InvalidSnapshot {
667                                        path: path.to_path_buf(),
668                                        reason: "snapshot file exists but contains no instance \
669                                             metadata; refusing to bootstrap on top of foreign \
670                                             data. Delete or move the file to create a fresh \
671                                             instance at this path."
672                                            .into(),
673                                    }
674                                    .into());
675                                }
676                                boxed
677                            }
678                        }
679                    }
680                };
681                Self::connect_or_create_backend_impl(
682                    Arc::from(backend),
683                    initial,
684                    clock,
685                    snapshot_path,
686                )
687                .await
688            }
689        }
690    }
691
692    /// Internal: load-or-bootstrap against a pre-built backend, optionally
693    /// remembering a snapshot path so Drop / flush can write to it.
694    async fn connect_or_create_backend_impl(
695        backend: Arc<dyn BackendImpl>,
696        initial: NewUser,
697        clock: Arc<dyn Clock>,
698        snapshot_path: Option<PathBuf>,
699    ) -> Result<(Self, Option<User>)> {
700        if let Some(metadata) = backend.get_instance_metadata().await? {
701            let instance = Self::open_impl_arc_with_metadata(backend, clock, metadata).await?;
702            instance.set_snapshot_path(snapshot_path);
703            Ok((instance, None))
704        } else {
705            let (instance, user) = Self::create_internal(backend, clock, initial).await?;
706            instance.set_snapshot_path(snapshot_path);
707            Ok((instance, Some(user)))
708        }
709    }
710
711    // ============ Backend connection helpers ============
712
713    #[cfg(all(unix, feature = "service"))]
714    async fn connect_unix_socket(socket_path: PathBuf, clock: Arc<dyn Clock>) -> Result<Self> {
715        let conn = crate::service::client::RemoteConnection::connect(&socket_path).await?;
716        let backend: Arc<dyn Backend> = Arc::new(RemoteBackend::new(conn, None));
717
718        // Load metadata from the remote backend
719        let metadata = backend
720            .get_instance_metadata()
721            .await?
722            .ok_or(InstanceError::DeviceKeyNotFound)?;
723
724        // No local secrets — keys are held server-side after login.
725        let inner = Arc::new(InstanceInternal {
726            backend,
727            clock,
728            sync: std::sync::OnceLock::new(),
729            metadata,
730            secrets: None,
731            snapshot_path: Mutex::new(None),
732            write_callbacks: Mutex::new(HashMap::new()),
733            global_write_callbacks: Mutex::new(Vec::new()),
734            next_callback_id: AtomicU64::new(0),
735            tree_locks: Mutex::new(HashMap::new()),
736        });
737        Ok(Self { inner })
738    }
739
740    #[cfg(not(all(unix, feature = "service")))]
741    async fn connect_unix_socket(_socket_path: PathBuf, _clock: Arc<dyn Clock>) -> Result<Self> {
742        Err(InstanceError::BackendUnavailable {
743            scheme: "unix",
744            missing_feature: "service",
745        }
746        .into())
747    }
748
749    async fn connect_sqlite(url: &str, clock: Arc<dyn Clock>) -> Result<Self> {
750        let backend = open_sqlite_backend(url).await?;
751        Self::open_impl(backend, clock).await
752    }
753
754    async fn connect_postgres(url: &str, clock: Arc<dyn Clock>) -> Result<Self> {
755        let backend = open_postgres_backend(url).await?;
756        Self::open_impl(backend, clock).await
757    }
758
759    async fn connect_memory(snapshot_path: Option<PathBuf>, clock: Arc<dyn Clock>) -> Result<Self> {
760        use crate::backend::database::InMemory;
761        // Strict load: a snapshot URL that points at a non-existent file
762        // cannot satisfy `connect`'s "must already be initialised" contract.
763        // `try_load_from_file` returns `Ok(None)` when the file doesn't
764        // exist, which we translate into a pointed `InvalidSnapshot`. Using
765        // the same call for the existence test and the read removes the
766        // TOCTOU window a separate `path.exists()` check would open: if the
767        // file vanishes mid-call, the underlying `read_to_string` surfaces
768        // `NotFound` and lands us in the same `None` arm.
769        let backend: Box<dyn BackendImpl> = match snapshot_path.as_deref() {
770            None => Box::new(InMemory::new()),
771            Some(path) => {
772                let loaded = InMemory::try_load_from_file(path).await.map_err(|e| {
773                    InstanceError::InvalidSnapshot {
774                        path: path.to_path_buf(),
775                        reason: e.to_string(),
776                    }
777                })?;
778                match loaded {
779                    Some(loaded) => Box::new(loaded),
780                    None => {
781                        return Err(InstanceError::InvalidSnapshot {
782                            path: path.to_path_buf(),
783                            reason: "snapshot file does not exist; \
784                                     use `Instance::connect_or_create` to bootstrap a new instance \
785                                     at this path, or pass `memory://` for an ephemeral instance"
786                                .into(),
787                        }
788                        .into());
789                    }
790                }
791            }
792        };
793        let instance = Self::open_impl(backend, clock).await?;
794        instance.set_snapshot_path(snapshot_path);
795        Ok(instance)
796    }
797
798    /// Flush deferred persistence state to disk.
799    ///
800    /// For an `Instance` constructed via a `memory:///path.json` URL, this
801    /// writes the current backend state to the snapshot path (atomic on
802    /// POSIX — `<path>.tmp` then rename). For sqlite/postgres/unix
803    /// backends this is a no-op; those storage layers handle persistence
804    /// inline.
805    ///
806    /// Idempotent and reentrant — call it as often as you like at
807    /// well-defined checkpoints. The snapshot path stays armed, so the
808    /// [`Drop`] fallback continues to fire on the last handle as a safety
809    /// net. The `Instance` (and any clones) remain fully usable after
810    /// `flush()` returns; this is not a shutdown.
811    ///
812    /// If `flush()` fails (e.g. nonexistent parent directory), the error
813    /// surfaces in the `Result`. Drop will later try the same write and
814    /// fail the same way, logging via `tracing::error!`. The duplicate
815    /// signal is intentional — Drop must report what it sees.
816    ///
817    /// **Blocking I/O note:** the snapshot write is synchronous
818    /// (`std::fs::write` + `rename`) and runs inline on the caller. Hence
819    /// the sync signature — there is no `.await` inside. If you're calling
820    /// from a tokio task, this briefly blocks the runtime worker;
821    /// negligible for small snapshots.
822    pub fn flush(&self) -> Result<()> {
823        // Acquire the snapshot_path lock once and hold it across the
824        // write — the lock both gates the path slot and serializes the
825        // sync I/O so concurrent flushes don't clobber each other's
826        // staging tempfile. The path stays armed (we read, don't take)
827        // so subsequent flushes and the Drop safety net keep working.
828        let guard = self
829            .inner
830            .snapshot_path
831            .lock()
832            .unwrap_or_else(|p| p.into_inner());
833        if let Some(path) = guard.as_deref() {
834            self.inner.save_snapshot_locked(path)?;
835        }
836        Ok(())
837    }
838
839    /// Write a JSON snapshot of the in-memory backend to `path`.
840    ///
841    /// The write goes to `<path>.tmp` and then renames into place. On POSIX
842    /// the rename is atomic; on Windows it is not atomic when the
843    /// destination already exists. Returns
844    /// [`InstanceError::SnapshotNotSupported`] on any backend other than
845    /// the in-memory backend.
846    pub fn snapshot_to_path(&self, path: impl AsRef<std::path::Path>) -> Result<()> {
847        let _guard = self
848            .inner
849            .snapshot_path
850            .lock()
851            .unwrap_or_else(|p| p.into_inner());
852        self.inner.save_snapshot_locked(path.as_ref())
853    }
854
855    /// Stash the snapshot path on the InstanceInternal so Drop / close can
856    /// find it. Only meaningful for in-memory backends — no-op for others.
857    fn set_snapshot_path(&self, path: Option<PathBuf>) {
858        if path.is_none() {
859            return;
860        }
861        // Poison-tolerant: a panic in another holder must not strand the
862        // Instance — the snapshot path is a simple swappable Option.
863        let mut guard = self
864            .inner
865            .snapshot_path
866            .lock()
867            .unwrap_or_else(|p| p.into_inner());
868        *guard = path;
869    }
870
871    /// Internal load-only implementation that works with any clock.
872    async fn open_impl(backend: Box<dyn BackendImpl>, clock: Arc<dyn Clock>) -> Result<Self> {
873        let backend: Arc<dyn BackendImpl> = Arc::from(backend);
874
875        // Strict: require existing InstanceMetadata. Initialisation is the
876        // caller's responsibility (`connect_or_create` / `connect_or_create_backend`).
877        let metadata = backend
878            .get_instance_metadata()
879            .await?
880            .ok_or(InstanceError::NotInitialized)?;
881
882        // Load secrets (contains the private key)
883        let secrets = backend.get_instance_secrets().await?;
884
885        // If secrets are present, verify they match the metadata
886        if let Some(ref secrets) = secrets {
887            let derived_id = secrets.signing_key.public_key();
888            if derived_id != metadata.id {
889                return Err(InstanceError::DeviceKeyMismatch.into());
890            }
891        }
892
893        // Existing backend: load from metadata + secrets
894        let inner = Arc::new(InstanceInternal {
895            backend: Arc::new(LocalBackend::new(backend)),
896            clock,
897            sync: std::sync::OnceLock::new(),
898            metadata,
899            secrets,
900            snapshot_path: Mutex::new(None),
901            write_callbacks: Mutex::new(HashMap::new()),
902            global_write_callbacks: Mutex::new(Vec::new()),
903            next_callback_id: AtomicU64::new(0),
904            tree_locks: Mutex::new(HashMap::new()),
905        });
906        Ok(Self { inner })
907    }
908
909    /// Load-only helper that accepts an already-arc'd backend and the
910    /// already-fetched metadata. Used by `connect_or_create_backend_impl`,
911    /// which has already inspected metadata to choose between the load
912    /// and bootstrap arms — passing it through avoids a redundant
913    /// `get_instance_metadata` round-trip.
914    async fn open_impl_arc_with_metadata(
915        backend: Arc<dyn BackendImpl>,
916        clock: Arc<dyn Clock>,
917        metadata: InstanceMetadata,
918    ) -> Result<Self> {
919        let secrets = backend.get_instance_secrets().await?;
920        if let Some(ref secrets) = secrets {
921            let derived_id = secrets.signing_key.public_key();
922            if derived_id != metadata.id {
923                return Err(InstanceError::DeviceKeyMismatch.into());
924            }
925        }
926        let inner = Arc::new(InstanceInternal {
927            backend: Arc::new(LocalBackend::new(backend)),
928            clock,
929            sync: std::sync::OnceLock::new(),
930            metadata,
931            secrets,
932            snapshot_path: Mutex::new(None),
933            write_callbacks: Mutex::new(HashMap::new()),
934            global_write_callbacks: Mutex::new(Vec::new()),
935            next_callback_id: AtomicU64::new(0),
936            tree_locks: Mutex::new(HashMap::new()),
937        });
938        Ok(Self { inner })
939    }
940
941    /// Internal create implementation. Returns the new `Instance` along with
942    /// the just-bootstrapped initial `User`, materialised directly from the
943    /// keys we generated (no redundant login round-trip).
944    pub(crate) async fn create_internal(
945        backend: Arc<dyn BackendImpl>,
946        clock: Arc<dyn Clock>,
947        initial: NewUser,
948    ) -> Result<(Self, User)> {
949        use crate::user::system_databases::{create_databases_tracking, create_users_database};
950
951        // 1. Generate device key
952        let device_key = PrivateKey::generate();
953        let device_id = device_key.public_key();
954
955        // 2. Create system databases with device_key passed directly
956        // Create a temporary Instance for database creation (databases will store full IDs later)
957        //
958        // SAFETY: The temporary instance has empty users_db_id and databases_db_id placeholders.
959        // This is safe because:
960        // 1. We only use it to create new system databases via Database::create()
961        // 2. Database::create() doesn't access the instance's system database IDs
962        // 3. The system databases don't exist yet, so their IDs can't be referenced
963        // 4. The temporary instance is only used during initial setup and discarded
964        // 5. The real instance is constructed afterward with the correct database IDs
965        let temp_instance = Self {
966            inner: Arc::new(InstanceInternal {
967                backend: Arc::new(LocalBackend::new(Arc::clone(&backend))),
968                clock: Arc::clone(&clock),
969                sync: std::sync::OnceLock::new(),
970                metadata: InstanceMetadata {
971                    id: device_id.clone(),
972                    users_db: ID::default(), // Placeholder - system DBs don't exist yet
973                    databases_db: ID::default(), // Placeholder - system DBs don't exist yet
974                    sync_db: None,
975                },
976                secrets: Some(InstanceSecrets {
977                    signing_key: device_key.clone(),
978                }),
979                snapshot_path: Mutex::new(None),
980                write_callbacks: Mutex::new(HashMap::new()),
981                global_write_callbacks: Mutex::new(Vec::new()),
982                next_callback_id: AtomicU64::new(0),
983                tree_locks: Mutex::new(HashMap::new()),
984            }),
985        };
986        let users_db = create_users_database(&temp_instance, &device_key).await?;
987        let databases_db = create_databases_tracking(&temp_instance, &device_key).await?;
988
989        // 3. Save metadata and secrets (marks instance as initialized)
990        // NB: Ordering matters. Secrets are stored first, then Metadata.
991        // The presence of the Metadata indicates the instance is fully initialized.
992        let secrets = InstanceSecrets {
993            signing_key: device_key,
994        };
995        backend.set_instance_secrets(&secrets).await?;
996
997        let metadata = InstanceMetadata {
998            id: device_id,
999            users_db: users_db.root_id().clone(),
1000            databases_db: databases_db.root_id().clone(),
1001            sync_db: None,
1002        };
1003        backend.set_instance_metadata(&metadata).await?;
1004
1005        // 4. Build real instance
1006        let inner = Arc::new(InstanceInternal {
1007            backend: Arc::new(LocalBackend::new(backend)),
1008            clock,
1009            sync: std::sync::OnceLock::new(),
1010            metadata,
1011            secrets: Some(secrets),
1012            snapshot_path: Mutex::new(None),
1013            write_callbacks: Mutex::new(HashMap::new()),
1014            global_write_callbacks: Mutex::new(Vec::new()),
1015            next_callback_id: AtomicU64::new(0),
1016            tree_locks: Mutex::new(HashMap::new()),
1017        });
1018
1019        let instance = Self { inner };
1020
1021        // 5. Bootstrap the initial user. The first user created on an
1022        // instance is automatically promoted to Admin on the system
1023        // databases by `system_databases::create_user`'s
1024        // first-user-becomes-admin logic.
1025        let users_db = instance.users_db().await?;
1026        let (user_uuid, user_info, root_key) = crate::user::system_databases::create_user(
1027            &users_db,
1028            &instance,
1029            &initial.username,
1030            initial.password.as_deref(),
1031        )
1032        .await?;
1033
1034        // 6. Materialise the User session directly from the keys we just
1035        // generated — skips a redundant `login_user` round-trip that would
1036        // otherwise re-derive the encryption key from the password.
1037        let user = crate::user::system_databases::build_user_session(
1038            &instance,
1039            &user_uuid,
1040            &user_info,
1041            root_key,
1042            initial.password.as_deref(),
1043        )
1044        .await?;
1045
1046        Ok((instance, user))
1047    }
1048
1049    /// Get a reference to the backend seam.
1050    pub fn backend(&self) -> &Arc<dyn Backend> {
1051        &self.inner.backend
1052    }
1053
1054    /// The concrete in-process storage engine, or [`OperationNotSupported`] on
1055    /// a remote instance.
1056    ///
1057    /// Off-seam local-only operations (instance secrets, verification-status
1058    /// mutation, `all_roots`/`get_tree` raw dumps, scope-keyed cache) are
1059    /// performed through this accessor, so they are reachable only where a
1060    /// concrete local backend exists.
1061    ///
1062    /// [`OperationNotSupported`]: InstanceError::OperationNotSupported
1063    pub(crate) fn require_local_engine(&self) -> Result<Arc<dyn BackendImpl>> {
1064        self.inner.backend.local_engine().ok_or_else(|| {
1065            InstanceError::OperationNotSupported {
1066                operation: "local backend engine on remote instance".to_string(),
1067            }
1068            .into()
1069        })
1070    }
1071
1072    /// The remote connection backing this instance, if it was created via
1073    /// [`connect`](Self::connect). Returns `None` for local instances.
1074    ///
1075    /// Useful for constructing a [`Database`](crate::Database) that routes
1076    /// reads through the Database-level wire API while sharing the same
1077    /// connection and session as the instance's write path.
1078    #[cfg(all(unix, feature = "service"))]
1079    pub fn remote_connection(&self) -> Option<RemoteConnection> {
1080        self.inner.backend.remote_connection()
1081    }
1082
1083    /// Check if an entry exists in storage.
1084    pub async fn has_entry(&self, id: &ID) -> bool {
1085        self.inner.backend.get(id).await.is_ok()
1086    }
1087
1088    /// Check if a database is present locally.
1089    ///
1090    /// This differs from `has_entry` in that it checks for the active tracking
1091    /// of the database by the Instance. This method checks if we're tracking
1092    /// the database's tip state.
1093    pub async fn has_database(&self, root_id: &ID) -> bool {
1094        match self.inner.backend.snapshot(root_id).await {
1095            Ok(snap) => !snap.is_empty(),
1096            Err(_) => false,
1097        }
1098    }
1099
1100    /// Get a reference to the clock.
1101    ///
1102    /// The clock is used for timestamps in height calculations and peer tracking.
1103    pub(crate) fn clock(&self) -> &dyn Clock {
1104        &*self.inner.clock
1105    }
1106
1107    /// Get a cloned Arc of the clock.
1108    ///
1109    /// Used when passing the clock to components that need ownership (e.g., HeightCalculator).
1110    pub(crate) fn clock_arc(&self) -> Arc<dyn Clock> {
1111        self.inner.clock.clone()
1112    }
1113
1114    // === Backend pass-through methods (pub(crate) for internal use) ===
1115
1116    /// Get an entry from the backend
1117    pub(crate) async fn get(&self, id: &crate::entry::ID) -> Result<crate::entry::Entry> {
1118        self.inner.backend.get(id).await
1119    }
1120
1121    /// Put an entry into the backend. Always stored Unverified — see
1122    /// [`crate::backend::BackendImpl::put`].
1123    pub(crate) async fn put(&self, entry: crate::entry::Entry) -> Result<()> {
1124        self.inner.backend.put(entry).await
1125    }
1126
1127    /// Returns the current [`crate::Snapshot`] of `tree` — its DAG tips. See
1128    /// [`Database::snapshot`] for the public entry point.
1129    pub(crate) async fn snapshot(
1130        &self,
1131        tree: &crate::entry::ID,
1132    ) -> Result<crate::snapshot::Snapshot> {
1133        self.inner.backend.snapshot(tree).await
1134    }
1135
1136    // === System database accessors ===
1137
1138    /// Get the _users database
1139    ///
1140    /// This constructs a Database instance on-the-fly to avoid circular references.
1141    /// On a local instance the device signing key is attached so users-table
1142    /// writes (e.g., the local `create_user` path) can sign. On a remote
1143    /// instance the device key lives on the daemon side and isn't available
1144    /// locally, so no key is attached — the returned handle is read-only.
1145    /// Write paths on a remote instance must instead go through
1146    /// [`Instance::users_db_for_session`], which attaches the caller's
1147    /// session signing key (e.g. admin's key on the `InstanceAdmin`
1148    /// `create_user` path) and routes through `Database::open_remote`.
1149    pub(crate) async fn users_db(&self) -> Result<Database> {
1150        let db = Database::open(self, &self.inner.metadata.users_db).await?;
1151        #[cfg(all(unix, feature = "service"))]
1152        if self.remote_connection().is_some() {
1153            return Ok(db);
1154        }
1155        Ok(db.with_key(self.signing_key()?.clone()))
1156    }
1157
1158    /// Open the _users system database with a specific signing key (not the device
1159    /// key).  Used by the admin-session paths
1160    /// ([`InstanceAdmin`](crate::user::InstanceAdmin), `User::admin_check`) on
1161    /// remote instances where the device key is unavailable.
1162    pub(crate) async fn users_db_for_session(&self, signing_key: &PrivateKey) -> Result<Database> {
1163        self.open_system_db_for_session(&self.inner.metadata.users_db, signing_key)
1164            .await
1165    }
1166
1167    /// Open a system database for an authenticated session.
1168    ///
1169    /// On a remote instance this routes every read through the connection's
1170    /// Database wire protocol ([`Database::open_remote`], a per-handle
1171    /// `RemoteBackend`), gated by the session key's identity — the plain
1172    /// [`Database::open`] path instead clones the instance's session backend,
1173    /// so on a connected instance its reads carry the connection's login
1174    /// identity. On a local instance it opens against the local backend as
1175    /// before. The signing key is attached for writes.
1176    pub(crate) async fn open_system_db_for_session(
1177        &self,
1178        root_id: &ID,
1179        signing_key: &PrivateKey,
1180    ) -> Result<Database> {
1181        #[cfg(all(unix, feature = "service"))]
1182        if let Some(conn) = self.remote_connection() {
1183            // The daemon gates per-tree reads against the acting pubkey
1184            // from the request's identity hint, and the hint here is
1185            // `signing_key.public_key()` (the caller's chosen identity for
1186            // this DB). The hint must be in the connection's session keyset
1187            // — register it now so subsequent reads through the returned
1188            // `RemoteBackend` are accepted.
1189            conn.register_session_key(signing_key).await?;
1190            let identity = SigKey::from_pubkey(&signing_key.public_key());
1191            return Ok(Database::open_remote(self, conn, root_id, identity)
1192                .await?
1193                .with_key(signing_key.clone()));
1194        }
1195        Ok(Database::open(self, root_id)
1196            .await?
1197            .with_key(signing_key.clone()))
1198    }
1199
1200    /// Get the _databases tracking database
1201    ///
1202    /// Parallel to `users_db()` — opens the instance's database-registry
1203    /// system DB with the device signing key attached. Used by the
1204    /// instance-admin bootstrap path (`system_databases::create_user`) to add
1205    /// the first user's pubkey as `Admin(0)` on the registry, so subsequent
1206    /// admin-gated instance ops (e.g., `SetInstanceMetadata`) can authorize
1207    /// against the user's key instead of the device key.
1208    pub(crate) async fn databases_db(&self) -> Result<Database> {
1209        Ok(Database::open(self, &self.inner.metadata.databases_db)
1210            .await?
1211            .with_key(self.signing_key()?.clone()))
1212    }
1213
1214    /// Root id of the `_databases` system DB.
1215    ///
1216    /// The service daemon uses this to gate admin-only ops
1217    /// (e.g., `SetInstanceMetadata`) against `_databases.auth_settings`:
1218    /// an instance admin is a user with `Admin` on `_databases`.
1219    pub(crate) fn databases_db_id(&self) -> &ID {
1220        &self.inner.metadata.databases_db
1221    }
1222
1223    /// Root id of the `_users` system DB.
1224    ///
1225    /// Parallel to `databases_db_id()`. Lets an instance admin open `_users`
1226    /// keyed by their own signing key (rather than the device key that
1227    /// `users_db()` attaches), so admin-gated edits to `_users.auth_settings`
1228    /// resolve against the admin's identity.
1229    pub(crate) fn users_db_id(&self) -> &ID {
1230        &self.inner.metadata.users_db
1231    }
1232
1233    // === User Management ===
1234
1235    /// Login a user with flexible password handling.
1236    ///
1237    /// Returns a User session object that provides access to user operations.
1238    /// For password-protected users, provide the password. For passwordless users, pass None.
1239    ///
1240    /// # Arguments
1241    /// * `user_id` - User identifier (username)
1242    /// * `password` - Optional password. None for passwordless users.
1243    ///
1244    /// # Returns
1245    /// A Result containing the User session
1246    pub async fn login_user(&self, user_id: &str, password: Option<&str>) -> Result<User> {
1247        // On a remote instance, the `TrustedLogin*` handshake authenticates the
1248        // socket connection AND ships back the user's full `UserInfo` plus the
1249        // decrypted root signing key. Build the `User` session from those —
1250        // the per-tree gate means a freshly-logged-in user with no permissions
1251        // on `_users` couldn't re-read it over the wire anyway, so we don't try.
1252        #[cfg(all(unix, feature = "service"))]
1253        if let Some(conn) = self.remote_connection() {
1254            let (user_uuid, user_info, signing_key) = conn.trusted_login(user_id, password).await?;
1255            return crate::user::system_databases::build_user_session(
1256                self,
1257                &user_uuid,
1258                &user_info,
1259                signing_key,
1260                password,
1261            )
1262            .await;
1263        }
1264
1265        use crate::user::system_databases::login_user;
1266        let users_db = self.users_db().await?;
1267        login_user(&users_db, self, user_id, password).await
1268    }
1269
1270    // === User-Sync Integration ===
1271
1272    // === Device Identity Management ===
1273    //
1274    // The Instance's public identity is stored in InstanceMetadata, and the private
1275    // signing key is stored in InstanceSecrets. Both are cached in memory.
1276
1277    /// Get the device signing key.
1278    ///
1279    /// # Internal Use Only
1280    ///
1281    /// This method provides direct access to the instance's cryptographic identity
1282    /// and is intended for internal operations that require the device key (sync,
1283    /// system database creation, authentication validation, etc.).
1284    ///
1285    /// These operations should only be performed by the server/instance administrator,
1286    /// but we don't verify that yet. Future versions may add admin permission checks.
1287    ///
1288    /// Similar to `Database::open` (without a key), this is a controlled escape hatch
1289    /// for internal library operations. Use with care - prefer User API for normal operations.
1290    ///
1291    /// Returns an error if this is a remote Instance that does not have access to the
1292    /// device key (e.g., connected via RPC where secrets are never transmitted).
1293    #[cfg(not(any(test, feature = "testing")))]
1294    pub(crate) fn signing_key(&self) -> Result<&PrivateKey> {
1295        self.inner
1296            .secrets
1297            .as_ref()
1298            .map(|s| &s.signing_key)
1299            .ok_or_else(|| InstanceError::DeviceKeyNotFound.into())
1300    }
1301
1302    /// Test-only: Get the device signing key.
1303    ///
1304    /// This is exposed for testing purposes only. In production, use the User API.
1305    ///
1306    /// Returns an error if this is a remote Instance that does not have access to the
1307    /// device key.
1308    #[cfg(any(test, feature = "testing"))]
1309    pub fn signing_key(&self) -> Result<&PrivateKey> {
1310        self.inner
1311            .secrets
1312            .as_ref()
1313            .map(|s| &s.signing_key)
1314            .ok_or_else(|| InstanceError::DeviceKeyNotFound.into())
1315    }
1316
1317    /// Get the instance identity (public key).
1318    ///
1319    /// # Returns
1320    /// The instance's public key identity.
1321    pub fn id(&self) -> PublicKey {
1322        self.inner.metadata.id.clone()
1323    }
1324
1325    // === Synchronization Management ===
1326    //
1327    // These methods provide access to the Sync module for managing synchronization
1328    // settings and state for this database instance.
1329
1330    /// Initializes the Sync module for this instance.
1331    ///
1332    /// Enables synchronization operations for this instance. This method is idempotent;
1333    /// calling it multiple times has no effect.
1334    ///
1335    /// # Errors
1336    /// Returns an error if the sync settings database cannot be created or if device key
1337    /// generation/storage fails.
1338    pub async fn enable_sync(&self) -> Result<()> {
1339        // Check if there is an existing Sync already loaded
1340        if self.inner.sync.get().is_some() {
1341            return Ok(());
1342        }
1343
1344        // A remote Instance must not run sync client-side: building a Sync
1345        // here would spin up a background sync engine that drives RPCs against
1346        // the daemon's backend — duplicating (and racing) the daemon's own
1347        // sync. Sync is owned by the process that owns the Instance.
1348        //
1349        // Return `Ok(())` so callers on a connected instance get the same
1350        // no-op success they would on a local instance where sync is already
1351        // running daemon-side. Long-term this should become an admin-gated
1352        // operation that lets a client ask the daemon to enable its sync
1353        // subsystem; until that ships, the client-side `enable_sync` is
1354        // intentionally a silent no-op because the daemon either already
1355        // has sync running or it doesn't, and the client can't change that.
1356        //
1357        // TODO(service): expose an admin-gated `enable_sync` on
1358        // `InstanceAdmin` so a client can enable sync remotely.
1359        #[cfg(all(unix, feature = "service"))]
1360        if self.remote_connection().is_some() {
1361            return Ok(());
1362        }
1363
1364        // Check InstanceMetadata for existing sync_db
1365        let metadata = self
1366            .backend()
1367            .get_instance_metadata()
1368            .await?
1369            .ok_or(InstanceError::DeviceKeyNotFound)?; // Metadata must exist if instance is initialized
1370
1371        let sync = if let Some(ref sync_db) = metadata.sync_db {
1372            // Load existing sync tree
1373            Sync::load(self.clone(), sync_db).await?
1374        } else {
1375            // Create new sync tree
1376            let sync = Sync::new(self.clone()).await?;
1377
1378            // Save sync_db to metadata
1379            let mut new_metadata = metadata;
1380            new_metadata.sync_db = Some(sync.sync_tree_root_id().clone());
1381            self.backend().set_instance_metadata(&new_metadata).await?;
1382
1383            sync
1384        };
1385
1386        let sync_arc = Arc::new(sync);
1387
1388        // Initialize the sync engine (no transports registered yet)
1389        // Users should call register_transport() to add transports
1390        sync_arc.start_background_sync()?;
1391
1392        // Register global callback for automatic sync on local writes.
1393        // Detached: lives for the life of the Instance.
1394        let sync_for_callback = Arc::clone(&sync_arc);
1395        self.register_global_write_callback(move |event, database| {
1396            let sync = Arc::clone(&sync_for_callback);
1397            let event = event.clone();
1398            let database = database.clone();
1399            async move {
1400                if event.source() != WriteSource::Local {
1401                    return Ok(());
1402                }
1403                // Local writes always carry exactly one entry today, but
1404                // the loop in `Sync::on_local_write` is intentionally
1405                // ready for future multi-entry local events.
1406                sync.on_local_write(&event, &database).await
1407            }
1408        });
1409
1410        let _ = self.inner.sync.set(sync_arc);
1411        Ok(())
1412    }
1413
1414    /// Get a reference to the Sync module.
1415    ///
1416    /// Returns a cheap-to-clone Arc handle to the Sync module. The Sync module
1417    /// uses interior mutability (AtomicBool and OnceLock) so &self methods are sufficient.
1418    ///
1419    /// # Returns
1420    /// An `Option` containing an `Arc<Sync>` if the Sync module is initialized.
1421    pub fn sync(&self) -> Option<Arc<Sync>> {
1422        self.inner.sync.get().map(Arc::clone)
1423    }
1424
1425    /// Flush all pending sync operations.
1426    ///
1427    /// This is a convenience method that processes all queued entries and
1428    /// retries any failed sends. If sync is not enabled, returns Ok(()).
1429    ///
1430    /// This is useful to force pending syncs to complete, e.g. on program shutdown.
1431    ///
1432    /// # Returns
1433    /// `Ok(())` if sync is not enabled or all operations completed successfully,
1434    /// or an error if sends failed.
1435    pub async fn flush_sync(&self) -> Result<()> {
1436        if let Some(sync) = self.sync() {
1437            sync.flush().await
1438        } else {
1439            Ok(())
1440        }
1441    }
1442
1443    // === Entry Write Coordination ===
1444    //
1445    // All entry writes go through Instance::put_entry() which handles backend storage
1446    // and callback dispatch. This centralizes write coordination and ensures hooks fire.
1447
1448    /// Register a per-database callback. Fires for both local and remote writes.
1449    ///
1450    /// Returns the [`CallbackId`] of the registration. Callers wrap this in a
1451    /// [`WriteCallback`] handle (see [`Database::on_write`]) to manage lifetime.
1452    pub(crate) fn register_write_callback<F, Fut>(&self, tree_id: ID, callback: F) -> CallbackId
1453    where
1454        F: for<'a> Fn(&'a WriteEvent, &'a Database) -> Fut + Send + std::marker::Sync + 'static,
1455        Fut: Future<Output = Result<()>> + Send + 'static,
1456    {
1457        let id = CallbackId(self.inner.next_callback_id.fetch_add(1, Ordering::Relaxed));
1458        let cb: AsyncWriteCallbackFn = Arc::new(move |event: &WriteEvent, database: &Database| {
1459            let fut = callback(event, database);
1460            Box::pin(fut) as AsyncWriteCallbackFuture<'_>
1461        });
1462        self.inner
1463            .write_callbacks
1464            .lock()
1465            .unwrap()
1466            .entry(tree_id)
1467            .or_default()
1468            .push((id, cb));
1469        id
1470    }
1471
1472    /// Register a global callback fired for every write across every database.
1473    ///
1474    /// Callers branch on [`WriteEvent::source`] inside the closure if they
1475    /// only care about one source.
1476    ///
1477    /// Note: there is intentionally no `WriteCallback` handle or remove path
1478    /// for global callbacks. The only current caller is sync's permanent hook
1479    /// (OnceLock-guarded) which is registered for the life of the Instance.
1480    /// If a future caller needs lifecycle management on a global callback,
1481    /// add `remove_global_write_callback` and a global variant of
1482    /// `WriteCallback`.
1483    pub(crate) fn register_global_write_callback<F, Fut>(&self, callback: F)
1484    where
1485        F: for<'a> Fn(&'a WriteEvent, &'a Database) -> Fut + Send + std::marker::Sync + 'static,
1486        Fut: Future<Output = Result<()>> + Send + 'static,
1487    {
1488        let id = CallbackId(self.inner.next_callback_id.fetch_add(1, Ordering::Relaxed));
1489        let cb: AsyncWriteCallbackFn = Arc::new(move |event: &WriteEvent, database: &Database| {
1490            let fut = callback(event, database);
1491            Box::pin(fut) as AsyncWriteCallbackFuture<'_>
1492        });
1493        self.inner
1494            .global_write_callbacks
1495            .lock()
1496            .unwrap()
1497            .push((id, cb));
1498    }
1499
1500    /// Remove a per-database callback by id. No-op if not found.
1501    pub(crate) fn remove_write_callback(&self, tree_id: &ID, id: CallbackId) {
1502        let mut callbacks = self.inner.write_callbacks.lock().unwrap();
1503        if let Some(vec) = callbacks.get_mut(tree_id) {
1504            vec.retain(|(cb_id, _)| *cb_id != id);
1505            if vec.is_empty() {
1506                callbacks.remove(tree_id);
1507            }
1508        }
1509    }
1510
1511    /// Acquire (or create) the per-tree async lock that serializes the
1512    /// `snapshot` → backend write → callback dispatch sequence.
1513    ///
1514    /// Without this, two concurrent writers to the same tree both snapshot
1515    /// `previous_tips` before either writes, so the second callback's
1516    /// `previous_tips` would not reflect the first write — breaking the
1517    /// "diff against current tips" contract documented on [`WriteEvent`].
1518    fn tree_lock(&self, tree_id: &ID) -> Arc<tokio::sync::Mutex<()>> {
1519        let mut locks = self.inner.tree_locks.lock().unwrap();
1520        Arc::clone(
1521            locks
1522                .entry(tree_id.clone())
1523                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
1524        )
1525    }
1526
1527    /// Write an entry to the backend and dispatch callbacks.
1528    ///
1529    /// This is the central coordination point for all entry writes in the system.
1530    /// All writes must go through this method to ensure:
1531    /// - Entries are persisted to the backend
1532    /// - Appropriate callbacks are triggered based on write source
1533    /// - Hooks have full context (entry, database, instance)
1534    ///
1535    /// Serialized per-tree against [`Self::put_remote_entries`] so
1536    /// [`WriteEvent::previous_tips`] is consistent.
1537    ///
1538    /// # Arguments
1539    /// * `tree_id` - The root ID of the database being written to
1540    /// * `verification` - Authentication verification status of the entry
1541    /// * `entry` - The entry to write
1542    /// * `source` - Whether this is a local or remote write
1543    ///
1544    /// # Returns
1545    /// A Result indicating success or failure
1546    pub async fn put_entry(
1547        &self,
1548        tree_id: &ID,
1549        verification: crate::backend::VerificationStatus,
1550        entry: Entry,
1551        source: WriteSource,
1552    ) -> Result<()> {
1553        let lock = self.tree_lock(tree_id);
1554        let _guard = lock.lock().await;
1555
1556        // 1. Capture tips before the write so callbacks know what changed.
1557        //
1558        // On a connected (remote) instance, the daemon owns the canonical
1559        // DAG and the client's local backend has nothing to read; reading
1560        // tips here would also gate against the *connection's* login pubkey
1561        // (not the per-DB acting identity from the Database handle), which
1562        // breaks the legitimate "create-a-tree-with-a-non-login-key" flow.
1563        // Remote callbacks therefore see an empty `previous_tips` — this is
1564        // a documented limitation of `Database::on_write` on a connected
1565        // Instance, lifted when the server-push notification path lands.
1566        #[cfg(all(unix, feature = "service"))]
1567        let previous_tips = if self.remote_connection().is_some() {
1568            Vec::new()
1569        } else {
1570            self.snapshot(tree_id).await?.into_tips()
1571        };
1572        #[cfg(not(all(unix, feature = "service")))]
1573        let previous_tips = self.snapshot(tree_id).await?.into_tips();
1574
1575        // 2. Persist to backend storage (and notify server for remote backends)
1576        self.backend()
1577            .write_entry(verification, entry.clone(), source)
1578            .await?;
1579
1580        // 3. Build event and fire callbacks
1581        let event = WriteEvent {
1582            entries: vec![entry],
1583            previous_tips,
1584            source,
1585        };
1586        self.fire_write_callbacks(tree_id, &event).await;
1587
1588        Ok(())
1589    }
1590
1591    /// Store a batch of remotely-received entries and fire callbacks once.
1592    ///
1593    /// This is the correct way to ingest entries from sync. All entries are
1594    /// persisted first, then callbacks fire exactly once with the full batch
1595    /// and the tips from before ingestion. This ensures:
1596    ///
1597    /// - The database is fully consistent when callbacks execute
1598    /// - Callbacks fire once per sync exchange, not once per entry
1599    /// - `previous_tips` lets consumers reconstruct exactly what changed
1600    ///
1601    /// Entries that fail to store are logged and skipped — remaining entries
1602    /// are still stored and callbacks still fire for whatever was persisted.
1603    /// Returns the number of entries that were successfully persisted.
1604    ///
1605    /// Serialized per-tree against [`Self::put_entry`] and other concurrent
1606    /// `put_remote_entries` calls so `previous_tips` is consistent across
1607    /// writers.
1608    ///
1609    /// Entries are stored as [`VerificationStatus::Unverified`] without
1610    /// exception: they arrive from outside this node's local validation pass,
1611    /// so this node has not verified them and a peer cannot assert that it
1612    /// did. A later local re-verification pass may promote them.
1613    ///
1614    /// # Arguments
1615    /// * `tree_id` - The root ID of the database receiving the batch
1616    /// * `entries` - The entries to ingest
1617    pub(crate) async fn put_remote_entries(
1618        &self,
1619        tree_id: &ID,
1620        entries: Vec<Entry>,
1621    ) -> Result<usize> {
1622        if entries.is_empty() {
1623            return Ok(0);
1624        }
1625
1626        let lock = self.tree_lock(tree_id);
1627        let _guard = lock.lock().await;
1628
1629        // 1. Capture tips before any writes
1630        let previous_tips = self.snapshot(tree_id).await?.into_tips();
1631
1632        // 2. Store all entries
1633        let mut stored_entries = Vec::with_capacity(entries.len());
1634        for entry in entries {
1635            match self.backend().put(entry.clone()).await {
1636                Ok(_) => stored_entries.push(entry),
1637                Err(e) => {
1638                    tracing::error!(
1639                        tree_id = %tree_id,
1640                        entry_id = %entry.id(),
1641                        "Failed to store remote entry: {}", e
1642                    );
1643                }
1644            }
1645        }
1646
1647        let stored_count = stored_entries.len();
1648
1649        // 3. Fire callbacks once for the whole batch
1650        if !stored_entries.is_empty() {
1651            let event = WriteEvent {
1652                entries: stored_entries,
1653                previous_tips,
1654                source: WriteSource::Remote,
1655            };
1656            self.fire_write_callbacks(tree_id, &event).await;
1657        }
1658
1659        Ok(stored_count)
1660    }
1661
1662    /// Dispatch callbacks for a write event.
1663    ///
1664    /// Fires per-database callbacks for `tree_id` then global callbacks. Each
1665    /// callback fires for both local and remote writes; consumers branch on
1666    /// [`WriteEvent::source`] internally.
1667    async fn fire_write_callbacks(&self, tree_id: &ID, event: &WriteEvent) {
1668        let per_db_callbacks = self
1669            .inner
1670            .write_callbacks
1671            .lock()
1672            .unwrap()
1673            .get(tree_id)
1674            .cloned();
1675
1676        let global_callbacks = self.inner.global_write_callbacks.lock().unwrap().clone();
1677
1678        let has_callbacks = per_db_callbacks.is_some() || !global_callbacks.is_empty();
1679        if !has_callbacks {
1680            return;
1681        }
1682
1683        // Create a Database handle for the callbacks
1684        let database = match Database::open(self, tree_id).await {
1685            Ok(db) => db,
1686            Err(e) => {
1687                tracing::error!(tree_id = %tree_id, "Failed to open database for callbacks: {}", e);
1688                return;
1689            }
1690        };
1691
1692        if let Some(callbacks) = per_db_callbacks {
1693            for (id, callback) in callbacks {
1694                if let Err(e) = callback(event, &database).await {
1695                    tracing::error!(
1696                        tree_id = %tree_id,
1697                        source = ?event.source(),
1698                        callback_id = ?id,
1699                        "Per-database callback failed: {}", e
1700                    );
1701                }
1702            }
1703        }
1704
1705        for (id, callback) in global_callbacks {
1706            if let Err(e) = callback(event, &database).await {
1707                tracing::error!(
1708                    tree_id = %tree_id,
1709                    source = ?event.source(),
1710                    callback_id = ?id,
1711                    "Global callback failed: {}", e
1712                );
1713            }
1714        }
1715    }
1716
1717    /// Downgrade to a weak reference.
1718    ///
1719    /// Creates a weak reference that does not prevent the Instance from being dropped.
1720    /// This is useful for preventing circular reference cycles in dependent objects.
1721    ///
1722    /// # Returns
1723    /// A `WeakInstance` that can be upgraded back to a strong reference.
1724    pub fn downgrade(&self) -> WeakInstance {
1725        WeakInstance {
1726            inner: Arc::downgrade(&self.inner),
1727        }
1728    }
1729}
1730
1731impl WeakInstance {
1732    /// Upgrade to a strong reference.
1733    ///
1734    /// Attempts to upgrade this weak reference to a strong `Instance` reference.
1735    /// Returns `None` if the Instance has already been dropped.
1736    ///
1737    /// # Returns
1738    /// `Some(Instance)` if the Instance still exists, `None` otherwise.
1739    ///
1740    /// # Example
1741    /// ```
1742    /// # use eidetica::{Instance, NewUser};
1743    /// # #[tokio::main]
1744    /// # async fn main() -> eidetica::Result<()> {
1745    /// let (instance, maybe_user) = Instance::connect_or_create(
1746    ///     "memory://",
1747    ///     NewUser::passwordless("alice"),
1748    /// ).await?;
1749    /// let user = maybe_user.expect("memory:// is always fresh");
1750    /// let weak = instance.downgrade();
1751    ///
1752    /// // Upgrade works while instance exists
1753    /// assert!(weak.upgrade().is_some());
1754    ///
1755    /// // User holds its own strong handle to the Instance — drop it too so
1756    /// // the weak upgrade can fail.
1757    /// drop(user);
1758    /// drop(instance);
1759    /// // Upgrade fails after instance is dropped
1760    /// assert!(weak.upgrade().is_none());
1761    /// # Ok(())
1762    /// # }
1763    /// ```
1764    pub fn upgrade(&self) -> Option<Instance> {
1765        self.inner.upgrade().map(|inner| Instance { inner })
1766    }
1767}
1768
1769// ============ URL-dispatch backend constructors ============
1770
1771#[cfg(feature = "sqlite")]
1772async fn open_sqlite_backend(url: &str) -> Result<Box<dyn BackendImpl>> {
1773    let backend = crate::backend::database::Sqlite::connect(url).await?;
1774    Ok(Box::new(backend))
1775}
1776
1777#[cfg(not(feature = "sqlite"))]
1778async fn open_sqlite_backend(_url: &str) -> Result<Box<dyn BackendImpl>> {
1779    Err(InstanceError::BackendUnavailable {
1780        scheme: "sqlite",
1781        missing_feature: "sqlite",
1782    }
1783    .into())
1784}
1785
1786#[cfg(feature = "postgres")]
1787async fn open_postgres_backend(url: &str) -> Result<Box<dyn BackendImpl>> {
1788    let backend = crate::backend::database::Postgres::connect(url).await?;
1789    Ok(Box::new(backend))
1790}
1791
1792#[cfg(not(feature = "postgres"))]
1793async fn open_postgres_backend(_url: &str) -> Result<Box<dyn BackendImpl>> {
1794    Err(InstanceError::BackendUnavailable {
1795        scheme: "postgres",
1796        missing_feature: "postgres",
1797    }
1798    .into())
1799}