eidetica/instance/mod.rs
1//!
2//! Provides the main database structures (`Instance` and `Database`).
3//!
4//! `Instance` manages multiple `Database` instances and interacts with the storage `Database`.
5//! `Database` represents a single, independent history of data entries, analogous to a table or branch.
6
7use std::{
8 collections::HashMap,
9 future::Future,
10 pin::Pin,
11 sync::{Arc, Mutex, Weak},
12};
13
14use handle_trait::Handle;
15
16use crate::{
17 Clock, Database, Entry, Result, SystemClock,
18 auth::crypto::{PrivateKey, PublicKey},
19 backend::{BackendImpl, InstanceMetadata},
20 database::DatabaseKey,
21 entry::ID,
22 sync::Sync,
23 user::User,
24};
25
26pub mod backend;
27pub mod errors;
28pub mod settings_merge;
29
30#[cfg(test)]
31mod tests;
32
33// Re-export main types for easier access
34use backend::Backend;
35pub use errors::InstanceError;
36
37/// Indicates whether an entry write originated locally or from a remote source (e.g., sync).
38///
39/// This distinction allows different callbacks to be triggered based on the write source,
40/// enabling behaviors like "only trigger sync for local writes" or "only update UI for remote writes".
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub enum WriteSource {
43 /// Write originated from a local transaction commit
44 Local,
45 /// Write originated from a remote source (e.g., sync, replication)
46 Remote,
47}
48
49/// Type alias for async write callback return type.
50///
51/// We use a boxed future for callbacks. The future is `Send` since internal
52/// operations use `Arc`/`Mutex` for thread-safety.
53pub type AsyncWriteCallbackFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
54
55/// Async callback function type for write operations.
56///
57/// Receives the entry that was written, the database it was written to, and the instance.
58/// Returns a boxed future that resolves to a Result.
59/// Used for both local and remote write callbacks.
60pub type AsyncWriteCallback = Arc<
61 dyn for<'a> Fn(&'a Entry, &'a Database, &'a Instance) -> AsyncWriteCallbackFuture<'a>
62 + Send
63 + std::marker::Sync,
64>;
65
66/// Type alias for a collection of write callbacks
67type CallbackVec = Vec<AsyncWriteCallback>;
68
69/// Type alias for the per-database callback map key
70type CallbackKey = (WriteSource, ID);
71
72/// Internal state for Instance
73///
74/// This structure holds the actual implementation data for Instance.
75/// Instance itself is just a cheap-to-clone handle wrapping Arc<InstanceInternal>.
76pub(crate) struct InstanceInternal {
77 /// The database storage backend
78 backend: Backend,
79 /// Time provider for timestamps
80 clock: Arc<dyn Clock>,
81 /// Synchronization module for this database instance
82 /// TODO: Overengineered, Sync can be created by default but disabled
83 sync: std::sync::OnceLock<Arc<Sync>>,
84 /// Root ID of the _users system database
85 users_db_id: ID,
86 /// Root ID of the _databases system database
87 databases_db_id: ID,
88 /// Device signing key - the instance's cryptographic identity
89 device_key: PrivateKey,
90 /// Per-database callbacks keyed by (WriteSource, tree_id)
91 write_callbacks: Mutex<HashMap<CallbackKey, CallbackVec>>,
92 /// Global callbacks keyed by WriteSource (triggered regardless of database)
93 global_write_callbacks: Mutex<HashMap<WriteSource, CallbackVec>>,
94}
95
96impl std::fmt::Debug for InstanceInternal {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.debug_struct("InstanceInternal")
99 .field("backend", &"<BackendDB>")
100 .field("clock", &self.clock)
101 .field("sync", &self.sync)
102 .field("users_db_id", &self.users_db_id)
103 .field("databases_db_id", &self.databases_db_id)
104 .field(
105 "write_callbacks",
106 &format!(
107 "<{} per-db callbacks>",
108 self.write_callbacks.lock().unwrap().len()
109 ),
110 )
111 .field(
112 "global_write_callbacks",
113 &format!(
114 "<{} global callbacks>",
115 self.global_write_callbacks.lock().unwrap().len()
116 ),
117 )
118 .finish()
119 }
120}
121/// Database implementation on top of the storage backend.
122///
123/// Instance manages infrastructure only:
124/// - Backend storage and device identity
125/// - System databases (_users, _databases, _sync)
126/// - User account management (create, login, list)
127///
128/// All database creation and key operations happen through User after login.
129///
130/// Instance is a cheap-to-clone handle around `Arc<InstanceInternal>`.
131///
132/// ## Example
133///
134/// ```
135/// # use eidetica::{backend::database::InMemory, Instance, crdt::Doc};
136/// # #[tokio::main]
137/// # async fn main() -> eidetica::Result<()> {
138/// let instance = Instance::open(Box::new(InMemory::new())).await?;
139///
140/// // Create passwordless user
141/// instance.create_user("alice", None).await?;
142/// let mut user = instance.login_user("alice", None).await?;
143///
144/// // Use User API for operations
145/// let mut settings = Doc::new();
146/// settings.set("name", "my_database");
147/// let default_key = user.get_default_key()?;
148/// let db = user.create_database(settings, &default_key).await?;
149/// # Ok(())
150/// # }
151/// ```
152#[derive(Clone, Debug, Handle)]
153pub struct Instance {
154 inner: Arc<InstanceInternal>,
155}
156
157/// Weak reference to an Instance.
158///
159/// This is a weak handle that does not prevent the Instance from being dropped.
160/// Dependent objects (Database, Sync, BackgroundSync) hold weak references to avoid
161/// circular reference cycles that would leak memory.
162///
163/// Use `upgrade()` to convert to a strong `Instance` reference.
164#[derive(Clone, Debug, Handle)]
165pub struct WeakInstance {
166 inner: Weak<InstanceInternal>,
167}
168
169impl Instance {
170 /// Load an existing Instance or create a new one (recommended).
171 ///
172 /// This is the recommended method for initializing an Instance. It automatically detects
173 /// whether the backend contains existing system state (device key and system databases)
174 /// and loads them, or creates new ones if starting fresh.
175 ///
176 /// Instance manages infrastructure only:
177 /// - Backend storage and device identity
178 /// - System databases (_users, _databases, _sync)
179 /// - User account management (create, login, list)
180 ///
181 /// All database creation and key operations require explicit User login.
182 ///
183 /// # Arguments
184 /// * `backend` - The storage backend to use
185 ///
186 /// # Returns
187 /// A Result containing the configured Instance
188 ///
189 /// # Example
190 /// ```
191 /// # use eidetica::{backend::database::InMemory, Instance, crdt::Doc};
192 /// # #[tokio::main]
193 /// # async fn main() -> eidetica::Result<()> {
194 /// let backend = InMemory::new();
195 /// let instance = Instance::open(Box::new(backend)).await?;
196 ///
197 /// // Create and login user explicitly
198 /// instance.create_user("alice", None).await?;
199 /// let mut user = instance.login_user("alice", None).await?;
200 ///
201 /// // Use User API for operations
202 /// let mut settings = Doc::new();
203 /// settings.set("name", "my_database");
204 /// let default_key = user.get_default_key()?;
205 /// let db = user.create_database(settings, &default_key).await?;
206 /// # Ok(())
207 /// # }
208 /// ```
209 pub async fn open(backend: Box<dyn BackendImpl>) -> Result<Self> {
210 Self::open_impl(backend, Arc::new(SystemClock)).await
211 }
212
213 /// Load an existing Instance or create a new one with a custom clock.
214 ///
215 /// This is the same as [`Instance::open`] but allows injecting a custom clock
216 /// for controllable timestamps in tests. The clock is used for timestamps in
217 /// height calculations and peer tracking.
218 ///
219 /// Only available with the `testing` feature or in test builds.
220 ///
221 /// # Arguments
222 /// * `backend` - The storage backend to use
223 /// * `clock` - The time provider to use (typically [`FixedClock`](crate::FixedClock))
224 ///
225 /// # Returns
226 /// A Result containing the configured Instance
227 #[cfg(any(test, feature = "testing"))]
228 pub async fn open_with_clock(
229 backend: Box<dyn BackendImpl>,
230 clock: Arc<dyn Clock>,
231 ) -> Result<Self> {
232 Self::open_impl(backend, clock).await
233 }
234
235 /// Internal implementation of open that works with any clock.
236 async fn open_impl(backend: Box<dyn BackendImpl>, clock: Arc<dyn Clock>) -> Result<Self> {
237 let backend: Arc<dyn BackendImpl> = Arc::from(backend);
238
239 // Check for existing InstanceMetadata
240 match backend.get_instance_metadata().await? {
241 Some(metadata) => {
242 // Existing backend: load from metadata
243 let inner = Arc::new(InstanceInternal {
244 backend: Backend::new(backend),
245 clock,
246 sync: std::sync::OnceLock::new(),
247 users_db_id: metadata.users_db,
248 databases_db_id: metadata.databases_db,
249 device_key: metadata.device_key,
250 write_callbacks: Mutex::new(HashMap::new()),
251 global_write_callbacks: Mutex::new(HashMap::new()),
252 });
253 Ok(Self { inner })
254 }
255 None => {
256 // New backend: initialize
257 Self::create_internal(backend, clock).await
258 }
259 }
260 }
261
262 /// Create a new Instance on a fresh backend (strict creation).
263 ///
264 /// This method creates a new Instance and fails if the backend is already initialized
265 /// (contains a device key and system databases). Use this when you want to ensure
266 /// you're creating a fresh instance.
267 ///
268 /// Instance manages infrastructure only:
269 /// - Backend storage and device identity
270 /// - System databases (_users, _databases, _sync)
271 /// - User account management (create, login, list)
272 ///
273 /// All database creation and key operations require explicit User login.
274 ///
275 /// For most use cases, prefer `Instance::open()` which automatically handles both
276 /// new and existing backends.
277 ///
278 /// # Arguments
279 /// * `backend` - The storage backend to use (must be uninitialized)
280 ///
281 /// # Returns
282 /// A Result containing the configured Instance, or InstanceAlreadyExists error
283 /// if the backend is already initialized.
284 ///
285 /// # Example
286 /// ```
287 /// # use eidetica::{backend::database::InMemory, Instance, crdt::Doc};
288 /// # #[tokio::main]
289 /// # async fn main() -> eidetica::Result<()> {
290 /// let backend = InMemory::new();
291 /// let instance = Instance::create(Box::new(backend)).await?;
292 ///
293 /// // Create and login user explicitly
294 /// instance.create_user("alice", None).await?;
295 /// let mut user = instance.login_user("alice", None).await?;
296 ///
297 /// // Use User API for operations
298 /// let mut settings = Doc::new();
299 /// settings.set("name", "my_database");
300 /// let default_key = user.get_default_key()?;
301 /// let db = user.create_database(settings, &default_key).await?;
302 /// # Ok(())
303 /// # }
304 /// ```
305 pub async fn create(backend: Box<dyn BackendImpl>) -> Result<Self> {
306 let backend: Arc<dyn BackendImpl> = Arc::from(backend);
307
308 // Check if already initialized
309 if backend.get_instance_metadata().await?.is_some() {
310 return Err(InstanceError::InstanceAlreadyExists.into());
311 }
312
313 // Create new instance
314 Self::create_internal(backend, Arc::new(SystemClock)).await
315 }
316
317 /// Internal implementation of new that works with Arc<dyn BackendImpl>
318 pub(crate) async fn create_internal(
319 backend: Arc<dyn BackendImpl>,
320 clock: Arc<dyn Clock>,
321 ) -> Result<Self> {
322 use crate::{
323 auth::crypto::generate_keypair,
324 user::system_databases::{create_databases_tracking, create_users_database},
325 };
326
327 // 1. Generate device key
328 let (device_key, _device_pubkey) = generate_keypair();
329
330 // 2. Create system databases with device_key passed directly
331 // Create a temporary Instance for database creation (databases will store full IDs later)
332 //
333 // SAFETY: The temporary instance has empty users_db_id and databases_db_id placeholders.
334 // This is safe because:
335 // 1. We only use it to create new system databases via Database::create()
336 // 2. Database::create() doesn't access the instance's system database IDs
337 // 3. The system databases don't exist yet, so their IDs can't be referenced
338 // 4. The temporary instance is only used during initial setup and discarded
339 // 5. The real instance is constructed afterward with the correct database IDs
340 let temp_instance = Self {
341 inner: Arc::new(InstanceInternal {
342 backend: Backend::new(Arc::clone(&backend)),
343 clock: Arc::clone(&clock),
344 sync: std::sync::OnceLock::new(),
345 users_db_id: ID::from(""), // Placeholder - system DBs don't exist yet
346 databases_db_id: ID::from(""), // Placeholder - system DBs don't exist yet
347 device_key: device_key.clone(), // Use the actual key for signing
348 write_callbacks: Mutex::new(HashMap::new()),
349 global_write_callbacks: Mutex::new(HashMap::new()),
350 }),
351 };
352 let users_db = create_users_database(&temp_instance, &device_key).await?;
353 let databases_db = create_databases_tracking(&temp_instance, &device_key).await?;
354
355 // 3. Save metadata (marks instance as initialized)
356 let metadata = InstanceMetadata {
357 device_key: device_key.clone(),
358 users_db: users_db.root_id().clone(),
359 databases_db: databases_db.root_id().clone(),
360 sync_db: None,
361 };
362 backend.set_instance_metadata(&metadata).await?;
363
364 // 4. Build real instance and return
365 let inner = Arc::new(InstanceInternal {
366 backend: Backend::new(backend),
367 clock,
368 sync: std::sync::OnceLock::new(),
369 users_db_id: users_db.root_id().clone(),
370 databases_db_id: databases_db.root_id().clone(),
371 device_key,
372 write_callbacks: Mutex::new(HashMap::new()),
373 global_write_callbacks: Mutex::new(HashMap::new()),
374 });
375
376 Ok(Self { inner })
377 }
378
379 /// Get a reference to the backend
380 pub fn backend(&self) -> &Backend {
381 &self.inner.backend
382 }
383
384 /// Check if an entry exists in storage.
385 pub async fn has_entry(&self, id: &ID) -> bool {
386 self.inner.backend.get(id).await.is_ok()
387 }
388
389 /// Check if a database is present locally.
390 ///
391 /// This differs from `has_entry` in that it checks for the active tracking
392 /// of the database by the Instance. This method checks if we're tracking
393 /// the database's tip state.
394 pub async fn has_database(&self, root_id: &ID) -> bool {
395 match self.inner.backend.get_tips(root_id).await {
396 Ok(tips) => !tips.is_empty(),
397 Err(_) => false,
398 }
399 }
400
401 /// Get a reference to the clock.
402 ///
403 /// The clock is used for timestamps in height calculations and peer tracking.
404 pub(crate) fn clock(&self) -> &dyn Clock {
405 &*self.inner.clock
406 }
407
408 /// Get a cloned Arc of the clock.
409 ///
410 /// Used when passing the clock to components that need ownership (e.g., HeightCalculator).
411 pub(crate) fn clock_arc(&self) -> Arc<dyn Clock> {
412 self.inner.clock.clone()
413 }
414
415 // === Backend pass-through methods (pub(crate) for internal use) ===
416
417 /// Get an entry from the backend
418 pub(crate) async fn get(&self, id: &crate::entry::ID) -> Result<crate::entry::Entry> {
419 self.inner.backend.get(id).await
420 }
421
422 /// Put an entry into the backend
423 pub(crate) async fn put(
424 &self,
425 verification_status: crate::backend::VerificationStatus,
426 entry: crate::entry::Entry,
427 ) -> Result<()> {
428 self.inner.backend.put(verification_status, entry).await
429 }
430
431 /// Get tips for a tree
432 pub(crate) async fn get_tips(&self, tree: &crate::entry::ID) -> Result<Vec<crate::entry::ID>> {
433 self.inner.backend.get_tips(tree).await
434 }
435
436 // === System database accessors ===
437
438 /// Get the _users database
439 ///
440 /// This constructs a Database instance on-the-fly to avoid circular references.
441 pub(crate) async fn users_db(&self) -> Result<Database> {
442 Database::open(
443 self.clone(),
444 &self.inner.users_db_id,
445 DatabaseKey::new(self.inner.device_key.clone()),
446 )
447 .await
448 }
449
450 // === User Management ===
451
452 /// Create a new user account with flexible password handling.
453 ///
454 /// Creates a user with or without password protection. Passwordless users are appropriate
455 /// for embedded applications where filesystem access = database access.
456 ///
457 /// # Arguments
458 /// * `user_id` - Unique user identifier (username)
459 /// * `password` - Optional password. If None, user is passwordless (instant login, no encryption)
460 ///
461 /// # Returns
462 /// A Result containing the user's UUID (stable internal identifier)
463 pub async fn create_user(&self, user_id: &str, password: Option<&str>) -> Result<String> {
464 use crate::user::system_databases::create_user;
465
466 let users_db = self.users_db().await?;
467 let (user_uuid, _user_info) = create_user(&users_db, self, user_id, password).await?;
468 Ok(user_uuid)
469 }
470
471 /// Login a user with flexible password handling.
472 ///
473 /// Returns a User session object that provides access to user operations.
474 /// For password-protected users, provide the password. For passwordless users, pass None.
475 ///
476 /// # Arguments
477 /// * `user_id` - User identifier (username)
478 /// * `password` - Optional password. None for passwordless users.
479 ///
480 /// # Returns
481 /// A Result containing the User session
482 pub async fn login_user(&self, user_id: &str, password: Option<&str>) -> Result<User> {
483 use crate::user::system_databases::login_user;
484
485 let users_db = self.users_db().await?;
486 login_user(&users_db, self, user_id, password).await
487 }
488
489 /// List all user IDs.
490 ///
491 /// # Returns
492 /// A Result containing a vector of user IDs
493 pub async fn list_users(&self) -> Result<Vec<String>> {
494 use crate::user::system_databases::list_users;
495
496 let users_db = self.users_db().await?;
497 list_users(&users_db).await
498 }
499
500 // === User-Sync Integration ===
501
502 // === Device Identity Management ===
503 //
504 // The Instance's device identity is stored in InstanceMetadata and cached in memory.
505
506 /// Get the device signing key.
507 ///
508 /// # Internal Use Only
509 ///
510 /// This method provides direct access to the instance's cryptographic identity
511 /// and is intended for internal operations that require the device key (sync,
512 /// system database creation, authentication validation, etc.).
513 ///
514 /// These operations should only be performed by the server/instance administrator,
515 /// but we don't verify that yet. Future versions may add admin permission checks.
516 ///
517 /// Similar to `Database::open_unauthenticated`, this is a controlled escape hatch
518 /// for internal library operations. Use with care - prefer User API for normal operations.
519 #[cfg(not(any(test, feature = "testing")))]
520 pub(crate) fn device_key(&self) -> &PrivateKey {
521 &self.inner.device_key
522 }
523
524 /// Test-only: Get the device signing key.
525 ///
526 /// This is exposed for testing purposes only. In production, use the User API.
527 #[cfg(any(test, feature = "testing"))]
528 pub fn device_key(&self) -> &PrivateKey {
529 &self.inner.device_key
530 }
531
532 /// Get the device ID (public key).
533 ///
534 /// # Returns
535 /// The device's public key (device ID).
536 pub fn device_id(&self) -> PublicKey {
537 self.inner.device_key.public_key()
538 }
539
540 // === Synchronization Management ===
541 //
542 // These methods provide access to the Sync module for managing synchronization
543 // settings and state for this database instance.
544
545 /// Initializes the Sync module for this instance.
546 ///
547 /// Enables synchronization operations for this instance. This method is idempotent;
548 /// calling it multiple times has no effect.
549 ///
550 /// # Errors
551 /// Returns an error if the sync settings database cannot be created or if device key
552 /// generation/storage fails.
553 pub async fn enable_sync(&self) -> Result<()> {
554 // Check if there is an existing Sync already loaded
555 if self.inner.sync.get().is_some() {
556 return Ok(());
557 }
558
559 // Check InstanceMetadata for existing sync_db
560 let metadata = self
561 .backend()
562 .get_instance_metadata()
563 .await?
564 .ok_or(InstanceError::DeviceKeyNotFound)?; // Metadata must exist if instance is initialized
565
566 let sync = if let Some(ref sync_db) = metadata.sync_db {
567 // Load existing sync tree
568 Sync::load(self.clone(), sync_db).await?
569 } else {
570 // Create new sync tree
571 let sync = Sync::new(self.clone()).await?;
572
573 // Save sync_db to metadata
574 let mut new_metadata = metadata;
575 new_metadata.sync_db = Some(sync.sync_tree_root_id().clone());
576 self.backend().set_instance_metadata(&new_metadata).await?;
577
578 sync
579 };
580
581 let sync_arc = Arc::new(sync);
582
583 // Initialize the sync engine (no transports registered yet)
584 // Users should call register_transport() to add transports
585 sync_arc.start_background_sync()?;
586
587 // Register global callback for automatic sync on local writes
588 let sync_for_callback = Arc::clone(&sync_arc);
589 self.register_global_write_callback(
590 WriteSource::Local,
591 move |entry, database, instance| {
592 let sync = Arc::clone(&sync_for_callback);
593 let entry = entry.clone();
594 let database = database.clone();
595 let instance = instance.clone();
596 async move { sync.on_local_write(&entry, &database, &instance).await }
597 },
598 )?;
599
600 let _ = self.inner.sync.set(sync_arc);
601 Ok(())
602 }
603
604 /// Get a reference to the Sync module.
605 ///
606 /// Returns a cheap-to-clone Arc handle to the Sync module. The Sync module
607 /// uses interior mutability (AtomicBool and OnceLock) so &self methods are sufficient.
608 ///
609 /// # Returns
610 /// An `Option` containing an `Arc<Sync>` if the Sync module is initialized.
611 pub fn sync(&self) -> Option<Arc<Sync>> {
612 self.inner.sync.get().map(Arc::clone)
613 }
614
615 /// Flush all pending sync operations.
616 ///
617 /// This is a convenience method that processes all queued entries and
618 /// retries any failed sends. If sync is not enabled, returns Ok(()).
619 ///
620 /// This is useful to force pending syncs to complete, e.g. on program shutdown.
621 ///
622 /// # Returns
623 /// `Ok(())` if sync is not enabled or all operations completed successfully,
624 /// or an error if sends failed.
625 pub async fn flush_sync(&self) -> Result<()> {
626 if let Some(sync) = self.sync() {
627 sync.flush().await
628 } else {
629 Ok(())
630 }
631 }
632
633 // === Entry Write Coordination ===
634 //
635 // All entry writes go through Instance::put_entry() which handles backend storage
636 // and callback dispatch. This centralizes write coordination and ensures hooks fire.
637
638 /// Register a callback to be invoked when entries are written to a database.
639 ///
640 /// The callback receives the entry, database, and instance as parameters.
641 ///
642 /// # Arguments
643 /// * `source` - The write source to monitor (Local or Remote)
644 /// * `tree_id` - The root ID of the database tree to monitor
645 /// * `callback` - Function to invoke on writes
646 ///
647 /// # Returns
648 /// A Result indicating success or failure
649 pub(crate) fn register_write_callback<F, Fut>(
650 &self,
651 source: WriteSource,
652 tree_id: ID,
653 callback: F,
654 ) -> Result<()>
655 where
656 F: for<'a> Fn(&'a Entry, &'a Database, &'a Instance) -> Fut
657 + Send
658 + std::marker::Sync
659 + 'static,
660 Fut: Future<Output = Result<()>> + Send + 'static,
661 {
662 let mut callbacks = self.inner.write_callbacks.lock().unwrap();
663 callbacks
664 .entry((source, tree_id))
665 .or_default()
666 .push(Arc::new(
667 move |entry: &Entry, database: &Database, instance: &Instance| {
668 let fut = callback(entry, database, instance);
669 Box::pin(fut) as AsyncWriteCallbackFuture<'_>
670 },
671 ));
672 Ok(())
673 }
674
675 /// Register a global callback to be invoked on all writes of a specific source.
676 ///
677 /// Global callbacks are invoked for all writes of the specified source across all databases.
678 /// This is useful for system-wide operations like synchronization that need to track
679 /// changes across all databases.
680 ///
681 /// # Arguments
682 /// * `source` - The write source to monitor (Local or Remote)
683 /// * `callback` - Function to invoke on all writes
684 ///
685 /// # Returns
686 /// A Result indicating success or failure
687 pub(crate) fn register_global_write_callback<F, Fut>(
688 &self,
689 source: WriteSource,
690 callback: F,
691 ) -> Result<()>
692 where
693 F: for<'a> Fn(&'a Entry, &'a Database, &'a Instance) -> Fut
694 + Send
695 + std::marker::Sync
696 + 'static,
697 Fut: Future<Output = Result<()>> + Send + 'static,
698 {
699 let mut callbacks = self.inner.global_write_callbacks.lock().unwrap();
700 callbacks.entry(source).or_default().push(Arc::new(
701 move |entry: &Entry, database: &Database, instance: &Instance| {
702 let fut = callback(entry, database, instance);
703 Box::pin(fut) as AsyncWriteCallbackFuture<'_>
704 },
705 ));
706 Ok(())
707 }
708
709 /// Write an entry to the backend and dispatch callbacks.
710 ///
711 /// This is the central coordination point for all entry writes in the system.
712 /// All writes must go through this method to ensure:
713 /// - Entries are persisted to the backend
714 /// - Appropriate callbacks are triggered based on write source
715 /// - Hooks have full context (entry, database, instance)
716 ///
717 /// # Arguments
718 /// * `tree_id` - The root ID of the database being written to
719 /// * `verification` - Authentication verification status of the entry
720 /// * `entry` - The entry to write
721 /// * `source` - Whether this is a local or remote write
722 ///
723 /// # Returns
724 /// A Result indicating success or failure
725 pub async fn put_entry(
726 &self,
727 tree_id: &ID,
728 verification: crate::backend::VerificationStatus,
729 entry: Entry,
730 source: WriteSource,
731 ) -> Result<()> {
732 // 1. Persist to backend storage
733 self.backend().put(verification, entry.clone()).await?;
734
735 // 2. Look up and execute callbacks based on write source
736 // Clone the callbacks to avoid holding the lock while executing callbacks.
737 let per_db_callbacks = self
738 .inner
739 .write_callbacks
740 .lock()
741 .unwrap()
742 .get(&(source, tree_id.clone()))
743 .cloned();
744
745 let global_callbacks = self
746 .inner
747 .global_write_callbacks
748 .lock()
749 .unwrap()
750 .get(&source)
751 .cloned();
752
753 // 3. Execute callbacks if any are registered
754 let has_callbacks = per_db_callbacks.is_some() || global_callbacks.is_some();
755 if has_callbacks {
756 // Create a Database handle for the callbacks
757 // Use open_readonly since we only need it for callback context
758 let database = Database::open_unauthenticated(tree_id.clone(), self)?;
759
760 // Execute per-database callbacks
761 if let Some(callbacks) = per_db_callbacks {
762 for callback in callbacks {
763 if let Err(e) = callback(&entry, &database, self).await {
764 tracing::error!(
765 tree_id = %tree_id,
766 entry_id = %entry.id(),
767 source = ?source,
768 "Per-database callback failed: {}", e
769 );
770 // Continue executing other callbacks even if one fails
771 }
772 }
773 }
774
775 // Execute global callbacks
776 if let Some(callbacks) = global_callbacks {
777 for callback in callbacks {
778 if let Err(e) = callback(&entry, &database, self).await {
779 tracing::error!(
780 tree_id = %tree_id,
781 entry_id = %entry.id(),
782 source = ?source,
783 "Global callback failed: {}", e
784 );
785 // Continue executing other callbacks even if one fails
786 }
787 }
788 }
789 }
790
791 Ok(())
792 }
793
794 /// Downgrade to a weak reference.
795 ///
796 /// Creates a weak reference that does not prevent the Instance from being dropped.
797 /// This is useful for preventing circular reference cycles in dependent objects.
798 ///
799 /// # Returns
800 /// A `WeakInstance` that can be upgraded back to a strong reference.
801 pub fn downgrade(&self) -> WeakInstance {
802 WeakInstance {
803 inner: Arc::downgrade(&self.inner),
804 }
805 }
806}
807
808impl WeakInstance {
809 /// Upgrade to a strong reference.
810 ///
811 /// Attempts to upgrade this weak reference to a strong `Instance` reference.
812 /// Returns `None` if the Instance has already been dropped.
813 ///
814 /// # Returns
815 /// `Some(Instance)` if the Instance still exists, `None` otherwise.
816 ///
817 /// # Example
818 /// ```
819 /// # use eidetica::{backend::database::InMemory, Instance};
820 /// # #[tokio::main]
821 /// # async fn main() -> eidetica::Result<()> {
822 /// let instance = Instance::open(Box::new(InMemory::new())).await?;
823 /// let weak = instance.downgrade();
824 ///
825 /// // Upgrade works while instance exists
826 /// assert!(weak.upgrade().is_some());
827 ///
828 /// drop(instance);
829 /// // Upgrade fails after instance is dropped
830 /// assert!(weak.upgrade().is_none());
831 /// # Ok(())
832 /// # }
833 /// ```
834 pub fn upgrade(&self) -> Option<Instance> {
835 self.inner.upgrade().map(|inner| Instance { inner })
836 }
837}