1use serde::{Deserialize, Serialize};
7
8use crate::{
9 Error, Result, Transaction,
10 auth::crypto::PublicKey,
11 clock::Clock,
12 crdt::doc::{Value, path},
13 entry::ID,
14 store::{DocStore, StoreError},
15};
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
22pub struct SyncCursor {
23 pub peer_pubkey: PublicKey,
25 pub tree_id: ID,
27 pub last_synced_entry: Option<ID>,
29 pub last_sync_time: String,
31 pub last_sync_count: u64,
33 pub total_synced_count: u64,
35}
36
37impl SyncCursor {
38 pub fn new(peer_pubkey: PublicKey, tree_id: ID, clock: &dyn Clock) -> Self {
45 Self {
46 peer_pubkey,
47 tree_id,
48 last_synced_entry: None,
49 last_sync_time: clock.now_rfc3339(),
50 last_sync_count: 0,
51 total_synced_count: 0,
52 }
53 }
54
55 pub fn update_sync(&mut self, last_entry: ID, count: u64, clock: &dyn Clock) {
62 self.last_synced_entry = Some(last_entry);
63 self.last_sync_time = clock.now_rfc3339();
64 self.last_sync_count = count;
65 self.total_synced_count += count;
66 }
67
68 pub fn has_sync_history(&self) -> bool {
70 self.last_synced_entry.is_some()
71 }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79pub struct SyncMetadata {
80 pub peer_pubkey: PublicKey,
82 pub sync_established: String,
84 pub last_sync_attempt: String,
86 pub last_successful_sync: Option<String>,
88 pub successful_sync_count: u64,
90 pub failed_sync_count: u64,
92 pub total_entries_synced: u64,
94 pub total_bytes_synced: u64,
96 pub average_sync_duration_ms: f64,
98 pub synced_trees: Vec<ID>,
100}
101
102impl SyncMetadata {
103 pub fn new(peer_pubkey: PublicKey, clock: &dyn Clock) -> Self {
109 let now = clock.now_rfc3339();
110 Self {
111 peer_pubkey,
112 sync_established: now.clone(),
113 last_sync_attempt: now,
114 last_successful_sync: None,
115 successful_sync_count: 0,
116 failed_sync_count: 0,
117 total_entries_synced: 0,
118 total_bytes_synced: 0,
119 average_sync_duration_ms: 0.0,
120 synced_trees: Vec::new(),
121 }
122 }
123
124 pub fn record_successful_sync(
132 &mut self,
133 entries_count: u64,
134 bytes: u64,
135 duration_ms: f64,
136 clock: &dyn Clock,
137 ) {
138 let now = clock.now_rfc3339();
139 self.last_sync_attempt = now.clone();
140 self.last_successful_sync = Some(now);
141 self.successful_sync_count += 1;
142 self.total_entries_synced += entries_count;
143 self.total_bytes_synced += bytes;
144
145 let total_ops = self.successful_sync_count as f64;
147 self.average_sync_duration_ms =
148 (self.average_sync_duration_ms * (total_ops - 1.0) + duration_ms) / total_ops;
149 }
150
151 pub fn record_failed_sync(&mut self, clock: &dyn Clock) {
156 self.last_sync_attempt = clock.now_rfc3339();
157 self.failed_sync_count += 1;
158 }
159
160 pub fn add_synced_tree(&mut self, tree_id: ID) {
162 if !self.synced_trees.contains(&tree_id) {
163 self.synced_trees.push(tree_id);
164 }
165 }
166
167 pub fn remove_synced_tree(&mut self, tree_id: &ID) {
169 self.synced_trees.retain(|id| id != tree_id);
170 }
171
172 pub fn sync_success_rate(&self) -> f64 {
174 let total = self.successful_sync_count + self.failed_sync_count;
175 if total == 0 {
176 0.0
177 } else {
178 self.successful_sync_count as f64 / total as f64
179 }
180 }
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
185pub struct SyncHistoryEntry {
186 pub sync_id: String,
188 pub peer_pubkey: PublicKey,
190 pub tree_id: ID,
192 pub started_at: String,
194 pub completed_at: String,
196 pub success: bool,
198 pub entries_count: u64,
200 pub bytes_transferred: u64,
202 pub duration_ms: f64,
204 pub error_message: Option<String>,
206}
207
208impl SyncHistoryEntry {
209 pub fn new(peer_pubkey: PublicKey, tree_id: ID, clock: &dyn Clock) -> Self {
216 let now = clock.now_rfc3339();
217 Self {
218 sync_id: uuid::Uuid::new_v4().to_string(),
219 peer_pubkey,
220 tree_id,
221 started_at: now.clone(),
222 completed_at: now,
223 success: false,
224 entries_count: 0,
225 bytes_transferred: 0,
226 duration_ms: 0.0,
227 error_message: None,
228 }
229 }
230
231 pub fn complete_success(&mut self, entries_count: u64, bytes: u64, clock: &dyn Clock) {
238 self.completed_at = clock.now_rfc3339();
239 self.success = true;
240 self.entries_count = entries_count;
241 self.bytes_transferred = bytes;
242 self.calculate_duration();
243 }
244
245 pub fn complete_failure(&mut self, error: String, clock: &dyn Clock) {
251 self.completed_at = clock.now_rfc3339();
252 self.success = false;
253 self.error_message = Some(error);
254 self.calculate_duration();
255 }
256
257 fn calculate_duration(&mut self) {
259 if let (Ok(start), Ok(end)) = (
260 chrono::DateTime::parse_from_rfc3339(&self.started_at),
261 chrono::DateTime::parse_from_rfc3339(&self.completed_at),
262 ) {
263 self.duration_ms = (end - start).num_milliseconds() as f64;
264 }
265 }
266}
267
268pub struct SyncStateManager<'a> {
270 txn: &'a Transaction,
272}
273
274impl<'a> SyncStateManager<'a> {
275 pub fn new(txn: &'a Transaction) -> Self {
277 Self { txn }
278 }
279
280 pub async fn get_sync_cursor(
287 &self,
288 peer_pubkey: &PublicKey,
289 tree_id: &ID,
290 clock: &dyn Clock,
291 ) -> Result<SyncCursor> {
292 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
293 let pk_str = peer_pubkey.to_string();
294 let cursor_path = path!("cursors", &pk_str as &str, tree_id.as_str());
295
296 match sync_state.get_path_as::<String>(&cursor_path).await {
297 Ok(json) => serde_json::from_str(&json).map_err(|e| {
298 Error::Store(StoreError::SerializationFailed {
299 store: "sync_state".to_string(),
300 reason: format!("Invalid cursor JSON: {e}"),
301 })
302 }),
303 Err(_) => {
304 Ok(SyncCursor::new(peer_pubkey.clone(), tree_id.clone(), clock))
306 }
307 }
308 }
309
310 pub async fn update_sync_cursor(&self, cursor: &SyncCursor) -> Result<()> {
312 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
313 let pk_str = cursor.peer_pubkey.to_string();
314 let cursor_path = path!("cursors", &pk_str as &str, cursor.tree_id.as_str());
315 let cursor_json = serde_json::to_string(cursor)?;
316 sync_state.set_path(&cursor_path, cursor_json).await?;
317 Ok(())
318 }
319
320 pub async fn get_sync_metadata(
326 &self,
327 peer_pubkey: &PublicKey,
328 clock: &dyn Clock,
329 ) -> Result<SyncMetadata> {
330 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
331 let pk_str = peer_pubkey.to_string();
332 let metadata_path = path!("metadata", &pk_str as &str);
333
334 match sync_state.get_path_as::<String>(&metadata_path).await {
335 Ok(json) => serde_json::from_str(&json).map_err(|e| {
336 Error::Store(StoreError::SerializationFailed {
337 store: "sync_state".to_string(),
338 reason: format!("Invalid metadata JSON: {e}"),
339 })
340 }),
341 Err(_) => {
342 Ok(SyncMetadata::new(peer_pubkey.clone(), clock))
344 }
345 }
346 }
347
348 pub async fn update_sync_metadata(&self, metadata: &SyncMetadata) -> Result<()> {
350 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
351 let pk_str = metadata.peer_pubkey.to_string();
352 let metadata_path = path!("metadata", &pk_str as &str);
353 let metadata_json = serde_json::to_string(metadata)?;
354 sync_state.set_path(&metadata_path, metadata_json).await?;
355 Ok(())
356 }
357
358 pub async fn add_sync_history(&self, history_entry: &SyncHistoryEntry) -> Result<()> {
360 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
361 let history_path = path!("history", history_entry.sync_id);
362 let history_json = serde_json::to_string(history_entry)?;
363 sync_state.set_path(&history_path, history_json).await?;
364 Ok(())
365 }
366
367 pub async fn get_sync_history(
374 &self,
375 peer_pubkey: &PublicKey,
376 limit: Option<usize>,
377 ) -> Result<Vec<SyncHistoryEntry>> {
378 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
379 let all_data = sync_state.get_all().await?;
380
381 let mut history_entries = Vec::new();
382
383 if let Some(Value::Doc(history_node)) = all_data.get("history") {
385 for (_sync_id, value) in history_node.iter() {
387 if let Value::Text(json_str) = value
388 && let Ok(history_entry) = serde_json::from_str::<SyncHistoryEntry>(json_str)
389 && history_entry.peer_pubkey == *peer_pubkey
390 {
391 history_entries.push(history_entry);
392 }
393 }
394 }
395
396 history_entries.sort_by(|a, b| b.started_at.cmp(&a.started_at));
398
399 if let Some(limit) = limit {
401 history_entries.truncate(limit);
402 }
403
404 Ok(history_entries)
405 }
406
407 pub async fn get_peers_with_sync_state(&self) -> Result<Vec<PublicKey>> {
414 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
415 let all_data = sync_state.get_all().await?;
416
417 let mut peers = std::collections::HashSet::new();
418
419 if let Some(Value::Doc(metadata_node)) = all_data.get("metadata") {
421 for (peer_key, _) in metadata_node.iter() {
422 if let Ok(pk) = PublicKey::from_prefixed_string(peer_key) {
423 peers.insert(pk);
424 }
425 }
426 }
427
428 if let Some(Value::Doc(cursors_node)) = all_data.get("cursors") {
430 for (peer_key, _) in cursors_node.iter() {
431 if let Ok(pk) = PublicKey::from_prefixed_string(peer_key) {
432 peers.insert(pk);
433 }
434 }
435 }
436
437 Ok(peers.into_iter().collect())
438 }
439
440 pub async fn cleanup_old_history(&self, max_age_days: u32, clock: &dyn Clock) -> Result<usize> {
451 let sync_state = self.txn.get_store::<DocStore>("sync_state").await?;
452 let all_data = sync_state.get_all().await?;
453
454 let now_millis = clock.now_millis();
456 let days_millis = max_age_days as u64 * 24 * 60 * 60 * 1000;
457 let cutoff_millis = now_millis.saturating_sub(days_millis);
458
459 let cutoff_time =
461 chrono::DateTime::from_timestamp_millis(cutoff_millis as i64).unwrap_or_default();
462 let cutoff_str = cutoff_time.to_rfc3339();
463
464 let mut cleaned_count = 0;
465
466 let mut keys_to_delete = Vec::new();
469 if let Some(Value::Doc(history_node)) = all_data.get("history") {
470 for (sync_id, value) in history_node.iter() {
471 if let Value::Text(json_str) = value
472 && let Ok(history_entry) = serde_json::from_str::<SyncHistoryEntry>(json_str)
473 && history_entry.started_at < cutoff_str
474 {
475 keys_to_delete.push(sync_id.to_string());
476 }
477 }
478 }
479
480 for sync_id in keys_to_delete {
482 sync_state.delete(format!("history.{sync_id}")).await?;
483 cleaned_count += 1;
484 }
485
486 Ok(cleaned_count)
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use crate::{Entry, Instance, backend::database::InMemory, crdt::Doc};
494
495 #[test]
496 fn test_sync_cursor() {
497 use crate::clock::FixedClock;
498
499 let clock = FixedClock::default();
500 let peer_pubkey = PublicKey::random();
501 let tree_id = Entry::root_builder()
502 .build()
503 .expect("Root entry should build successfully")
504 .id()
505 .clone();
506
507 let mut cursor = SyncCursor::new(peer_pubkey.clone(), tree_id.clone(), &clock);
508 assert_eq!(cursor.peer_pubkey, peer_pubkey);
509 assert_eq!(cursor.tree_id, tree_id);
510 assert!(!cursor.has_sync_history());
511
512 let entry_id = Entry::root_builder()
513 .build()
514 .expect("Root entry should build successfully")
515 .id()
516 .clone();
517 cursor.update_sync(entry_id.clone(), 5, &clock);
518 assert!(cursor.has_sync_history());
519 assert_eq!(cursor.last_synced_entry.unwrap(), entry_id);
520 assert_eq!(cursor.last_sync_count, 5);
521 assert_eq!(cursor.total_synced_count, 5);
522 }
523
524 #[test]
525 fn test_sync_metadata() {
526 use crate::clock::FixedClock;
527
528 let clock = FixedClock::default();
529 let peer_pubkey = PublicKey::random();
530 let mut metadata = SyncMetadata::new(peer_pubkey.clone(), &clock);
531
532 assert_eq!(metadata.peer_pubkey, peer_pubkey);
533 assert_eq!(metadata.successful_sync_count, 0);
534 assert_eq!(metadata.sync_success_rate(), 0.0);
535
536 metadata.record_successful_sync(10, 1024, 100.0, &clock);
537 assert_eq!(metadata.successful_sync_count, 1);
538 assert_eq!(metadata.total_entries_synced, 10);
539 assert_eq!(metadata.average_sync_duration_ms, 100.0);
540 assert_eq!(metadata.sync_success_rate(), 1.0);
541
542 metadata.record_failed_sync(&clock);
543 assert_eq!(metadata.failed_sync_count, 1);
544 assert_eq!(metadata.sync_success_rate(), 0.5);
545 }
546
547 #[tokio::test]
548 async fn test_sync_state_manager() {
549 use crate::clock::FixedClock;
550 use std::sync::Arc;
551
552 let clock = Arc::new(FixedClock::default());
553 let backend = InMemory::new();
554 let instance = Instance::open_with_clock(Box::new(backend), clock.clone())
555 .await
556 .expect("Failed to create test instance");
557 instance.enable_sync().await.unwrap();
558
559 instance.create_user("test", None).await.unwrap();
561 let mut user = instance.login_user("test", None).await.unwrap();
562 let key_id = user.add_private_key(None).await.unwrap();
563 let user_tree = user.create_database(Doc::new(), &key_id).await.unwrap();
564 let tree_id = user_tree.root_id().clone();
565
566 let sync = instance.sync().unwrap();
568 let sync_tree = &sync.sync_tree;
569 let txn = sync_tree.new_transaction().await.unwrap();
570
571 let state_manager = SyncStateManager::new(&txn);
572 let peer_pubkey = PublicKey::random();
573
574 let mut cursor = state_manager
576 .get_sync_cursor(&peer_pubkey, &tree_id, clock.as_ref())
577 .await
578 .unwrap();
579 assert!(!cursor.has_sync_history());
580
581 let entry_id = Entry::root_builder()
582 .build()
583 .expect("Root entry should build successfully")
584 .id()
585 .clone();
586 cursor.update_sync(entry_id, 3, clock.as_ref());
587 state_manager.update_sync_cursor(&cursor).await.unwrap();
588
589 let mut metadata = state_manager
591 .get_sync_metadata(&peer_pubkey, clock.as_ref())
592 .await
593 .unwrap();
594 metadata.record_successful_sync(3, 512, 50.0, clock.as_ref());
595 state_manager.update_sync_metadata(&metadata).await.unwrap();
596
597 let mut history_entry =
599 SyncHistoryEntry::new(peer_pubkey.clone(), tree_id.clone(), clock.as_ref());
600 history_entry.complete_success(3, 512, clock.as_ref());
601 state_manager
602 .add_sync_history(&history_entry)
603 .await
604 .unwrap();
605
606 txn.commit().await.unwrap();
608
609 let txn2 = sync_tree.new_transaction().await.unwrap();
611 let state_manager2 = SyncStateManager::new(&txn2);
612 let history = state_manager2
613 .get_sync_history(&peer_pubkey, Some(10))
614 .await
615 .unwrap();
616
617 assert_eq!(history.len(), 1, "Should have one history entry");
619 assert!(
620 history[0].success,
621 "History entry should be marked as success"
622 );
623 assert_eq!(history[0].entries_count, 3, "Should have synced 3 entries");
624 assert_eq!(
625 history[0].bytes_transferred, 512,
626 "Should have transferred 512 bytes"
627 );
628 }
629}