Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/store/
ydoc.rs

1//! Y-CRDT integration for Eidetica
2//!
3//! This module provides seamless integration between Eidetica's transaction system
4//! and Y-CRDT (Yjs) for real-time collaborative editing. The main component is `YDoc`,
5//! which implements differential saving to minimize storage overhead while maintaining
6//! full compatibility with Y-CRDT's conflict resolution algorithms.
7//!
8//! # Key Features
9//!
10//! - **Differential Saving**: Only stores incremental changes, not full document state
11//! - **Efficient Caching**: Caches expensive backend data retrieval operations
12//! - **Seamless Integration**: Works with Eidetica's transaction and viewer model
13//! - **Full Y-CRDT API**: Exposes the complete yrs library functionality
14//!
15//! # Performance Considerations
16//!
17//! The implementation caches the expensive `get_full_state()` backend operation and
18//! constructs documents and state vectors on-demand from this cached data. This
19//! approach minimizes both I/O overhead and memory usage.
20//!
21//! This module is only available when the "y-crdt" feature is enabled.
22
23use std::sync::Mutex;
24
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28use yrs::{Doc, ReadTxn, Transact, Update, updates::decoder::Decode};
29
30use crate::{
31    Result, Store, Transaction,
32    crdt::{CRDT, Data},
33    store::{Registered, errors::StoreError},
34};
35
36/// Errors specific to Y-CRDT operations
37#[derive(Debug, Error)]
38pub enum YDocError {
39    /// Y-CRDT operation failed
40    #[error("Y-CRDT operation failed: {operation} - {reason}")]
41    Operation { operation: String, reason: String },
42
43    /// Y-CRDT binary data is invalid
44    #[error("Invalid Y-CRDT binary data: {reason}")]
45    InvalidData { reason: String },
46
47    /// Y-CRDT merge operation failed
48    #[error("Y-CRDT merge failed: {reason}")]
49    Merge { reason: String },
50}
51
52impl From<YDocError> for StoreError {
53    fn from(err: YDocError) -> Self {
54        StoreError::ImplementationError {
55            store: "YDoc".to_string(),
56            reason: err.to_string(),
57        }
58    }
59}
60
61/// A CRDT wrapper for Y-CRDT binary update data.
62///
63/// This wrapper implements the required `Data` and `CRDT` traits to allow
64/// Y-CRDT binary updates to be stored and merged within the Eidetica system.
65///
66/// ## Design
67///
68/// Y-CRDT represents document state as binary updates that can be efficiently
69/// merged and applied. This wrapper enables these binary updates to participate
70/// in Eidetica's CRDT-based data storage and synchronization system.
71///
72/// ## Merging Strategy
73///
74/// When two `YrsBinary` instances are merged, both updates are applied to a new
75/// Y-CRDT document, and the resulting merged state is returned as a new binary
76/// update. This ensures that Y-CRDT's sophisticated conflict resolution algorithms
77/// are preserved within Eidetica's merge operations.
78#[derive(Default, Debug, Clone, Serialize, Deserialize)]
79pub struct YrsBinary {
80    data: Vec<u8>,
81}
82
83impl Data for YrsBinary {}
84
85impl CRDT for YrsBinary {
86    /// Merges two Y-CRDT binary updates by applying both to a new document
87    /// and returning the resulting state as a binary update.
88    fn merge(&self, other: &Self) -> Result<Self> {
89        let doc = Doc::new();
90
91        // Apply self's update if not empty
92        if !self.data.is_empty() {
93            let update = Update::decode_v1(&self.data).map_err(|e| {
94                StoreError::from(YDocError::InvalidData {
95                    reason: format!("Failed to decode Y-CRDT update (self): {e}"),
96                })
97            })?;
98            let mut txn = doc.transact_mut();
99            txn.apply_update(update).map_err(|e| {
100                StoreError::from(YDocError::Merge {
101                    reason: format!("Failed to apply Y-CRDT update (self): {e}"),
102                })
103            })?;
104        }
105
106        // Apply other's update if not empty
107        if !other.data.is_empty() {
108            let other_update = Update::decode_v1(&other.data).map_err(|e| {
109                StoreError::from(YDocError::InvalidData {
110                    reason: format!("Failed to decode Y-CRDT update (other): {e}"),
111                })
112            })?;
113            let mut txn = doc.transact_mut();
114            txn.apply_update(other_update).map_err(|e| {
115                StoreError::from(YDocError::Merge {
116                    reason: format!("Failed to apply Y-CRDT update (other): {e}"),
117                })
118            })?;
119        }
120
121        // Return the merged state as a binary update
122        let txn = doc.transact();
123        let merged_update = txn.encode_state_as_update_v1(&yrs::StateVector::default());
124
125        Ok(YrsBinary {
126            data: merged_update,
127        })
128    }
129}
130
131impl YrsBinary {
132    pub fn new(data: Vec<u8>) -> Self {
133        Self { data }
134    }
135
136    pub fn as_bytes(&self) -> &[u8] {
137        &self.data
138    }
139
140    pub fn is_empty(&self) -> bool {
141        self.data.is_empty()
142    }
143}
144
145/// A Y-CRDT based Store implementation with efficient differential saving.
146///
147/// `YDoc` provides a CRDT-based storage abstraction using the yrs library,
148/// which is a Rust port of Yjs. This allows for real-time collaborative editing
149/// and automatic conflict resolution through the Y-CRDT algorithms.
150///
151/// ## Architecture
152///
153/// The `YDoc` integrates with Eidetica's transaction system to provide:
154/// - **Differential Updates**: Only saves incremental changes, not full document state
155/// - **Efficient Caching**: Caches expensive backend data retrieval operations
156/// - **Transaction/Viewer Model**: Compatible with Eidetica's transaction patterns
157/// - **Full Y-CRDT API**: Direct access to the complete yrs library functionality
158///
159/// ## Caching Strategy
160///
161/// To optimize performance, `YDoc` caches the expensive `get_full_state()` operation
162/// from the backend and constructs documents and state vectors on-demand from this
163/// cached data. This approach minimizes I/O operations while keeping memory usage low.
164///
165/// ## Differential Saving
166///
167/// When saving documents, `YDoc` calculates diffs relative to the current backend
168/// state rather than saving full document snapshots. This significantly reduces storage
169/// overhead for large documents with incremental changes.
170///
171/// ## Usage
172///
173/// The `YDoc` exposes the underlying Y-CRDT document directly, allowing users
174/// to work with the full yrs API. Changes are automatically captured and stored
175/// when the transaction is committed.
176///
177/// ```rust,no_run
178/// use eidetica::store::YDoc;
179/// use yrs::{Map, Text, Transact};
180/// # use eidetica::Result;
181/// # async fn example(store: &YDoc) -> Result<()> {
182/// // Work directly with the yrs document
183/// store.with_doc_mut(|doc| {
184///     let map = doc.get_or_insert_map("root");
185///     let text = doc.get_or_insert_text("document");
186///
187///     let mut txn = doc.transact_mut();
188///     map.insert(&mut txn, "key", "value");
189///     text.insert(&mut txn, 0, "Hello, World!");
190///
191///     Ok(())
192/// }).await?;
193/// # Ok(())
194/// # }
195/// ```
196pub struct YDoc {
197    /// The name identifier for this subtree within the transaction
198    name: String,
199    /// Reference to the Transaction for data access
200    txn: Transaction,
201    /// Cached backend data to avoid expensive get_full_state() calls
202    /// This contains the merged historical state as Y-CRDT binary data
203    cached_backend_data: Mutex<Option<YrsBinary>>,
204}
205
206impl Registered for YDoc {
207    fn type_id() -> &'static str {
208        "ydoc:v0"
209    }
210}
211
212#[async_trait]
213impl Store for YDoc {
214    type Data = YrsBinary;
215
216    async fn new(txn: &Transaction, subtree_name: String) -> Result<Self> {
217        Ok(Self {
218            name: subtree_name,
219            txn: txn.clone(),
220            cached_backend_data: Mutex::new(None),
221        })
222    }
223
224    fn name(&self) -> &str {
225        &self.name
226    }
227
228    fn transaction(&self) -> &Transaction {
229        &self.txn
230    }
231}
232
233impl YDoc {
234    /// Gets the current Y-CRDT document, merging all historical state.
235    ///
236    /// This method reconstructs the current state of the Y-CRDT document by:
237    /// 1. Loading the full historical state from the backend (cached)
238    /// 2. Applying any local changes from the current transaction
239    /// 3. Returning a Y-Doc that can be used for reading and further modifications
240    ///
241    /// ## Performance
242    ///
243    /// The expensive backend data retrieval is cached, so subsequent calls are fast.
244    /// Documents are constructed fresh each time to ensure isolation between operations.
245    ///
246    /// ## Returns
247    /// A `Result` containing the merged `Doc` (Y-CRDT document).
248    ///
249    /// ## Errors
250    /// Returns an error if there are issues deserializing the Y-CRDT updates.
251    pub async fn doc(&self) -> Result<Doc> {
252        let doc = self.get_initial_doc().await?;
253
254        // Apply local changes if they exist
255        if let Some(local_data) = self.local_data()?
256            && !local_data.is_empty()
257        {
258            let local_update = Update::decode_v1(local_data.as_bytes()).map_err(|e| {
259                StoreError::from(YDocError::InvalidData {
260                    reason: format!("Failed to decode local Y-CRDT update: {e}"),
261                })
262            })?;
263
264            let mut txn = doc.transact_mut();
265            txn.apply_update(local_update).map_err(|e| {
266                StoreError::from(YDocError::Operation {
267                    operation: "apply_local_update".to_string(),
268                    reason: format!("Failed to apply local Y-CRDT update: {e}"),
269                })
270            })?;
271        }
272
273        Ok(doc)
274    }
275
276    /// Executes a function with read-only access to the Y-Doc.
277    ///
278    /// This method provides access to the current state of the document
279    /// for read-only operations. No changes are persisted.
280    ///
281    /// ## Arguments
282    /// * `f` - A function that receives the Y-Doc for reading
283    ///
284    /// ## Returns
285    /// A `Result` containing the return value of the function.
286    ///
287    /// ## Example
288    /// ```rust,no_run
289    /// # use eidetica::Result;
290    /// # use yrs::{Transact, GetString};
291    /// # async fn example(store: &eidetica::store::YDoc) -> Result<()> {
292    /// let content = store.with_doc(|doc| {
293    ///     let text = doc.get_or_insert_text("document");
294    ///     let txn = doc.transact();
295    ///     Ok(text.get_string(&txn))
296    /// }).await?;
297    /// # Ok(())
298    /// # }
299    /// ```
300    pub async fn with_doc<F, R>(&self, f: F) -> Result<R>
301    where
302        F: FnOnce(&Doc) -> Result<R>,
303    {
304        let doc = self.doc().await?;
305        f(&doc)
306    }
307
308    /// Executes a function with access to the Y-Doc and automatically saves changes.
309    ///
310    /// This is the preferred way to make changes to the document as it
311    /// ensures all changes are captured using differential saving and staged
312    /// in the transaction for later commit.
313    ///
314    /// ## Differential Saving
315    ///
316    /// Changes are saved as diffs relative to the current backend state, which
317    /// significantly reduces storage overhead compared to saving full document
318    /// snapshots.
319    ///
320    /// ## Arguments
321    /// * `f` - A function that receives the Y-Doc and can make modifications
322    ///
323    /// ## Returns
324    /// A `Result` containing the return value of the function.
325    ///
326    /// ## Example
327    /// ```rust,no_run
328    /// # use eidetica::Result;
329    /// # use yrs::{Transact, Text};
330    /// # async fn example(store: &eidetica::store::YDoc) -> Result<()> {
331    /// store.with_doc_mut(|doc| {
332    ///     let text = doc.get_or_insert_text("document");
333    ///     let mut txn = doc.transact_mut();
334    ///     text.insert(&mut txn, 0, "Hello, World!");
335    ///     Ok(())
336    /// }).await?;
337    /// # Ok(())
338    /// # }
339    /// ```
340    pub async fn with_doc_mut<F, R>(&self, f: F) -> Result<R>
341    where
342        F: FnOnce(&Doc) -> Result<R>,
343    {
344        let doc = self.doc().await?;
345        let result = f(&doc)?;
346        self.save_doc(&doc).await?;
347        Ok(result)
348    }
349
350    /// Applies a Y-CRDT update to the document.
351    ///
352    /// This method is useful for receiving updates from other collaborators or
353    /// applying updates received through a network provider. The update is applied
354    /// to the current document state and saved using differential saving.
355    ///
356    /// ## Use Cases
357    /// - Applying updates from remote collaborators
358    /// - Synchronizing with external Y-CRDT instances
359    /// - Replaying historical updates
360    ///
361    /// ## Arguments
362    /// * `update_data` - The binary Y-CRDT update data
363    ///
364    /// ## Returns
365    /// A `Result<()>` indicating success or failure.
366    ///
367    /// ## Errors
368    /// Returns an error if the update data is malformed or cannot be applied.
369    pub async fn apply_update(&self, update_data: &[u8]) -> Result<()> {
370        let doc = self.doc().await?;
371        let update = Update::decode_v1(update_data).map_err(|e| {
372            StoreError::from(YDocError::InvalidData {
373                reason: format!("Failed to decode Y-CRDT update: {e}"),
374            })
375        })?;
376
377        {
378            let mut txn = doc.transact_mut();
379            txn.apply_update(update).map_err(|e| {
380                StoreError::from(YDocError::Operation {
381                    operation: "apply_update".to_string(),
382                    reason: format!("Failed to apply Y-CRDT update: {e}"),
383                })
384            })?;
385        }
386
387        self.save_doc(&doc).await
388    }
389
390    /// Gets the current state of the document as a binary update.
391    ///
392    /// This method encodes the complete current document state as a Y-CRDT binary
393    /// update that can be used to synchronize with other instances or persist
394    /// the current state.
395    ///
396    /// ## Use Cases
397    /// - Synchronizing document state with other instances
398    /// - Creating snapshots of the current state
399    /// - Sharing the complete document with new collaborators
400    ///
401    /// ## Returns
402    /// A `Result` containing the binary update data representing the full document state.
403    ///
404    /// ## Performance
405    /// This method constructs the full document state, so it may be expensive for
406    /// large documents. For incremental synchronization, consider using the
407    /// differential updates automatically saved by `with_doc_mut()`.
408    pub async fn get_update(&self) -> Result<Vec<u8>> {
409        let doc = self.doc().await?;
410        let txn = doc.transact();
411        let update = txn.encode_state_as_update_v1(&yrs::StateVector::default());
412        Ok(update)
413    }
414
415    /// Saves the complete document state to the transaction.
416    ///
417    /// This method captures the entire current state of the document and stages it
418    /// in the transaction. Unlike `save_doc()`, this saves the full document
419    /// state rather than just the incremental changes.
420    ///
421    /// ## When to Use
422    /// - When you need to ensure the complete state is captured
423    /// - For creating clean snapshots without incremental history
424    /// - When differential saving is not suitable for your use case
425    ///
426    /// ## Performance Impact
427    /// This method is less storage-efficient than `save_doc()` as it saves the
428    /// complete document state regardless of what changes were made.
429    ///
430    /// ## Arguments
431    /// * `doc` - The Y-CRDT document to save
432    ///
433    /// ## Returns
434    /// A `Result<()>` indicating success or failure.
435    pub async fn save_doc_full(&self, doc: &Doc) -> Result<()> {
436        let txn = doc.transact();
437        let update = txn.encode_state_as_update_v1(&yrs::StateVector::default());
438
439        let yrs_binary = YrsBinary::new(update);
440        let serialized = serde_json::to_string(&yrs_binary)?;
441        self.txn.update_subtree(&self.name, &serialized).await
442    }
443
444    /// Saves the document state using efficient differential encoding.
445    ///
446    /// This method captures only the changes since the current backend state and
447    /// stages them in the transaction. This is the preferred saving method
448    /// as it significantly reduces storage overhead for incremental changes.
449    ///
450    /// ## Differential Encoding
451    ///
452    /// The method works by:
453    /// 1. Getting the current backend state vector (cached for efficiency)
454    /// 2. Encoding only the changes since that state
455    /// 3. Saving only the incremental diff, not the full document
456    ///
457    /// ## Storage Efficiency
458    ///
459    /// For a document with small incremental changes, this can reduce storage
460    /// requirements by orders of magnitude compared to saving full snapshots.
461    ///
462    /// ## Arguments
463    /// * `doc` - The Y-CRDT document to save differentially
464    ///
465    /// ## Returns
466    /// A `Result<()>` indicating success or failure.
467    ///
468    /// ## Performance
469    /// This method is optimized for performance - the expensive backend state
470    /// retrieval is cached, and only minimal diff calculation is performed.
471    pub async fn save_doc(&self, doc: &Doc) -> Result<()> {
472        let txn = doc.transact();
473
474        // Get the backend state vector efficiently
475        let backend_state_vector = self.get_initial_state_vector().await?;
476
477        // Encode only the changes since the backend state
478        let diff_update = txn.encode_state_as_update_v1(&backend_state_vector);
479
480        // Only save if there are actual changes
481        if !diff_update.is_empty() {
482            let yrs_binary = YrsBinary::new(diff_update);
483            let serialized = serde_json::to_string(&yrs_binary)?;
484            self.txn.update_subtree(&self.name, &serialized).await?;
485        }
486
487        Ok(())
488    }
489
490    /// Gets the state vector of the backend data efficiently without constructing the full document.
491    ///
492    /// This method extracts just the state vector from the cached backend data,
493    /// which is used for efficient differential encoding. The state vector represents
494    /// the "version" information for each client that has contributed to the document.
495    ///
496    /// ## Implementation
497    ///
498    /// Rather than constructing the full document just to get the state vector,
499    /// this method creates a minimal temporary document, applies the backend data,
500    /// and extracts only the state vector information.
501    ///
502    /// ## Caching
503    ///
504    /// This method leverages the cached backend data, so the expensive `get_full_state()`
505    /// operation is only performed once per `YDoc` instance.
506    ///
507    /// ## Returns
508    /// A `Result` containing the state vector representing the backend document state.
509    ///
510    /// ## Errors
511    /// Returns an error if the cached backend data cannot be decoded or applied.
512    async fn get_initial_state_vector(&self) -> Result<yrs::StateVector> {
513        // Get the cached backend data
514        let backend_data = self.get_cached_backend_data().await?;
515
516        if backend_data.is_empty() {
517            return Ok(yrs::StateVector::default());
518        }
519
520        // Construct a temporary document to extract the state vector
521        let temp_doc = Doc::new();
522        let backend_update = Update::decode_v1(backend_data.as_bytes()).map_err(|e| {
523            StoreError::from(YDocError::InvalidData {
524                reason: format!("Failed to decode backend Y-CRDT update: {e}"),
525            })
526        })?;
527        let mut temp_txn = temp_doc.transact_mut();
528        temp_txn.apply_update(backend_update).map_err(|e| {
529            StoreError::from(YDocError::Operation {
530                operation: "get_initial_state_vector".to_string(),
531                reason: format!("Failed to apply backend Y-CRDT update: {e}"),
532            })
533        })?;
534        drop(temp_txn);
535        let temp_txn = temp_doc.transact();
536        Ok(temp_txn.state_vector())
537    }
538
539    /// Constructs a Y-CRDT document from the cached backend data.
540    ///
541    /// This method creates a fresh document instance and applies the historical
542    /// backend state to it. Each call returns a new document instance to ensure
543    /// proper isolation between different operations and viewers.
544    ///
545    /// ## Caching Strategy
546    ///
547    /// The expensive `get_full_state()` operation is cached, but documents are
548    /// constructed fresh each time. This balances performance (avoiding expensive
549    /// I/O) with safety (ensuring document isolation).
550    ///
551    /// ## Returns
552    /// A `Result` containing a new `Doc` instance with the backend state applied.
553    ///
554    /// ## Errors
555    /// Returns an error if the cached backend data cannot be decoded or applied.
556    async fn get_initial_doc(&self) -> Result<Doc> {
557        // Get the cached backend data
558        let backend_data = self.get_cached_backend_data().await?;
559
560        // Create a new doc and apply backend data if it exists
561        let doc = Doc::new();
562        if !backend_data.is_empty() {
563            let update = Update::decode_v1(backend_data.as_bytes()).map_err(|e| {
564                StoreError::from(YDocError::InvalidData {
565                    reason: format!("Failed to decode Y-CRDT update: {e}"),
566                })
567            })?;
568
569            let mut txn = doc.transact_mut();
570            txn.apply_update(update).map_err(|e| {
571                StoreError::from(YDocError::Operation {
572                    operation: "get_initial_doc".to_string(),
573                    reason: format!("Failed to apply Y-CRDT update from backend: {e}"),
574                })
575            })?;
576        }
577
578        Ok(doc)
579    }
580
581    /// Retrieves backend data with caching to avoid expensive repeated `get_full_state()` calls.
582    ///
583    /// This is the core caching mechanism for `YDoc`. The first call performs the
584    /// expensive `txn.get_full_state()` operation and caches the result. All
585    /// subsequent calls return the cached data immediately.
586    ///
587    /// ## Performance Impact
588    ///
589    /// The `get_full_state()` operation can be expensive as it involves reading and
590    /// merging potentially large amounts of historical data from the backend storage.
591    /// By caching this data, we avoid repeating this expensive operation multiple times
592    /// within the same transaction scope.
593    ///
594    /// ## Cache Lifetime
595    ///
596    /// The cache is tied to the lifetime of the `YDoc` instance, which typically
597    /// corresponds to a single transaction. This ensures that:
598    /// - Data is cached for the duration of the transaction
599    /// - Fresh data is loaded for each new transaction
600    /// - Memory usage is bounded to the transaction scope
601    ///
602    /// ## Returns
603    /// A `Result` containing the cached `YrsBinary` backend data.
604    ///
605    /// ## Errors
606    /// Returns an error if the backend data cannot be retrieved or deserialized.
607    async fn get_cached_backend_data(&self) -> Result<YrsBinary> {
608        // Check if we already have the backend data cached
609        if let Some(backend_data) = self.cached_backend_data.lock().unwrap().as_ref() {
610            return Ok(backend_data.clone());
611        }
612
613        // Perform the expensive operation once
614        let backend_data = self.txn.get_full_state::<YrsBinary>(&self.name).await?;
615
616        // Cache it for future use
617        *self.cached_backend_data.lock().unwrap() = Some(backend_data.clone());
618
619        Ok(backend_data)
620    }
621}