Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/sync/background/
mod.rs

1//! Background sync engine implementation.
2//!
3//! This module provides the BackgroundSync struct that handles all sync operations
4//! in a single background thread, removing circular dependency issues and providing
5//! automatic retry, periodic sync, and reconnection handling.
6
7use std::{sync::Arc, time::Duration};
8
9use tokio::{
10    sync::{mpsc, oneshot},
11    time::interval,
12};
13use tracing::{Instrument, debug, info, info_span, trace};
14
15use super::{
16    error::SyncError,
17    handler::SyncHandlerImpl,
18    peer_manager::PeerManager,
19    peer_types::{Address, PeerId, PeerStatus},
20    protocol::{SyncRequest, SyncResponse, SyncTreeRequest},
21    queue::SyncQueue,
22    transport_manager::TransportManager,
23};
24use crate::{
25    Database, Error, Instance, Result, WeakInstance,
26    auth::crypto::PublicKey,
27    entry::{Entry, ID},
28    store::DocStore,
29};
30
31mod conn;
32
33/// Commands that can be sent to the background sync engine
34#[allow(clippy::large_enum_variant)]
35pub enum SyncCommand {
36    /// Send entries to a specific peer
37    SendEntries { peer: PeerId, entries: Vec<Entry> },
38    /// Trigger immediate sync with a peer
39    SyncWithPeer { peer: PeerId },
40    /// Shutdown the background engine
41    Shutdown,
42
43    // Transport management
44    /// Add a named transport to the transport manager
45    AddTransport {
46        name: String,
47        transport: Box<dyn super::transports::SyncTransport>,
48        response: oneshot::Sender<Result<()>>,
49    },
50
51    // Server management commands
52    /// Start the sync server on specified or all transports
53    StartServer {
54        /// Transport name to start, or None for all transports
55        name: Option<String>,
56        response: oneshot::Sender<Result<()>>,
57    },
58    /// Stop the sync server on specified or all transports
59    StopServer {
60        /// Transport name to stop, or None for all transports
61        name: Option<String>,
62        response: oneshot::Sender<Result<()>>,
63    },
64    /// Get the server's listening address for a specific transport
65    GetServerAddress {
66        name: String,
67        response: oneshot::Sender<Result<String>>,
68    },
69    /// Get all server addresses for running servers
70    GetAllServerAddresses {
71        response: oneshot::Sender<Result<Vec<(String, String)>>>,
72    },
73
74    // Peer connection
75    /// Connect to a peer and perform handshake
76    ConnectToPeer {
77        address: Address,
78        response: oneshot::Sender<Result<PublicKey>>, // Returns peer pubkey
79    },
80
81    // Request/Response operations
82    /// Send a sync request and get response
83    SendRequest {
84        address: Address,
85        request: Box<SyncRequest>,
86        response: oneshot::Sender<Result<SyncResponse>>,
87    },
88
89    /// Flush: process all queued entries and retry queue, then respond.
90    Flush {
91        response: oneshot::Sender<Result<()>>,
92    },
93}
94
95// Manual Debug impl required because:
96// - `Box<dyn SyncTransport>` doesn't implement Debug (trait object)
97// - `oneshot::Sender` doesn't implement Debug (channel internals)
98// - Transports may contain secrets (e.g., Iroh's cryptographic keys)
99// This impl provides safe, useful debug output for logging.
100impl std::fmt::Debug for SyncCommand {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        match self {
103            Self::SendEntries { peer, entries } => f
104                .debug_struct("SendEntries")
105                .field("peer", peer)
106                .field("entries_count", &entries.len())
107                .finish(),
108            Self::SyncWithPeer { peer } => {
109                f.debug_struct("SyncWithPeer").field("peer", peer).finish()
110            }
111            Self::Shutdown => write!(f, "Shutdown"),
112            Self::AddTransport {
113                name, transport, ..
114            } => f
115                .debug_struct("AddTransport")
116                .field("name", name)
117                .field("transport_type", &transport.transport_type())
118                .finish(),
119            Self::StartServer { name, .. } => {
120                f.debug_struct("StartServer").field("name", name).finish()
121            }
122            Self::StopServer { name, .. } => {
123                f.debug_struct("StopServer").field("name", name).finish()
124            }
125            Self::GetServerAddress { name, .. } => f
126                .debug_struct("GetServerAddress")
127                .field("name", name)
128                .finish(),
129            Self::GetAllServerAddresses { .. } => write!(f, "GetAllServerAddresses"),
130            Self::ConnectToPeer { address, .. } => f
131                .debug_struct("ConnectToPeer")
132                .field("address", address)
133                .finish(),
134            Self::SendRequest {
135                address, request, ..
136            } => f
137                .debug_struct("SendRequest")
138                .field("address", address)
139                .field("request", request)
140                .finish(),
141            Self::Flush { .. } => write!(f, "Flush"),
142        }
143    }
144}
145
146/// Entry in the retry queue for failed sends
147#[derive(Debug, Clone)]
148struct RetryEntry {
149    peer: PeerId,
150    entries: Vec<Entry>,
151    attempts: u32,
152    /// Timestamp of last attempt in milliseconds since Unix epoch
153    last_attempt_ms: u64,
154}
155
156/// Background sync engine that owns all sync state and handles operations
157pub struct BackgroundSync {
158    // Core components - owns everything
159    pub(super) transport_manager: TransportManager,
160    instance: WeakInstance,
161    pub(super) sync_tree_id: ID,
162
163    // Queue for entries pending synchronization (shared with Sync frontend)
164    queue: Arc<SyncQueue>,
165
166    // Retry queue for failed sends
167    retry_queue: Vec<RetryEntry>,
168
169    // Communication
170    command_rx: mpsc::Receiver<SyncCommand>,
171}
172
173impl BackgroundSync {
174    /// Start the background sync engine and return a command sender.
175    ///
176    /// The engine starts with no transports registered. Use `AddTransport`
177    /// commands to add transports after starting.
178    pub fn start(
179        instance: Instance,
180        sync_tree_id: ID,
181        queue: Arc<SyncQueue>,
182    ) -> mpsc::Sender<SyncCommand> {
183        let (tx, rx) = mpsc::channel(100);
184
185        let background = Self {
186            transport_manager: TransportManager::new(),
187            instance: instance.downgrade(),
188            sync_tree_id,
189            queue,
190            retry_queue: Vec::new(),
191            command_rx: rx,
192        };
193
194        // Spawn background sync as a regular tokio task
195        // (Transaction is now Send since it uses Arc<Mutex>)
196        tokio::spawn(background.run());
197        tx
198    }
199
200    /// Upgrade the weak instance reference to a strong reference.
201    pub(super) fn instance(&self) -> Result<Instance> {
202        self.instance
203            .upgrade()
204            .ok_or_else(|| SyncError::InstanceDropped.into())
205    }
206
207    /// Get the sync tree for accessing peer data
208    pub(super) async fn get_sync_tree(&self) -> Result<Database> {
209        // Load sync tree with the device key
210        let instance = self.instance()?;
211        let signing_key = instance.signing_key()?.clone();
212        Ok(Database::open(&instance, &self.sync_tree_id)
213            .await?
214            .with_key(signing_key))
215    }
216
217    /// Get the minimum sync interval from all tracked databases
218    /// Returns None if no databases are tracked or no intervals are set
219    async fn get_min_sync_interval(&self) -> Option<u64> {
220        let sync_tree = match self.get_sync_tree().await {
221            Ok(tree) => tree,
222            Err(_) => return None,
223        };
224
225        let txn = match sync_tree.new_transaction().await {
226            Ok(txn) => txn,
227            Err(_) => return None,
228        };
229
230        let user_mgr = super::user_sync_manager::UserSyncManager::new(&txn);
231
232        // Get all tracked database IDs from the DATABASE_USERS_SUBTREE
233        let database_users = match txn
234            .get_store::<DocStore>(super::user_sync_manager::DATABASE_USERS_SUBTREE)
235            .await
236        {
237            Ok(store) => store,
238            Err(_) => return None,
239        };
240
241        let all_dbs = match database_users.get_all().await {
242            Ok(doc) => doc,
243            Err(_) => return None,
244        };
245
246        // Find the minimum interval across all databases
247        let mut min_interval: Option<u64> = None;
248        for db_id_str in all_dbs.keys() {
249            if let Ok(db_id) = ID::parse(db_id_str)
250                && let Ok(Some(settings)) = user_mgr.get_combined_settings(&db_id).await
251                && let Some(interval) = settings.interval_seconds
252            {
253                min_interval = Some(match min_interval {
254                    Some(current_min) => current_min.min(interval),
255                    None => interval,
256                });
257            }
258        }
259
260        min_interval
261    }
262
263    /// Main event loop that handles all sync operations
264    async fn run(mut self) {
265        async move {
266            info!("Starting background sync engine");
267
268            // Get initial sync interval from settings (default to 300 seconds if none set)
269            let mut current_interval_secs = self.get_min_sync_interval().await.unwrap_or(300);
270            info!("Initial periodic sync interval: {} seconds", current_interval_secs);
271
272            // Set up timers
273            let mut periodic_sync = interval(Duration::from_secs(current_interval_secs));
274            let mut queue_check = interval(Duration::from_secs(5)); // 5 seconds - batches local writes
275            let mut retry_check = interval(Duration::from_secs(30)); // 30 seconds
276            let mut connection_check = interval(Duration::from_secs(60)); // 1 minute
277            let mut settings_check = interval(Duration::from_secs(60)); // Check for settings changes every minute
278
279            // Skip initial tick to avoid immediate execution
280            periodic_sync.tick().await;
281            queue_check.tick().await;
282            retry_check.tick().await;
283            connection_check.tick().await;
284            settings_check.tick().await;
285
286            loop {
287                tokio::select! {
288                    // Handle commands from frontend
289                    Some(cmd) = self.command_rx.recv() => {
290                        if let Err(e) = self.handle_command(cmd).await {
291                            // Log errors but continue running - background sync should be resilient
292                            tracing::error!("Background sync command error: {e}");
293                        }
294                    }
295
296                    // Drain sync queue (batched entries)
297                    _ = queue_check.tick() => {
298                        self.process_queue().await;
299                    }
300
301                    // Periodic sync with all peers
302                    _ = periodic_sync.tick() => {
303                        self.periodic_sync_all_peers().await;
304                    }
305
306                    // Process retry queue
307                    _ = retry_check.tick() => {
308                        self.process_retry_queue().await;
309                    }
310
311                    // Check and reconnect disconnected peers
312                    _ = connection_check.tick() => {
313                        self.check_peer_connections().await;
314                    }
315
316                    // Check if sync interval settings have changed
317                    _ = settings_check.tick() => {
318                        if let Some(new_interval) = self.get_min_sync_interval().await
319                            && new_interval != current_interval_secs {
320                                info!("Sync interval changed from {} to {} seconds", current_interval_secs, new_interval);
321                                current_interval_secs = new_interval;
322                                // Recreate the periodic sync timer with new interval
323                                periodic_sync = interval(Duration::from_secs(new_interval));
324                                periodic_sync.tick().await; // Skip initial tick
325                            }
326                    }
327
328                    // Channel closed, shutdown
329                    else => {
330                        // Normal shutdown when channel closes
331                        info!("Background sync engine shutting down");
332                        break;
333                    }
334                }
335            }
336        }
337        .instrument(info_span!("background_sync"))
338        .await
339    }
340
341    /// Handle a single command from the frontend
342    async fn handle_command(&mut self, command: SyncCommand) -> Result<()> {
343        match command {
344            SyncCommand::SendEntries { peer, entries } => {
345                if let Err(e) = self.send_to_peer(&peer, entries.clone()).await {
346                    let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
347                    self.add_to_retry_queue(peer, entries, e, now_ms);
348                }
349            }
350
351            SyncCommand::SyncWithPeer { peer } => {
352                if let Err(e) = self.sync_with_peer(&peer).await {
353                    // Log sync failure but don't crash the background engine
354                    tracing::error!("Failed to sync with peer {peer}: {e}");
355                }
356            }
357
358            SyncCommand::AddTransport {
359                name,
360                transport,
361                response,
362            } => {
363                // Stop server on existing transport if running
364                if let Some(old) = self.transport_manager.get_mut(&name)
365                    && old.is_server_running()
366                {
367                    let _ = old.stop_server().await;
368                }
369                self.transport_manager.add(&name, transport);
370                tracing::debug!("Added transport: {}", name);
371                let _ = response.send(Ok(()));
372            }
373
374            SyncCommand::StartServer { name, response } => {
375                let result = self.start_server(name.as_deref()).await;
376                let _ = response.send(result);
377            }
378
379            SyncCommand::StopServer { name, response } => {
380                let result = self.stop_server(name.as_deref()).await;
381                let _ = response.send(result);
382            }
383
384            SyncCommand::GetServerAddress { name, response } => {
385                let result = self.transport_manager.get_server_address(&name);
386                let _ = response.send(result);
387            }
388
389            SyncCommand::GetAllServerAddresses { response } => {
390                let addresses = self.transport_manager.get_all_server_addresses();
391                let _ = response.send(Ok(addresses));
392            }
393
394            SyncCommand::ConnectToPeer { address, response } => {
395                let result = self.connect_to_peer(&address).await;
396                let _ = response.send(result);
397            }
398
399            SyncCommand::SendRequest {
400                address,
401                request,
402                response,
403            } => {
404                let result = self.send_sync_request(&address, &request).await;
405                let _ = response.send(result);
406            }
407
408            SyncCommand::Flush { response } => {
409                // Process retry queue first (old failures), then main queue (new entries)
410                // This avoids double-trying entries that fail in process_queue
411                let retry_failures = self.flush_retry_queue().await;
412                let queue_failures = self.process_queue().await;
413
414                // Report error if any failures occurred
415                let result = match (retry_failures, queue_failures) {
416                    (0, 0) => Ok(()),
417                    (r, q) => Err(SyncError::Network(format!(
418                        "Flush had failures: {r} from retry queue, {q} from new entries"
419                    ))
420                    .into()),
421                };
422                let _ = response.send(result);
423            }
424
425            SyncCommand::Shutdown => {
426                // Shutdown command received - exit cleanly
427                return Err(SyncError::Network("Shutdown requested".to_string()).into());
428            }
429        }
430        Ok(())
431    }
432
433    /// Send specific entries to a peer without duplicate filtering.
434    ///
435    /// This method performs direct entry transmission and is used by:
436    /// - `SendEntries` commands from the frontend (caller handles filtering)
437    /// - `sync_tree_with_peer()` after smart duplicate prevention analysis
438    ///
439    /// # Design Note
440    ///
441    /// This method does NOT perform duplicate prevention - that responsibility
442    /// lies with the caller. The background sync's smart duplicate prevention
443    /// happens in `sync_tree_with_peer()` via tip comparison, while direct
444    /// `SendEntries` commands trust the caller to send appropriate entries.
445    ///
446    /// # Error Handling
447    ///
448    /// Failed sends are automatically added to the retry queue with exponential backoff.
449    async fn send_to_peer(&self, peer: &PeerId, entries: Vec<Entry>) -> Result<()> {
450        // Get peer address from sync tree (extract and drop transaction before await)
451        let address = {
452            let sync_tree = self.get_sync_tree().await?;
453            let txn = sync_tree.new_transaction().await?;
454            let peer_info = PeerManager::new(&txn)
455                .get_peer_info(peer.public_key())
456                .await?
457                .ok_or_else(|| SyncError::PeerNotFound(peer.to_string()))?;
458
459            peer_info
460                .addresses
461                .first()
462                .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?
463                .clone()
464        }; // Transaction is dropped here
465
466        let request = SyncRequest::SendEntries(entries);
467        let response = self
468            .transport_manager
469            .send_request(&address, &request)
470            .await?;
471
472        match response {
473            SyncResponse::Ack | SyncResponse::Count(_) => Ok(()),
474            SyncResponse::Error(msg) => Err(SyncError::SyncProtocolError(format!(
475                "Peer {peer} returned error: {msg}"
476            ))
477            .into()),
478            _ => Err(SyncError::UnexpectedResponse {
479                expected: "Ack or Count",
480                actual: format!("{response:?}"),
481            }
482            .into()),
483        }
484    }
485
486    /// Add failed send to retry queue
487    fn add_to_retry_queue(&mut self, peer: PeerId, entries: Vec<Entry>, error: Error, now_ms: u64) {
488        // Log send failure and add to retry queue
489        tracing::warn!("Failed to send to {peer}: {error}. Adding to retry queue.");
490        self.retry_queue.push(RetryEntry {
491            peer,
492            entries,
493            attempts: 1,
494            last_attempt_ms: now_ms,
495        });
496    }
497
498    /// Process entries from the sync queue, batching by peer.
499    ///
500    /// Drains the queue and sends entries to each peer. Failed sends
501    /// are added to the retry queue with exponential backoff.
502    /// Returns the number of peers that failed to receive entries.
503    async fn process_queue(&mut self) -> usize {
504        let batches = self.queue.drain();
505        if batches.is_empty() {
506            return 0;
507        }
508
509        let instance = match self.instance() {
510            Ok(i) => i,
511            Err(e) => {
512                tracing::warn!("Failed to get instance for queue processing: {e}");
513                return batches.len(); // All batches failed
514            }
515        };
516
517        let mut failures = 0;
518        for (peer, entry_ids) in batches {
519            // Fetch entries from backend
520            let mut entries = Vec::with_capacity(entry_ids.len());
521            for (entry_id, _tree_id) in &entry_ids {
522                match instance.backend().get(entry_id).await {
523                    Ok(entry) => entries.push(entry),
524                    Err(e) => {
525                        tracing::warn!("Failed to fetch entry {entry_id} for peer {peer}: {e}");
526                    }
527                }
528            }
529
530            if entries.is_empty() {
531                continue;
532            }
533
534            // Send batched entries to peer
535            if let Err(e) = self.send_to_peer(&peer, entries.clone()).await {
536                let now_ms = instance.clock().now_millis();
537                self.add_to_retry_queue(peer, entries, e, now_ms);
538                failures += 1;
539            }
540        }
541        failures
542    }
543
544    /// Process retry queue with exponential backoff
545    async fn process_retry_queue(&mut self) {
546        let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
547        let mut still_failed = Vec::new();
548
549        // Take the retry queue to avoid borrowing issues
550        let retry_queue = std::mem::take(&mut self.retry_queue);
551
552        // Process entries that are ready for retry
553        for mut entry in retry_queue {
554            // Backoff in milliseconds: 2^attempts * 1000ms, max 64 seconds
555            let backoff_ms = 2u64.pow(entry.attempts.min(6)) * 1000;
556            let elapsed_ms = now_ms.saturating_sub(entry.last_attempt_ms);
557
558            if elapsed_ms >= backoff_ms {
559                // Try sending again
560                if let Err(_e) = self.send_to_peer(&entry.peer, entry.entries.clone()).await {
561                    entry.attempts += 1;
562                    entry.last_attempt_ms = now_ms;
563
564                    if entry.attempts < 10 {
565                        // Max 10 attempts
566                        still_failed.push(entry);
567                    } else {
568                        // Max retries exceeded - give up on this batch
569                        tracing::error!("Giving up on sending to {} after 10 attempts", entry.peer);
570                    }
571                } else {
572                    // Successfully retried after failure
573                }
574            } else {
575                // Not ready for retry yet
576                still_failed.push(entry);
577            }
578        }
579
580        self.retry_queue = still_failed;
581    }
582
583    /// Flush retry queue immediately, ignoring backoff timers.
584    /// Returns the number of entries that still failed after retry.
585    async fn flush_retry_queue(&mut self) -> usize {
586        let mut still_failed = Vec::new();
587        let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
588
589        // Take the retry queue to process
590        let retry_queue = std::mem::take(&mut self.retry_queue);
591
592        // Try sending each entry immediately (ignore backoff)
593        for mut retry_entry in retry_queue {
594            if let Err(_e) = self
595                .send_to_peer(&retry_entry.peer, retry_entry.entries.clone())
596                .await
597            {
598                retry_entry.attempts += 1;
599                retry_entry.last_attempt_ms = now_ms;
600
601                if retry_entry.attempts < 10 {
602                    still_failed.push(retry_entry);
603                } else {
604                    tracing::error!(
605                        "Giving up on sending to {} after 10 attempts",
606                        retry_entry.peer
607                    );
608                }
609            }
610        }
611
612        let failed_count = still_failed.len();
613        self.retry_queue = still_failed;
614        failed_count
615    }
616
617    /// Perform periodic sync with all active peers
618    async fn periodic_sync_all_peers(&self) {
619        // Periodic sync triggered
620
621        // Get all peers from sync tree
622        let peers = match self.get_sync_tree().await {
623            Ok(sync_tree) => match sync_tree.new_transaction().await {
624                Ok(txn) => match PeerManager::new(&txn).list_peers().await {
625                    Ok(peers) => {
626                        // Extract peer list and drop the operation before awaiting
627                        peers
628                    }
629                    Err(_) => {
630                        // Skip sync if we can't list peers
631                        return;
632                    }
633                },
634                Err(_) => {
635                    // Skip sync if we can't create transaction
636                    return;
637                }
638            },
639            Err(_) => {
640                // Skip sync if we can't get sync tree
641                return;
642            }
643        };
644
645        // Now sync with peers (transaction is dropped, so no Send issues)
646        for peer_info in peers {
647            if peer_info.status == PeerStatus::Active
648                && let Err(e) = self.sync_with_peer(&peer_info.id).await
649            {
650                // Log individual peer sync failure but continue with others
651                tracing::error!("Periodic sync failed with {}: {e}", peer_info.id);
652            }
653        }
654    }
655
656    /// Sync with a specific peer (bidirectional)
657    async fn sync_with_peer(&self, peer_id: &PeerId) -> Result<()> {
658        async move {
659            info!(peer = %peer_id, "Starting peer synchronization");
660
661            // Get peer info and tree list from sync tree (extract and drop transaction before await)
662            let (address, sync_trees) = {
663                let sync_tree = self.get_sync_tree().await?;
664                let txn = sync_tree.new_transaction().await?;
665                let peer_manager = PeerManager::new(&txn);
666
667                let peer_info = peer_manager
668                    .get_peer_info(peer_id.public_key())
669                    .await?
670                    .ok_or_else(|| SyncError::PeerNotFound(peer_id.to_string()))?;
671
672                let address = peer_info
673                    .addresses
674                    .first()
675                    .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?
676                    .clone();
677
678                // Find all trees that sync with this peer from sync tree
679                let sync_trees = peer_manager.get_peer_trees(peer_id.public_key()).await?;
680
681                (address, sync_trees)
682            }; // Transaction is dropped here
683
684            if sync_trees.is_empty() {
685                debug!(peer = %peer_id, "No trees configured for sync with peer");
686                return Ok(()); // No trees to sync
687            }
688
689            info!(peer = %peer_id, tree_count = sync_trees.len(), "Synchronizing trees with peer");
690
691            for tree_id in sync_trees {
692                if let Err(e) = self.sync_tree_with_peer(peer_id, &tree_id, &address).await {
693                    // Log tree sync failure but continue with other trees
694                    tracing::error!("Failed to sync tree {tree_id} with peer {peer_id}: {e}");
695                }
696            }
697
698            info!(peer = %peer_id, "Completed peer synchronization");
699            Ok(())
700        }
701        .instrument(info_span!("sync_with_peer", peer = %peer_id))
702        .await
703    }
704
705    /// Sync a specific tree with a peer using smart duplicate prevention.
706    ///
707    /// This method implements Eidetica's core synchronization algorithm based on
708    /// Merkle-CRDT tip comparison. It eliminates duplicate sends by understanding
709    /// the semantic state of both peers' trees.
710    ///
711    /// # Algorithm
712    ///
713    /// 1. **Tip Exchange**: Get local tips and request peer's tips
714    /// 2. **Gap Analysis**: Compare tips to identify missing entries on both sides
715    /// 3. **Smart Transfer**: Only send/receive entries that are genuinely missing
716    /// 4. **DAG Completion**: Include all necessary ancestor entries
717    ///
718    /// # Benefits
719    ///
720    /// - **No duplicates**: Tips comparison guarantees no redundant network transfers
721    /// - **Complete data**: DAG traversal ensures all dependencies are satisfied
722    /// - **Bidirectional**: Both peers sync simultaneously for efficiency
723    /// - **Self-correcting**: Any missed entries are caught in subsequent syncs
724    ///
725    /// # Performance
726    ///
727    /// - **O(tip_count)** network requests for discovery
728    /// - **O(missing_entries)** data transfer (optimal)
729    /// - **Stateless**: No persistent tracking of individual sends needed
730    async fn sync_tree_with_peer(
731        &self,
732        peer_id: &PeerId,
733        tree_id: &ID,
734        address: &Address,
735    ) -> Result<()> {
736        async move {
737            trace!(peer = %peer_id, tree = %tree_id, "Starting unified tree synchronization");
738
739            // Get our tips for this tree (empty if tree doesn't exist)
740            let instance = self.instance()?;
741            let our_tips: Vec<ID> = instance
742                .backend()
743                .snapshot(tree_id)
744                .await
745                .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?
746                .into_tips();
747
748            // Get our device public key for automatic peer tracking
749            let our_device_pubkey = Some(instance.id());
750
751            debug!(peer = %peer_id, tree = %tree_id, our_tips = our_tips.len(), "Sending sync tree request");
752
753            // Send unified sync request
754            let request = SyncRequest::SyncTree(SyncTreeRequest {
755                tree_id: tree_id.clone(),
756                our_tips,
757                peer_pubkey: our_device_pubkey,
758                requesting_key: None, // TODO: Add auth support for background sync
759                requesting_key_name: None,
760                requested_permission: None,
761            });
762
763            let response = self.transport_manager.send_request(address, &request).await?;
764
765            match response {
766                SyncResponse::Bootstrap(bootstrap_response) => {
767                    info!(peer = %peer_id, tree = %tree_id, entry_count = bootstrap_response.all_entries.len() + 1, "Received bootstrap response");
768                    self.handle_bootstrap_response(bootstrap_response).await?;
769                }
770                SyncResponse::Incremental(incremental_response) => {
771                    debug!(peer = %peer_id, tree = %tree_id,
772                           their_tips = incremental_response.their_tips.len(),
773                           missing_count = incremental_response.missing_entries.len(),
774                           "Received incremental sync response");
775                    self.handle_incremental_response(incremental_response).await?;
776                }
777                SyncResponse::Error(msg) => {
778                    return Err(SyncError::SyncProtocolError(format!("Sync error: {msg}")).into());
779                }
780                _ => {
781                    return Err(SyncError::UnexpectedResponse {
782                        expected: "Bootstrap or Incremental",
783                        actual: format!("{response:?}"),
784                    }.into());
785                }
786            }
787
788            trace!(peer = %peer_id, tree = %tree_id, "Completed unified tree synchronization");
789            Ok(())
790        }
791        .instrument(info_span!("sync_tree", peer = %peer_id, tree = %tree_id))
792        .await
793    }
794
795    /// Check peer connections and attempt reconnection
796    async fn check_peer_connections(&mut self) {
797        // For now, this is a placeholder
798        // In the future, we could implement connection health checks
799        // and automatic reconnection logic here
800    }
801
802    /// Start the sync server on specified or all transports
803    async fn start_server(&mut self, name: Option<&str>) -> Result<()> {
804        // Create a sync handler with instance access and sync tree ID
805        let handler = Arc::new(SyncHandlerImpl::new(
806            self.instance()?,
807            self.sync_tree_id.clone(),
808        ));
809
810        match name {
811            Some(name) => {
812                // Start server on specific transport
813                self.transport_manager.start_server(name, handler).await?;
814                tracing::info!("Sync server started for transport {name}");
815            }
816            None => {
817                // Start servers on all transports
818                self.transport_manager.start_all_servers(handler).await?;
819                tracing::info!("Sync servers started for all transports");
820            }
821        }
822
823        Ok(())
824    }
825
826    /// Stop the sync server on specified or all transports
827    async fn stop_server(&mut self, name: Option<&str>) -> Result<()> {
828        match name {
829            Some(name) => {
830                self.transport_manager.stop_server(name).await?;
831                tracing::info!("Sync server stopped for transport {name}");
832            }
833            None => {
834                self.transport_manager.stop_all_servers().await?;
835                tracing::info!("All sync servers stopped");
836            }
837        }
838        Ok(())
839    }
840}