1use serde::{Deserialize, Serialize};
7
8use crate::{
9 Error, Result, Transaction,
10 auth::crypto::PublicKey,
11 clock::Clock,
12 crdt::doc::{Value, path},
13 entry::ID,
14 store::{DocStore, StoreError},
15};
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
22pub struct SyncCursor {
23 pub peer_pubkey: PublicKey,
25 pub tree_id: ID,
27 pub last_synced_entry: Option<ID>,
29 pub last_sync_time: String,
31 pub last_sync_count: u64,
33 pub total_synced_count: u64,
35}
36
37impl SyncCursor {
38 pub fn new(peer_pubkey: PublicKey, tree_id: ID, clock: &dyn Clock) -> Self {
45 Self {
46 peer_pubkey,
47 tree_id,
48 last_synced_entry: None,
49 last_sync_time: clock.now_rfc3339(),
50 last_sync_count: 0,
51 total_synced_count: 0,
52 }
53 }
54
55 pub fn update_sync(&mut self, last_entry: ID, count: u64, clock: &dyn Clock) {
62 self.last_synced_entry = Some(last_entry);
63 self.last_sync_time = clock.now_rfc3339();
64 self.last_sync_count = count;
65 self.total_synced_count += count;
66 }
67
68 pub fn has_sync_history(&self) -> bool {
70 self.last_synced_entry.is_some()
71 }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79pub struct SyncMetadata {
80 pub peer_pubkey: PublicKey,
82 pub sync_established: String,
84 pub last_sync_attempt: String,
86 pub last_successful_sync: Option<String>,
88 pub successful_sync_count: u64,
90 pub failed_sync_count: u64,
92 pub total_entries_synced: u64,
94 pub total_bytes_synced: u64,
96 pub average_sync_duration_ms: f64,
98 pub synced_trees: Vec<ID>,
100}
101
102impl SyncMetadata {
103 pub fn new(peer_pubkey: PublicKey, clock: &dyn Clock) -> Self {
109 let now = clock.now_rfc3339();
110 Self {
111 peer_pubkey,
112 sync_established: now.clone(),
113 last_sync_attempt: now,
114 last_successful_sync: None,
115 successful_sync_count: 0,
116 failed_sync_count: 0,
117 total_entries_synced: 0,
118 total_bytes_synced: 0,
119 average_sync_duration_ms: 0.0,
120 synced_trees: Vec::new(),
121 }
122 }
123
124 pub fn record_successful_sync(
132 &mut self,
133 entries_count: u64,
134 bytes: u64,
135 duration_ms: f64,
136 clock: &dyn Clock,
137 ) {
138 let now = clock.now_rfc3339();
139 self.last_sync_attempt = now.clone();
140 self.last_successful_sync = Some(now);
141 self.successful_sync_count += 1;
142 self.total_entries_synced += entries_count;
143 self.total_bytes_synced += bytes;
144
145 let total_ops = self.successful_sync_count as f64;
147 self.average_sync_duration_ms =
148 (self.average_sync_duration_ms * (total_ops - 1.0) + duration_ms) / total_ops;
149 }
150
151 pub fn record_failed_sync(&mut self, clock: &dyn Clock) {
156 self.last_sync_attempt = clock.now_rfc3339();
157 self.failed_sync_count += 1;
158 }
159
160 pub fn add_synced_tree(&mut self, tree_id: ID) {
162 if !self.synced_trees.contains(&tree_id) {
163 self.synced_trees.push(tree_id);
164 }
165 }
166
167 pub fn remove_synced_tree(&mut self, tree_id: &ID) {
169 self.synced_trees.retain(|id| id != tree_id);
170 }
171
172 pub fn sync_success_rate(&self) -> f64 {
174 let total = self.successful_sync_count + self.failed_sync_count;
175 if total == 0 {
176 0.0
177 } else {
178 self.successful_sync_count as f64 / total as f64
179 }
180 }
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
185pub struct SyncHistoryEntry {
186 pub sync_id: String,
188 pub peer_pubkey: PublicKey,
190 pub tree_id: ID,
192 pub started_at: String,
194 pub completed_at: String,
196 pub success: bool,
198 pub entries_count: u64,
200 pub bytes_transferred: u64,
202 pub duration_ms: f64,
204 pub error_message: Option<String>,
206}
207
208impl SyncHistoryEntry {
209 pub fn new(peer_pubkey: PublicKey, tree_id: ID, clock: &dyn Clock) -> Self {
216 let now = clock.now_rfc3339();
217 Self {
218 sync_id: uuid::Uuid::new_v4().to_string(),
219 peer_pubkey,
220 tree_id,
221 started_at: now.clone(),
222 completed_at: now,
223 success: false,
224 entries_count: 0,
225 bytes_transferred: 0,
226 duration_ms: 0.0,
227 error_message: None,
228 }
229 }
230
231 pub fn complete_success(&mut self, entries_count: u64, bytes: u64, clock: &dyn Clock) {
238 self.completed_at = clock.now_rfc3339();
239 self.success = true;
240 self.entries_count = entries_count;
241 self.bytes_transferred = bytes;
242 self.calculate_duration();
243 }
244
245 pub fn complete_failure(&mut self, error: String, clock: &dyn Clock) {
251 self.completed_at = clock.now_rfc3339();
252 self.success = false;
253 self.error_message = Some(error);
254 self.calculate_duration();
255 }
256
257 fn calculate_duration(&mut self) {
259 if let (Ok(start), Ok(end)) = (
260 chrono::DateTime::parse_from_rfc3339(&self.started_at),
261 chrono::DateTime::parse_from_rfc3339(&self.completed_at),
262 ) {
263 self.duration_ms = (end - start).num_milliseconds() as f64;
264 }
265 }
266}
267
268pub struct SyncStateManager<'a> {
270 txn: &'a Transaction,
272}
273
274impl<'a> SyncStateManager<'a> {
275 pub fn new(txn: &'a Transaction) -> Self {
277 Self { txn }
278 }
279
280 pub async fn get_sync_cursor(
287 &self,
288 peer_pubkey: &PublicKey,
289 tree_id: &ID,
290 clock: &dyn Clock,
291 ) -> Result<SyncCursor> {
292 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
293 let pk_str = peer_pubkey.to_string();
294 let tree_id_str = tree_id.to_string();
295 let cursor_path = path!("cursors", &pk_str as &str, &tree_id_str as &str);
296
297 match sync_state.get_path_as::<String>(&cursor_path).await {
298 Ok(json) => serde_json::from_str(&json).map_err(|e| {
299 Error::Store(Box::new(StoreError::SerializationFailed {
300 store: "sync_state".to_string(),
301 reason: format!("Invalid cursor JSON: {e}"),
302 }))
303 }),
304 Err(_) => {
305 Ok(SyncCursor::new(peer_pubkey.clone(), tree_id.clone(), clock))
307 }
308 }
309 }
310
311 pub async fn update_sync_cursor(&self, cursor: &SyncCursor) -> Result<()> {
313 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
314 let pk_str = cursor.peer_pubkey.to_string();
315 let cursor_tree_id_str = cursor.tree_id.to_string();
316 let cursor_path = path!("cursors", &pk_str as &str, &cursor_tree_id_str as &str);
317 let cursor_json = serde_json::to_string(cursor)?;
318 sync_state.set_path(&cursor_path, cursor_json).await?;
319 Ok(())
320 }
321
322 pub async fn get_sync_metadata(
328 &self,
329 peer_pubkey: &PublicKey,
330 clock: &dyn Clock,
331 ) -> Result<SyncMetadata> {
332 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
333 let pk_str = peer_pubkey.to_string();
334 let metadata_path = path!("metadata", &pk_str as &str);
335
336 match sync_state.get_path_as::<String>(&metadata_path).await {
337 Ok(json) => serde_json::from_str(&json).map_err(|e| {
338 Error::Store(Box::new(StoreError::SerializationFailed {
339 store: "sync_state".to_string(),
340 reason: format!("Invalid metadata JSON: {e}"),
341 }))
342 }),
343 Err(_) => {
344 Ok(SyncMetadata::new(peer_pubkey.clone(), clock))
346 }
347 }
348 }
349
350 pub async fn update_sync_metadata(&self, metadata: &SyncMetadata) -> Result<()> {
352 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
353 let pk_str = metadata.peer_pubkey.to_string();
354 let metadata_path = path!("metadata", &pk_str as &str);
355 let metadata_json = serde_json::to_string(metadata)?;
356 sync_state.set_path(&metadata_path, metadata_json).await?;
357 Ok(())
358 }
359
360 pub async fn add_sync_history(&self, history_entry: &SyncHistoryEntry) -> Result<()> {
362 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
363 let history_path = path!("history", history_entry.sync_id);
364 let history_json = serde_json::to_string(history_entry)?;
365 sync_state.set_path(&history_path, history_json).await?;
366 Ok(())
367 }
368
369 pub async fn get_sync_history(
376 &self,
377 peer_pubkey: &PublicKey,
378 limit: Option<usize>,
379 ) -> Result<Vec<SyncHistoryEntry>> {
380 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
381 let all_data = sync_state.get_all().await?;
382
383 let mut history_entries = Vec::new();
384
385 if let Some(Value::Doc(history_node)) = all_data.get("history") {
387 for (_sync_id, value) in history_node.iter() {
389 if let Value::Text(json_str) = value
390 && let Ok(history_entry) = serde_json::from_str::<SyncHistoryEntry>(json_str)
391 && history_entry.peer_pubkey == *peer_pubkey
392 {
393 history_entries.push(history_entry);
394 }
395 }
396 }
397
398 history_entries.sort_by(|a, b| b.started_at.cmp(&a.started_at));
400
401 if let Some(limit) = limit {
403 history_entries.truncate(limit);
404 }
405
406 Ok(history_entries)
407 }
408
409 pub async fn get_peers_with_sync_state(&self) -> Result<Vec<PublicKey>> {
416 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
417 let all_data = sync_state.get_all().await?;
418
419 let mut peers = std::collections::HashSet::new();
420
421 if let Some(Value::Doc(metadata_node)) = all_data.get("metadata") {
423 for (peer_key, _) in metadata_node.iter() {
424 if let Ok(pk) = PublicKey::from_prefixed_string(peer_key) {
425 peers.insert(pk);
426 }
427 }
428 }
429
430 if let Some(Value::Doc(cursors_node)) = all_data.get("cursors") {
432 for (peer_key, _) in cursors_node.iter() {
433 if let Ok(pk) = PublicKey::from_prefixed_string(peer_key) {
434 peers.insert(pk);
435 }
436 }
437 }
438
439 Ok(peers.into_iter().collect())
440 }
441
442 pub async fn cleanup_old_history(&self, max_age_days: u32, clock: &dyn Clock) -> Result<usize> {
453 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
454 let all_data = sync_state.get_all().await?;
455
456 let now_millis = clock.now_millis();
458 let days_millis = max_age_days as u64 * 24 * 60 * 60 * 1000;
459 let cutoff_millis = now_millis.saturating_sub(days_millis);
460
461 let cutoff_time =
463 chrono::DateTime::from_timestamp_millis(cutoff_millis as i64).unwrap_or_default();
464 let cutoff_str = cutoff_time.to_rfc3339();
465
466 let mut cleaned_count = 0;
467
468 let mut keys_to_delete = Vec::new();
471 if let Some(Value::Doc(history_node)) = all_data.get("history") {
472 for (sync_id, value) in history_node.iter() {
473 if let Value::Text(json_str) = value
474 && let Ok(history_entry) = serde_json::from_str::<SyncHistoryEntry>(json_str)
475 && history_entry.started_at < cutoff_str
476 {
477 keys_to_delete.push(sync_id.to_string());
478 }
479 }
480 }
481
482 for sync_id in keys_to_delete {
484 sync_state.delete(format!("history.{sync_id}")).await?;
485 cleaned_count += 1;
486 }
487
488 Ok(cleaned_count)
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use crate::{Entry, Instance, backend::database::InMemory};
496
497 #[test]
498 fn test_sync_cursor() {
499 use crate::clock::FixedClock;
500
501 let clock = FixedClock::default();
502 let peer_pubkey = PublicKey::random();
503 let tree_id = Entry::root_builder()
504 .build()
505 .expect("Root entry should build successfully")
506 .id()
507 .clone();
508
509 let mut cursor = SyncCursor::new(peer_pubkey.clone(), tree_id.clone(), &clock);
510 assert_eq!(cursor.peer_pubkey, peer_pubkey);
511 assert_eq!(cursor.tree_id, tree_id);
512 assert!(!cursor.has_sync_history());
513
514 let entry_id = Entry::root_builder()
515 .build()
516 .expect("Root entry should build successfully")
517 .id()
518 .clone();
519 cursor.update_sync(entry_id.clone(), 5, &clock);
520 assert!(cursor.has_sync_history());
521 assert_eq!(cursor.last_synced_entry.unwrap(), entry_id);
522 assert_eq!(cursor.last_sync_count, 5);
523 assert_eq!(cursor.total_synced_count, 5);
524 }
525
526 #[test]
527 fn test_sync_metadata() {
528 use crate::clock::FixedClock;
529
530 let clock = FixedClock::default();
531 let peer_pubkey = PublicKey::random();
532 let mut metadata = SyncMetadata::new(peer_pubkey.clone(), &clock);
533
534 assert_eq!(metadata.peer_pubkey, peer_pubkey);
535 assert_eq!(metadata.successful_sync_count, 0);
536 assert_eq!(metadata.sync_success_rate(), 0.0);
537
538 metadata.record_successful_sync(10, 1024, 100.0, &clock);
539 assert_eq!(metadata.successful_sync_count, 1);
540 assert_eq!(metadata.total_entries_synced, 10);
541 assert_eq!(metadata.average_sync_duration_ms, 100.0);
542 assert_eq!(metadata.sync_success_rate(), 1.0);
543
544 metadata.record_failed_sync(&clock);
545 assert_eq!(metadata.failed_sync_count, 1);
546 assert_eq!(metadata.sync_success_rate(), 0.5);
547 }
548
549 #[tokio::test]
550 async fn test_sync_state_manager() {
551 use crate::clock::FixedClock;
552 use std::sync::Arc;
553
554 let clock = Arc::new(FixedClock::default());
555 let (instance, mut user) = Instance::create_backend_with_clock(
556 Box::new(InMemory::new()),
557 clock.clone(),
558 crate::NewUser::passwordless("test"),
559 )
560 .await
561 .expect("Failed to create test instance");
562 instance.enable_sync().await.unwrap();
563
564 let (user_tree, _) = user.new_database().build().await.unwrap();
565 let tree_id = user_tree.root_id().clone();
566
567 let sync = instance.sync().unwrap();
569 let sync_tree = &sync.sync_tree;
570 let txn = sync_tree.new_transaction().await.unwrap();
571
572 let state_manager = SyncStateManager::new(&txn);
573 let peer_pubkey = PublicKey::random();
574
575 let mut cursor = state_manager
577 .get_sync_cursor(&peer_pubkey, &tree_id, clock.as_ref())
578 .await
579 .unwrap();
580 assert!(!cursor.has_sync_history());
581
582 let entry_id = Entry::root_builder()
583 .build()
584 .expect("Root entry should build successfully")
585 .id()
586 .clone();
587 cursor.update_sync(entry_id, 3, clock.as_ref());
588 state_manager.update_sync_cursor(&cursor).await.unwrap();
589
590 let mut metadata = state_manager
592 .get_sync_metadata(&peer_pubkey, clock.as_ref())
593 .await
594 .unwrap();
595 metadata.record_successful_sync(3, 512, 50.0, clock.as_ref());
596 state_manager.update_sync_metadata(&metadata).await.unwrap();
597
598 let mut history_entry =
600 SyncHistoryEntry::new(peer_pubkey.clone(), tree_id.clone(), clock.as_ref());
601 history_entry.complete_success(3, 512, clock.as_ref());
602 state_manager
603 .add_sync_history(&history_entry)
604 .await
605 .unwrap();
606
607 txn.commit().await.unwrap();
609
610 let txn2 = sync_tree.new_transaction().await.unwrap();
612 let state_manager2 = SyncStateManager::new(&txn2);
613 let history = state_manager2
614 .get_sync_history(&peer_pubkey, Some(10))
615 .await
616 .unwrap();
617
618 assert_eq!(history.len(), 1, "Should have one history entry");
620 assert!(
621 history[0].success,
622 "History entry should be marked as success"
623 );
624 assert_eq!(history[0].entries_count, 3, "Should have synced 3 entries");
625 assert_eq!(
626 history[0].bytes_transferred, 512,
627 "Should have transferred 512 bytes"
628 );
629 }
630}