Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/transaction/
mod.rs

1//! Transaction system for atomic database modifications
2//!
3//! This module provides the transaction API for making atomic changes to an Eidetica database.
4//! Transactions ensure that all changes within a transaction are applied atomically and maintain
5//! proper parent-child relationships in the Merkle-CRDT DAG structure.
6//!
7//! # Subtree Parent Management
8//!
9//! One of the critical responsibilities of the transaction system is establishing proper
10//! subtree parent relationships. When a store (subtree) is accessed for the first time
11//! in a transaction, the system must determine the correct parent entries for that subtree.
12//! This involves:
13//!
14//! 1. Checking for existing subtree tips (leaf nodes)
15//! 2. If no tips exist, traversing the DAG to find reachable subtree entries
16//! 3. Setting appropriate parent relationships (empty for first entry, or proper parents)
17
18pub mod errors;
19
20#[cfg(test)]
21mod tests;
22
23use std::{
24    collections::HashMap,
25    sync::{
26        Arc, Mutex,
27        atomic::{AtomicBool, Ordering},
28    },
29};
30
31pub use errors::TransactionError;
32use serde::{Deserialize, Serialize};
33
34use crate::{
35    Database, Result, Snapshot, Store,
36    auth::{
37        AuthSettings,
38        crypto::{PrivateKey, sign_entry},
39        types::{SigInfo, SigKey},
40        validation::AuthValidator,
41    },
42    backend::VerificationStatus,
43    constants::{INDEX, ROOT, SETTINGS},
44    crdt::{CRDT, Data, Doc, doc::Value},
45    entry::{Entry, EntryBuilder, ID},
46    height::HeightStrategy,
47    instance::WriteSource,
48    store::{Registry, SettingsStore, StoreError},
49};
50
51/// Creates a synthetic entry ID for multi-tip merged CRDT state caching.
52///
53/// Tips are sorted to ensure deterministic keys regardless of input order.
54/// The resulting ID has format `merge:{tip1}:{tip2}:...` which is distinct
55/// from real content-addressed entry IDs.
56fn create_merge_cache_id(tip_ids: &[ID]) -> ID {
57    let mut sorted_tips = tip_ids.to_vec();
58    sorted_tips.sort();
59
60    // Create a deterministic cache key by hashing the sorted tip IDs
61    let mut key = String::from("merge");
62    for tip in &sorted_tips {
63        key.push(':');
64        key.push_str(&tip.to_string());
65    }
66    ID::from_bytes(key.as_bytes())
67}
68
69/// Trait for encrypting/decrypting subtree data transparently
70///
71/// Encryptors are registered with a Transaction for specific subtrees, allowing
72/// transparent encryption/decryption at the transaction boundary. When an encryptor
73/// is registered:
74///
75/// - `get_full_state()` decrypts each historical entry before CRDT merging
76/// - `get_local_data()` returns plaintext (cached in EntryBuilder)
77/// - `update_subtree()` stores plaintext in cache, encrypted on commit
78///
79/// This ensures proper CRDT merge semantics while keeping data encrypted at rest.
80///
81/// # Wire Format
82///
83/// The trait operates on raw bytes, allowing implementations to define their own
84/// wire format. For example, AES-GCM implementations typically use `nonce || ciphertext`.
85/// Entry subtree payloads are opaque bytes, so ciphertext is stored verbatim with no
86/// additional encoding.
87///
88/// # Example
89///
90/// ```rust,ignore
91/// struct PasswordEncryptor { /* ... */ }
92///
93/// impl Encryptor for PasswordEncryptor {
94///     fn decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>> {
95///         let (nonce, ct) = ciphertext.split_at(12);
96///         // decrypt with nonce and ciphertext...
97///     }
98///
99///     fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>> {
100///         let nonce = generate_nonce();
101///         let ct = encrypt(plaintext, &nonce);
102///         // return nonce || ciphertext
103///     }
104/// }
105/// ```
106pub(crate) trait Encryptor: Send + Sync {
107    /// Decrypt ciphertext bytes to plaintext bytes
108    ///
109    /// # Arguments
110    /// * `ciphertext` - Encrypted data in implementation-defined format
111    ///
112    /// # Returns
113    /// Plaintext bytes in whatever format the wrapped store produces (e.g.
114    /// JSON for `DocStore`/`Table`, binary Yrs updates for `YDoc`).
115    fn decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>>;
116
117    /// Encrypt plaintext bytes to ciphertext bytes
118    ///
119    /// # Arguments
120    /// * `plaintext` - Bytes to encrypt; format is whatever the wrapped store
121    ///   produces (JSON, binary CRDT update, etc.). The Encryptor itself does
122    ///   not interpret the bytes.
123    ///
124    /// # Returns
125    /// Encrypted data in implementation-defined format
126    fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>>;
127}
128
129/// Metadata structure for entries
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub(crate) struct EntryMetadata {
132    /// Snapshot of the `_settings` subtree at the time this entry was created.
133    /// This is the entry's **pin**: the exact `_settings` state its
134    /// signature must be validated against. Used for sync performance,
135    /// sparse-checkout validation, and deferred re-verification.
136    ///
137    /// Wire name remains `settings_tips` for on-disk stability — `Snapshot`
138    /// serializes as a bare ID array, identical to the legacy `Vec<ID>` shape.
139    #[serde(rename = "settings_tips")]
140    pub(crate) settings_snapshot: Snapshot,
141    /// Random entropy for ensuring unique IDs for root entries
142    pub(crate) entropy: Option<u64>,
143}
144
145/// Represents a single, atomic transaction for modifying a `Database`.
146///
147/// An `Transaction` encapsulates a mutable `EntryBuilder` being constructed. Users interact with
148/// specific `Store` instances obtained via `Transaction::get_store` to stage changes.
149/// All staged changes across different subtrees within the transaction are recorded
150/// in the internal `EntryBuilder`.
151///
152/// When `commit()` is called, the transaction:
153/// 1. Finalizes the `EntryBuilder` by building an immutable `Entry`
154/// 2. Calculates the entry's content-addressable ID
155/// 3. Ensures the correct parent links are set based on the tree's state
156/// 4. Removes any empty subtrees that didn't have data staged
157/// 5. Signs the entry if authentication is configured
158/// 6. Persists the resulting immutable `Entry` to the backend
159///
160/// `Transaction` instances are typically created via `Database::new_transaction()`.
161#[derive(Clone)]
162pub struct Transaction {
163    /// The entry builder being modified, wrapped in Option to support consuming on commit
164    entry_builder: Arc<Mutex<Option<EntryBuilder>>>,
165    /// The database this transaction belongs to
166    db: Database,
167    /// Provided signing key paired with its auth identity
168    provided_signing_key: Option<(PrivateKey, SigKey)>,
169    /// Registered encryptors for transparent encryption/decryption of specific subtrees
170    /// Maps subtree name -> encryptor implementation
171    /// When an encryptor is registered, the transaction automatically encrypts writes
172    /// and decrypts reads for that subtree
173    encryptors: Arc<Mutex<HashMap<String, Box<dyn Encryptor>>>>,
174    /// When true, `get_store` rejects any `_`-prefixed subtree name. Used by
175    /// `Database::create_with_init` to keep its init callback from opening the
176    /// system subtrees (`_settings`, `_root`, `_index`) that `create_with_init`
177    /// itself manages. Legitimate internal paths — `get_index()` (via
178    /// `Registry::new` → `DocStore::load`) and `Store::register`'s own
179    /// `_index` updates — bypass `get_store` and remain unaffected.
180    system_subtrees_locked: Arc<AtomicBool>,
181}
182
183/// RAII guard returned by [`Transaction::lock_system_subtrees`]. Releases the
184/// lock on drop, covering early-return-via-`?` and panic-unwind alike.
185pub(crate) struct SystemSubtreeLockGuard {
186    flag: Arc<AtomicBool>,
187}
188
189impl Drop for SystemSubtreeLockGuard {
190    fn drop(&mut self) {
191        self.flag.store(false, Ordering::Release);
192    }
193}
194
195impl Transaction {
196    /// Creates a new atomic transaction for a specific `Database` anchored at a snapshot.
197    ///
198    /// Initializes an internal `EntryBuilder` with its main parent pointers set to the
199    /// snapshot's tips instead of the database's current state. This allows creating
200    /// transactions that branch from specific points in the database history (e.g.
201    /// diamond patterns).
202    ///
203    /// # Arguments
204    /// * `database` - The `Database` this transaction will modify.
205    /// * `snapshot` - The snapshot to anchor the transaction at. Must contain at least one tip,
206    ///   unless this transaction is creating the database's root entry.
207    pub(crate) async fn new_at(database: &Database, snapshot: &Snapshot) -> Result<Self> {
208        let tips = snapshot.tips();
209        // Validate that tips are not empty, unless we're creating the root entry
210        if tips.is_empty() {
211            // Check if this is a root entry creation by seeing if the database root exists in backend
212            let root_exists = database.ops().get(database.root_id()).await.is_ok();
213
214            if root_exists {
215                return Err(TransactionError::EmptyTipsNotAllowed.into());
216            }
217            // If root doesn't exist, this is valid (creating the root entry)
218        }
219
220        // Validate that all tips belong to the same tree
221        let backend = database.ops();
222        for tip_id in tips {
223            let entry = backend.get(tip_id).await?;
224            if !entry.in_tree(database.root_id()) {
225                return Err(TransactionError::InvalidTip {
226                    tip_id: tip_id.clone(),
227                }
228                .into());
229            }
230        }
231
232        // Start with a basic entry linked to the database's root.
233        // Data and parents will be filled based on the transaction type.
234        let mut builder = Entry::builder(database.root_id().clone());
235
236        // Use the provided tips as parents (only if not empty)
237        if !tips.is_empty() {
238            builder.set_parents_mut(tips.to_vec());
239        }
240
241        Ok(Self {
242            entry_builder: Arc::new(Mutex::new(Some(builder))),
243            db: database.clone(),
244            provided_signing_key: None,
245            encryptors: Arc::new(Mutex::new(HashMap::new())),
246            system_subtrees_locked: Arc::new(AtomicBool::new(false)),
247        })
248    }
249
250    /// Lock the system subtrees (`_settings`, `_root`, `_index`) for the
251    /// returned guard's lifetime.
252    ///
253    /// While the guard is live, [`Self::get_store`] rejects any subtree name
254    /// beginning with `_` (the system-subtree prefix). Used by
255    /// [`Database::create_with_init`] to guard its init callback against
256    /// clobbering `_settings`/`_root`/`_index`, all of which `create_with_init`
257    /// manages itself or via a dedicated accessor. The lock releases on the
258    /// guard's `Drop`, so it covers both early-return-via-`?` and panic-unwind
259    /// paths.
260    ///
261    /// The lock is process-local state on the (cloned-by-Arc) transaction;
262    /// clones share the same flag. Legitimate internal paths — `get_index()`
263    /// (via `Registry::new` → `DocStore::load`) and `Store::register`'s own
264    /// `_index` writes — bypass `get_store` and are unaffected.
265    pub(crate) fn lock_system_subtrees(&self) -> SystemSubtreeLockGuard {
266        self.system_subtrees_locked.store(true, Ordering::Release);
267        SystemSubtreeLockGuard {
268            flag: self.system_subtrees_locked.clone(),
269        }
270    }
271
272    /// Set signing key directly for user context (internal API).
273    ///
274    /// This method is used when a Database has a key attached
275    /// (via `Database::open().with_key()`). The provided SigningKey is already
276    /// decrypted and ready to use, eliminating the need for backend key lookup.
277    ///
278    /// # Arguments
279    /// * `signing_key` - The decrypted signing key from UserKeyManager
280    /// * `identity` - The SigKey identity used in database auth settings
281    pub(crate) fn set_provided_key(&mut self, signing_key: PrivateKey, identity: SigKey) {
282        self.provided_signing_key = Some((signing_key, identity));
283    }
284
285    /// Get current time as RFC3339 string.
286    ///
287    /// Delegates to the underlying instance's clock.
288    pub(crate) fn now_rfc3339(&self) -> Result<String> {
289        Ok(self.db.instance()?.clock().now_rfc3339())
290    }
291
292    /// Register an encryptor for transparent encryption/decryption of a specific subtree.
293    ///
294    /// Once registered, the transaction will automatically:
295    /// - Decrypt each historical entry before CRDT merging in `get_full_state()`
296    /// - Return plaintext data from `get_local_data()` (cached in EntryBuilder)
297    /// - Encrypt plaintext data before persisting in `commit()`
298    ///
299    /// This ensures proper CRDT merge semantics while keeping data encrypted at rest.
300    ///
301    /// # Arguments
302    /// * `subtree` - The name of the subtree to encrypt/decrypt
303    /// * `encryptor` - The encryptor implementation to use
304    ///
305    /// # Example
306    ///
307    /// For password-based encryption, use [`PasswordStore`] which handles
308    /// encryptor registration automatically:
309    ///
310    /// ```rust,ignore
311    /// let mut encrypted = tx.get_store::<PasswordStore<DocStore>>("secrets")?;
312    /// encrypted.initialize("my_password", Doc::new())?;
313    ///
314    /// // PasswordStore registers the encryptor internally
315    /// let docstore = encrypted.inner()?;
316    /// ```
317    ///
318    /// For custom encryption, implement the [`Encryptor`] trait:
319    ///
320    /// ```rust,ignore
321    /// struct MyEncryptor { /* ... */ }
322    /// impl Encryptor for MyEncryptor {
323    ///     fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>> { /* ... */ }
324    ///     fn decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>> { /* ... */ }
325    /// }
326    ///
327    /// transaction.register_encryptor("secrets", Box::new(MyEncryptor::new()))?;
328    /// ```
329    ///
330    /// [`PasswordStore`]: crate::store::PasswordStore
331    /// [`Encryptor`]: crate::Encryptor
332    pub(crate) fn register_encryptor(
333        &self,
334        subtree: impl Into<String>,
335        encryptor: Box<dyn Encryptor>,
336    ) -> Result<()> {
337        self.encryptors
338            .lock()
339            .unwrap()
340            .insert(subtree.into(), encryptor);
341        Ok(())
342    }
343
344    /// Decrypt bytes if an encryptor is registered, otherwise return them unchanged.
345    ///
346    /// This is used throughout Transaction to transparently decrypt encrypted payloads
347    /// before deserializing into CRDT types.
348    fn decrypt_if_needed(&self, subtree: &str, data: &[u8]) -> Result<Vec<u8>> {
349        if let Some(encryptor) = self.encryptors.lock().unwrap().get(subtree) {
350            encryptor.decrypt(data)
351        } else {
352            Ok(data.to_vec())
353        }
354    }
355
356    /// Encrypt bytes if an encryptor is registered for the subtree, otherwise return them unchanged.
357    fn encrypt_if_needed(&self, subtree: &str, plaintext: &[u8]) -> Result<Vec<u8>> {
358        if let Some(encryptor) = self.encryptors.lock().unwrap().get(subtree) {
359            encryptor.encrypt(plaintext)
360        } else {
361            Ok(plaintext.to_vec())
362        }
363    }
364
365    /// Get a SettingsStore handle for the settings subtree within this transaction.
366    ///
367    /// This method returns a `SettingsStore` that provides specialized access to the `_settings` subtree,
368    /// allowing you to read and modify settings data within this atomic transaction.
369    /// The DocStore automatically merges historical settings from the database with any
370    /// staged changes in this transaction.
371    ///
372    /// # Returns
373    ///
374    /// Returns a `Result<SettingsStore>` that can be used to:
375    /// - Read current settings values (including both historical and staged data)
376    /// - Stage new settings changes within this transaction
377    /// - Access nested settings structures
378    ///
379    /// # Example
380    ///
381    /// ```rust,no_run
382    /// # use eidetica::Database;
383    /// # async fn example(database: Database) -> eidetica::Result<()> {
384    /// let txn = database.new_transaction().await?;
385    /// let settings = txn.get_settings()?;
386    ///
387    /// // Read a setting
388    /// if let Ok(name) = settings.get_name().await {
389    ///     println!("Database name: {}", name);
390    /// }
391    ///
392    /// // Modify a setting
393    /// settings.set_name("Updated Database Name").await?;
394    /// # Ok(())
395    /// # }
396    /// ```
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if:
401    /// - Unable to create the SettingsStore for the settings subtree
402    /// - Operation has already been committed
403    pub fn get_settings(&self) -> Result<SettingsStore> {
404        // Create a SettingsStore for the settings subtree
405        SettingsStore::new(self)
406    }
407
408    /// Gets a handle to the Index for managing subtree registry and metadata.
409    ///
410    /// The Index provides access to the `_index` subtree, which stores metadata
411    /// about all subtrees in the database including their type identifiers and configurations.
412    ///
413    /// # Returns
414    ///
415    /// A `Result<Registry>` containing the handle for managing the index.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if:
420    /// - Unable to create the Registry for the _index subtree
421    /// - Operation has already been committed
422    pub async fn get_index(&self) -> Result<Registry> {
423        Registry::new(self, INDEX).await
424    }
425
426    /// Set the tree root field for the entry being built.
427    ///
428    /// Called by `Database::create()` to override the placeholder root with
429    /// `ID::default()`, making the entry a proper top-level root.
430    ///
431    /// # Arguments
432    /// * `root` - The tree root ID to set (use `ID::default()` for top-level roots)
433    pub(crate) fn set_entry_root(&self, root: ID) -> Result<()> {
434        let mut builder_ref = self.entry_builder.lock().unwrap();
435        let builder = builder_ref
436            .as_mut()
437            .ok_or(TransactionError::TransactionAlreadyCommitted)?;
438        builder.set_root_mut(root);
439        Ok(())
440    }
441
442    /// Set entropy in the entry metadata.
443    ///
444    /// This is used during database creation to ensure unique IDs for databases
445    /// even when they have identical settings.
446    ///
447    /// # Arguments
448    /// * `entropy` - Random entropy value
449    pub(crate) fn set_metadata_entropy(&self, entropy: u64) -> Result<()> {
450        let mut builder_ref = self.entry_builder.lock().unwrap();
451        let builder = builder_ref
452            .as_mut()
453            .ok_or(TransactionError::TransactionAlreadyCommitted)?;
454
455        // Parse existing metadata if present, or create new
456        let mut metadata = builder
457            .metadata()
458            .and_then(|m| serde_json::from_slice::<EntryMetadata>(m).ok())
459            .unwrap_or(EntryMetadata {
460                settings_snapshot: Snapshot::EMPTY,
461                entropy: None,
462            });
463
464        // Set entropy
465        metadata.entropy = Some(entropy);
466
467        // Serialize and set metadata
468        let metadata_json = serde_json::to_vec(&metadata)?;
469        builder.set_metadata_mut(metadata_json);
470
471        Ok(())
472    }
473
474    /// Stages an update for a specific subtree within this atomic transaction.
475    ///
476    /// This method is primarily intended for internal use by `Store` implementations
477    /// (like `DocStore::set`). It records the serialized `data` for the given `subtree`
478    /// name within the transaction's internal `EntryBuilder`.
479    ///
480    /// If this is the first modification to the named subtree within this transaction,
481    /// it also fetches and records the current tips of that subtree from the backend
482    /// to set the correct `subtree_parents` for the new entry.
483    ///
484    /// # Arguments
485    /// * `subtree` - The name of the subtree to update.
486    /// * `data` - The serialized CRDT data to stage for the subtree.
487    ///
488    /// # Returns
489    /// A `Result<()>` indicating success or an error.
490    pub(crate) async fn update_subtree(
491        &self,
492        subtree: impl AsRef<str>,
493        data: impl Into<Vec<u8>>,
494    ) -> Result<()> {
495        let subtree = subtree.as_ref();
496        let data = data.into();
497
498        // Check if we need to fetch tips (check without holding borrow across await)
499        let needs_tips = {
500            let builder_ref = self.entry_builder.lock().unwrap();
501            let builder = builder_ref
502                .as_ref()
503                .ok_or(TransactionError::TransactionAlreadyCommitted)?;
504            !builder.subtrees().contains(&subtree.to_string())
505        };
506
507        // Fetch tips if needed (no borrow held across this await)
508        let tips = if needs_tips {
509            let backend = self.db.ops();
510            // FIXME: we should get the subtree snapshot while still using the parent pointers
511            Some(
512                backend
513                    .store_snapshot(self.db.root_id(), subtree)
514                    .await?
515                    .into_tips(),
516            )
517        } else {
518            None
519        };
520
521        // Now update the builder
522        let mut builder_ref = self.entry_builder.lock().unwrap();
523        let builder = builder_ref
524            .as_mut()
525            .ok_or(TransactionError::TransactionAlreadyCommitted)?;
526
527        builder.set_subtree_data_mut(subtree.to_string(), data);
528        if let Some(tips) = tips {
529            builder.set_subtree_parents_mut(subtree, tips);
530        }
531
532        Ok(())
533    }
534
535    /// Gets a handle to a specific `Store` for modification within this transaction.
536    ///
537    /// This method creates and returns an instance of the specified `Store` type `T`,
538    /// associated with this `Transaction`. The returned `Store` handle can be used to
539    /// stage changes (e.g., using `DocStore::set`).
540    /// These changes are recorded within this `Transaction`.
541    ///
542    /// If this is the first time this subtree is accessed within the transaction,
543    /// its parent tips will be fetched and stored.
544    ///
545    /// # Type Parameters
546    /// * `T` - The concrete `Store` implementation type to create.
547    ///
548    /// # Arguments
549    /// * `subtree_name` - The name of the subtree to get a modification handle for.
550    ///
551    /// # Returns
552    /// A `Result<T>` containing the `Store` handle.
553    pub async fn get_store<T>(&self, subtree_name: impl Into<String> + Send) -> Result<T>
554    where
555        T: Store + Send,
556    {
557        let subtree_name = subtree_name.into();
558
559        // Skip special system subtrees to avoid circular dependencies
560        let is_system_subtree =
561            subtree_name == INDEX || subtree_name == SETTINGS || subtree_name == ROOT;
562
563        if is_system_subtree && self.system_subtrees_locked.load(Ordering::Acquire) {
564            return Err(TransactionError::SystemSubtreeLocked { name: subtree_name }.into());
565        }
566
567        // Initialize subtree parents before checking _index
568        self.init_subtree_parents(&subtree_name).await?;
569
570        if is_system_subtree {
571            // System subtrees don't use _index registration
572            return T::load(self, subtree_name).await;
573        }
574
575        // Check _index to determine if this is a new or existing subtree
576        let index_store = self.get_index().await?;
577        if index_store.contains(&subtree_name).await {
578            // Type validation for existing subtree
579            let subtree_info = index_store.get_entry(&subtree_name).await?;
580
581            if !T::supports_type_id(&subtree_info.type_id) {
582                return Err(StoreError::TypeMismatch {
583                    store: subtree_name,
584                    expected: T::type_id().to_string(),
585                    actual: subtree_info.type_id,
586                }
587                .into());
588            }
589
590            // Type supported - create the Store
591            T::load(self, subtree_name).await
592        } else {
593            // New subtree - register adds it to _index
594            T::register(self, subtree_name).await
595        }
596    }
597
598    /// Get the subtree tips reachable from the given main tree entries.
599    async fn get_subtree_tips(&self, subtree_name: &str, main_parents: &[ID]) -> Result<Vec<ID>> {
600        let boundary = Snapshot::from(main_parents.to_vec());
601        self.db
602            .ops()
603            .store_snapshot_at(self.db.root_id(), subtree_name, &boundary)
604            .await
605            .map(Snapshot::into_tips)
606    }
607
608    /// Initialize subtree parents if this is the first time accessing this subtree
609    /// in this transaction.
610    pub(crate) async fn init_subtree_parents(&self, subtree_name: &str) -> Result<()> {
611        let main_parents = {
612            let builder_ref = self.entry_builder.lock().unwrap();
613            let builder = builder_ref
614                .as_ref()
615                .ok_or(TransactionError::TransactionAlreadyCommitted)?;
616
617            let subtrees = builder.subtrees();
618            if subtrees.contains(&subtree_name.to_string()) {
619                return Ok(()); // Already initialized
620            }
621            builder.parents().unwrap_or_default()
622        };
623
624        let tips = self.get_subtree_tips(subtree_name, &main_parents).await?;
625
626        let mut builder_ref = self.entry_builder.lock().unwrap();
627        let builder = builder_ref
628            .as_mut()
629            .ok_or(TransactionError::TransactionAlreadyCommitted)?;
630
631        // Initialize the subtree with proper parent relationships
632        // set_subtree_parents_mut creates the subtree with data=None if it doesn't exist
633        builder.set_subtree_parents_mut(subtree_name, tips);
634
635        Ok(())
636    }
637
638    /// Gets the currently staged data for a specific subtree within this transaction.
639    ///
640    /// This is intended for use by `Store` implementations to retrieve the data
641    /// they have staged locally within the `Transaction` before potentially merging
642    /// it with historical data.
643    ///
644    /// # Type Parameters
645    /// * `T` - The data type (expected to be a CRDT) to deserialize the staged data into.
646    ///
647    /// # Arguments
648    /// * `subtree_name` - The name of the subtree whose staged data is needed.
649    ///
650    /// # Returns
651    /// A `Result<Option<T>>`:
652    ///
653    /// # Behavior
654    /// - If the subtree doesn't exist or has no data, returns `Ok(None)`
655    /// - If the subtree exists but has empty data (empty string or whitespace), returns `Ok(None)`
656    /// - Otherwise deserializes the JSON data to type `T` and returns `Ok(Some(T))`
657    ///
658    /// # Errors
659    /// Returns an error if the transaction has already been committed or if the
660    /// subtree data exists but cannot be deserialized to type `T`.
661    pub fn get_local_data<T>(&self, subtree_name: impl AsRef<str>) -> Result<Option<T>>
662    where
663        T: Data,
664    {
665        let subtree_name = subtree_name.as_ref();
666        let builder_ref = self.entry_builder.lock().unwrap();
667        let builder = builder_ref
668            .as_ref()
669            .ok_or(TransactionError::TransactionAlreadyCommitted)?;
670
671        if let Ok(data) = builder.data(subtree_name) {
672            if data.is_empty() {
673                Ok(None)
674            } else {
675                serde_json::from_slice(data).map(Some).map_err(|e| {
676                    TransactionError::StoreDeserializationFailed {
677                        store: subtree_name.to_string(),
678                        reason: e.to_string(),
679                    }
680                    .into()
681                })
682            }
683        } else {
684            Ok(None)
685        }
686    }
687
688    /// Gets the fully merged historical state of a subtree up to the point this transaction began.
689    ///
690    /// This retrieves all relevant historical entries for the `subtree_name` from the backend,
691    /// considering the parent tips recorded when this `Transaction` was created (or when the
692    /// subtree was first accessed within the transaction). It deserializes the data from each
693    /// relevant entry into the CRDT type `T` and merges them according to `T`'s `CRDT::merge`
694    /// implementation.
695    ///
696    /// This is intended for use by `Store` implementations (e.g., in their `get` or `get_all` methods)
697    /// to provide the historical context against which staged changes might be applied or compared.
698    ///
699    /// # Type Parameters
700    /// * `T` - The CRDT type to deserialize and merge the historical subtree data into.
701    ///
702    /// # Arguments
703    /// * `subtree_name` - The name of the subtree.
704    ///
705    /// # Returns
706    /// A `Result<T>` containing the merged historical data of type `T`. Returns `Ok(T::default())`
707    /// if the subtree has no history prior to this transaction.
708    pub(crate) async fn get_full_state<T>(&self, subtree_name: impl AsRef<str> + Send) -> Result<T>
709    where
710        T: CRDT + Send,
711    {
712        let subtree_name = subtree_name.as_ref();
713
714        // Check if we need to initialize subtree tips (get data from RefCell before await)
715        let (needs_init, main_parents) = {
716            let builder_ref = self.entry_builder.lock().unwrap();
717            let builder = builder_ref
718                .as_ref()
719                .ok_or(TransactionError::TransactionAlreadyCommitted)?;
720
721            let subtrees = builder.subtrees();
722            if subtrees.contains(&subtree_name.to_string()) {
723                (false, Vec::new())
724            } else {
725                (true, builder.parents().unwrap_or_default())
726            }
727        };
728
729        // Initialize subtree tips if needed (async operations)
730        if needs_init {
731            let current_database_snapshot = self.db.ops().snapshot(self.db.root_id()).await?;
732
733            // Set-equal comparison via Snapshot canonical form.
734            let parents_snapshot = Snapshot::from(main_parents.clone());
735            let tips = if parents_snapshot == current_database_snapshot {
736                let backend = self.db.ops();
737                backend
738                    .store_snapshot(self.db.root_id(), subtree_name)
739                    .await?
740                    .into_tips()
741            } else {
742                // This transaction uses custom tips - use special handler
743                self.db
744                    .ops()
745                    .store_snapshot_at(self.db.root_id(), subtree_name, &parents_snapshot)
746                    .await?
747                    .into_tips()
748            };
749
750            // Update RefCell after async operations
751            let mut builder_ref = self.entry_builder.lock().unwrap();
752            let builder = builder_ref
753                .as_mut()
754                .ok_or(TransactionError::TransactionAlreadyCommitted)?;
755            builder.set_subtree_parents_mut(subtree_name, tips);
756        }
757
758        // Get the parent pointers for this subtree
759        let parents = {
760            let builder_ref = self.entry_builder.lock().unwrap();
761            let builder = builder_ref
762                .as_ref()
763                .ok_or(TransactionError::TransactionAlreadyCommitted)?;
764            builder.subtree_parents(subtree_name).unwrap_or_default()
765        };
766
767        // If there are no parents, return a default
768        if parents.is_empty() {
769            return Ok(T::default());
770        }
771
772        // Compute the CRDT state using merge-base ROOT-to-target computation
773        self.compute_subtree_state_merge_based(subtree_name, &parents)
774            .await
775    }
776
777    /// Computes the CRDT state for a subtree using correct recursive merge-base algorithm.
778    ///
779    /// Algorithm:
780    /// 1. If no entries, return default state
781    /// 2. If single entry, compute its state recursively
782    /// 3. If multiple entries, find their merge base and compute state from there
783    ///
784    /// # Type Parameters
785    /// * `T` - The CRDT type to compute the state for
786    ///
787    /// # Arguments
788    /// * `subtree_name` - The name of the subtree
789    /// * `entry_ids` - The entry IDs to compute the merged state for (tips)
790    ///
791    /// # Returns
792    /// A `Result<T>` containing the computed CRDT state
793    async fn compute_subtree_state_merge_based<T>(
794        &self,
795        subtree_name: impl AsRef<str> + Send,
796        entry_ids: &[ID],
797    ) -> Result<T>
798    where
799        T: CRDT + Send,
800    {
801        // Base case: no entries
802        if entry_ids.is_empty() {
803            return Ok(T::default());
804        }
805
806        let subtree_name = subtree_name.as_ref();
807
808        // If we have a single entry, compute its state recursively
809        if entry_ids.len() == 1 {
810            return self
811                .compute_single_entry_state_recursive(subtree_name, &entry_ids[0])
812                .await;
813        }
814
815        // Multiple entries: check multi-tip cache first
816        let cache_id = create_merge_cache_id(entry_ids);
817
818        if let Some(cached_state) = self
819            .db
820            .ops()
821            .get_cached_crdt_state(self.db.root_id(), &cache_id, subtree_name)
822            .await?
823        {
824            let decrypted = self.decrypt_if_needed(subtree_name, &cached_state)?;
825            let result: T = serde_json::from_slice(&decrypted)?;
826            return Ok(result);
827        }
828
829        // Cache miss: find merge base and compute state from there
830        let merge_base_id = self
831            .db
832            .ops()
833            .find_merge_base(self.db.root_id(), subtree_name, entry_ids)
834            .await?;
835
836        // Get the merge base state recursively
837        let mut result = self
838            .compute_single_entry_state_recursive(subtree_name, &merge_base_id)
839            .await?;
840
841        // Get all entries from merge base to all tip entries (deduplicated and sorted)
842        let path_entries = {
843            self.db
844                .ops()
845                .get_path_from_to(self.db.root_id(), subtree_name, &merge_base_id, entry_ids)
846                .await?
847        };
848
849        // Merge all path entries in order
850        result = self
851            .merge_path_entries(subtree_name, result, &path_entries)
852            .await?;
853
854        // Cache the computed merge result
855        let serialized = serde_json::to_vec(&result)?;
856        let to_cache = self.encrypt_if_needed(subtree_name, &serialized)?;
857        // FIXME: Multiple tips in the cache is a hack
858        // cache_crdt_state is supposed to take in (ID, subtree, data)
859        // This caching only technically works by constructing a custom ID
860        self.db
861            .ops()
862            .cache_crdt_state(self.db.root_id(), &cache_id, subtree_name, to_cache)
863            .await?;
864
865        Ok(result)
866    }
867
868    /// Computes the CRDT state for a single entry using batch fetching.
869    ///
870    /// Algorithm:
871    /// 1. Check if entry state is cached → return it
872    /// 2. Fetch all ancestors in one batch query (sorted by height)
873    /// 3. Merge all entries in order from root to target
874    /// 4. Cache only the final result
875    ///
876    /// # Type Parameters
877    /// * `T` - The CRDT type to compute the state for
878    ///
879    /// # Arguments
880    /// * `subtree_name` - The name of the subtree
881    /// * `entry_id` - The entry ID to compute the state for
882    ///
883    /// # Returns
884    /// A `Result<T>` containing the computed CRDT state for the entry
885    fn compute_single_entry_state_recursive<'a, T>(
886        &'a self,
887        subtree_name: &'a str,
888        entry_id: &'a ID,
889    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T>> + Send + 'a>>
890    where
891        T: CRDT + Send + 'a,
892    {
893        Box::pin(async move {
894            // Step 1: Check if already cached
895            if let Some(cached_state) = self
896                .db
897                .ops()
898                .get_cached_crdt_state(self.db.root_id(), entry_id, subtree_name)
899                .await?
900            {
901                // Decrypt cached state if encryptor is registered
902                let decrypted = self.decrypt_if_needed(subtree_name, &cached_state)?;
903                let result: T = serde_json::from_slice(&decrypted)?;
904                return Ok(result);
905            }
906
907            // Step 2: Batch fetch all ancestors sorted by height (root first)
908            // This single query replaces N recursive queries
909            let boundary = Snapshot::from([entry_id.clone()]);
910            let entries = self
911                .db
912                .ops()
913                .store_at(self.db.root_id(), subtree_name, &boundary)
914                .await?;
915
916            // Step 3: Merge all entries in order (already sorted by height, root first)
917            let mut result = T::default();
918            for entry in &entries {
919                let local_data = if let Ok(data) = entry.data(subtree_name) {
920                    // Decrypt before deserializing
921                    let plaintext = self.decrypt_if_needed(subtree_name, data)?;
922                    serde_json::from_slice::<T>(&plaintext)?
923                } else {
924                    T::default()
925                };
926                result = result.merge(&local_data)?;
927            }
928
929            // Step 4: Cache only the final result (encrypted if encryptor is registered)
930            let serialized_state = serde_json::to_vec(&result)?;
931            let to_cache = self.encrypt_if_needed(subtree_name, &serialized_state)?;
932            self.db
933                .ops()
934                .cache_crdt_state(self.db.root_id(), entry_id, subtree_name, to_cache)
935                .await?;
936
937            Ok(result)
938        })
939    }
940
941    /// Merges a sequence of entries into a CRDT state.
942    ///
943    /// # Arguments
944    /// * `subtree_name` - The name of the subtree
945    /// * `initial_state` - The initial CRDT state to merge into
946    /// * `entry_ids` - The entry IDs to merge in order
947    ///
948    /// # Returns
949    /// A `Result<T>` containing the merged CRDT state
950    async fn merge_path_entries<T>(
951        &self,
952        subtree_name: &str,
953        mut state: T,
954        entry_ids: &[ID],
955    ) -> Result<T>
956    where
957        T: CRDT,
958    {
959        for entry_id in entry_ids {
960            let entry = self.db.ops().get(entry_id).await?;
961
962            // Get local data for this entry in the subtree
963            let local_data = if let Ok(data) = entry.data(subtree_name) {
964                // Decrypt before deserializing
965                let plaintext = self.decrypt_if_needed(subtree_name, data)?;
966                serde_json::from_slice::<T>(&plaintext)?
967            } else {
968                T::default()
969            };
970
971            state = state.merge(&local_data)?;
972        }
973
974        Ok(state)
975    }
976
977    /// Commits the transaction, finalizing and persisting the entry to the backend.
978    ///
979    /// This method:
980    /// 1. Takes ownership of the `EntryBuilder` from the internal `Option`
981    /// 2. Removes any empty subtrees
982    /// 3. Adds metadata if appropriate
983    /// 4. Sets authentication if configured
984    /// 5. Builds the immutable `Entry` using `EntryBuilder::build()`
985    /// 6. Signs the entry if authentication is configured
986    /// 7. Validates authentication if present
987    /// 8. Calculates the entry's content-addressable ID
988    /// 9. Persists the entry to the backend
989    /// 10. Returns the ID of the newly created entry
990    ///
991    /// After commit, the transaction cannot be used again, as the internal
992    /// `EntryBuilder` has been consumed.
993    ///
994    /// # Returns
995    /// A `Result<ID>` containing the ID of the committed entry.
996    pub async fn commit(self) -> Result<ID> {
997        // Check if this is a settings subtree update and get the effective settings before any borrowing
998        let has_settings_update = {
999            let builder_cell = self.entry_builder.lock().unwrap();
1000            let builder = builder_cell
1001                .as_ref()
1002                .ok_or(TransactionError::TransactionAlreadyCommitted)?;
1003            builder.subtrees().contains(&SETTINGS.to_string())
1004        };
1005
1006        // Get settings using full CRDT state computation
1007        let historical_settings = self.get_full_state::<Doc>(SETTINGS).await?;
1008
1009        // However, if this is a settings update and there's no historical auth but staged auth exists,
1010        // use the staged settings for validation (this handles initial database creation with auth)
1011        let effective_settings_for_validation = if has_settings_update {
1012            let historical_has_auth = matches!(historical_settings.get("auth"), Some(Value::Doc(auth_map)) if !auth_map.is_empty());
1013            if !historical_has_auth {
1014                let staged_settings = self.get_local_data::<Doc>(SETTINGS)?.unwrap_or_default();
1015                let staged_has_auth = matches!(staged_settings.get("auth"), Some(Value::Doc(auth_map)) if !auth_map.is_empty());
1016                if staged_has_auth {
1017                    staged_settings
1018                } else {
1019                    historical_settings
1020                }
1021            } else {
1022                historical_settings
1023            }
1024        } else {
1025            historical_settings
1026        };
1027
1028        // VALIDATION: Ensure that the new settings state (after this transaction) doesn't corrupt auth
1029        // This prevents committing entries that would corrupt the database's auth configuration
1030        if has_settings_update {
1031            // Compute what the new settings state will be after merging local changes
1032            let local_settings = self.get_local_data::<Doc>(SETTINGS)?.unwrap_or_default();
1033            let new_settings = effective_settings_for_validation.merge(&local_settings)?;
1034
1035            // Check if the new settings would have corrupted auth
1036            if new_settings.is_tombstone("auth") {
1037                // Auth was explicitly deleted - this would corrupt the database
1038                return Err(TransactionError::CorruptedAuthConfiguration.into());
1039            } else if let Some(auth_value) = new_settings.get("auth") {
1040                // Auth exists in new settings - check if it's the right type
1041                if !matches!(auth_value, Value::Doc(_)) {
1042                    // Auth exists but has wrong type (not a Doc) - this would corrupt the database
1043                    return Err(TransactionError::CorruptedAuthConfiguration.into());
1044                }
1045            }
1046            // If auth is None (not configured), that's fine - we allow empty auth
1047        }
1048
1049        // Ensure _index constraint: subtrees referenced in _index must appear in Entry.
1050        // This adds subtrees with None data if they're referenced in _index but not yet in builder.
1051        // First, get the data we need before any async operations
1052        let (_index_data_opt, main_parents, missing_subtrees) = {
1053            let builder_ref = self.entry_builder.lock().unwrap();
1054            let builder = builder_ref
1055                .as_ref()
1056                .ok_or(TransactionError::TransactionAlreadyCommitted)?;
1057
1058            let index_data_opt = builder.data(INDEX).ok().cloned();
1059            let main_parents = builder.parents().unwrap_or_default();
1060            let existing_subtrees = builder.subtrees();
1061
1062            // Find missing subtrees
1063            let missing = if let Some(ref index_data) = index_data_opt
1064                && let Ok(index_doc) = serde_json::from_slice::<Doc>(index_data)
1065            {
1066                index_doc
1067                    .keys()
1068                    .filter(|name| !existing_subtrees.contains(&name.to_string()))
1069                    .cloned()
1070                    .collect::<Vec<_>>()
1071            } else {
1072                Vec::new()
1073            };
1074
1075            (index_data_opt, main_parents, missing)
1076        };
1077
1078        // Get tips for missing subtrees (async)
1079        let mut subtree_tips: Vec<(String, Vec<ID>)> = Vec::new();
1080        for subtree_name in missing_subtrees {
1081            let tips = self.get_subtree_tips(&subtree_name, &main_parents).await?;
1082            subtree_tips.push((subtree_name, tips));
1083        }
1084
1085        // Now update the builder with the tips
1086        {
1087            let mut builder_ref = self.entry_builder.lock().unwrap();
1088            let builder = builder_ref
1089                .as_mut()
1090                .ok_or(TransactionError::TransactionAlreadyCommitted)?;
1091
1092            for (subtree_name, tips) in subtree_tips {
1093                builder.set_subtree_parents_mut(&subtree_name, tips);
1094            }
1095
1096            builder.remove_empty_subtrees_mut()?;
1097        }
1098
1099        // Add metadata with settings snapshot for all entries
1100        // Get the backend to access the settings snapshot (do async ops before RefCell borrow)
1101        let db_snapshot = self.db.snapshot().await?;
1102        let settings_snapshot = self
1103            .db
1104            .ops()
1105            .store_snapshot_at(self.db.root_id(), SETTINGS, &db_snapshot)
1106            .await?;
1107
1108        // Clone the builder from RefCell (limit borrow scope to avoid holding across await)
1109        let mut builder = {
1110            let builder_cell = self.entry_builder.lock().unwrap();
1111            let builder_from_cell = builder_cell
1112                .as_ref()
1113                .ok_or(TransactionError::TransactionAlreadyCommitted)?;
1114            builder_from_cell.clone()
1115        };
1116
1117        // Parse existing metadata if present, or create new
1118        let mut metadata = builder
1119            .metadata()
1120            .and_then(|m| serde_json::from_slice::<EntryMetadata>(m).ok())
1121            .unwrap_or(EntryMetadata {
1122                settings_snapshot: Snapshot::EMPTY,
1123                entropy: None,
1124            });
1125
1126        // Update settings snapshot
1127        metadata.settings_snapshot = settings_snapshot;
1128
1129        // Serialize the metadata
1130        let metadata_json = serde_json::to_vec(&metadata)?;
1131
1132        // Add metadata to the entry builder
1133        builder.set_metadata_mut(metadata_json);
1134
1135        // Handle authentication configuration before building
1136        // All entries must now be authenticated - fail if no auth key is configured
1137
1138        // Use provided signing key
1139        let signing_key = if let Some((ref provided_key, ref identity)) = self.provided_signing_key
1140        {
1141            // Use provided signing key directly (already decrypted from UserKeyManager or device key)
1142            let key_clone = provided_key.clone();
1143
1144            // Build SigInfo from the already-typed SigKey identity
1145            let sig_builder = SigInfo::builder().key(identity.clone());
1146
1147            // Set auth ID on the entry builder (without signature initially)
1148            builder.set_sig_mut(sig_builder.build());
1149
1150            Some(key_clone)
1151        } else {
1152            // No authentication key configured
1153            return Err(TransactionError::AuthenticationRequired.into());
1154        };
1155        // Encrypt subtree data if encryptors are registered
1156        // This must happen before building the entry to ensure encrypted data is persisted
1157        {
1158            let encryptors = self.encryptors.lock().unwrap();
1159            for subtree_name in builder.subtrees() {
1160                if let Some(encryptor) = encryptors.get(&subtree_name)
1161                    && let Ok(plaintext_data) = builder.data(&subtree_name)
1162                    && !plaintext_data.is_empty()
1163                {
1164                    let ciphertext = encryptor.encrypt(plaintext_data)?;
1165                    builder.set_subtree_data_mut(subtree_name.clone(), ciphertext);
1166                }
1167            }
1168        }
1169
1170        // Extract height strategy from settings (defaults to Incremental)
1171        // If this transaction includes settings updates, merge them to get the effective strategy
1172        let settings_for_height = if has_settings_update {
1173            let local_settings = self.get_local_data::<Doc>(SETTINGS)?.unwrap_or_default();
1174            effective_settings_for_validation.merge(&local_settings)?
1175        } else {
1176            effective_settings_for_validation.clone()
1177        };
1178        let height_strategy: HeightStrategy = settings_for_height
1179            .get_json("height_strategy")
1180            .unwrap_or_default();
1181
1182        // Compute heights from parent entries using the configured strategy
1183        {
1184            let backend = self.db.ops();
1185            let instance = self.db.instance()?;
1186            let calculator = height_strategy.into_calculator(instance.clock_arc());
1187
1188            // Compute main tree height using the height strategy
1189            let main_parents = builder.parents().unwrap_or_default();
1190            let max_parent_height = if main_parents.is_empty() {
1191                None
1192            } else {
1193                let mut max_height = 0u64;
1194                for parent_id in &main_parents {
1195                    if let Ok(parent) = backend.get(parent_id).await {
1196                        max_height = max_height.max(parent.height());
1197                    }
1198                }
1199                Some(max_height)
1200            };
1201            let tree_height = calculator.calculate_height(max_parent_height);
1202            builder.set_height_mut(tree_height);
1203
1204            // Compute subtree heights based on per-subtree settings from _index
1205            // System subtrees (prefixed with _) always inherit from tree.
1206            // Regular subtrees check _index for a height_strategy override.
1207            //
1208            // If a subtree has no override, its height is left as None, which means
1209            // Entry.subtree_height() will return the tree height (inheritance).
1210            let index = self.get_index().await.ok();
1211
1212            for subtree_name in builder.subtrees() {
1213                // Determine the effective strategy for this subtree:
1214                // - System subtrees (_settings, _index, etc.): inherit (None)
1215                // - User subtrees: look up in _index, default to inherit (None)
1216                let subtree_strategy: Option<HeightStrategy> = if subtree_name.starts_with('_') {
1217                    // System subtrees always inherit from tree
1218                    None
1219                } else if let Some(ref idx) = index {
1220                    idx.get_subtree_settings(&subtree_name)
1221                        .await
1222                        .ok()
1223                        .and_then(|s| s.height_strategy)
1224                } else {
1225                    None
1226                };
1227
1228                match subtree_strategy {
1229                    None => {
1230                        // Inherit from tree - height stays None (default)
1231                        // Entry.subtree_height() will return tree height
1232                    }
1233                    Some(strategy) => {
1234                        // Calculate independent height from subtree parents
1235                        let subtree_calculator = strategy.into_calculator(instance.clock_arc());
1236                        let subtree_parents =
1237                            builder.subtree_parents(&subtree_name).unwrap_or_default();
1238                        let max_subtree_parent_height = if subtree_parents.is_empty() {
1239                            None
1240                        } else {
1241                            let mut max_height = 0u64;
1242                            for parent_id in &subtree_parents {
1243                                if let Ok(parent) = backend.get(parent_id).await
1244                                    && let Ok(height) = parent.subtree_height(&subtree_name)
1245                                {
1246                                    max_height = max_height.max(height);
1247                                }
1248                            }
1249                            Some(max_height)
1250                        };
1251                        let subtree_height =
1252                            subtree_calculator.calculate_height(max_subtree_parent_height);
1253                        builder.set_subtree_height_mut(&subtree_name, Some(subtree_height));
1254                    }
1255                }
1256            }
1257        }
1258
1259        // Build the final immutable Entry
1260        let mut entry = builder.build()?;
1261
1262        // CRITICAL VALIDATION: Ensure entry structural integrity before commit
1263        //
1264        // This validation is crucial because the transaction layer has already:
1265        // 1. Discovered proper parent relationships through DAG traversal
1266        // 2. Set up correct subtree parents via find_subtree_parents_from_main_parents()
1267        // 3. Ensured all references point to valid entries in the backend
1268        //
1269        // The validate() call here ensures that:
1270        // - Non-root entries have main tree parents (preventing orphaned nodes)
1271        // - Parent IDs are not empty strings (preventing reference errors)
1272        // - The entry structure is valid before signing and storage
1273        //
1274        // This catches any issues early in the transaction, providing clear error
1275        // messages before the entry is signed or reaches the backend storage layer.
1276        entry.validate()?;
1277
1278        // Sign the entry if we have a signing key
1279        if let Some(signing_key) = signing_key {
1280            let signature = sign_entry(&entry, &signing_key)?;
1281            entry.sig.sig = Some(signature);
1282        }
1283
1284        // Validate authentication (all entries must be authenticated)
1285        let mut validator = AuthValidator::new();
1286
1287        // Get the final settings state for validation
1288        // IMPORTANT: For permission checking, we must use the historical auth configuration
1289        // (before this transaction), not the auth configuration from the current entry.
1290        // This prevents operations from modifying their own permission requirements.
1291
1292        // Extract AuthSettings from effective settings for validation
1293        // IMPORTANT: Distinguish between empty auth vs corrupted/deleted auth:
1294        // - None: No auth ever configured → Allow unsigned operations (empty AuthSettings)
1295        // - Some(Doc): Normal auth configuration → Use it for validation
1296        // - Tombstone (deleted): Auth was configured then deleted → CORRUPTED (fail-safe)
1297        // - Some(other types): Wrong type in auth field → CORRUPTED (fail-safe)
1298        //
1299        // NOTE: Doc::get() hides tombstones (returns None for deleted values), so we need
1300        // to check for tombstones explicitly using is_tombstone() before using get().
1301        let auth_settings_for_validation = if effective_settings_for_validation.is_tombstone("auth")
1302        {
1303            // Auth was configured then explicitly deleted - this is corrupted
1304            return Err(TransactionError::CorruptedAuthConfiguration.into());
1305        } else {
1306            match effective_settings_for_validation.get("auth") {
1307                Some(Value::Doc(auth_doc)) => auth_doc.clone().into(),
1308                None => AuthSettings::new(), // Empty auth - never configured
1309                Some(_) => {
1310                    // Auth exists but has wrong type (not a Doc) - this is corrupted
1311                    return Err(TransactionError::CorruptedAuthConfiguration.into());
1312                }
1313            }
1314        };
1315
1316        let instance = self.db.instance()?;
1317
1318        // Validate entry (signature + permissions)
1319        let is_valid = validator
1320            .validate_entry(&entry, &auth_settings_for_validation, Some(&instance))
1321            .await?;
1322
1323        if !is_valid {
1324            return Err(TransactionError::EntryValidationFailed.into());
1325        }
1326
1327        let verification_status = VerificationStatus::Verified;
1328
1329        // Get the entry's ID
1330        let id = entry.id();
1331
1332        // Write entry through Instance which handles backend storage and callback dispatch
1333        let instance = self.db.instance()?;
1334        instance
1335            .put_entry(
1336                self.db.root_id(),
1337                verification_status,
1338                entry.clone(),
1339                WriteSource::Local,
1340            )
1341            .await?;
1342
1343        Ok(id)
1344    }
1345}