Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/sync/
state.rs

1//! Sync state tracking for managing synchronization progress and metadata.
2//!
3//! This module provides structures and functionality for tracking sync state
4//! between peers, including sync cursors, metadata, and history.
5
6use serde::{Deserialize, Serialize};
7
8use crate::{
9    Error, Result, Transaction,
10    auth::crypto::PublicKey,
11    clock::Clock,
12    crdt::doc::{Value, path},
13    entry::ID,
14    store::{DocStore, StoreError},
15};
16
17/// Tracks the synchronization position for a specific peer-tree relationship.
18///
19/// A sync cursor represents how far synchronization has progressed between
20/// this database and a specific peer for a specific tree.
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
22pub struct SyncCursor {
23    /// The peer's public key
24    pub peer_pubkey: PublicKey,
25    /// The tree ID this cursor applies to
26    pub tree_id: ID,
27    /// The last entry ID that was successfully synchronized
28    pub last_synced_entry: Option<ID>,
29    /// Timestamp of the last successful sync
30    pub last_sync_time: String,
31    /// Number of entries synchronized in the last session
32    pub last_sync_count: u64,
33    /// Total number of entries synchronized with this peer for this tree
34    pub total_synced_count: u64,
35}
36
37impl SyncCursor {
38    /// Create a new sync cursor for a peer-tree relationship.
39    ///
40    /// # Arguments
41    /// * `peer_pubkey` - The peer's public key
42    /// * `tree_id` - The tree ID this cursor applies to
43    /// * `clock` - The time provider for timestamps
44    pub fn new(peer_pubkey: PublicKey, tree_id: ID, clock: &dyn Clock) -> Self {
45        Self {
46            peer_pubkey,
47            tree_id,
48            last_synced_entry: None,
49            last_sync_time: clock.now_rfc3339(),
50            last_sync_count: 0,
51            total_synced_count: 0,
52        }
53    }
54
55    /// Update the cursor with a successful sync operation.
56    ///
57    /// # Arguments
58    /// * `last_entry` - The ID of the last synced entry
59    /// * `count` - Number of entries synced in this operation
60    /// * `clock` - The time provider for timestamps
61    pub fn update_sync(&mut self, last_entry: ID, count: u64, clock: &dyn Clock) {
62        self.last_synced_entry = Some(last_entry);
63        self.last_sync_time = clock.now_rfc3339();
64        self.last_sync_count = count;
65        self.total_synced_count += count;
66    }
67
68    /// Check if this cursor has any sync history.
69    pub fn has_sync_history(&self) -> bool {
70        self.last_synced_entry.is_some()
71    }
72}
73
74/// Metadata about synchronization operations for a peer.
75///
76/// This tracks overall sync statistics and health information for a peer
77/// relationship across all trees.
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79pub struct SyncMetadata {
80    /// The peer's public key
81    pub peer_pubkey: PublicKey,
82    /// Timestamp when sync relationship was first established
83    pub sync_established: String,
84    /// Timestamp of the last sync attempt (successful or failed)
85    pub last_sync_attempt: String,
86    /// Timestamp of the last successful sync
87    pub last_successful_sync: Option<String>,
88    /// Total number of successful sync operations
89    pub successful_sync_count: u64,
90    /// Total number of failed sync operations
91    pub failed_sync_count: u64,
92    /// Total number of entries synchronized
93    pub total_entries_synced: u64,
94    /// Estimated total bytes synchronized
95    pub total_bytes_synced: u64,
96    /// Average sync duration in milliseconds
97    pub average_sync_duration_ms: f64,
98    /// List of trees being synchronized with this peer
99    pub synced_trees: Vec<ID>,
100}
101
102impl SyncMetadata {
103    /// Create new sync metadata for a peer.
104    ///
105    /// # Arguments
106    /// * `peer_pubkey` - The peer's public key
107    /// * `clock` - The time provider for timestamps
108    pub fn new(peer_pubkey: PublicKey, clock: &dyn Clock) -> Self {
109        let now = clock.now_rfc3339();
110        Self {
111            peer_pubkey,
112            sync_established: now.clone(),
113            last_sync_attempt: now,
114            last_successful_sync: None,
115            successful_sync_count: 0,
116            failed_sync_count: 0,
117            total_entries_synced: 0,
118            total_bytes_synced: 0,
119            average_sync_duration_ms: 0.0,
120            synced_trees: Vec::new(),
121        }
122    }
123
124    /// Record a successful sync operation.
125    ///
126    /// # Arguments
127    /// * `entries_count` - Number of entries synced
128    /// * `bytes` - Estimated bytes transferred
129    /// * `duration_ms` - Duration of sync in milliseconds
130    /// * `clock` - The time provider for timestamps
131    pub fn record_successful_sync(
132        &mut self,
133        entries_count: u64,
134        bytes: u64,
135        duration_ms: f64,
136        clock: &dyn Clock,
137    ) {
138        let now = clock.now_rfc3339();
139        self.last_sync_attempt = now.clone();
140        self.last_successful_sync = Some(now);
141        self.successful_sync_count += 1;
142        self.total_entries_synced += entries_count;
143        self.total_bytes_synced += bytes;
144
145        // Update average duration (simple running average)
146        let total_ops = self.successful_sync_count as f64;
147        self.average_sync_duration_ms =
148            (self.average_sync_duration_ms * (total_ops - 1.0) + duration_ms) / total_ops;
149    }
150
151    /// Record a failed sync operation.
152    ///
153    /// # Arguments
154    /// * `clock` - The time provider for timestamps
155    pub fn record_failed_sync(&mut self, clock: &dyn Clock) {
156        self.last_sync_attempt = clock.now_rfc3339();
157        self.failed_sync_count += 1;
158    }
159
160    /// Add a tree to the list of synced trees if not already present.
161    pub fn add_synced_tree(&mut self, tree_id: ID) {
162        if !self.synced_trees.contains(&tree_id) {
163            self.synced_trees.push(tree_id);
164        }
165    }
166
167    /// Remove a tree from the list of synced trees.
168    pub fn remove_synced_tree(&mut self, tree_id: &ID) {
169        self.synced_trees.retain(|id| id != tree_id);
170    }
171
172    /// Calculate the success rate of sync operations.
173    pub fn sync_success_rate(&self) -> f64 {
174        let total = self.successful_sync_count + self.failed_sync_count;
175        if total == 0 {
176            0.0
177        } else {
178            self.successful_sync_count as f64 / total as f64
179        }
180    }
181}
182
183/// Record of a single sync operation for audit and debugging purposes.
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
185pub struct SyncHistoryEntry {
186    /// Unique ID for this sync operation
187    pub sync_id: String,
188    /// The peer involved in this sync
189    pub peer_pubkey: PublicKey,
190    /// The tree that was synchronized
191    pub tree_id: ID,
192    /// Timestamp when sync started
193    pub started_at: String,
194    /// Timestamp when sync completed (or failed)
195    pub completed_at: String,
196    /// Whether the sync was successful
197    pub success: bool,
198    /// Number of entries synchronized
199    pub entries_count: u64,
200    /// Estimated bytes transferred
201    pub bytes_transferred: u64,
202    /// Duration in milliseconds
203    pub duration_ms: f64,
204    /// Error message if sync failed
205    pub error_message: Option<String>,
206}
207
208impl SyncHistoryEntry {
209    /// Create a new sync history entry.
210    ///
211    /// # Arguments
212    /// * `peer_pubkey` - The peer's public key
213    /// * `tree_id` - The tree that was synchronized
214    /// * `clock` - The time provider for timestamps
215    pub fn new(peer_pubkey: PublicKey, tree_id: ID, clock: &dyn Clock) -> Self {
216        let now = clock.now_rfc3339();
217        Self {
218            sync_id: uuid::Uuid::new_v4().to_string(),
219            peer_pubkey,
220            tree_id,
221            started_at: now.clone(),
222            completed_at: now,
223            success: false,
224            entries_count: 0,
225            bytes_transferred: 0,
226            duration_ms: 0.0,
227            error_message: None,
228        }
229    }
230
231    /// Mark the sync as completed successfully.
232    ///
233    /// # Arguments
234    /// * `entries_count` - Number of entries synced
235    /// * `bytes` - Estimated bytes transferred
236    /// * `clock` - The time provider for timestamps
237    pub fn complete_success(&mut self, entries_count: u64, bytes: u64, clock: &dyn Clock) {
238        self.completed_at = clock.now_rfc3339();
239        self.success = true;
240        self.entries_count = entries_count;
241        self.bytes_transferred = bytes;
242        self.calculate_duration();
243    }
244
245    /// Mark the sync as failed.
246    ///
247    /// # Arguments
248    /// * `error` - Error message describing the failure
249    /// * `clock` - The time provider for timestamps
250    pub fn complete_failure(&mut self, error: String, clock: &dyn Clock) {
251        self.completed_at = clock.now_rfc3339();
252        self.success = false;
253        self.error_message = Some(error);
254        self.calculate_duration();
255    }
256
257    /// Calculate the duration based on start and end times.
258    fn calculate_duration(&mut self) {
259        if let (Ok(start), Ok(end)) = (
260            chrono::DateTime::parse_from_rfc3339(&self.started_at),
261            chrono::DateTime::parse_from_rfc3339(&self.completed_at),
262        ) {
263            self.duration_ms = (end - start).num_milliseconds() as f64;
264        }
265    }
266}
267
268/// Manages sync state persistence in the sync tree.
269pub struct SyncStateManager<'a> {
270    /// The transaction for modifying the sync tree
271    txn: &'a Transaction,
272}
273
274impl<'a> SyncStateManager<'a> {
275    /// Create a new sync state manager.
276    pub fn new(txn: &'a Transaction) -> Self {
277        Self { txn }
278    }
279
280    /// Get or create a sync cursor for a peer-tree relationship.
281    ///
282    /// # Arguments
283    /// * `peer_pubkey` - The peer's public key
284    /// * `tree_id` - The tree ID this cursor applies to
285    /// * `clock` - The time provider for timestamps (used when creating new cursor)
286    pub async fn get_sync_cursor(
287        &self,
288        peer_pubkey: &PublicKey,
289        tree_id: &ID,
290        clock: &dyn Clock,
291    ) -> Result<SyncCursor> {
292        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
293        let pk_str = peer_pubkey.to_string();
294        let cursor_path = path!("cursors", &pk_str as &str, tree_id.as_str());
295
296        match sync_state.get_path_as::<String>(&cursor_path).await {
297            Ok(json) => serde_json::from_str(&json).map_err(|e| {
298                Error::Store(StoreError::SerializationFailed {
299                    store: "sync_state".to_string(),
300                    reason: format!("Invalid cursor JSON: {e}"),
301                })
302            }),
303            Err(_) => {
304                // Create new cursor
305                Ok(SyncCursor::new(peer_pubkey.clone(), tree_id.clone(), clock))
306            }
307        }
308    }
309
310    /// Update a sync cursor.
311    pub async fn update_sync_cursor(&self, cursor: &SyncCursor) -> Result<()> {
312        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
313        let pk_str = cursor.peer_pubkey.to_string();
314        let cursor_path = path!("cursors", &pk_str as &str, cursor.tree_id.as_str());
315        let cursor_json = serde_json::to_string(cursor)?;
316        sync_state.set_path(&cursor_path, cursor_json).await?;
317        Ok(())
318    }
319
320    /// Get or create sync metadata for a peer.
321    ///
322    /// # Arguments
323    /// * `peer_pubkey` - The peer's public key
324    /// * `clock` - The time provider for timestamps (used when creating new metadata)
325    pub async fn get_sync_metadata(
326        &self,
327        peer_pubkey: &PublicKey,
328        clock: &dyn Clock,
329    ) -> Result<SyncMetadata> {
330        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
331        let pk_str = peer_pubkey.to_string();
332        let metadata_path = path!("metadata", &pk_str as &str);
333
334        match sync_state.get_path_as::<String>(&metadata_path).await {
335            Ok(json) => serde_json::from_str(&json).map_err(|e| {
336                Error::Store(StoreError::SerializationFailed {
337                    store: "sync_state".to_string(),
338                    reason: format!("Invalid metadata JSON: {e}"),
339                })
340            }),
341            Err(_) => {
342                // Create new metadata
343                Ok(SyncMetadata::new(peer_pubkey.clone(), clock))
344            }
345        }
346    }
347
348    /// Update sync metadata for a peer.
349    pub async fn update_sync_metadata(&self, metadata: &SyncMetadata) -> Result<()> {
350        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
351        let pk_str = metadata.peer_pubkey.to_string();
352        let metadata_path = path!("metadata", &pk_str as &str);
353        let metadata_json = serde_json::to_string(metadata)?;
354        sync_state.set_path(&metadata_path, metadata_json).await?;
355        Ok(())
356    }
357
358    /// Add a sync history entry.
359    pub async fn add_sync_history(&self, history_entry: &SyncHistoryEntry) -> Result<()> {
360        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
361        let history_path = path!("history", history_entry.sync_id);
362        let history_json = serde_json::to_string(history_entry)?;
363        sync_state.set_path(&history_path, history_json).await?;
364        Ok(())
365    }
366
367    /// Get sync history for a peer, optionally limited to recent entries.
368    ///
369    /// # Implementation Note
370    /// This method navigates the nested map structure created by `DocStore::set_path()`.
371    /// When using `set_path("history.sync_id", data)`, it creates a nested structure
372    /// `{ "history": { "sync_id": data } }` rather than a flat key with dots.
373    pub async fn get_sync_history(
374        &self,
375        peer_pubkey: &PublicKey,
376        limit: Option<usize>,
377    ) -> Result<Vec<SyncHistoryEntry>> {
378        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
379        let all_data = sync_state.get_all().await?;
380
381        let mut history_entries = Vec::new();
382
383        // The history data is stored as nested structure under the "history" key
384        if let Some(Value::Doc(history_node)) = all_data.get("history") {
385            // Iterate through all history entries (each is stored under its sync_id)
386            for (_sync_id, value) in history_node.iter() {
387                if let Value::Text(json_str) = value
388                    && let Ok(history_entry) = serde_json::from_str::<SyncHistoryEntry>(json_str)
389                    && history_entry.peer_pubkey == *peer_pubkey
390                {
391                    history_entries.push(history_entry);
392                }
393            }
394        }
395
396        // Sort by start time (most recent first)
397        history_entries.sort_by(|a, b| b.started_at.cmp(&a.started_at));
398
399        // Apply limit if specified
400        if let Some(limit) = limit {
401            history_entries.truncate(limit);
402        }
403
404        Ok(history_entries)
405    }
406
407    /// Get all peers with sync state.
408    ///
409    /// # Implementation Note
410    /// This method navigates the nested map structure created by `DocStore::set_path()`.
411    /// The data is organized in nested maps like `{ "metadata": { "peer_key": data } }`
412    /// and `{ "cursors": { "peer_key": { "tree_id": data } } }`.
413    pub async fn get_peers_with_sync_state(&self) -> Result<Vec<PublicKey>> {
414        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
415        let all_data = sync_state.get_all().await?;
416
417        let mut peers = std::collections::HashSet::new();
418
419        // Check metadata node for peers
420        if let Some(Value::Doc(metadata_node)) = all_data.get("metadata") {
421            for (peer_key, _) in metadata_node.iter() {
422                if let Ok(pk) = PublicKey::from_prefixed_string(peer_key) {
423                    peers.insert(pk);
424                }
425            }
426        }
427
428        // Check cursors node for peers
429        if let Some(Value::Doc(cursors_node)) = all_data.get("cursors") {
430            for (peer_key, _) in cursors_node.iter() {
431                if let Ok(pk) = PublicKey::from_prefixed_string(peer_key) {
432                    peers.insert(pk);
433                }
434            }
435        }
436
437        Ok(peers.into_iter().collect())
438    }
439
440    /// Clean up old sync history entries.
441    ///
442    /// # Arguments
443    /// * `max_age_days` - Maximum age of history entries to keep (older entries are deleted)
444    /// * `clock` - The time provider for determining current time
445    ///
446    /// # Implementation Note
447    /// This method navigates the nested map structure created by `DocStore::set_path()`.
448    /// History entries are stored as `{ "history": { "sync_id": data } }` and the
449    /// method properly navigates this structure to find and clean old entries.
450    pub async fn cleanup_old_history(&self, max_age_days: u32, clock: &dyn Clock) -> Result<usize> {
451        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
452        let all_data = sync_state.get_all().await?;
453
454        // Calculate cutoff timestamp from clock
455        let now_millis = clock.now_millis();
456        let days_millis = max_age_days as u64 * 24 * 60 * 60 * 1000;
457        let cutoff_millis = now_millis.saturating_sub(days_millis);
458
459        // Convert to RFC3339 for comparison
460        let cutoff_time =
461            chrono::DateTime::from_timestamp_millis(cutoff_millis as i64).unwrap_or_default();
462        let cutoff_str = cutoff_time.to_rfc3339();
463
464        let mut cleaned_count = 0;
465
466        // The history data is stored as nested structure under the "history" key
467        // Collect keys to delete first to avoid borrowing issues
468        let mut keys_to_delete = Vec::new();
469        if let Some(Value::Doc(history_node)) = all_data.get("history") {
470            for (sync_id, value) in history_node.iter() {
471                if let Value::Text(json_str) = value
472                    && let Ok(history_entry) = serde_json::from_str::<SyncHistoryEntry>(json_str)
473                    && history_entry.started_at < cutoff_str
474                {
475                    keys_to_delete.push(sync_id.to_string());
476                }
477            }
478        }
479
480        // Delete the collected keys
481        for sync_id in keys_to_delete {
482            sync_state.delete(format!("history.{sync_id}")).await?;
483            cleaned_count += 1;
484        }
485
486        Ok(cleaned_count)
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use crate::{Entry, Instance, backend::database::InMemory, crdt::Doc};
494
495    #[test]
496    fn test_sync_cursor() {
497        use crate::clock::FixedClock;
498
499        let clock = FixedClock::default();
500        let peer_pubkey = PublicKey::random();
501        let tree_id = Entry::root_builder()
502            .build()
503            .expect("Root entry should build successfully")
504            .id()
505            .clone();
506
507        let mut cursor = SyncCursor::new(peer_pubkey.clone(), tree_id.clone(), &clock);
508        assert_eq!(cursor.peer_pubkey, peer_pubkey);
509        assert_eq!(cursor.tree_id, tree_id);
510        assert!(!cursor.has_sync_history());
511
512        let entry_id = Entry::root_builder()
513            .build()
514            .expect("Root entry should build successfully")
515            .id()
516            .clone();
517        cursor.update_sync(entry_id.clone(), 5, &clock);
518        assert!(cursor.has_sync_history());
519        assert_eq!(cursor.last_synced_entry.unwrap(), entry_id);
520        assert_eq!(cursor.last_sync_count, 5);
521        assert_eq!(cursor.total_synced_count, 5);
522    }
523
524    #[test]
525    fn test_sync_metadata() {
526        use crate::clock::FixedClock;
527
528        let clock = FixedClock::default();
529        let peer_pubkey = PublicKey::random();
530        let mut metadata = SyncMetadata::new(peer_pubkey.clone(), &clock);
531
532        assert_eq!(metadata.peer_pubkey, peer_pubkey);
533        assert_eq!(metadata.successful_sync_count, 0);
534        assert_eq!(metadata.sync_success_rate(), 0.0);
535
536        metadata.record_successful_sync(10, 1024, 100.0, &clock);
537        assert_eq!(metadata.successful_sync_count, 1);
538        assert_eq!(metadata.total_entries_synced, 10);
539        assert_eq!(metadata.average_sync_duration_ms, 100.0);
540        assert_eq!(metadata.sync_success_rate(), 1.0);
541
542        metadata.record_failed_sync(&clock);
543        assert_eq!(metadata.failed_sync_count, 1);
544        assert_eq!(metadata.sync_success_rate(), 0.5);
545    }
546
547    #[tokio::test]
548    async fn test_sync_state_manager() {
549        use crate::clock::FixedClock;
550        use std::sync::Arc;
551
552        let clock = Arc::new(FixedClock::default());
553        let backend = InMemory::new();
554        let instance = Instance::open_with_clock(Box::new(backend), clock.clone())
555            .await
556            .expect("Failed to create test instance");
557        instance.enable_sync().await.unwrap();
558
559        // Create a user tree for testing tree ID using User API
560        instance.create_user("test", None).await.unwrap();
561        let mut user = instance.login_user("test", None).await.unwrap();
562        let key_id = user.add_private_key(None).await.unwrap();
563        let user_tree = user.create_database(Doc::new(), &key_id).await.unwrap();
564        let tree_id = user_tree.root_id().clone();
565
566        // Get the sync instance and its tree
567        let sync = instance.sync().unwrap();
568        let sync_tree = &sync.sync_tree;
569        let txn = sync_tree.new_transaction().await.unwrap();
570
571        let state_manager = SyncStateManager::new(&txn);
572        let peer_pubkey = PublicKey::random();
573
574        // Test cursor management
575        let mut cursor = state_manager
576            .get_sync_cursor(&peer_pubkey, &tree_id, clock.as_ref())
577            .await
578            .unwrap();
579        assert!(!cursor.has_sync_history());
580
581        let entry_id = Entry::root_builder()
582            .build()
583            .expect("Root entry should build successfully")
584            .id()
585            .clone();
586        cursor.update_sync(entry_id, 3, clock.as_ref());
587        state_manager.update_sync_cursor(&cursor).await.unwrap();
588
589        // Test metadata management
590        let mut metadata = state_manager
591            .get_sync_metadata(&peer_pubkey, clock.as_ref())
592            .await
593            .unwrap();
594        metadata.record_successful_sync(3, 512, 50.0, clock.as_ref());
595        state_manager.update_sync_metadata(&metadata).await.unwrap();
596
597        // Test history
598        let mut history_entry =
599            SyncHistoryEntry::new(peer_pubkey.clone(), tree_id.clone(), clock.as_ref());
600        history_entry.complete_success(3, 512, clock.as_ref());
601        state_manager
602            .add_sync_history(&history_entry)
603            .await
604            .unwrap();
605
606        // Commit the changes and test
607        txn.commit().await.unwrap();
608
609        // Create a new transaction on the sync tree and test that the history is persisted
610        let txn2 = sync_tree.new_transaction().await.unwrap();
611        let state_manager2 = SyncStateManager::new(&txn2);
612        let history = state_manager2
613            .get_sync_history(&peer_pubkey, Some(10))
614            .await
615            .unwrap();
616
617        // Verify that history is properly persisted and retrieved
618        assert_eq!(history.len(), 1, "Should have one history entry");
619        assert!(
620            history[0].success,
621            "History entry should be marked as success"
622        );
623        assert_eq!(history[0].entries_count, 3, "Should have synced 3 entries");
624        assert_eq!(
625            history[0].bytes_transferred, 512,
626            "Should have transferred 512 bytes"
627        );
628    }
629}