eidetica/store/ydoc.rs
1//! Y-CRDT integration for Eidetica
2//!
3//! This module provides seamless integration between Eidetica's transaction system
4//! and Y-CRDT (Yjs) for real-time collaborative editing. The main component is `YDoc`,
5//! which implements differential saving to minimize storage overhead while maintaining
6//! full compatibility with Y-CRDT's conflict resolution algorithms.
7//!
8//! # Key Features
9//!
10//! - **Differential Saving**: Only stores incremental changes, not full document state
11//! - **Efficient Caching**: Caches expensive backend data retrieval operations
12//! - **Seamless Integration**: Works with Eidetica's transaction and viewer model
13//! - **Full Y-CRDT API**: Exposes the complete yrs library functionality
14//!
15//! # Performance Considerations
16//!
17//! The implementation caches the expensive `get_full_state()` backend operation and
18//! constructs documents and state vectors on-demand from this cached data. This
19//! approach minimizes both I/O overhead and memory usage.
20//!
21//! This module is only available when the "y-crdt" feature is enabled.
22
23use std::sync::Mutex;
24
25use async_trait::async_trait;
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28use yrs::{Doc, ReadTxn, Transact, Update, updates::decoder::Decode};
29
30use crate::{
31 Result, Store, Transaction,
32 crdt::{CRDT, Data},
33 store::{Registered, errors::StoreError},
34};
35
36/// Errors specific to Y-CRDT operations
37#[derive(Debug, Error)]
38pub enum YDocError {
39 /// Y-CRDT operation failed
40 #[error("Y-CRDT operation failed: {operation} - {reason}")]
41 Operation { operation: String, reason: String },
42
43 /// Y-CRDT binary data is invalid
44 #[error("Invalid Y-CRDT binary data: {reason}")]
45 InvalidData { reason: String },
46
47 /// Y-CRDT merge operation failed
48 #[error("Y-CRDT merge failed: {reason}")]
49 Merge { reason: String },
50}
51
52impl From<YDocError> for StoreError {
53 fn from(err: YDocError) -> Self {
54 StoreError::ImplementationError {
55 store: "YDoc".to_string(),
56 reason: err.to_string(),
57 }
58 }
59}
60
61/// A CRDT wrapper for Y-CRDT binary update data.
62///
63/// This wrapper implements the required `Data` and `CRDT` traits to allow
64/// Y-CRDT binary updates to be stored and merged within the Eidetica system.
65///
66/// ## Design
67///
68/// Y-CRDT represents document state as binary updates that can be efficiently
69/// merged and applied. This wrapper enables these binary updates to participate
70/// in Eidetica's CRDT-based data storage and synchronization system.
71///
72/// ## Merging Strategy
73///
74/// When two `YrsBinary` instances are merged, both updates are applied to a new
75/// Y-CRDT document, and the resulting merged state is returned as a new binary
76/// update. This ensures that Y-CRDT's sophisticated conflict resolution algorithms
77/// are preserved within Eidetica's merge operations.
78#[derive(Default, Debug, Clone, Serialize, Deserialize)]
79pub struct YrsBinary {
80 data: Vec<u8>,
81}
82
83impl Data for YrsBinary {}
84
85impl CRDT for YrsBinary {
86 /// Merges two Y-CRDT binary updates by applying both to a new document
87 /// and returning the resulting state as a binary update.
88 fn merge(&self, other: &Self) -> Result<Self> {
89 let doc = Doc::new();
90
91 // Apply self's update if not empty
92 if !self.data.is_empty() {
93 let update = Update::decode_v1(&self.data).map_err(|e| {
94 StoreError::from(YDocError::InvalidData {
95 reason: format!("Failed to decode Y-CRDT update (self): {e}"),
96 })
97 })?;
98 let mut txn = doc.transact_mut();
99 txn.apply_update(update).map_err(|e| {
100 StoreError::from(YDocError::Merge {
101 reason: format!("Failed to apply Y-CRDT update (self): {e}"),
102 })
103 })?;
104 }
105
106 // Apply other's update if not empty
107 if !other.data.is_empty() {
108 let other_update = Update::decode_v1(&other.data).map_err(|e| {
109 StoreError::from(YDocError::InvalidData {
110 reason: format!("Failed to decode Y-CRDT update (other): {e}"),
111 })
112 })?;
113 let mut txn = doc.transact_mut();
114 txn.apply_update(other_update).map_err(|e| {
115 StoreError::from(YDocError::Merge {
116 reason: format!("Failed to apply Y-CRDT update (other): {e}"),
117 })
118 })?;
119 }
120
121 // Return the merged state as a binary update
122 let txn = doc.transact();
123 let merged_update = txn.encode_state_as_update_v1(&yrs::StateVector::default());
124
125 Ok(YrsBinary {
126 data: merged_update,
127 })
128 }
129}
130
131impl YrsBinary {
132 pub fn new(data: Vec<u8>) -> Self {
133 Self { data }
134 }
135
136 pub fn as_bytes(&self) -> &[u8] {
137 &self.data
138 }
139
140 pub fn is_empty(&self) -> bool {
141 self.data.is_empty()
142 }
143}
144
145/// A Y-CRDT based Store implementation with efficient differential saving.
146///
147/// `YDoc` provides a CRDT-based storage abstraction using the yrs library,
148/// which is a Rust port of Yjs. This allows for real-time collaborative editing
149/// and automatic conflict resolution through the Y-CRDT algorithms.
150///
151/// ## Architecture
152///
153/// The `YDoc` integrates with Eidetica's transaction system to provide:
154/// - **Differential Updates**: Only saves incremental changes, not full document state
155/// - **Efficient Caching**: Caches expensive backend data retrieval operations
156/// - **Transaction/Viewer Model**: Compatible with Eidetica's transaction patterns
157/// - **Full Y-CRDT API**: Direct access to the complete yrs library functionality
158///
159/// ## Caching Strategy
160///
161/// To optimize performance, `YDoc` caches the expensive `get_full_state()` operation
162/// from the backend and constructs documents and state vectors on-demand from this
163/// cached data. This approach minimizes I/O operations while keeping memory usage low.
164///
165/// ## Differential Saving
166///
167/// When saving documents, `YDoc` calculates diffs relative to the current backend
168/// state rather than saving full document snapshots. This significantly reduces storage
169/// overhead for large documents with incremental changes.
170///
171/// ## Usage
172///
173/// The `YDoc` exposes the underlying Y-CRDT document directly, allowing users
174/// to work with the full yrs API. Changes are automatically captured and stored
175/// when the transaction is committed.
176///
177/// ```rust,no_run
178/// use eidetica::store::YDoc;
179/// use yrs::{Map, Text, Transact};
180/// # use eidetica::Result;
181/// # async fn example(store: &YDoc) -> Result<()> {
182/// // Work directly with the yrs document
183/// store.with_doc_mut(|doc| {
184/// let map = doc.get_or_insert_map("root");
185/// let text = doc.get_or_insert_text("document");
186///
187/// let mut txn = doc.transact_mut();
188/// map.insert(&mut txn, "key", "value");
189/// text.insert(&mut txn, 0, "Hello, World!");
190///
191/// Ok(())
192/// }).await?;
193/// # Ok(())
194/// # }
195/// ```
196pub struct YDoc {
197 /// The name identifier for this subtree within the transaction
198 name: String,
199 /// Reference to the Transaction for data access
200 txn: Transaction,
201 /// Cached backend data to avoid expensive get_full_state() calls
202 /// This contains the merged historical state as Y-CRDT binary data
203 cached_backend_data: Mutex<Option<YrsBinary>>,
204}
205
206impl Registered for YDoc {
207 fn type_id() -> &'static str {
208 "ydoc:v0"
209 }
210}
211
212#[async_trait]
213impl Store for YDoc {
214 type Data = YrsBinary;
215
216 async fn new(txn: &Transaction, subtree_name: String) -> Result<Self> {
217 Ok(Self {
218 name: subtree_name,
219 txn: txn.clone(),
220 cached_backend_data: Mutex::new(None),
221 })
222 }
223
224 fn name(&self) -> &str {
225 &self.name
226 }
227
228 fn transaction(&self) -> &Transaction {
229 &self.txn
230 }
231}
232
233impl YDoc {
234 /// Gets the current Y-CRDT document, merging all historical state.
235 ///
236 /// This method reconstructs the current state of the Y-CRDT document by:
237 /// 1. Loading the full historical state from the backend (cached)
238 /// 2. Applying any local changes from the current transaction
239 /// 3. Returning a Y-Doc that can be used for reading and further modifications
240 ///
241 /// ## Performance
242 ///
243 /// The expensive backend data retrieval is cached, so subsequent calls are fast.
244 /// Documents are constructed fresh each time to ensure isolation between operations.
245 ///
246 /// ## Returns
247 /// A `Result` containing the merged `Doc` (Y-CRDT document).
248 ///
249 /// ## Errors
250 /// Returns an error if there are issues deserializing the Y-CRDT updates.
251 pub async fn doc(&self) -> Result<Doc> {
252 let doc = self.get_initial_doc().await?;
253
254 // Apply local changes if they exist
255 if let Some(local_data) = self.local_data()?
256 && !local_data.is_empty()
257 {
258 let local_update = Update::decode_v1(local_data.as_bytes()).map_err(|e| {
259 StoreError::from(YDocError::InvalidData {
260 reason: format!("Failed to decode local Y-CRDT update: {e}"),
261 })
262 })?;
263
264 let mut txn = doc.transact_mut();
265 txn.apply_update(local_update).map_err(|e| {
266 StoreError::from(YDocError::Operation {
267 operation: "apply_local_update".to_string(),
268 reason: format!("Failed to apply local Y-CRDT update: {e}"),
269 })
270 })?;
271 }
272
273 Ok(doc)
274 }
275
276 /// Executes a function with read-only access to the Y-Doc.
277 ///
278 /// This method provides access to the current state of the document
279 /// for read-only operations. No changes are persisted.
280 ///
281 /// ## Arguments
282 /// * `f` - A function that receives the Y-Doc for reading
283 ///
284 /// ## Returns
285 /// A `Result` containing the return value of the function.
286 ///
287 /// ## Example
288 /// ```rust,no_run
289 /// # use eidetica::Result;
290 /// # use yrs::{Transact, GetString};
291 /// # async fn example(store: &eidetica::store::YDoc) -> Result<()> {
292 /// let content = store.with_doc(|doc| {
293 /// let text = doc.get_or_insert_text("document");
294 /// let txn = doc.transact();
295 /// Ok(text.get_string(&txn))
296 /// }).await?;
297 /// # Ok(())
298 /// # }
299 /// ```
300 pub async fn with_doc<F, R>(&self, f: F) -> Result<R>
301 where
302 F: FnOnce(&Doc) -> Result<R>,
303 {
304 let doc = self.doc().await?;
305 f(&doc)
306 }
307
308 /// Executes a function with access to the Y-Doc and automatically saves changes.
309 ///
310 /// This is the preferred way to make changes to the document as it
311 /// ensures all changes are captured using differential saving and staged
312 /// in the transaction for later commit.
313 ///
314 /// ## Differential Saving
315 ///
316 /// Changes are saved as diffs relative to the current backend state, which
317 /// significantly reduces storage overhead compared to saving full document
318 /// snapshots.
319 ///
320 /// ## Arguments
321 /// * `f` - A function that receives the Y-Doc and can make modifications
322 ///
323 /// ## Returns
324 /// A `Result` containing the return value of the function.
325 ///
326 /// ## Example
327 /// ```rust,no_run
328 /// # use eidetica::Result;
329 /// # use yrs::{Transact, Text};
330 /// # async fn example(store: &eidetica::store::YDoc) -> Result<()> {
331 /// store.with_doc_mut(|doc| {
332 /// let text = doc.get_or_insert_text("document");
333 /// let mut txn = doc.transact_mut();
334 /// text.insert(&mut txn, 0, "Hello, World!");
335 /// Ok(())
336 /// }).await?;
337 /// # Ok(())
338 /// # }
339 /// ```
340 pub async fn with_doc_mut<F, R>(&self, f: F) -> Result<R>
341 where
342 F: FnOnce(&Doc) -> Result<R>,
343 {
344 let doc = self.doc().await?;
345 let result = f(&doc)?;
346 self.save_doc(&doc).await?;
347 Ok(result)
348 }
349
350 /// Applies a Y-CRDT update to the document.
351 ///
352 /// This method is useful for receiving updates from other collaborators or
353 /// applying updates received through a network provider. The update is applied
354 /// to the current document state and saved using differential saving.
355 ///
356 /// ## Use Cases
357 /// - Applying updates from remote collaborators
358 /// - Synchronizing with external Y-CRDT instances
359 /// - Replaying historical updates
360 ///
361 /// ## Arguments
362 /// * `update_data` - The binary Y-CRDT update data
363 ///
364 /// ## Returns
365 /// A `Result<()>` indicating success or failure.
366 ///
367 /// ## Errors
368 /// Returns an error if the update data is malformed or cannot be applied.
369 pub async fn apply_update(&self, update_data: &[u8]) -> Result<()> {
370 let doc = self.doc().await?;
371 let update = Update::decode_v1(update_data).map_err(|e| {
372 StoreError::from(YDocError::InvalidData {
373 reason: format!("Failed to decode Y-CRDT update: {e}"),
374 })
375 })?;
376
377 {
378 let mut txn = doc.transact_mut();
379 txn.apply_update(update).map_err(|e| {
380 StoreError::from(YDocError::Operation {
381 operation: "apply_update".to_string(),
382 reason: format!("Failed to apply Y-CRDT update: {e}"),
383 })
384 })?;
385 }
386
387 self.save_doc(&doc).await
388 }
389
390 /// Gets the current state of the document as a binary update.
391 ///
392 /// This method encodes the complete current document state as a Y-CRDT binary
393 /// update that can be used to synchronize with other instances or persist
394 /// the current state.
395 ///
396 /// ## Use Cases
397 /// - Synchronizing document state with other instances
398 /// - Creating snapshots of the current state
399 /// - Sharing the complete document with new collaborators
400 ///
401 /// ## Returns
402 /// A `Result` containing the binary update data representing the full document state.
403 ///
404 /// ## Performance
405 /// This method constructs the full document state, so it may be expensive for
406 /// large documents. For incremental synchronization, consider using the
407 /// differential updates automatically saved by `with_doc_mut()`.
408 pub async fn get_update(&self) -> Result<Vec<u8>> {
409 let doc = self.doc().await?;
410 let txn = doc.transact();
411 let update = txn.encode_state_as_update_v1(&yrs::StateVector::default());
412 Ok(update)
413 }
414
415 /// Saves the complete document state to the transaction.
416 ///
417 /// This method captures the entire current state of the document and stages it
418 /// in the transaction. Unlike `save_doc()`, this saves the full document
419 /// state rather than just the incremental changes.
420 ///
421 /// ## When to Use
422 /// - When you need to ensure the complete state is captured
423 /// - For creating clean snapshots without incremental history
424 /// - When differential saving is not suitable for your use case
425 ///
426 /// ## Performance Impact
427 /// This method is less storage-efficient than `save_doc()` as it saves the
428 /// complete document state regardless of what changes were made.
429 ///
430 /// ## Arguments
431 /// * `doc` - The Y-CRDT document to save
432 ///
433 /// ## Returns
434 /// A `Result<()>` indicating success or failure.
435 pub async fn save_doc_full(&self, doc: &Doc) -> Result<()> {
436 let txn = doc.transact();
437 let update = txn.encode_state_as_update_v1(&yrs::StateVector::default());
438
439 let yrs_binary = YrsBinary::new(update);
440 let serialized = serde_json::to_string(&yrs_binary)?;
441 self.txn.update_subtree(&self.name, &serialized).await
442 }
443
444 /// Saves the document state using efficient differential encoding.
445 ///
446 /// This method captures only the changes since the current backend state and
447 /// stages them in the transaction. This is the preferred saving method
448 /// as it significantly reduces storage overhead for incremental changes.
449 ///
450 /// ## Differential Encoding
451 ///
452 /// The method works by:
453 /// 1. Getting the current backend state vector (cached for efficiency)
454 /// 2. Encoding only the changes since that state
455 /// 3. Saving only the incremental diff, not the full document
456 ///
457 /// ## Storage Efficiency
458 ///
459 /// For a document with small incremental changes, this can reduce storage
460 /// requirements by orders of magnitude compared to saving full snapshots.
461 ///
462 /// ## Arguments
463 /// * `doc` - The Y-CRDT document to save differentially
464 ///
465 /// ## Returns
466 /// A `Result<()>` indicating success or failure.
467 ///
468 /// ## Performance
469 /// This method is optimized for performance - the expensive backend state
470 /// retrieval is cached, and only minimal diff calculation is performed.
471 pub async fn save_doc(&self, doc: &Doc) -> Result<()> {
472 let txn = doc.transact();
473
474 // Get the backend state vector efficiently
475 let backend_state_vector = self.get_initial_state_vector().await?;
476
477 // Encode only the changes since the backend state
478 let diff_update = txn.encode_state_as_update_v1(&backend_state_vector);
479
480 // Only save if there are actual changes
481 if !diff_update.is_empty() {
482 let yrs_binary = YrsBinary::new(diff_update);
483 let serialized = serde_json::to_string(&yrs_binary)?;
484 self.txn.update_subtree(&self.name, &serialized).await?;
485 }
486
487 Ok(())
488 }
489
490 /// Gets the state vector of the backend data efficiently without constructing the full document.
491 ///
492 /// This method extracts just the state vector from the cached backend data,
493 /// which is used for efficient differential encoding. The state vector represents
494 /// the "version" information for each client that has contributed to the document.
495 ///
496 /// ## Implementation
497 ///
498 /// Rather than constructing the full document just to get the state vector,
499 /// this method creates a minimal temporary document, applies the backend data,
500 /// and extracts only the state vector information.
501 ///
502 /// ## Caching
503 ///
504 /// This method leverages the cached backend data, so the expensive `get_full_state()`
505 /// operation is only performed once per `YDoc` instance.
506 ///
507 /// ## Returns
508 /// A `Result` containing the state vector representing the backend document state.
509 ///
510 /// ## Errors
511 /// Returns an error if the cached backend data cannot be decoded or applied.
512 async fn get_initial_state_vector(&self) -> Result<yrs::StateVector> {
513 // Get the cached backend data
514 let backend_data = self.get_cached_backend_data().await?;
515
516 if backend_data.is_empty() {
517 return Ok(yrs::StateVector::default());
518 }
519
520 // Construct a temporary document to extract the state vector
521 let temp_doc = Doc::new();
522 let backend_update = Update::decode_v1(backend_data.as_bytes()).map_err(|e| {
523 StoreError::from(YDocError::InvalidData {
524 reason: format!("Failed to decode backend Y-CRDT update: {e}"),
525 })
526 })?;
527 let mut temp_txn = temp_doc.transact_mut();
528 temp_txn.apply_update(backend_update).map_err(|e| {
529 StoreError::from(YDocError::Operation {
530 operation: "get_initial_state_vector".to_string(),
531 reason: format!("Failed to apply backend Y-CRDT update: {e}"),
532 })
533 })?;
534 drop(temp_txn);
535 let temp_txn = temp_doc.transact();
536 Ok(temp_txn.state_vector())
537 }
538
539 /// Constructs a Y-CRDT document from the cached backend data.
540 ///
541 /// This method creates a fresh document instance and applies the historical
542 /// backend state to it. Each call returns a new document instance to ensure
543 /// proper isolation between different operations and viewers.
544 ///
545 /// ## Caching Strategy
546 ///
547 /// The expensive `get_full_state()` operation is cached, but documents are
548 /// constructed fresh each time. This balances performance (avoiding expensive
549 /// I/O) with safety (ensuring document isolation).
550 ///
551 /// ## Returns
552 /// A `Result` containing a new `Doc` instance with the backend state applied.
553 ///
554 /// ## Errors
555 /// Returns an error if the cached backend data cannot be decoded or applied.
556 async fn get_initial_doc(&self) -> Result<Doc> {
557 // Get the cached backend data
558 let backend_data = self.get_cached_backend_data().await?;
559
560 // Create a new doc and apply backend data if it exists
561 let doc = Doc::new();
562 if !backend_data.is_empty() {
563 let update = Update::decode_v1(backend_data.as_bytes()).map_err(|e| {
564 StoreError::from(YDocError::InvalidData {
565 reason: format!("Failed to decode Y-CRDT update: {e}"),
566 })
567 })?;
568
569 let mut txn = doc.transact_mut();
570 txn.apply_update(update).map_err(|e| {
571 StoreError::from(YDocError::Operation {
572 operation: "get_initial_doc".to_string(),
573 reason: format!("Failed to apply Y-CRDT update from backend: {e}"),
574 })
575 })?;
576 }
577
578 Ok(doc)
579 }
580
581 /// Retrieves backend data with caching to avoid expensive repeated `get_full_state()` calls.
582 ///
583 /// This is the core caching mechanism for `YDoc`. The first call performs the
584 /// expensive `txn.get_full_state()` operation and caches the result. All
585 /// subsequent calls return the cached data immediately.
586 ///
587 /// ## Performance Impact
588 ///
589 /// The `get_full_state()` operation can be expensive as it involves reading and
590 /// merging potentially large amounts of historical data from the backend storage.
591 /// By caching this data, we avoid repeating this expensive operation multiple times
592 /// within the same transaction scope.
593 ///
594 /// ## Cache Lifetime
595 ///
596 /// The cache is tied to the lifetime of the `YDoc` instance, which typically
597 /// corresponds to a single transaction. This ensures that:
598 /// - Data is cached for the duration of the transaction
599 /// - Fresh data is loaded for each new transaction
600 /// - Memory usage is bounded to the transaction scope
601 ///
602 /// ## Returns
603 /// A `Result` containing the cached `YrsBinary` backend data.
604 ///
605 /// ## Errors
606 /// Returns an error if the backend data cannot be retrieved or deserialized.
607 async fn get_cached_backend_data(&self) -> Result<YrsBinary> {
608 // Check if we already have the backend data cached
609 if let Some(backend_data) = self.cached_backend_data.lock().unwrap().as_ref() {
610 return Ok(backend_data.clone());
611 }
612
613 // Perform the expensive operation once
614 let backend_data = self.txn.get_full_state::<YrsBinary>(&self.name).await?;
615
616 // Cache it for future use
617 *self.cached_backend_data.lock().unwrap() = Some(backend_data.clone());
618
619 Ok(backend_data)
620 }
621}