eidetica/instance/mod.rs
1//!
2//! Provides the main database structures (`Instance` and `Database`).
3//!
4//! `Instance` manages multiple `Database` instances and interacts with the storage `Database`.
5//! `Database` represents a single, independent history of data entries, analogous to a table or branch.
6
7use std::{
8 collections::HashMap,
9 future::Future,
10 path::PathBuf,
11 pin::Pin,
12 sync::{
13 Arc, Mutex, Weak,
14 atomic::{AtomicU64, Ordering},
15 },
16};
17
18use handle_trait::Handle;
19
20use crate::{
21 Clock, Database, Entry, Result, SystemClock,
22 auth::crypto::{PrivateKey, PublicKey},
23 backend::{BackendImpl, InstanceMetadata, InstanceSecrets},
24 entry::ID,
25 sync::Sync,
26 user::User,
27};
28#[cfg(all(unix, feature = "service"))]
29use crate::{auth::SigKey, service::client::RemoteConnection};
30
31pub mod backend;
32pub mod errors;
33pub mod new_user;
34pub mod settings_merge;
35pub mod url;
36
37#[cfg(test)]
38mod tests;
39
40// Re-export main types for easier access
41#[cfg(all(unix, feature = "service"))]
42use backend::RemoteBackend;
43use backend::{Backend, LocalBackend};
44pub use errors::InstanceError;
45pub use new_user::NewUser;
46
47/// Indicates whether an entry write originated locally or from a remote source (e.g., sync).
48///
49/// This distinction allows different callbacks to be triggered based on the write source,
50/// enabling behaviors like "only trigger sync for local writes" or "only update UI for remote writes".
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
52pub enum WriteSource {
53 /// Write originated from a local transaction commit
54 Local,
55 /// Write originated from a remote source (e.g., sync, replication)
56 Remote,
57}
58
59/// Context provided to write callbacks describing what changed in the database.
60///
61/// For local writes (transaction commits), this contains a single entry.
62/// For remote writes (sync), this may contain a batch of entries that were
63/// received and stored together.
64///
65/// # Catching up on missed writes
66///
67/// The `previous_tips` field contains the DAG tips of the database *before* the
68/// write(s) that triggered this callback. Consumers can use this to determine
69/// exactly what changed by walking the DAG from the current tips back to these
70/// previous tips. This is analogous to `git log previous_tip..HEAD`.
71///
72/// This design means callbacks never need to "miss" writes — even if multiple
73/// entries are batched (as in sync), the consumer can reconstruct the full set
74/// of changes from the tip diff.
75///
76/// # Example
77///
78/// ```rust,no_run
79/// # use eidetica::instance::WriteEvent;
80/// # fn example(event: &WriteEvent) {
81/// // Check what stores were touched
82/// for entry in event.entries() {
83/// if entry.in_subtree("messages") {
84/// // A write touched the "messages" store
85/// }
86/// }
87///
88/// // Use previous_tips to find what's new
89/// let prev = event.previous_tips();
90/// // Walk DAG from current tips back to prev to find all new entries
91/// # }
92/// ```
93#[derive(Debug, Clone)]
94pub struct WriteEvent {
95 /// The entries written in this event. For local writes, this is always
96 /// exactly one entry. For remote sync, this is the full batch of entries
97 /// that were received and stored together.
98 entries: Vec<Entry>,
99 /// The DAG tips of the database immediately before this write.
100 /// Consumers can diff current tips against these to determine what changed.
101 previous_tips: Vec<ID>,
102 /// Whether this write originated locally or from a remote sync.
103 source: WriteSource,
104}
105
106impl WriteEvent {
107 /// Get the entries written in this event.
108 ///
109 /// For local writes (transaction commits), this always contains exactly one entry.
110 /// For remote writes (sync), this contains the full batch of entries received together.
111 pub fn entries(&self) -> &[Entry] {
112 &self.entries
113 }
114
115 /// Get the DAG tips of the database before this write.
116 ///
117 /// Use these to determine what changed: walk from the database's current tips
118 /// back to these previous tips to find all new entries.
119 pub fn previous_tips(&self) -> &[ID] {
120 &self.previous_tips
121 }
122
123 /// The source of this write (local commit or remote sync).
124 pub fn source(&self) -> WriteSource {
125 self.source
126 }
127}
128
129/// Boxed future returned by the internal async callback dispatcher.
130pub(crate) type AsyncWriteCallbackFuture<'a> =
131 Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
132
133/// Internal async callback function type. The user-facing callback contract
134/// is documented on [`Database::on_write`](crate::Database::on_write).
135pub(crate) type AsyncWriteCallbackFn = Arc<
136 dyn for<'a> Fn(&'a WriteEvent, &'a Database) -> AsyncWriteCallbackFuture<'a>
137 + Send
138 + std::marker::Sync,
139>;
140
141/// Opaque identifier for a registered callback. Stable for the life of the registration.
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
143pub(crate) struct CallbackId(u64);
144
145/// Type alias for a collection of write callbacks paired with their ids.
146type CallbackVec = Vec<(CallbackId, AsyncWriteCallbackFn)>;
147
148/// Handle to a registered write callback. **Drop to unregister.**
149///
150/// Returned by [`Database::on_write`](crate::Database::on_write). While this
151/// value is alive the callback fires on writes; dropping it removes the
152/// registration. Use [`detach`](Self::detach) to keep the callback registered
153/// for the life of the [`Instance`] when you don't want to manage the lifetime
154/// yourself.
155///
156/// Holds a weak reference to the [`Instance`], so a `WriteCallback` will not
157/// keep the Instance alive on its own.
158#[must_use = "dropping a WriteCallback unregisters it; call .detach() to keep the callback registered"]
159pub struct WriteCallback {
160 instance: WeakInstance,
161 tree_id: ID,
162 id: CallbackId,
163 detached: bool,
164}
165
166impl WriteCallback {
167 pub(crate) fn new_per_database(instance: WeakInstance, tree_id: ID, id: CallbackId) -> Self {
168 Self {
169 instance,
170 tree_id,
171 id,
172 detached: false,
173 }
174 }
175
176 /// Consume the handle without unregistering. The callback remains active
177 /// for the life of the [`Instance`].
178 ///
179 /// Implementation note: this sets a flag rather than calling `mem::forget`
180 /// so that field destructors (the `WeakInstance`'s weak count, the
181 /// `tree_id`'s heap allocation) still run — only our `Drop` impl is
182 /// short-circuited.
183 pub fn detach(mut self) {
184 self.detached = true;
185 }
186}
187
188impl std::fmt::Debug for WriteCallback {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 f.debug_struct("WriteCallback")
191 .field("id", &self.id)
192 .field("tree_id", &self.tree_id)
193 .field("detached", &self.detached)
194 .finish()
195 }
196}
197
198impl Drop for WriteCallback {
199 fn drop(&mut self) {
200 if self.detached {
201 return;
202 }
203 if let Some(instance) = self.instance.upgrade() {
204 instance.remove_write_callback(&self.tree_id, self.id);
205 }
206 }
207}
208
209/// Internal state for Instance
210///
211/// This structure holds the actual implementation data for Instance.
212/// Instance itself is just a cheap-to-clone handle wrapping Arc<InstanceInternal>.
213pub(crate) struct InstanceInternal {
214 /// The database storage backend
215 backend: Arc<dyn Backend>,
216 /// Time provider for timestamps
217 clock: Arc<dyn Clock>,
218 /// Synchronization module for this database instance
219 /// TODO: Overengineered, Sync can be created by default but disabled
220 sync: std::sync::OnceLock<Arc<Sync>>,
221 /// Public instance metadata (device identity, system database IDs)
222 metadata: InstanceMetadata,
223 /// Private instance secrets (None for remote instances without key access)
224 secrets: Option<InstanceSecrets>,
225 /// JSON snapshot file path for an in-memory backend constructed via
226 /// `memory:///path.json` (or set explicitly through
227 /// [`Instance::snapshot_to_path`]). [`Instance::flush`] and the
228 /// [`Drop`] safety net write through this. `None` on any non-snapshot
229 /// backend.
230 ///
231 /// The mutex serves double duty: it guards the path slot itself (so
232 /// `set_snapshot_path` doesn't race with readers) AND serializes the
233 /// actual write so concurrent callers from `flush` / `snapshot_to_path`
234 /// / `Drop` don't race on the shared `<path>.tmp` staging file in
235 /// [`InMemory::save_to_file`]. Held across sync I/O only — never
236 /// across an `.await`. Poison-tolerant: a panic mid-write leaves the
237 /// on-disk snapshot unchanged but must not strand the [`Instance`].
238 snapshot_path: Mutex<Option<PathBuf>>,
239 /// Per-database callbacks keyed by tree_id. Each callback fires for both
240 /// local and remote writes; consumers branch on [`WriteEvent::source`] if
241 /// they only care about one.
242 write_callbacks: Mutex<HashMap<ID, CallbackVec>>,
243 /// Global callbacks fired for every write across every database.
244 global_write_callbacks: Mutex<CallbackVec>,
245 /// Monotonic id source for [`CallbackId`].
246 next_callback_id: AtomicU64,
247 /// Per-tree async locks serializing the
248 /// `snapshot` → backend write → callback dispatch sequence so
249 /// `WriteEvent::previous_tips` is consistent for concurrent writers
250 /// to the same tree.
251 tree_locks: Mutex<HashMap<ID, Arc<tokio::sync::Mutex<()>>>>,
252}
253
254impl std::fmt::Debug for InstanceInternal {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 f.debug_struct("InstanceInternal")
257 .field("backend", &"<BackendDB>")
258 .field("clock", &self.clock)
259 .field("sync", &self.sync)
260 .field("metadata", &self.metadata)
261 .field("secrets", &self.secrets.is_some())
262 .field(
263 "write_callbacks",
264 &format!(
265 "<{} per-db callbacks>",
266 self.write_callbacks.lock().unwrap().len()
267 ),
268 )
269 .field(
270 "global_write_callbacks",
271 &format!(
272 "<{} global callbacks>",
273 self.global_write_callbacks.lock().unwrap().len()
274 ),
275 )
276 .field(
277 "next_callback_id",
278 &self.next_callback_id.load(Ordering::Relaxed),
279 )
280 .finish()
281 }
282}
283
284impl InstanceInternal {
285 /// Synchronously write a JSON snapshot of the underlying backend to `path`.
286 ///
287 /// Returns [`InstanceError::SnapshotNotSupported`] for any backend other
288 /// than the local in-memory backend. Shared by [`Instance::snapshot_to_path`],
289 /// [`Instance::flush`], and the [`Drop`] fallback so the three can't drift.
290 ///
291 /// **Caller must hold the [`snapshot_path`](Self::snapshot_path) mutex.**
292 /// That lock serializes the write — without it, concurrent callers
293 /// would race on the shared `<path>.tmp` staging file in
294 /// [`InMemory::save_to_file`]. The critical section is fully sync; no
295 /// `.await` happens while the lock is held.
296 fn save_snapshot_locked(&self, path: &std::path::Path) -> Result<()> {
297 use crate::backend::database::InMemory;
298 let engine = self
299 .backend
300 .local_engine()
301 .ok_or(InstanceError::SnapshotNotSupported)?;
302 let in_memory = engine
303 .as_any()
304 .downcast_ref::<InMemory>()
305 .ok_or(InstanceError::SnapshotNotSupported)?;
306 in_memory.save_to_file(path)
307 }
308}
309
310/// Best-effort snapshot save on the *last* `InstanceInternal` drop.
311///
312/// Fires when the `Arc<InstanceInternal>` reaches refcount 0 and a snapshot
313/// path is armed (i.e. the `Instance` was constructed via a
314/// `memory:///path.json` URL). [`Instance::flush`] does **not** clear the
315/// snapshot path, so Drop fires even after a successful `flush()` — the
316/// write is idempotent (same atomic tmp+rename), so the worst case is one
317/// extra write of unchanged JSON.
318///
319/// **Errors are logged via `tracing::error!`, not surfaced** — `Drop` can't
320/// return a `Result` and panicking would be worse than logging. Apps that
321/// care about snapshot durability should call [`Instance::flush`] at
322/// well-defined checkpoints and inspect its `Result`; Drop is a safety net,
323/// not the primary persistence path. If `flush()` failed with a permanent
324/// error (e.g. nonexistent parent directory), Drop will fail the same way
325/// and emit a second log line — accept this redundancy as the cost of a
326/// best-effort fallback.
327///
328/// **Blocking I/O warning:** the snapshot write is synchronous
329/// (`std::fs::write` + `rename`). If the `Instance` is dropped on a tokio
330/// worker thread, this blocks that worker for the duration of the write —
331/// negligible for small snapshots, but pathological for very large ones.
332/// Prefer `flush().await` (which still blocks briefly, but does so under
333/// explicit caller control).
334impl Drop for InstanceInternal {
335 fn drop(&mut self) {
336 // Drop runs at Arc refcount 0, so no other handle can race here —
337 // any in-flight `flush()` future holds `&self` and thus an Arc
338 // clone, which would have prevented Drop from firing. We still
339 // acquire the lock for the write so the locking discipline in
340 // `save_snapshot_locked`'s doc-comment holds uniformly. The lock
341 // is uncontended at this point.
342 let mut guard = match self.snapshot_path.lock() {
343 Ok(g) => g,
344 Err(p) => p.into_inner(),
345 };
346 let Some(path) = guard.take() else { return };
347
348 if let Err(e) = self.save_snapshot_locked(&path) {
349 tracing::error!(
350 snapshot_path = %path.display(),
351 error = %e,
352 "Drop: snapshot save failed. Call `Instance::flush().await` at \
353 checkpoints to inspect the error via Result; Drop is a safety net only.",
354 );
355 }
356 }
357}
358/// Database implementation on top of the storage backend.
359///
360/// Instance manages infrastructure only:
361/// - Backend storage and device identity
362/// - System databases (_users, _databases, _sync)
363/// - User account management (create, login, list)
364///
365/// All database creation and key operations happen through User after login.
366///
367/// Instance is a cheap-to-clone handle around `Arc<InstanceInternal>`.
368///
369/// ## Example
370///
371/// ```
372/// # use eidetica::{Instance, NewUser, crdt::Doc};
373/// # #[tokio::main]
374/// # async fn main() -> eidetica::Result<()> {
375/// // Bootstrap a fresh instance with an initial admin user. The first user
376/// // created on an instance is automatically granted Admin on the system
377/// // databases.
378/// let (instance, maybe_user) = Instance::connect_or_create(
379/// "memory://",
380/// NewUser::passwordless("alice"),
381/// ).await?;
382/// let mut user = maybe_user.expect("memory:// is always fresh");
383///
384/// // Use User API for operations
385/// let mut settings = Doc::new();
386/// settings.set("name", "my_database");
387/// let default_key = user.get_default_key()?;
388/// let db = user.create_database(settings, &default_key).await?;
389/// # Ok(())
390/// # }
391/// ```
392#[derive(Clone, Debug, Handle)]
393pub struct Instance {
394 inner: Arc<InstanceInternal>,
395}
396
397/// Weak reference to an Instance.
398///
399/// This is a weak handle that does not prevent the Instance from being dropped.
400/// Dependent objects (Database, Sync, BackgroundSync) hold weak references to avoid
401/// circular reference cycles that would leak memory.
402///
403/// Use `upgrade()` to convert to a strong `Instance` reference.
404#[derive(Clone, Debug, Handle)]
405pub struct WeakInstance {
406 inner: Weak<InstanceInternal>,
407}
408
409impl Instance {
410 /// Open a connection to an eidetica instance described by a connection URL.
411 ///
412 /// Strict load: returns [`InstanceError::NotInitialized`] when the URL
413 /// points at an embedded backend (`sqlite://`, `postgres://`, `memory://`)
414 /// that has no eidetica metadata yet. Use
415 /// [`Instance::connect_or_create`] to bootstrap an embedded backend on
416 /// first run.
417 ///
418 /// Supported URL schemes:
419 /// - `sqlite://./app.db` — embedded sqlite backend; URL is passed through
420 /// to `sqlx::sqlite`, so any sqlx-accepted form works
421 /// (`?mode=rwc&journal_mode=WAL` etc.).
422 /// - `postgres://user:pwd@host/db` — embedded postgres backend; URL is
423 /// passed through to `sqlx::postgres`.
424 /// - `unix:///run/eidetica/sock` — thin client to a running daemon.
425 /// - `memory://` — empty in-memory backend. Strict load against an
426 /// empty in-memory backend always errors `NotInitialized`; use
427 /// `connect_or_create` for a fresh in-memory instance.
428 /// - `memory:///path/to/snap.json` — in-memory backend with a JSON
429 /// snapshot file (load-on-start; snapshot writes via
430 /// [`Instance::flush`] / [`Instance::snapshot_to_path`] / Drop fallback).
431 ///
432 /// See [`crate::instance::url`] for the full URL grammar.
433 ///
434 /// # Example
435 ///
436 /// `connect()` only succeeds against an already-initialised backend.
437 /// The two-phase pattern below bootstraps once, then re-opens with
438 /// the strict load:
439 ///
440 /// ```
441 /// # #[tokio::main]
442 /// # async fn main() -> eidetica::Result<()> {
443 /// use eidetica::{Instance, NewUser};
444 ///
445 /// let temp = tempfile::tempdir()?;
446 /// let snapshot = temp.path().join("app.json");
447 /// let url = format!("memory://{}", snapshot.display());
448 ///
449 /// // First run: bootstrap and flush a snapshot to disk.
450 /// {
451 /// let (instance, maybe_user) =
452 /// Instance::connect_or_create(&url, NewUser::passwordless("alice")).await?;
453 /// let _user = maybe_user.expect("fresh bootstrap on first run");
454 /// instance.flush()?;
455 /// }
456 ///
457 /// // Later: strict connect against the persisted snapshot.
458 /// let instance = Instance::connect(&url).await?;
459 /// let _user = instance.login_user("alice", None).await?;
460 /// # Ok(())
461 /// # }
462 /// ```
463 ///
464 /// Calling `connect()` on a backend with no eidetica metadata returns
465 /// [`InstanceError::NotInitialized`]; reach for
466 /// [`Instance::connect_or_create`] when first-run bootstrap is part
467 /// of the expected lifecycle.
468 pub async fn connect(url: impl AsRef<str>) -> Result<Self> {
469 Self::connect_impl(url.as_ref(), Arc::new(SystemClock)).await
470 }
471
472 /// Open or initialise an eidetica instance described by a connection URL.
473 ///
474 /// On the load arm: identical to [`Instance::connect`]; `initial` is
475 /// silently ignored and the second tuple element is `None`.
476 ///
477 /// On the bootstrap arm: initialises the backend at the URL with the
478 /// supplied [`NewUser`] as the first admin and returns
479 /// `(Instance, Some(User))`. Only embedded backends
480 /// (sqlite/postgres/memory) ever take the bootstrap arm — `unix://` URLs
481 /// degrade to `connect` (the daemon owns its own initialisation), so the
482 /// returned `Option<User>` is always `None` for `unix://`.
483 ///
484 /// # Example
485 /// ```
486 /// # use eidetica::{Instance, NewUser};
487 /// # #[tokio::main]
488 /// # async fn main() -> eidetica::Result<()> {
489 /// let (instance, maybe_user) = Instance::connect_or_create(
490 /// "memory://",
491 /// NewUser::passwordless("alice"),
492 /// ).await?;
493 /// let mut user = match maybe_user {
494 /// Some(u) => u,
495 /// None => instance.login_user("alice", None).await?,
496 /// };
497 /// # let _ = user.get_default_key()?;
498 /// # Ok(())
499 /// # }
500 /// ```
501 pub async fn connect_or_create(
502 url: impl AsRef<str>,
503 initial: NewUser,
504 ) -> Result<(Self, Option<User>)> {
505 Self::connect_or_create_impl(url.as_ref(), initial, Arc::new(SystemClock)).await
506 }
507
508 /// Escape hatch: open or initialise an eidetica instance against a
509 /// pre-built [`BackendImpl`] (sqlite, postgres, in-memory, or custom).
510 ///
511 /// Same load-or-bootstrap semantics as [`Instance::connect_or_create`]
512 /// but skips URL parsing. Useful for tests, embedded apps that want to
513 /// configure the backend's pool/runtime manually, or backends not yet
514 /// exposed via a URL scheme.
515 pub async fn connect_or_create_backend(
516 backend: Box<dyn BackendImpl>,
517 initial: NewUser,
518 ) -> Result<(Self, Option<User>)> {
519 Self::connect_or_create_backend_impl(
520 Arc::from(backend),
521 initial,
522 Arc::new(SystemClock),
523 None,
524 )
525 .await
526 }
527
528 /// Strict-load escape hatch: open an eidetica instance against a
529 /// pre-built [`BackendImpl`] that's already been initialised. Mirrors
530 /// [`Instance::connect`]'s strict semantics for the URL-less case.
531 ///
532 /// Errors with [`InstanceError::NotInitialized`] if the backend has no
533 /// instance metadata; use [`Instance::connect_or_create_backend`] when you want
534 /// to bootstrap on an empty backend.
535 pub async fn open_backend(backend: Box<dyn BackendImpl>) -> Result<Self> {
536 Self::open_impl(backend, Arc::new(SystemClock)).await
537 }
538
539 /// Test variant of [`Instance::open_backend`] with an injectable clock.
540 #[cfg(any(test, feature = "testing"))]
541 pub async fn open_backend_with_clock(
542 backend: Box<dyn BackendImpl>,
543 clock: Arc<dyn Clock>,
544 ) -> Result<Self> {
545 Self::open_impl(backend, clock).await
546 }
547
548 /// Strict-create escape hatch: initialise an eidetica instance on a
549 /// fresh pre-built [`BackendImpl`] and bootstrap an initial admin user.
550 ///
551 /// Errors with [`InstanceError::InstanceAlreadyExists`] if the backend
552 /// is already initialised; use [`Instance::connect_or_create_backend`] when the
553 /// caller doesn't want to choose between load and create up front.
554 pub async fn create_backend(
555 backend: Box<dyn BackendImpl>,
556 initial: NewUser,
557 ) -> Result<(Self, User)> {
558 Self::create_backend_impl(backend, initial, Arc::new(SystemClock)).await
559 }
560
561 /// Test variant of [`Instance::create_backend`] with an injectable clock.
562 ///
563 /// Arg order: backend, clock, initial — clock goes in the middle so
564 /// migrating from the prior `create_with_clock` is a pure rename.
565 #[cfg(any(test, feature = "testing"))]
566 pub async fn create_backend_with_clock(
567 backend: Box<dyn BackendImpl>,
568 clock: Arc<dyn Clock>,
569 initial: NewUser,
570 ) -> Result<(Self, User)> {
571 Self::create_backend_impl(backend, initial, clock).await
572 }
573
574 async fn create_backend_impl(
575 backend: Box<dyn BackendImpl>,
576 initial: NewUser,
577 clock: Arc<dyn Clock>,
578 ) -> Result<(Self, User)> {
579 let backend: Arc<dyn BackendImpl> = Arc::from(backend);
580 if backend.get_instance_metadata().await?.is_some() {
581 return Err(InstanceError::InstanceAlreadyExists.into());
582 }
583 Self::create_internal(backend, clock, initial).await
584 }
585
586 // Clock injection is exposed only through the pre-built-backend
587 // variants ([`open_backend_with_clock`] and [`create_backend_with_clock`]).
588 // The URL-based `connect_*` constructors deliberately have no
589 // `_with_clock` siblings: every existing test that needs deterministic
590 // timestamps already builds an `InMemory` backend directly, so a URL-
591 // shaped clock entry point would be dead weight.
592
593 // ============ Internal URL dispatchers ============
594
595 async fn connect_impl(url: &str, clock: Arc<dyn Clock>) -> Result<Self> {
596 let parsed = url::parse(url)?;
597 match parsed {
598 url::ConnectionUrl::Sqlite { url } => Self::connect_sqlite(&url, clock).await,
599 url::ConnectionUrl::Postgres { url } => Self::connect_postgres(&url, clock).await,
600 url::ConnectionUrl::Unix { socket_path } => {
601 Self::connect_unix_socket(socket_path, clock).await
602 }
603 url::ConnectionUrl::Memory { snapshot_path } => {
604 Self::connect_memory(snapshot_path, clock).await
605 }
606 }
607 }
608
609 async fn connect_or_create_impl(
610 url: &str,
611 initial: NewUser,
612 clock: Arc<dyn Clock>,
613 ) -> Result<(Self, Option<User>)> {
614 let parsed = url::parse(url)?;
615 match parsed {
616 url::ConnectionUrl::Sqlite { url } => {
617 let backend = open_sqlite_backend(&url).await?;
618 Self::connect_or_create_backend_impl(Arc::from(backend), initial, clock, None).await
619 }
620 url::ConnectionUrl::Postgres { url } => {
621 let backend = open_postgres_backend(&url).await?;
622 Self::connect_or_create_backend_impl(Arc::from(backend), initial, clock, None).await
623 }
624 url::ConnectionUrl::Unix { socket_path } => {
625 // Daemons own their own initialisation. connect_or_create
626 // against `unix://` degrades to a plain connect; `initial`
627 // is unused on this arm. Log it so the silent drop is
628 // discoverable when debugging "why didn't my initial user
629 // get created?" on a remote URL.
630 tracing::debug!(
631 socket_path = %socket_path.display(),
632 username = %initial.username,
633 "connect_or_create against `unix://` is degrading to `connect`; \
634 `initial` is ignored — daemons own their own initialisation. \
635 Run `eidetica daemon init` to bootstrap a daemon-side instance."
636 );
637 let instance = Self::connect_unix_socket(socket_path, clock).await?;
638 Ok((instance, None))
639 }
640 url::ConnectionUrl::Memory { snapshot_path } => {
641 use crate::backend::database::InMemory;
642 // Build the backend from a single `try_load_from_file` call.
643 // `Ok(None)` means the file didn't exist at read time (the
644 // bootstrap-friendly "first run" case → empty backend).
645 // `Ok(Some(loaded))` means the file existed and parsed; if
646 // it carries no instance metadata it's foreign data that
647 // happened to satisfy the `SerializableDatabase` shape, and
648 // we refuse to bootstrap over it (the next snapshot would
649 // silently overwrite the caller's file). Doing the existence
650 // test and the read in one call removes the TOCTOU window a
651 // separate `path.exists()` check would open.
652 let backend: Box<dyn BackendImpl> = match snapshot_path.as_deref() {
653 None => Box::new(InMemory::new()),
654 Some(path) => {
655 let loaded = InMemory::try_load_from_file(path).await.map_err(|e| {
656 InstanceError::InvalidSnapshot {
657 path: path.to_path_buf(),
658 reason: e.to_string(),
659 }
660 })?;
661 match loaded {
662 None => Box::new(InMemory::new()),
663 Some(loaded) => {
664 let boxed: Box<dyn BackendImpl> = Box::new(loaded);
665 if boxed.get_instance_metadata().await?.is_none() {
666 return Err(InstanceError::InvalidSnapshot {
667 path: path.to_path_buf(),
668 reason: "snapshot file exists but contains no instance \
669 metadata; refusing to bootstrap on top of foreign \
670 data. Delete or move the file to create a fresh \
671 instance at this path."
672 .into(),
673 }
674 .into());
675 }
676 boxed
677 }
678 }
679 }
680 };
681 Self::connect_or_create_backend_impl(
682 Arc::from(backend),
683 initial,
684 clock,
685 snapshot_path,
686 )
687 .await
688 }
689 }
690 }
691
692 /// Internal: load-or-bootstrap against a pre-built backend, optionally
693 /// remembering a snapshot path so Drop / flush can write to it.
694 async fn connect_or_create_backend_impl(
695 backend: Arc<dyn BackendImpl>,
696 initial: NewUser,
697 clock: Arc<dyn Clock>,
698 snapshot_path: Option<PathBuf>,
699 ) -> Result<(Self, Option<User>)> {
700 if let Some(metadata) = backend.get_instance_metadata().await? {
701 let instance = Self::open_impl_arc_with_metadata(backend, clock, metadata).await?;
702 instance.set_snapshot_path(snapshot_path);
703 Ok((instance, None))
704 } else {
705 let (instance, user) = Self::create_internal(backend, clock, initial).await?;
706 instance.set_snapshot_path(snapshot_path);
707 Ok((instance, Some(user)))
708 }
709 }
710
711 // ============ Backend connection helpers ============
712
713 #[cfg(all(unix, feature = "service"))]
714 async fn connect_unix_socket(socket_path: PathBuf, clock: Arc<dyn Clock>) -> Result<Self> {
715 let conn = crate::service::client::RemoteConnection::connect(&socket_path).await?;
716 let backend: Arc<dyn Backend> = Arc::new(RemoteBackend::new(conn, None));
717
718 // Load metadata from the remote backend
719 let metadata = backend
720 .get_instance_metadata()
721 .await?
722 .ok_or(InstanceError::DeviceKeyNotFound)?;
723
724 // No local secrets — keys are held server-side after login.
725 let inner = Arc::new(InstanceInternal {
726 backend,
727 clock,
728 sync: std::sync::OnceLock::new(),
729 metadata,
730 secrets: None,
731 snapshot_path: Mutex::new(None),
732 write_callbacks: Mutex::new(HashMap::new()),
733 global_write_callbacks: Mutex::new(Vec::new()),
734 next_callback_id: AtomicU64::new(0),
735 tree_locks: Mutex::new(HashMap::new()),
736 });
737 Ok(Self { inner })
738 }
739
740 #[cfg(not(all(unix, feature = "service")))]
741 async fn connect_unix_socket(_socket_path: PathBuf, _clock: Arc<dyn Clock>) -> Result<Self> {
742 Err(InstanceError::BackendUnavailable {
743 scheme: "unix",
744 missing_feature: "service",
745 }
746 .into())
747 }
748
749 async fn connect_sqlite(url: &str, clock: Arc<dyn Clock>) -> Result<Self> {
750 let backend = open_sqlite_backend(url).await?;
751 Self::open_impl(backend, clock).await
752 }
753
754 async fn connect_postgres(url: &str, clock: Arc<dyn Clock>) -> Result<Self> {
755 let backend = open_postgres_backend(url).await?;
756 Self::open_impl(backend, clock).await
757 }
758
759 async fn connect_memory(snapshot_path: Option<PathBuf>, clock: Arc<dyn Clock>) -> Result<Self> {
760 use crate::backend::database::InMemory;
761 // Strict load: a snapshot URL that points at a non-existent file
762 // cannot satisfy `connect`'s "must already be initialised" contract.
763 // `try_load_from_file` returns `Ok(None)` when the file doesn't
764 // exist, which we translate into a pointed `InvalidSnapshot`. Using
765 // the same call for the existence test and the read removes the
766 // TOCTOU window a separate `path.exists()` check would open: if the
767 // file vanishes mid-call, the underlying `read_to_string` surfaces
768 // `NotFound` and lands us in the same `None` arm.
769 let backend: Box<dyn BackendImpl> = match snapshot_path.as_deref() {
770 None => Box::new(InMemory::new()),
771 Some(path) => {
772 let loaded = InMemory::try_load_from_file(path).await.map_err(|e| {
773 InstanceError::InvalidSnapshot {
774 path: path.to_path_buf(),
775 reason: e.to_string(),
776 }
777 })?;
778 match loaded {
779 Some(loaded) => Box::new(loaded),
780 None => {
781 return Err(InstanceError::InvalidSnapshot {
782 path: path.to_path_buf(),
783 reason: "snapshot file does not exist; \
784 use `Instance::connect_or_create` to bootstrap a new instance \
785 at this path, or pass `memory://` for an ephemeral instance"
786 .into(),
787 }
788 .into());
789 }
790 }
791 }
792 };
793 let instance = Self::open_impl(backend, clock).await?;
794 instance.set_snapshot_path(snapshot_path);
795 Ok(instance)
796 }
797
798 /// Flush deferred persistence state to disk.
799 ///
800 /// For an `Instance` constructed via a `memory:///path.json` URL, this
801 /// writes the current backend state to the snapshot path (atomic on
802 /// POSIX — `<path>.tmp` then rename). For sqlite/postgres/unix
803 /// backends this is a no-op; those storage layers handle persistence
804 /// inline.
805 ///
806 /// Idempotent and reentrant — call it as often as you like at
807 /// well-defined checkpoints. The snapshot path stays armed, so the
808 /// [`Drop`] fallback continues to fire on the last handle as a safety
809 /// net. The `Instance` (and any clones) remain fully usable after
810 /// `flush()` returns; this is not a shutdown.
811 ///
812 /// If `flush()` fails (e.g. nonexistent parent directory), the error
813 /// surfaces in the `Result`. Drop will later try the same write and
814 /// fail the same way, logging via `tracing::error!`. The duplicate
815 /// signal is intentional — Drop must report what it sees.
816 ///
817 /// **Blocking I/O note:** the snapshot write is synchronous
818 /// (`std::fs::write` + `rename`) and runs inline on the caller. Hence
819 /// the sync signature — there is no `.await` inside. If you're calling
820 /// from a tokio task, this briefly blocks the runtime worker;
821 /// negligible for small snapshots.
822 pub fn flush(&self) -> Result<()> {
823 // Acquire the snapshot_path lock once and hold it across the
824 // write — the lock both gates the path slot and serializes the
825 // sync I/O so concurrent flushes don't clobber each other's
826 // staging tempfile. The path stays armed (we read, don't take)
827 // so subsequent flushes and the Drop safety net keep working.
828 let guard = self
829 .inner
830 .snapshot_path
831 .lock()
832 .unwrap_or_else(|p| p.into_inner());
833 if let Some(path) = guard.as_deref() {
834 self.inner.save_snapshot_locked(path)?;
835 }
836 Ok(())
837 }
838
839 /// Write a JSON snapshot of the in-memory backend to `path`.
840 ///
841 /// The write goes to `<path>.tmp` and then renames into place. On POSIX
842 /// the rename is atomic; on Windows it is not atomic when the
843 /// destination already exists. Returns
844 /// [`InstanceError::SnapshotNotSupported`] on any backend other than
845 /// the in-memory backend.
846 pub fn snapshot_to_path(&self, path: impl AsRef<std::path::Path>) -> Result<()> {
847 let _guard = self
848 .inner
849 .snapshot_path
850 .lock()
851 .unwrap_or_else(|p| p.into_inner());
852 self.inner.save_snapshot_locked(path.as_ref())
853 }
854
855 /// Stash the snapshot path on the InstanceInternal so Drop / close can
856 /// find it. Only meaningful for in-memory backends — no-op for others.
857 fn set_snapshot_path(&self, path: Option<PathBuf>) {
858 if path.is_none() {
859 return;
860 }
861 // Poison-tolerant: a panic in another holder must not strand the
862 // Instance — the snapshot path is a simple swappable Option.
863 let mut guard = self
864 .inner
865 .snapshot_path
866 .lock()
867 .unwrap_or_else(|p| p.into_inner());
868 *guard = path;
869 }
870
871 /// Internal load-only implementation that works with any clock.
872 async fn open_impl(backend: Box<dyn BackendImpl>, clock: Arc<dyn Clock>) -> Result<Self> {
873 let backend: Arc<dyn BackendImpl> = Arc::from(backend);
874
875 // Strict: require existing InstanceMetadata. Initialisation is the
876 // caller's responsibility (`connect_or_create` / `connect_or_create_backend`).
877 let metadata = backend
878 .get_instance_metadata()
879 .await?
880 .ok_or(InstanceError::NotInitialized)?;
881
882 // Load secrets (contains the private key)
883 let secrets = backend.get_instance_secrets().await?;
884
885 // If secrets are present, verify they match the metadata
886 if let Some(ref secrets) = secrets {
887 let derived_id = secrets.signing_key.public_key();
888 if derived_id != metadata.id {
889 return Err(InstanceError::DeviceKeyMismatch.into());
890 }
891 }
892
893 // Existing backend: load from metadata + secrets
894 let inner = Arc::new(InstanceInternal {
895 backend: Arc::new(LocalBackend::new(backend)),
896 clock,
897 sync: std::sync::OnceLock::new(),
898 metadata,
899 secrets,
900 snapshot_path: Mutex::new(None),
901 write_callbacks: Mutex::new(HashMap::new()),
902 global_write_callbacks: Mutex::new(Vec::new()),
903 next_callback_id: AtomicU64::new(0),
904 tree_locks: Mutex::new(HashMap::new()),
905 });
906 Ok(Self { inner })
907 }
908
909 /// Load-only helper that accepts an already-arc'd backend and the
910 /// already-fetched metadata. Used by `connect_or_create_backend_impl`,
911 /// which has already inspected metadata to choose between the load
912 /// and bootstrap arms — passing it through avoids a redundant
913 /// `get_instance_metadata` round-trip.
914 async fn open_impl_arc_with_metadata(
915 backend: Arc<dyn BackendImpl>,
916 clock: Arc<dyn Clock>,
917 metadata: InstanceMetadata,
918 ) -> Result<Self> {
919 let secrets = backend.get_instance_secrets().await?;
920 if let Some(ref secrets) = secrets {
921 let derived_id = secrets.signing_key.public_key();
922 if derived_id != metadata.id {
923 return Err(InstanceError::DeviceKeyMismatch.into());
924 }
925 }
926 let inner = Arc::new(InstanceInternal {
927 backend: Arc::new(LocalBackend::new(backend)),
928 clock,
929 sync: std::sync::OnceLock::new(),
930 metadata,
931 secrets,
932 snapshot_path: Mutex::new(None),
933 write_callbacks: Mutex::new(HashMap::new()),
934 global_write_callbacks: Mutex::new(Vec::new()),
935 next_callback_id: AtomicU64::new(0),
936 tree_locks: Mutex::new(HashMap::new()),
937 });
938 Ok(Self { inner })
939 }
940
941 /// Internal create implementation. Returns the new `Instance` along with
942 /// the just-bootstrapped initial `User`, materialised directly from the
943 /// keys we generated (no redundant login round-trip).
944 pub(crate) async fn create_internal(
945 backend: Arc<dyn BackendImpl>,
946 clock: Arc<dyn Clock>,
947 initial: NewUser,
948 ) -> Result<(Self, User)> {
949 use crate::user::system_databases::{create_databases_tracking, create_users_database};
950
951 // 1. Generate device key
952 let device_key = PrivateKey::generate();
953 let device_id = device_key.public_key();
954
955 // 2. Create system databases with device_key passed directly
956 // Create a temporary Instance for database creation (databases will store full IDs later)
957 //
958 // SAFETY: The temporary instance has empty users_db_id and databases_db_id placeholders.
959 // This is safe because:
960 // 1. We only use it to create new system databases via Database::create()
961 // 2. Database::create() doesn't access the instance's system database IDs
962 // 3. The system databases don't exist yet, so their IDs can't be referenced
963 // 4. The temporary instance is only used during initial setup and discarded
964 // 5. The real instance is constructed afterward with the correct database IDs
965 let temp_instance = Self {
966 inner: Arc::new(InstanceInternal {
967 backend: Arc::new(LocalBackend::new(Arc::clone(&backend))),
968 clock: Arc::clone(&clock),
969 sync: std::sync::OnceLock::new(),
970 metadata: InstanceMetadata {
971 id: device_id.clone(),
972 users_db: ID::default(), // Placeholder - system DBs don't exist yet
973 databases_db: ID::default(), // Placeholder - system DBs don't exist yet
974 sync_db: None,
975 },
976 secrets: Some(InstanceSecrets {
977 signing_key: device_key.clone(),
978 }),
979 snapshot_path: Mutex::new(None),
980 write_callbacks: Mutex::new(HashMap::new()),
981 global_write_callbacks: Mutex::new(Vec::new()),
982 next_callback_id: AtomicU64::new(0),
983 tree_locks: Mutex::new(HashMap::new()),
984 }),
985 };
986 let users_db = create_users_database(&temp_instance, &device_key).await?;
987 let databases_db = create_databases_tracking(&temp_instance, &device_key).await?;
988
989 // 3. Save metadata and secrets (marks instance as initialized)
990 // NB: Ordering matters. Secrets are stored first, then Metadata.
991 // The presence of the Metadata indicates the instance is fully initialized.
992 let secrets = InstanceSecrets {
993 signing_key: device_key,
994 };
995 backend.set_instance_secrets(&secrets).await?;
996
997 let metadata = InstanceMetadata {
998 id: device_id,
999 users_db: users_db.root_id().clone(),
1000 databases_db: databases_db.root_id().clone(),
1001 sync_db: None,
1002 };
1003 backend.set_instance_metadata(&metadata).await?;
1004
1005 // 4. Build real instance
1006 let inner = Arc::new(InstanceInternal {
1007 backend: Arc::new(LocalBackend::new(backend)),
1008 clock,
1009 sync: std::sync::OnceLock::new(),
1010 metadata,
1011 secrets: Some(secrets),
1012 snapshot_path: Mutex::new(None),
1013 write_callbacks: Mutex::new(HashMap::new()),
1014 global_write_callbacks: Mutex::new(Vec::new()),
1015 next_callback_id: AtomicU64::new(0),
1016 tree_locks: Mutex::new(HashMap::new()),
1017 });
1018
1019 let instance = Self { inner };
1020
1021 // 5. Bootstrap the initial user. The first user created on an
1022 // instance is automatically promoted to Admin on the system
1023 // databases by `system_databases::create_user`'s
1024 // first-user-becomes-admin logic.
1025 let users_db = instance.users_db().await?;
1026 let (user_uuid, user_info, root_key) = crate::user::system_databases::create_user(
1027 &users_db,
1028 &instance,
1029 &initial.username,
1030 initial.password.as_deref(),
1031 )
1032 .await?;
1033
1034 // 6. Materialise the User session directly from the keys we just
1035 // generated — skips a redundant `login_user` round-trip that would
1036 // otherwise re-derive the encryption key from the password.
1037 let user = crate::user::system_databases::build_user_session(
1038 &instance,
1039 &user_uuid,
1040 &user_info,
1041 root_key,
1042 initial.password.as_deref(),
1043 )
1044 .await?;
1045
1046 Ok((instance, user))
1047 }
1048
1049 /// Get a reference to the backend seam.
1050 pub fn backend(&self) -> &Arc<dyn Backend> {
1051 &self.inner.backend
1052 }
1053
1054 /// The concrete in-process storage engine, or [`OperationNotSupported`] on
1055 /// a remote instance.
1056 ///
1057 /// Off-seam local-only operations (instance secrets, verification-status
1058 /// mutation, `all_roots`/`get_tree` raw dumps, scope-keyed cache) are
1059 /// performed through this accessor, so they are reachable only where a
1060 /// concrete local backend exists.
1061 ///
1062 /// [`OperationNotSupported`]: InstanceError::OperationNotSupported
1063 pub(crate) fn require_local_engine(&self) -> Result<Arc<dyn BackendImpl>> {
1064 self.inner.backend.local_engine().ok_or_else(|| {
1065 InstanceError::OperationNotSupported {
1066 operation: "local backend engine on remote instance".to_string(),
1067 }
1068 .into()
1069 })
1070 }
1071
1072 /// The remote connection backing this instance, if it was created via
1073 /// [`connect`](Self::connect). Returns `None` for local instances.
1074 ///
1075 /// Useful for constructing a [`Database`](crate::Database) that routes
1076 /// reads through the Database-level wire API while sharing the same
1077 /// connection and session as the instance's write path.
1078 #[cfg(all(unix, feature = "service"))]
1079 pub fn remote_connection(&self) -> Option<RemoteConnection> {
1080 self.inner.backend.remote_connection()
1081 }
1082
1083 /// Check if an entry exists in storage.
1084 pub async fn has_entry(&self, id: &ID) -> bool {
1085 self.inner.backend.get(id).await.is_ok()
1086 }
1087
1088 /// Check if a database is present locally.
1089 ///
1090 /// This differs from `has_entry` in that it checks for the active tracking
1091 /// of the database by the Instance. This method checks if we're tracking
1092 /// the database's tip state.
1093 pub async fn has_database(&self, root_id: &ID) -> bool {
1094 match self.inner.backend.snapshot(root_id).await {
1095 Ok(snap) => !snap.is_empty(),
1096 Err(_) => false,
1097 }
1098 }
1099
1100 /// Get a reference to the clock.
1101 ///
1102 /// The clock is used for timestamps in height calculations and peer tracking.
1103 pub(crate) fn clock(&self) -> &dyn Clock {
1104 &*self.inner.clock
1105 }
1106
1107 /// Get a cloned Arc of the clock.
1108 ///
1109 /// Used when passing the clock to components that need ownership (e.g., HeightCalculator).
1110 pub(crate) fn clock_arc(&self) -> Arc<dyn Clock> {
1111 self.inner.clock.clone()
1112 }
1113
1114 // === Backend pass-through methods (pub(crate) for internal use) ===
1115
1116 /// Get an entry from the backend
1117 pub(crate) async fn get(&self, id: &crate::entry::ID) -> Result<crate::entry::Entry> {
1118 self.inner.backend.get(id).await
1119 }
1120
1121 /// Put an entry into the backend. Always stored Unverified — see
1122 /// [`crate::backend::BackendImpl::put`].
1123 pub(crate) async fn put(&self, entry: crate::entry::Entry) -> Result<()> {
1124 self.inner.backend.put(entry).await
1125 }
1126
1127 /// Returns the current [`crate::Snapshot`] of `tree` — its DAG tips. See
1128 /// [`Database::snapshot`] for the public entry point.
1129 pub(crate) async fn snapshot(
1130 &self,
1131 tree: &crate::entry::ID,
1132 ) -> Result<crate::snapshot::Snapshot> {
1133 self.inner.backend.snapshot(tree).await
1134 }
1135
1136 // === System database accessors ===
1137
1138 /// Get the _users database
1139 ///
1140 /// This constructs a Database instance on-the-fly to avoid circular references.
1141 /// On a local instance the device signing key is attached so users-table
1142 /// writes (e.g., the local `create_user` path) can sign. On a remote
1143 /// instance the device key lives on the daemon side and isn't available
1144 /// locally, so no key is attached — the returned handle is read-only.
1145 /// Write paths on a remote instance must instead go through
1146 /// [`Instance::users_db_for_session`], which attaches the caller's
1147 /// session signing key (e.g. admin's key on the `InstanceAdmin`
1148 /// `create_user` path) and routes through `Database::open_remote`.
1149 pub(crate) async fn users_db(&self) -> Result<Database> {
1150 let db = Database::open(self, &self.inner.metadata.users_db).await?;
1151 #[cfg(all(unix, feature = "service"))]
1152 if self.remote_connection().is_some() {
1153 return Ok(db);
1154 }
1155 Ok(db.with_key(self.signing_key()?.clone()))
1156 }
1157
1158 /// Open the _users system database with a specific signing key (not the device
1159 /// key). Used by the admin-session paths
1160 /// ([`InstanceAdmin`](crate::user::InstanceAdmin), `User::admin_check`) on
1161 /// remote instances where the device key is unavailable.
1162 pub(crate) async fn users_db_for_session(&self, signing_key: &PrivateKey) -> Result<Database> {
1163 self.open_system_db_for_session(&self.inner.metadata.users_db, signing_key)
1164 .await
1165 }
1166
1167 /// Open a system database for an authenticated session.
1168 ///
1169 /// On a remote instance this routes every read through the connection's
1170 /// Database wire protocol ([`Database::open_remote`], a per-handle
1171 /// `RemoteBackend`), gated by the session key's identity — the plain
1172 /// [`Database::open`] path instead clones the instance's session backend,
1173 /// so on a connected instance its reads carry the connection's login
1174 /// identity. On a local instance it opens against the local backend as
1175 /// before. The signing key is attached for writes.
1176 pub(crate) async fn open_system_db_for_session(
1177 &self,
1178 root_id: &ID,
1179 signing_key: &PrivateKey,
1180 ) -> Result<Database> {
1181 #[cfg(all(unix, feature = "service"))]
1182 if let Some(conn) = self.remote_connection() {
1183 // The daemon gates per-tree reads against the acting pubkey
1184 // from the request's identity hint, and the hint here is
1185 // `signing_key.public_key()` (the caller's chosen identity for
1186 // this DB). The hint must be in the connection's session keyset
1187 // — register it now so subsequent reads through the returned
1188 // `RemoteBackend` are accepted.
1189 conn.register_session_key(signing_key).await?;
1190 let identity = SigKey::from_pubkey(&signing_key.public_key());
1191 return Ok(Database::open_remote(self, conn, root_id, identity)
1192 .await?
1193 .with_key(signing_key.clone()));
1194 }
1195 Ok(Database::open(self, root_id)
1196 .await?
1197 .with_key(signing_key.clone()))
1198 }
1199
1200 /// Get the _databases tracking database
1201 ///
1202 /// Parallel to `users_db()` — opens the instance's database-registry
1203 /// system DB with the device signing key attached. Used by the
1204 /// instance-admin bootstrap path (`system_databases::create_user`) to add
1205 /// the first user's pubkey as `Admin(0)` on the registry, so subsequent
1206 /// admin-gated instance ops (e.g., `SetInstanceMetadata`) can authorize
1207 /// against the user's key instead of the device key.
1208 pub(crate) async fn databases_db(&self) -> Result<Database> {
1209 Ok(Database::open(self, &self.inner.metadata.databases_db)
1210 .await?
1211 .with_key(self.signing_key()?.clone()))
1212 }
1213
1214 /// Root id of the `_databases` system DB.
1215 ///
1216 /// The service daemon uses this to gate admin-only ops
1217 /// (e.g., `SetInstanceMetadata`) against `_databases.auth_settings`:
1218 /// an instance admin is a user with `Admin` on `_databases`.
1219 pub(crate) fn databases_db_id(&self) -> &ID {
1220 &self.inner.metadata.databases_db
1221 }
1222
1223 /// Root id of the `_users` system DB.
1224 ///
1225 /// Parallel to `databases_db_id()`. Lets an instance admin open `_users`
1226 /// keyed by their own signing key (rather than the device key that
1227 /// `users_db()` attaches), so admin-gated edits to `_users.auth_settings`
1228 /// resolve against the admin's identity.
1229 pub(crate) fn users_db_id(&self) -> &ID {
1230 &self.inner.metadata.users_db
1231 }
1232
1233 // === User Management ===
1234
1235 /// Login a user with flexible password handling.
1236 ///
1237 /// Returns a User session object that provides access to user operations.
1238 /// For password-protected users, provide the password. For passwordless users, pass None.
1239 ///
1240 /// # Arguments
1241 /// * `user_id` - User identifier (username)
1242 /// * `password` - Optional password. None for passwordless users.
1243 ///
1244 /// # Returns
1245 /// A Result containing the User session
1246 pub async fn login_user(&self, user_id: &str, password: Option<&str>) -> Result<User> {
1247 // On a remote instance, the `TrustedLogin*` handshake authenticates the
1248 // socket connection AND ships back the user's full `UserInfo` plus the
1249 // decrypted root signing key. Build the `User` session from those —
1250 // the per-tree gate means a freshly-logged-in user with no permissions
1251 // on `_users` couldn't re-read it over the wire anyway, so we don't try.
1252 #[cfg(all(unix, feature = "service"))]
1253 if let Some(conn) = self.remote_connection() {
1254 let (user_uuid, user_info, signing_key) = conn.trusted_login(user_id, password).await?;
1255 return crate::user::system_databases::build_user_session(
1256 self,
1257 &user_uuid,
1258 &user_info,
1259 signing_key,
1260 password,
1261 )
1262 .await;
1263 }
1264
1265 use crate::user::system_databases::login_user;
1266 let users_db = self.users_db().await?;
1267 login_user(&users_db, self, user_id, password).await
1268 }
1269
1270 // === User-Sync Integration ===
1271
1272 // === Device Identity Management ===
1273 //
1274 // The Instance's public identity is stored in InstanceMetadata, and the private
1275 // signing key is stored in InstanceSecrets. Both are cached in memory.
1276
1277 /// Get the device signing key.
1278 ///
1279 /// # Internal Use Only
1280 ///
1281 /// This method provides direct access to the instance's cryptographic identity
1282 /// and is intended for internal operations that require the device key (sync,
1283 /// system database creation, authentication validation, etc.).
1284 ///
1285 /// These operations should only be performed by the server/instance administrator,
1286 /// but we don't verify that yet. Future versions may add admin permission checks.
1287 ///
1288 /// Similar to `Database::open` (without a key), this is a controlled escape hatch
1289 /// for internal library operations. Use with care - prefer User API for normal operations.
1290 ///
1291 /// Returns an error if this is a remote Instance that does not have access to the
1292 /// device key (e.g., connected via RPC where secrets are never transmitted).
1293 #[cfg(not(any(test, feature = "testing")))]
1294 pub(crate) fn signing_key(&self) -> Result<&PrivateKey> {
1295 self.inner
1296 .secrets
1297 .as_ref()
1298 .map(|s| &s.signing_key)
1299 .ok_or_else(|| InstanceError::DeviceKeyNotFound.into())
1300 }
1301
1302 /// Test-only: Get the device signing key.
1303 ///
1304 /// This is exposed for testing purposes only. In production, use the User API.
1305 ///
1306 /// Returns an error if this is a remote Instance that does not have access to the
1307 /// device key.
1308 #[cfg(any(test, feature = "testing"))]
1309 pub fn signing_key(&self) -> Result<&PrivateKey> {
1310 self.inner
1311 .secrets
1312 .as_ref()
1313 .map(|s| &s.signing_key)
1314 .ok_or_else(|| InstanceError::DeviceKeyNotFound.into())
1315 }
1316
1317 /// Get the instance identity (public key).
1318 ///
1319 /// # Returns
1320 /// The instance's public key identity.
1321 pub fn id(&self) -> PublicKey {
1322 self.inner.metadata.id.clone()
1323 }
1324
1325 // === Synchronization Management ===
1326 //
1327 // These methods provide access to the Sync module for managing synchronization
1328 // settings and state for this database instance.
1329
1330 /// Initializes the Sync module for this instance.
1331 ///
1332 /// Enables synchronization operations for this instance. This method is idempotent;
1333 /// calling it multiple times has no effect.
1334 ///
1335 /// # Errors
1336 /// Returns an error if the sync settings database cannot be created or if device key
1337 /// generation/storage fails.
1338 pub async fn enable_sync(&self) -> Result<()> {
1339 // Check if there is an existing Sync already loaded
1340 if self.inner.sync.get().is_some() {
1341 return Ok(());
1342 }
1343
1344 // A remote Instance must not run sync client-side: building a Sync
1345 // here would spin up a background sync engine that drives RPCs against
1346 // the daemon's backend — duplicating (and racing) the daemon's own
1347 // sync. Sync is owned by the process that owns the Instance.
1348 //
1349 // Return `Ok(())` so callers on a connected instance get the same
1350 // no-op success they would on a local instance where sync is already
1351 // running daemon-side. Long-term this should become an admin-gated
1352 // operation that lets a client ask the daemon to enable its sync
1353 // subsystem; until that ships, the client-side `enable_sync` is
1354 // intentionally a silent no-op because the daemon either already
1355 // has sync running or it doesn't, and the client can't change that.
1356 //
1357 // TODO(service): expose an admin-gated `enable_sync` on
1358 // `InstanceAdmin` so a client can enable sync remotely.
1359 #[cfg(all(unix, feature = "service"))]
1360 if self.remote_connection().is_some() {
1361 return Ok(());
1362 }
1363
1364 // Check InstanceMetadata for existing sync_db
1365 let metadata = self
1366 .backend()
1367 .get_instance_metadata()
1368 .await?
1369 .ok_or(InstanceError::DeviceKeyNotFound)?; // Metadata must exist if instance is initialized
1370
1371 let sync = if let Some(ref sync_db) = metadata.sync_db {
1372 // Load existing sync tree
1373 Sync::load(self.clone(), sync_db).await?
1374 } else {
1375 // Create new sync tree
1376 let sync = Sync::new(self.clone()).await?;
1377
1378 // Save sync_db to metadata
1379 let mut new_metadata = metadata;
1380 new_metadata.sync_db = Some(sync.sync_tree_root_id().clone());
1381 self.backend().set_instance_metadata(&new_metadata).await?;
1382
1383 sync
1384 };
1385
1386 let sync_arc = Arc::new(sync);
1387
1388 // Initialize the sync engine (no transports registered yet)
1389 // Users should call register_transport() to add transports
1390 sync_arc.start_background_sync()?;
1391
1392 // Register global callback for automatic sync on local writes.
1393 // Detached: lives for the life of the Instance.
1394 let sync_for_callback = Arc::clone(&sync_arc);
1395 self.register_global_write_callback(move |event, database| {
1396 let sync = Arc::clone(&sync_for_callback);
1397 let event = event.clone();
1398 let database = database.clone();
1399 async move {
1400 if event.source() != WriteSource::Local {
1401 return Ok(());
1402 }
1403 // Local writes always carry exactly one entry today, but
1404 // the loop in `Sync::on_local_write` is intentionally
1405 // ready for future multi-entry local events.
1406 sync.on_local_write(&event, &database).await
1407 }
1408 });
1409
1410 let _ = self.inner.sync.set(sync_arc);
1411 Ok(())
1412 }
1413
1414 /// Get a reference to the Sync module.
1415 ///
1416 /// Returns a cheap-to-clone Arc handle to the Sync module. The Sync module
1417 /// uses interior mutability (AtomicBool and OnceLock) so &self methods are sufficient.
1418 ///
1419 /// # Returns
1420 /// An `Option` containing an `Arc<Sync>` if the Sync module is initialized.
1421 pub fn sync(&self) -> Option<Arc<Sync>> {
1422 self.inner.sync.get().map(Arc::clone)
1423 }
1424
1425 /// Flush all pending sync operations.
1426 ///
1427 /// This is a convenience method that processes all queued entries and
1428 /// retries any failed sends. If sync is not enabled, returns Ok(()).
1429 ///
1430 /// This is useful to force pending syncs to complete, e.g. on program shutdown.
1431 ///
1432 /// # Returns
1433 /// `Ok(())` if sync is not enabled or all operations completed successfully,
1434 /// or an error if sends failed.
1435 pub async fn flush_sync(&self) -> Result<()> {
1436 if let Some(sync) = self.sync() {
1437 sync.flush().await
1438 } else {
1439 Ok(())
1440 }
1441 }
1442
1443 // === Entry Write Coordination ===
1444 //
1445 // All entry writes go through Instance::put_entry() which handles backend storage
1446 // and callback dispatch. This centralizes write coordination and ensures hooks fire.
1447
1448 /// Register a per-database callback. Fires for both local and remote writes.
1449 ///
1450 /// Returns the [`CallbackId`] of the registration. Callers wrap this in a
1451 /// [`WriteCallback`] handle (see [`Database::on_write`]) to manage lifetime.
1452 pub(crate) fn register_write_callback<F, Fut>(&self, tree_id: ID, callback: F) -> CallbackId
1453 where
1454 F: for<'a> Fn(&'a WriteEvent, &'a Database) -> Fut + Send + std::marker::Sync + 'static,
1455 Fut: Future<Output = Result<()>> + Send + 'static,
1456 {
1457 let id = CallbackId(self.inner.next_callback_id.fetch_add(1, Ordering::Relaxed));
1458 let cb: AsyncWriteCallbackFn = Arc::new(move |event: &WriteEvent, database: &Database| {
1459 let fut = callback(event, database);
1460 Box::pin(fut) as AsyncWriteCallbackFuture<'_>
1461 });
1462 self.inner
1463 .write_callbacks
1464 .lock()
1465 .unwrap()
1466 .entry(tree_id)
1467 .or_default()
1468 .push((id, cb));
1469 id
1470 }
1471
1472 /// Register a global callback fired for every write across every database.
1473 ///
1474 /// Callers branch on [`WriteEvent::source`] inside the closure if they
1475 /// only care about one source.
1476 ///
1477 /// Note: there is intentionally no `WriteCallback` handle or remove path
1478 /// for global callbacks. The only current caller is sync's permanent hook
1479 /// (OnceLock-guarded) which is registered for the life of the Instance.
1480 /// If a future caller needs lifecycle management on a global callback,
1481 /// add `remove_global_write_callback` and a global variant of
1482 /// `WriteCallback`.
1483 pub(crate) fn register_global_write_callback<F, Fut>(&self, callback: F)
1484 where
1485 F: for<'a> Fn(&'a WriteEvent, &'a Database) -> Fut + Send + std::marker::Sync + 'static,
1486 Fut: Future<Output = Result<()>> + Send + 'static,
1487 {
1488 let id = CallbackId(self.inner.next_callback_id.fetch_add(1, Ordering::Relaxed));
1489 let cb: AsyncWriteCallbackFn = Arc::new(move |event: &WriteEvent, database: &Database| {
1490 let fut = callback(event, database);
1491 Box::pin(fut) as AsyncWriteCallbackFuture<'_>
1492 });
1493 self.inner
1494 .global_write_callbacks
1495 .lock()
1496 .unwrap()
1497 .push((id, cb));
1498 }
1499
1500 /// Remove a per-database callback by id. No-op if not found.
1501 pub(crate) fn remove_write_callback(&self, tree_id: &ID, id: CallbackId) {
1502 let mut callbacks = self.inner.write_callbacks.lock().unwrap();
1503 if let Some(vec) = callbacks.get_mut(tree_id) {
1504 vec.retain(|(cb_id, _)| *cb_id != id);
1505 if vec.is_empty() {
1506 callbacks.remove(tree_id);
1507 }
1508 }
1509 }
1510
1511 /// Acquire (or create) the per-tree async lock that serializes the
1512 /// `snapshot` → backend write → callback dispatch sequence.
1513 ///
1514 /// Without this, two concurrent writers to the same tree both snapshot
1515 /// `previous_tips` before either writes, so the second callback's
1516 /// `previous_tips` would not reflect the first write — breaking the
1517 /// "diff against current tips" contract documented on [`WriteEvent`].
1518 fn tree_lock(&self, tree_id: &ID) -> Arc<tokio::sync::Mutex<()>> {
1519 let mut locks = self.inner.tree_locks.lock().unwrap();
1520 Arc::clone(
1521 locks
1522 .entry(tree_id.clone())
1523 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
1524 )
1525 }
1526
1527 /// Write an entry to the backend and dispatch callbacks.
1528 ///
1529 /// This is the central coordination point for all entry writes in the system.
1530 /// All writes must go through this method to ensure:
1531 /// - Entries are persisted to the backend
1532 /// - Appropriate callbacks are triggered based on write source
1533 /// - Hooks have full context (entry, database, instance)
1534 ///
1535 /// Serialized per-tree against [`Self::put_remote_entries`] so
1536 /// [`WriteEvent::previous_tips`] is consistent.
1537 ///
1538 /// # Arguments
1539 /// * `tree_id` - The root ID of the database being written to
1540 /// * `verification` - Authentication verification status of the entry
1541 /// * `entry` - The entry to write
1542 /// * `source` - Whether this is a local or remote write
1543 ///
1544 /// # Returns
1545 /// A Result indicating success or failure
1546 pub async fn put_entry(
1547 &self,
1548 tree_id: &ID,
1549 verification: crate::backend::VerificationStatus,
1550 entry: Entry,
1551 source: WriteSource,
1552 ) -> Result<()> {
1553 let lock = self.tree_lock(tree_id);
1554 let _guard = lock.lock().await;
1555
1556 // 1. Capture tips before the write so callbacks know what changed.
1557 //
1558 // On a connected (remote) instance, the daemon owns the canonical
1559 // DAG and the client's local backend has nothing to read; reading
1560 // tips here would also gate against the *connection's* login pubkey
1561 // (not the per-DB acting identity from the Database handle), which
1562 // breaks the legitimate "create-a-tree-with-a-non-login-key" flow.
1563 // Remote callbacks therefore see an empty `previous_tips` — this is
1564 // a documented limitation of `Database::on_write` on a connected
1565 // Instance, lifted when the server-push notification path lands.
1566 #[cfg(all(unix, feature = "service"))]
1567 let previous_tips = if self.remote_connection().is_some() {
1568 Vec::new()
1569 } else {
1570 self.snapshot(tree_id).await?.into_tips()
1571 };
1572 #[cfg(not(all(unix, feature = "service")))]
1573 let previous_tips = self.snapshot(tree_id).await?.into_tips();
1574
1575 // 2. Persist to backend storage (and notify server for remote backends)
1576 self.backend()
1577 .write_entry(verification, entry.clone(), source)
1578 .await?;
1579
1580 // 3. Build event and fire callbacks
1581 let event = WriteEvent {
1582 entries: vec![entry],
1583 previous_tips,
1584 source,
1585 };
1586 self.fire_write_callbacks(tree_id, &event).await;
1587
1588 Ok(())
1589 }
1590
1591 /// Store a batch of remotely-received entries and fire callbacks once.
1592 ///
1593 /// This is the correct way to ingest entries from sync. All entries are
1594 /// persisted first, then callbacks fire exactly once with the full batch
1595 /// and the tips from before ingestion. This ensures:
1596 ///
1597 /// - The database is fully consistent when callbacks execute
1598 /// - Callbacks fire once per sync exchange, not once per entry
1599 /// - `previous_tips` lets consumers reconstruct exactly what changed
1600 ///
1601 /// Entries that fail to store are logged and skipped — remaining entries
1602 /// are still stored and callbacks still fire for whatever was persisted.
1603 /// Returns the number of entries that were successfully persisted.
1604 ///
1605 /// Serialized per-tree against [`Self::put_entry`] and other concurrent
1606 /// `put_remote_entries` calls so `previous_tips` is consistent across
1607 /// writers.
1608 ///
1609 /// Entries are stored as [`VerificationStatus::Unverified`] without
1610 /// exception: they arrive from outside this node's local validation pass,
1611 /// so this node has not verified them and a peer cannot assert that it
1612 /// did. A later local re-verification pass may promote them.
1613 ///
1614 /// # Arguments
1615 /// * `tree_id` - The root ID of the database receiving the batch
1616 /// * `entries` - The entries to ingest
1617 pub(crate) async fn put_remote_entries(
1618 &self,
1619 tree_id: &ID,
1620 entries: Vec<Entry>,
1621 ) -> Result<usize> {
1622 if entries.is_empty() {
1623 return Ok(0);
1624 }
1625
1626 let lock = self.tree_lock(tree_id);
1627 let _guard = lock.lock().await;
1628
1629 // 1. Capture tips before any writes
1630 let previous_tips = self.snapshot(tree_id).await?.into_tips();
1631
1632 // 2. Store all entries
1633 let mut stored_entries = Vec::with_capacity(entries.len());
1634 for entry in entries {
1635 match self.backend().put(entry.clone()).await {
1636 Ok(_) => stored_entries.push(entry),
1637 Err(e) => {
1638 tracing::error!(
1639 tree_id = %tree_id,
1640 entry_id = %entry.id(),
1641 "Failed to store remote entry: {}", e
1642 );
1643 }
1644 }
1645 }
1646
1647 let stored_count = stored_entries.len();
1648
1649 // 3. Fire callbacks once for the whole batch
1650 if !stored_entries.is_empty() {
1651 let event = WriteEvent {
1652 entries: stored_entries,
1653 previous_tips,
1654 source: WriteSource::Remote,
1655 };
1656 self.fire_write_callbacks(tree_id, &event).await;
1657 }
1658
1659 Ok(stored_count)
1660 }
1661
1662 /// Dispatch callbacks for a write event.
1663 ///
1664 /// Fires per-database callbacks for `tree_id` then global callbacks. Each
1665 /// callback fires for both local and remote writes; consumers branch on
1666 /// [`WriteEvent::source`] internally.
1667 async fn fire_write_callbacks(&self, tree_id: &ID, event: &WriteEvent) {
1668 let per_db_callbacks = self
1669 .inner
1670 .write_callbacks
1671 .lock()
1672 .unwrap()
1673 .get(tree_id)
1674 .cloned();
1675
1676 let global_callbacks = self.inner.global_write_callbacks.lock().unwrap().clone();
1677
1678 let has_callbacks = per_db_callbacks.is_some() || !global_callbacks.is_empty();
1679 if !has_callbacks {
1680 return;
1681 }
1682
1683 // Create a Database handle for the callbacks
1684 let database = match Database::open(self, tree_id).await {
1685 Ok(db) => db,
1686 Err(e) => {
1687 tracing::error!(tree_id = %tree_id, "Failed to open database for callbacks: {}", e);
1688 return;
1689 }
1690 };
1691
1692 if let Some(callbacks) = per_db_callbacks {
1693 for (id, callback) in callbacks {
1694 if let Err(e) = callback(event, &database).await {
1695 tracing::error!(
1696 tree_id = %tree_id,
1697 source = ?event.source(),
1698 callback_id = ?id,
1699 "Per-database callback failed: {}", e
1700 );
1701 }
1702 }
1703 }
1704
1705 for (id, callback) in global_callbacks {
1706 if let Err(e) = callback(event, &database).await {
1707 tracing::error!(
1708 tree_id = %tree_id,
1709 source = ?event.source(),
1710 callback_id = ?id,
1711 "Global callback failed: {}", e
1712 );
1713 }
1714 }
1715 }
1716
1717 /// Downgrade to a weak reference.
1718 ///
1719 /// Creates a weak reference that does not prevent the Instance from being dropped.
1720 /// This is useful for preventing circular reference cycles in dependent objects.
1721 ///
1722 /// # Returns
1723 /// A `WeakInstance` that can be upgraded back to a strong reference.
1724 pub fn downgrade(&self) -> WeakInstance {
1725 WeakInstance {
1726 inner: Arc::downgrade(&self.inner),
1727 }
1728 }
1729}
1730
1731impl WeakInstance {
1732 /// Upgrade to a strong reference.
1733 ///
1734 /// Attempts to upgrade this weak reference to a strong `Instance` reference.
1735 /// Returns `None` if the Instance has already been dropped.
1736 ///
1737 /// # Returns
1738 /// `Some(Instance)` if the Instance still exists, `None` otherwise.
1739 ///
1740 /// # Example
1741 /// ```
1742 /// # use eidetica::{Instance, NewUser};
1743 /// # #[tokio::main]
1744 /// # async fn main() -> eidetica::Result<()> {
1745 /// let (instance, maybe_user) = Instance::connect_or_create(
1746 /// "memory://",
1747 /// NewUser::passwordless("alice"),
1748 /// ).await?;
1749 /// let user = maybe_user.expect("memory:// is always fresh");
1750 /// let weak = instance.downgrade();
1751 ///
1752 /// // Upgrade works while instance exists
1753 /// assert!(weak.upgrade().is_some());
1754 ///
1755 /// // User holds its own strong handle to the Instance — drop it too so
1756 /// // the weak upgrade can fail.
1757 /// drop(user);
1758 /// drop(instance);
1759 /// // Upgrade fails after instance is dropped
1760 /// assert!(weak.upgrade().is_none());
1761 /// # Ok(())
1762 /// # }
1763 /// ```
1764 pub fn upgrade(&self) -> Option<Instance> {
1765 self.inner.upgrade().map(|inner| Instance { inner })
1766 }
1767}
1768
1769// ============ URL-dispatch backend constructors ============
1770
1771#[cfg(feature = "sqlite")]
1772async fn open_sqlite_backend(url: &str) -> Result<Box<dyn BackendImpl>> {
1773 let backend = crate::backend::database::Sqlite::connect(url).await?;
1774 Ok(Box::new(backend))
1775}
1776
1777#[cfg(not(feature = "sqlite"))]
1778async fn open_sqlite_backend(_url: &str) -> Result<Box<dyn BackendImpl>> {
1779 Err(InstanceError::BackendUnavailable {
1780 scheme: "sqlite",
1781 missing_feature: "sqlite",
1782 }
1783 .into())
1784}
1785
1786#[cfg(feature = "postgres")]
1787async fn open_postgres_backend(url: &str) -> Result<Box<dyn BackendImpl>> {
1788 let backend = crate::backend::database::Postgres::connect(url).await?;
1789 Ok(Box::new(backend))
1790}
1791
1792#[cfg(not(feature = "postgres"))]
1793async fn open_postgres_backend(_url: &str) -> Result<Box<dyn BackendImpl>> {
1794 Err(InstanceError::BackendUnavailable {
1795 scheme: "postgres",
1796 missing_feature: "postgres",
1797 }
1798 .into())
1799}