Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/sync/
state.rs

1//! Sync state tracking for managing synchronization progress and metadata.
2//!
3//! This module provides structures and functionality for tracking sync state
4//! between peers, including sync cursors, metadata, and history.
5
6use serde::{Deserialize, Serialize};
7
8use crate::{
9    Error, Result, Transaction,
10    auth::crypto::PublicKey,
11    clock::Clock,
12    crdt::doc::{Value, path},
13    entry::ID,
14    store::{DocStore, StoreError},
15};
16
17/// Tracks the synchronization position for a specific peer-tree relationship.
18///
19/// A sync cursor represents how far synchronization has progressed between
20/// this database and a specific peer for a specific tree.
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
22pub struct SyncCursor {
23    /// The peer's public key
24    pub peer_pubkey: PublicKey,
25    /// The tree ID this cursor applies to
26    pub tree_id: ID,
27    /// The last entry ID that was successfully synchronized
28    pub last_synced_entry: Option<ID>,
29    /// Timestamp of the last successful sync
30    pub last_sync_time: String,
31    /// Number of entries synchronized in the last session
32    pub last_sync_count: u64,
33    /// Total number of entries synchronized with this peer for this tree
34    pub total_synced_count: u64,
35}
36
37impl SyncCursor {
38    /// Create a new sync cursor for a peer-tree relationship.
39    ///
40    /// # Arguments
41    /// * `peer_pubkey` - The peer's public key
42    /// * `tree_id` - The tree ID this cursor applies to
43    /// * `clock` - The time provider for timestamps
44    pub fn new(peer_pubkey: PublicKey, tree_id: ID, clock: &dyn Clock) -> Self {
45        Self {
46            peer_pubkey,
47            tree_id,
48            last_synced_entry: None,
49            last_sync_time: clock.now_rfc3339(),
50            last_sync_count: 0,
51            total_synced_count: 0,
52        }
53    }
54
55    /// Update the cursor with a successful sync operation.
56    ///
57    /// # Arguments
58    /// * `last_entry` - The ID of the last synced entry
59    /// * `count` - Number of entries synced in this operation
60    /// * `clock` - The time provider for timestamps
61    pub fn update_sync(&mut self, last_entry: ID, count: u64, clock: &dyn Clock) {
62        self.last_synced_entry = Some(last_entry);
63        self.last_sync_time = clock.now_rfc3339();
64        self.last_sync_count = count;
65        self.total_synced_count += count;
66    }
67
68    /// Check if this cursor has any sync history.
69    pub fn has_sync_history(&self) -> bool {
70        self.last_synced_entry.is_some()
71    }
72}
73
74/// Metadata about synchronization operations for a peer.
75///
76/// This tracks overall sync statistics and health information for a peer
77/// relationship across all trees.
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79pub struct SyncMetadata {
80    /// The peer's public key
81    pub peer_pubkey: PublicKey,
82    /// Timestamp when sync relationship was first established
83    pub sync_established: String,
84    /// Timestamp of the last sync attempt (successful or failed)
85    pub last_sync_attempt: String,
86    /// Timestamp of the last successful sync
87    pub last_successful_sync: Option<String>,
88    /// Total number of successful sync operations
89    pub successful_sync_count: u64,
90    /// Total number of failed sync operations
91    pub failed_sync_count: u64,
92    /// Total number of entries synchronized
93    pub total_entries_synced: u64,
94    /// Estimated total bytes synchronized
95    pub total_bytes_synced: u64,
96    /// Average sync duration in milliseconds
97    pub average_sync_duration_ms: f64,
98    /// List of trees being synchronized with this peer
99    pub synced_trees: Vec<ID>,
100}
101
102impl SyncMetadata {
103    /// Create new sync metadata for a peer.
104    ///
105    /// # Arguments
106    /// * `peer_pubkey` - The peer's public key
107    /// * `clock` - The time provider for timestamps
108    pub fn new(peer_pubkey: PublicKey, clock: &dyn Clock) -> Self {
109        let now = clock.now_rfc3339();
110        Self {
111            peer_pubkey,
112            sync_established: now.clone(),
113            last_sync_attempt: now,
114            last_successful_sync: None,
115            successful_sync_count: 0,
116            failed_sync_count: 0,
117            total_entries_synced: 0,
118            total_bytes_synced: 0,
119            average_sync_duration_ms: 0.0,
120            synced_trees: Vec::new(),
121        }
122    }
123
124    /// Record a successful sync operation.
125    ///
126    /// # Arguments
127    /// * `entries_count` - Number of entries synced
128    /// * `bytes` - Estimated bytes transferred
129    /// * `duration_ms` - Duration of sync in milliseconds
130    /// * `clock` - The time provider for timestamps
131    pub fn record_successful_sync(
132        &mut self,
133        entries_count: u64,
134        bytes: u64,
135        duration_ms: f64,
136        clock: &dyn Clock,
137    ) {
138        let now = clock.now_rfc3339();
139        self.last_sync_attempt = now.clone();
140        self.last_successful_sync = Some(now);
141        self.successful_sync_count += 1;
142        self.total_entries_synced += entries_count;
143        self.total_bytes_synced += bytes;
144
145        // Update average duration (simple running average)
146        let total_ops = self.successful_sync_count as f64;
147        self.average_sync_duration_ms =
148            (self.average_sync_duration_ms * (total_ops - 1.0) + duration_ms) / total_ops;
149    }
150
151    /// Record a failed sync operation.
152    ///
153    /// # Arguments
154    /// * `clock` - The time provider for timestamps
155    pub fn record_failed_sync(&mut self, clock: &dyn Clock) {
156        self.last_sync_attempt = clock.now_rfc3339();
157        self.failed_sync_count += 1;
158    }
159
160    /// Add a tree to the list of synced trees if not already present.
161    pub fn add_synced_tree(&mut self, tree_id: ID) {
162        if !self.synced_trees.contains(&tree_id) {
163            self.synced_trees.push(tree_id);
164        }
165    }
166
167    /// Remove a tree from the list of synced trees.
168    pub fn remove_synced_tree(&mut self, tree_id: &ID) {
169        self.synced_trees.retain(|id| id != tree_id);
170    }
171
172    /// Calculate the success rate of sync operations.
173    pub fn sync_success_rate(&self) -> f64 {
174        let total = self.successful_sync_count + self.failed_sync_count;
175        if total == 0 {
176            0.0
177        } else {
178            self.successful_sync_count as f64 / total as f64
179        }
180    }
181}
182
183/// Record of a single sync operation for audit and debugging purposes.
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
185pub struct SyncHistoryEntry {
186    /// Unique ID for this sync operation
187    pub sync_id: String,
188    /// The peer involved in this sync
189    pub peer_pubkey: PublicKey,
190    /// The tree that was synchronized
191    pub tree_id: ID,
192    /// Timestamp when sync started
193    pub started_at: String,
194    /// Timestamp when sync completed (or failed)
195    pub completed_at: String,
196    /// Whether the sync was successful
197    pub success: bool,
198    /// Number of entries synchronized
199    pub entries_count: u64,
200    /// Estimated bytes transferred
201    pub bytes_transferred: u64,
202    /// Duration in milliseconds
203    pub duration_ms: f64,
204    /// Error message if sync failed
205    pub error_message: Option<String>,
206}
207
208impl SyncHistoryEntry {
209    /// Create a new sync history entry.
210    ///
211    /// # Arguments
212    /// * `peer_pubkey` - The peer's public key
213    /// * `tree_id` - The tree that was synchronized
214    /// * `clock` - The time provider for timestamps
215    pub fn new(peer_pubkey: PublicKey, tree_id: ID, clock: &dyn Clock) -> Self {
216        let now = clock.now_rfc3339();
217        Self {
218            sync_id: uuid::Uuid::new_v4().to_string(),
219            peer_pubkey,
220            tree_id,
221            started_at: now.clone(),
222            completed_at: now,
223            success: false,
224            entries_count: 0,
225            bytes_transferred: 0,
226            duration_ms: 0.0,
227            error_message: None,
228        }
229    }
230
231    /// Mark the sync as completed successfully.
232    ///
233    /// # Arguments
234    /// * `entries_count` - Number of entries synced
235    /// * `bytes` - Estimated bytes transferred
236    /// * `clock` - The time provider for timestamps
237    pub fn complete_success(&mut self, entries_count: u64, bytes: u64, clock: &dyn Clock) {
238        self.completed_at = clock.now_rfc3339();
239        self.success = true;
240        self.entries_count = entries_count;
241        self.bytes_transferred = bytes;
242        self.calculate_duration();
243    }
244
245    /// Mark the sync as failed.
246    ///
247    /// # Arguments
248    /// * `error` - Error message describing the failure
249    /// * `clock` - The time provider for timestamps
250    pub fn complete_failure(&mut self, error: String, clock: &dyn Clock) {
251        self.completed_at = clock.now_rfc3339();
252        self.success = false;
253        self.error_message = Some(error);
254        self.calculate_duration();
255    }
256
257    /// Calculate the duration based on start and end times.
258    fn calculate_duration(&mut self) {
259        if let (Ok(start), Ok(end)) = (
260            chrono::DateTime::parse_from_rfc3339(&self.started_at),
261            chrono::DateTime::parse_from_rfc3339(&self.completed_at),
262        ) {
263            self.duration_ms = (end - start).num_milliseconds() as f64;
264        }
265    }
266}
267
268/// Manages sync state persistence in the sync tree.
269pub struct SyncStateManager<'a> {
270    /// The transaction for modifying the sync tree
271    txn: &'a Transaction,
272}
273
274impl<'a> SyncStateManager<'a> {
275    /// Create a new sync state manager.
276    pub fn new(txn: &'a Transaction) -> Self {
277        Self { txn }
278    }
279
280    /// Get or create a sync cursor for a peer-tree relationship.
281    ///
282    /// # Arguments
283    /// * `peer_pubkey` - The peer's public key
284    /// * `tree_id` - The tree ID this cursor applies to
285    /// * `clock` - The time provider for timestamps (used when creating new cursor)
286    pub async fn get_sync_cursor(
287        &self,
288        peer_pubkey: &PublicKey,
289        tree_id: &ID,
290        clock: &dyn Clock,
291    ) -> Result<SyncCursor> {
292        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
293        let pk_str = peer_pubkey.to_string();
294        let tree_id_str = tree_id.to_string();
295        let cursor_path = path!("cursors", &pk_str as &str, &tree_id_str as &str);
296
297        match sync_state.get_path_as::<String>(&cursor_path).await {
298            Ok(json) => serde_json::from_str(&json).map_err(|e| {
299                Error::Store(Box::new(StoreError::SerializationFailed {
300                    store: "sync_state".to_string(),
301                    reason: format!("Invalid cursor JSON: {e}"),
302                }))
303            }),
304            Err(_) => {
305                // Create new cursor
306                Ok(SyncCursor::new(peer_pubkey.clone(), tree_id.clone(), clock))
307            }
308        }
309    }
310
311    /// Update a sync cursor.
312    pub async fn update_sync_cursor(&self, cursor: &SyncCursor) -> Result<()> {
313        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
314        let pk_str = cursor.peer_pubkey.to_string();
315        let cursor_tree_id_str = cursor.tree_id.to_string();
316        let cursor_path = path!("cursors", &pk_str as &str, &cursor_tree_id_str as &str);
317        let cursor_json = serde_json::to_string(cursor)?;
318        sync_state.set_path(&cursor_path, cursor_json).await?;
319        Ok(())
320    }
321
322    /// Get or create sync metadata for a peer.
323    ///
324    /// # Arguments
325    /// * `peer_pubkey` - The peer's public key
326    /// * `clock` - The time provider for timestamps (used when creating new metadata)
327    pub async fn get_sync_metadata(
328        &self,
329        peer_pubkey: &PublicKey,
330        clock: &dyn Clock,
331    ) -> Result<SyncMetadata> {
332        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
333        let pk_str = peer_pubkey.to_string();
334        let metadata_path = path!("metadata", &pk_str as &str);
335
336        match sync_state.get_path_as::<String>(&metadata_path).await {
337            Ok(json) => serde_json::from_str(&json).map_err(|e| {
338                Error::Store(Box::new(StoreError::SerializationFailed {
339                    store: "sync_state".to_string(),
340                    reason: format!("Invalid metadata JSON: {e}"),
341                }))
342            }),
343            Err(_) => {
344                // Create new metadata
345                Ok(SyncMetadata::new(peer_pubkey.clone(), clock))
346            }
347        }
348    }
349
350    /// Update sync metadata for a peer.
351    pub async fn update_sync_metadata(&self, metadata: &SyncMetadata) -> Result<()> {
352        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
353        let pk_str = metadata.peer_pubkey.to_string();
354        let metadata_path = path!("metadata", &pk_str as &str);
355        let metadata_json = serde_json::to_string(metadata)?;
356        sync_state.set_path(&metadata_path, metadata_json).await?;
357        Ok(())
358    }
359
360    /// Add a sync history entry.
361    pub async fn add_sync_history(&self, history_entry: &SyncHistoryEntry) -> Result<()> {
362        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
363        let history_path = path!("history", history_entry.sync_id);
364        let history_json = serde_json::to_string(history_entry)?;
365        sync_state.set_path(&history_path, history_json).await?;
366        Ok(())
367    }
368
369    /// Get sync history for a peer, optionally limited to recent entries.
370    ///
371    /// # Implementation Note
372    /// This method navigates the nested map structure created by `DocStore::set_path()`.
373    /// When using `set_path("history.sync_id", data)`, it creates a nested structure
374    /// `{ "history": { "sync_id": data } }` rather than a flat key with dots.
375    pub async fn get_sync_history(
376        &self,
377        peer_pubkey: &PublicKey,
378        limit: Option<usize>,
379    ) -> Result<Vec<SyncHistoryEntry>> {
380        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
381        let all_data = sync_state.get_all().await?;
382
383        let mut history_entries = Vec::new();
384
385        // The history data is stored as nested structure under the "history" key
386        if let Some(Value::Doc(history_node)) = all_data.get("history") {
387            // Iterate through all history entries (each is stored under its sync_id)
388            for (_sync_id, value) in history_node.iter() {
389                if let Value::Text(json_str) = value
390                    && let Ok(history_entry) = serde_json::from_str::<SyncHistoryEntry>(json_str)
391                    && history_entry.peer_pubkey == *peer_pubkey
392                {
393                    history_entries.push(history_entry);
394                }
395            }
396        }
397
398        // Sort by start time (most recent first)
399        history_entries.sort_by(|a, b| b.started_at.cmp(&a.started_at));
400
401        // Apply limit if specified
402        if let Some(limit) = limit {
403            history_entries.truncate(limit);
404        }
405
406        Ok(history_entries)
407    }
408
409    /// Get all peers with sync state.
410    ///
411    /// # Implementation Note
412    /// This method navigates the nested map structure created by `DocStore::set_path()`.
413    /// The data is organized in nested maps like `{ "metadata": { "peer_key": data } }`
414    /// and `{ "cursors": { "peer_key": { "tree_id": data } } }`.
415    pub async fn get_peers_with_sync_state(&self) -> Result<Vec<PublicKey>> {
416        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
417        let all_data = sync_state.get_all().await?;
418
419        let mut peers = std::collections::HashSet::new();
420
421        // Check metadata node for peers
422        if let Some(Value::Doc(metadata_node)) = all_data.get("metadata") {
423            for (peer_key, _) in metadata_node.iter() {
424                if let Ok(pk) = PublicKey::from_prefixed_string(peer_key) {
425                    peers.insert(pk);
426                }
427            }
428        }
429
430        // Check cursors node for peers
431        if let Some(Value::Doc(cursors_node)) = all_data.get("cursors") {
432            for (peer_key, _) in cursors_node.iter() {
433                if let Ok(pk) = PublicKey::from_prefixed_string(peer_key) {
434                    peers.insert(pk);
435                }
436            }
437        }
438
439        Ok(peers.into_iter().collect())
440    }
441
442    /// Clean up old sync history entries.
443    ///
444    /// # Arguments
445    /// * `max_age_days` - Maximum age of history entries to keep (older entries are deleted)
446    /// * `clock` - The time provider for determining current time
447    ///
448    /// # Implementation Note
449    /// This method navigates the nested map structure created by `DocStore::set_path()`.
450    /// History entries are stored as `{ "history": { "sync_id": data } }` and the
451    /// method properly navigates this structure to find and clean old entries.
452    pub async fn cleanup_old_history(&self, max_age_days: u32, clock: &dyn Clock) -> Result<usize> {
453        let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
454        let all_data = sync_state.get_all().await?;
455
456        // Calculate cutoff timestamp from clock
457        let now_millis = clock.now_millis();
458        let days_millis = max_age_days as u64 * 24 * 60 * 60 * 1000;
459        let cutoff_millis = now_millis.saturating_sub(days_millis);
460
461        // Convert to RFC3339 for comparison
462        let cutoff_time =
463            chrono::DateTime::from_timestamp_millis(cutoff_millis as i64).unwrap_or_default();
464        let cutoff_str = cutoff_time.to_rfc3339();
465
466        let mut cleaned_count = 0;
467
468        // The history data is stored as nested structure under the "history" key
469        // Collect keys to delete first to avoid borrowing issues
470        let mut keys_to_delete = Vec::new();
471        if let Some(Value::Doc(history_node)) = all_data.get("history") {
472            for (sync_id, value) in history_node.iter() {
473                if let Value::Text(json_str) = value
474                    && let Ok(history_entry) = serde_json::from_str::<SyncHistoryEntry>(json_str)
475                    && history_entry.started_at < cutoff_str
476                {
477                    keys_to_delete.push(sync_id.to_string());
478                }
479            }
480        }
481
482        // Delete the collected keys
483        for sync_id in keys_to_delete {
484            sync_state.delete(format!("history.{sync_id}")).await?;
485            cleaned_count += 1;
486        }
487
488        Ok(cleaned_count)
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use crate::{Entry, Instance, backend::database::InMemory};
496
497    #[test]
498    fn test_sync_cursor() {
499        use crate::clock::FixedClock;
500
501        let clock = FixedClock::default();
502        let peer_pubkey = PublicKey::random();
503        let tree_id = Entry::root_builder()
504            .build()
505            .expect("Root entry should build successfully")
506            .id()
507            .clone();
508
509        let mut cursor = SyncCursor::new(peer_pubkey.clone(), tree_id.clone(), &clock);
510        assert_eq!(cursor.peer_pubkey, peer_pubkey);
511        assert_eq!(cursor.tree_id, tree_id);
512        assert!(!cursor.has_sync_history());
513
514        let entry_id = Entry::root_builder()
515            .build()
516            .expect("Root entry should build successfully")
517            .id()
518            .clone();
519        cursor.update_sync(entry_id.clone(), 5, &clock);
520        assert!(cursor.has_sync_history());
521        assert_eq!(cursor.last_synced_entry.unwrap(), entry_id);
522        assert_eq!(cursor.last_sync_count, 5);
523        assert_eq!(cursor.total_synced_count, 5);
524    }
525
526    #[test]
527    fn test_sync_metadata() {
528        use crate::clock::FixedClock;
529
530        let clock = FixedClock::default();
531        let peer_pubkey = PublicKey::random();
532        let mut metadata = SyncMetadata::new(peer_pubkey.clone(), &clock);
533
534        assert_eq!(metadata.peer_pubkey, peer_pubkey);
535        assert_eq!(metadata.successful_sync_count, 0);
536        assert_eq!(metadata.sync_success_rate(), 0.0);
537
538        metadata.record_successful_sync(10, 1024, 100.0, &clock);
539        assert_eq!(metadata.successful_sync_count, 1);
540        assert_eq!(metadata.total_entries_synced, 10);
541        assert_eq!(metadata.average_sync_duration_ms, 100.0);
542        assert_eq!(metadata.sync_success_rate(), 1.0);
543
544        metadata.record_failed_sync(&clock);
545        assert_eq!(metadata.failed_sync_count, 1);
546        assert_eq!(metadata.sync_success_rate(), 0.5);
547    }
548
549    #[tokio::test]
550    async fn test_sync_state_manager() {
551        use crate::clock::FixedClock;
552        use std::sync::Arc;
553
554        let clock = Arc::new(FixedClock::default());
555        let (instance, mut user) = Instance::create_backend_with_clock(
556            Box::new(InMemory::new()),
557            clock.clone(),
558            crate::NewUser::passwordless("test"),
559        )
560        .await
561        .expect("Failed to create test instance");
562        instance.enable_sync().await.unwrap();
563
564        let (user_tree, _) = user.new_database().build().await.unwrap();
565        let tree_id = user_tree.root_id().clone();
566
567        // Get the sync instance and its tree
568        let sync = instance.sync().unwrap();
569        let sync_tree = &sync.sync_tree;
570        let txn = sync_tree.new_transaction().await.unwrap();
571
572        let state_manager = SyncStateManager::new(&txn);
573        let peer_pubkey = PublicKey::random();
574
575        // Test cursor management
576        let mut cursor = state_manager
577            .get_sync_cursor(&peer_pubkey, &tree_id, clock.as_ref())
578            .await
579            .unwrap();
580        assert!(!cursor.has_sync_history());
581
582        let entry_id = Entry::root_builder()
583            .build()
584            .expect("Root entry should build successfully")
585            .id()
586            .clone();
587        cursor.update_sync(entry_id, 3, clock.as_ref());
588        state_manager.update_sync_cursor(&cursor).await.unwrap();
589
590        // Test metadata management
591        let mut metadata = state_manager
592            .get_sync_metadata(&peer_pubkey, clock.as_ref())
593            .await
594            .unwrap();
595        metadata.record_successful_sync(3, 512, 50.0, clock.as_ref());
596        state_manager.update_sync_metadata(&metadata).await.unwrap();
597
598        // Test history
599        let mut history_entry =
600            SyncHistoryEntry::new(peer_pubkey.clone(), tree_id.clone(), clock.as_ref());
601        history_entry.complete_success(3, 512, clock.as_ref());
602        state_manager
603            .add_sync_history(&history_entry)
604            .await
605            .unwrap();
606
607        // Commit the changes and test
608        txn.commit().await.unwrap();
609
610        // Create a new transaction on the sync tree and test that the history is persisted
611        let txn2 = sync_tree.new_transaction().await.unwrap();
612        let state_manager2 = SyncStateManager::new(&txn2);
613        let history = state_manager2
614            .get_sync_history(&peer_pubkey, Some(10))
615            .await
616            .unwrap();
617
618        // Verify that history is properly persisted and retrieved
619        assert_eq!(history.len(), 1, "Should have one history entry");
620        assert!(
621            history[0].success,
622            "History entry should be marked as success"
623        );
624        assert_eq!(history[0].entries_count, 3, "Should have synced 3 entries");
625        assert_eq!(
626            history[0].bytes_transferred, 512,
627            "Should have transferred 512 bytes"
628        );
629    }
630}