Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/sync/background/
mod.rs

1//! Background sync engine implementation.
2//!
3//! This module provides the BackgroundSync struct that handles all sync operations
4//! in a single background thread, removing circular dependency issues and providing
5//! automatic retry, periodic sync, and reconnection handling.
6
7use std::{sync::Arc, time::Duration};
8
9use tokio::{
10    sync::{mpsc, oneshot},
11    time::interval,
12};
13use tracing::{Instrument, debug, info, info_span, trace};
14
15use super::{
16    error::SyncError,
17    handler::SyncHandlerImpl,
18    peer_manager::PeerManager,
19    peer_types::{Address, PeerId, PeerStatus},
20    protocol::{SyncRequest, SyncResponse, SyncTreeRequest},
21    queue::SyncQueue,
22    transport_manager::TransportManager,
23};
24use crate::{
25    Database, Error, Instance, Result, WeakInstance,
26    auth::crypto::PublicKey,
27    database::DatabaseKey,
28    entry::{Entry, ID},
29    store::DocStore,
30};
31
32mod conn;
33
34/// Commands that can be sent to the background sync engine
35#[allow(clippy::large_enum_variant)]
36pub enum SyncCommand {
37    /// Send entries to a specific peer
38    SendEntries { peer: PeerId, entries: Vec<Entry> },
39    /// Trigger immediate sync with a peer
40    SyncWithPeer { peer: PeerId },
41    /// Shutdown the background engine
42    Shutdown,
43
44    // Transport management
45    /// Add a named transport to the transport manager
46    AddTransport {
47        name: String,
48        transport: Box<dyn super::transports::SyncTransport>,
49        response: oneshot::Sender<Result<()>>,
50    },
51
52    // Server management commands
53    /// Start the sync server on specified or all transports
54    StartServer {
55        /// Transport name to start, or None for all transports
56        name: Option<String>,
57        response: oneshot::Sender<Result<()>>,
58    },
59    /// Stop the sync server on specified or all transports
60    StopServer {
61        /// Transport name to stop, or None for all transports
62        name: Option<String>,
63        response: oneshot::Sender<Result<()>>,
64    },
65    /// Get the server's listening address for a specific transport
66    GetServerAddress {
67        name: String,
68        response: oneshot::Sender<Result<String>>,
69    },
70    /// Get all server addresses for running servers
71    GetAllServerAddresses {
72        response: oneshot::Sender<Result<Vec<(String, String)>>>,
73    },
74
75    // Peer connection
76    /// Connect to a peer and perform handshake
77    ConnectToPeer {
78        address: Address,
79        response: oneshot::Sender<Result<PublicKey>>, // Returns peer pubkey
80    },
81
82    // Request/Response operations
83    /// Send a sync request and get response
84    SendRequest {
85        address: Address,
86        request: Box<SyncRequest>,
87        response: oneshot::Sender<Result<SyncResponse>>,
88    },
89
90    /// Flush: process all queued entries and retry queue, then respond.
91    Flush {
92        response: oneshot::Sender<Result<()>>,
93    },
94}
95
96// Manual Debug impl required because:
97// - `Box<dyn SyncTransport>` doesn't implement Debug (trait object)
98// - `oneshot::Sender` doesn't implement Debug (channel internals)
99// - Transports may contain secrets (e.g., Iroh's cryptographic keys)
100// This impl provides safe, useful debug output for logging.
101impl std::fmt::Debug for SyncCommand {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        match self {
104            Self::SendEntries { peer, entries } => f
105                .debug_struct("SendEntries")
106                .field("peer", peer)
107                .field("entries_count", &entries.len())
108                .finish(),
109            Self::SyncWithPeer { peer } => {
110                f.debug_struct("SyncWithPeer").field("peer", peer).finish()
111            }
112            Self::Shutdown => write!(f, "Shutdown"),
113            Self::AddTransport {
114                name, transport, ..
115            } => f
116                .debug_struct("AddTransport")
117                .field("name", name)
118                .field("transport_type", &transport.transport_type())
119                .finish(),
120            Self::StartServer { name, .. } => {
121                f.debug_struct("StartServer").field("name", name).finish()
122            }
123            Self::StopServer { name, .. } => {
124                f.debug_struct("StopServer").field("name", name).finish()
125            }
126            Self::GetServerAddress { name, .. } => f
127                .debug_struct("GetServerAddress")
128                .field("name", name)
129                .finish(),
130            Self::GetAllServerAddresses { .. } => write!(f, "GetAllServerAddresses"),
131            Self::ConnectToPeer { address, .. } => f
132                .debug_struct("ConnectToPeer")
133                .field("address", address)
134                .finish(),
135            Self::SendRequest {
136                address, request, ..
137            } => f
138                .debug_struct("SendRequest")
139                .field("address", address)
140                .field("request", request)
141                .finish(),
142            Self::Flush { .. } => write!(f, "Flush"),
143        }
144    }
145}
146
147/// Entry in the retry queue for failed sends
148#[derive(Debug, Clone)]
149struct RetryEntry {
150    peer: PeerId,
151    entries: Vec<Entry>,
152    attempts: u32,
153    /// Timestamp of last attempt in milliseconds since Unix epoch
154    last_attempt_ms: u64,
155}
156
157/// Background sync engine that owns all sync state and handles operations
158pub struct BackgroundSync {
159    // Core components - owns everything
160    pub(super) transport_manager: TransportManager,
161    instance: WeakInstance,
162    pub(super) sync_tree_id: ID,
163
164    // Queue for entries pending synchronization (shared with Sync frontend)
165    queue: Arc<SyncQueue>,
166
167    // Retry queue for failed sends
168    retry_queue: Vec<RetryEntry>,
169
170    // Communication
171    command_rx: mpsc::Receiver<SyncCommand>,
172}
173
174impl BackgroundSync {
175    /// Start the background sync engine and return a command sender.
176    ///
177    /// The engine starts with no transports registered. Use `AddTransport`
178    /// commands to add transports after starting.
179    pub fn start(
180        instance: Instance,
181        sync_tree_id: ID,
182        queue: Arc<SyncQueue>,
183    ) -> mpsc::Sender<SyncCommand> {
184        let (tx, rx) = mpsc::channel(100);
185
186        let background = Self {
187            transport_manager: TransportManager::new(),
188            instance: instance.downgrade(),
189            sync_tree_id,
190            queue,
191            retry_queue: Vec::new(),
192            command_rx: rx,
193        };
194
195        // Spawn background sync as a regular tokio task
196        // (Transaction is now Send since it uses Arc<Mutex>)
197        tokio::spawn(background.run());
198        tx
199    }
200
201    /// Upgrade the weak instance reference to a strong reference.
202    pub(super) fn instance(&self) -> Result<Instance> {
203        self.instance
204            .upgrade()
205            .ok_or_else(|| SyncError::InstanceDropped.into())
206    }
207
208    /// Get the sync tree for accessing peer data
209    pub(super) async fn get_sync_tree(&self) -> Result<Database> {
210        // Load sync tree with the device key
211        let instance = self.instance()?;
212        let signing_key = instance.device_key().clone();
213
214        Database::open(instance, &self.sync_tree_id, DatabaseKey::new(signing_key)).await
215    }
216
217    /// Get the minimum sync interval from all tracked databases
218    /// Returns None if no databases are tracked or no intervals are set
219    async fn get_min_sync_interval(&self) -> Option<u64> {
220        let sync_tree = match self.get_sync_tree().await {
221            Ok(tree) => tree,
222            Err(_) => return None,
223        };
224
225        let txn = match sync_tree.new_transaction().await {
226            Ok(txn) => txn,
227            Err(_) => return None,
228        };
229
230        let user_mgr = super::user_sync_manager::UserSyncManager::new(&txn);
231
232        // Get all tracked database IDs from the DATABASE_USERS_SUBTREE
233        let database_users = match txn
234            .get_store::<DocStore>(super::user_sync_manager::DATABASE_USERS_SUBTREE)
235            .await
236        {
237            Ok(store) => store,
238            Err(_) => return None,
239        };
240
241        let all_dbs = match database_users.get_all().await {
242            Ok(doc) => doc,
243            Err(_) => return None,
244        };
245
246        // Find the minimum interval across all databases
247        let mut min_interval: Option<u64> = None;
248        for db_id_str in all_dbs.keys() {
249            if let Ok(db_id) = ID::parse(db_id_str)
250                && let Ok(Some(settings)) = user_mgr.get_combined_settings(&db_id).await
251                && let Some(interval) = settings.interval_seconds
252            {
253                min_interval = Some(match min_interval {
254                    Some(current_min) => current_min.min(interval),
255                    None => interval,
256                });
257            }
258        }
259
260        min_interval
261    }
262
263    /// Main event loop that handles all sync operations
264    async fn run(mut self) {
265        async move {
266            info!("Starting background sync engine");
267
268            // Get initial sync interval from settings (default to 300 seconds if none set)
269            let mut current_interval_secs = self.get_min_sync_interval().await.unwrap_or(300);
270            info!("Initial periodic sync interval: {} seconds", current_interval_secs);
271
272            // Set up timers
273            let mut periodic_sync = interval(Duration::from_secs(current_interval_secs));
274            let mut queue_check = interval(Duration::from_secs(5)); // 5 seconds - batches local writes
275            let mut retry_check = interval(Duration::from_secs(30)); // 30 seconds
276            let mut connection_check = interval(Duration::from_secs(60)); // 1 minute
277            let mut settings_check = interval(Duration::from_secs(60)); // Check for settings changes every minute
278
279            // Skip initial tick to avoid immediate execution
280            periodic_sync.tick().await;
281            queue_check.tick().await;
282            retry_check.tick().await;
283            connection_check.tick().await;
284            settings_check.tick().await;
285
286            loop {
287                tokio::select! {
288                    // Handle commands from frontend
289                    Some(cmd) = self.command_rx.recv() => {
290                        if let Err(e) = self.handle_command(cmd).await {
291                            // Log errors but continue running - background sync should be resilient
292                            tracing::error!("Background sync command error: {e}");
293                        }
294                    }
295
296                    // Drain sync queue (batched entries)
297                    _ = queue_check.tick() => {
298                        self.process_queue().await;
299                    }
300
301                    // Periodic sync with all peers
302                    _ = periodic_sync.tick() => {
303                        self.periodic_sync_all_peers().await;
304                    }
305
306                    // Process retry queue
307                    _ = retry_check.tick() => {
308                        self.process_retry_queue().await;
309                    }
310
311                    // Check and reconnect disconnected peers
312                    _ = connection_check.tick() => {
313                        self.check_peer_connections().await;
314                    }
315
316                    // Check if sync interval settings have changed
317                    _ = settings_check.tick() => {
318                        if let Some(new_interval) = self.get_min_sync_interval().await
319                            && new_interval != current_interval_secs {
320                                info!("Sync interval changed from {} to {} seconds", current_interval_secs, new_interval);
321                                current_interval_secs = new_interval;
322                                // Recreate the periodic sync timer with new interval
323                                periodic_sync = interval(Duration::from_secs(new_interval));
324                                periodic_sync.tick().await; // Skip initial tick
325                            }
326                    }
327
328                    // Channel closed, shutdown
329                    else => {
330                        // Normal shutdown when channel closes
331                        info!("Background sync engine shutting down");
332                        break;
333                    }
334                }
335            }
336        }
337        .instrument(info_span!("background_sync"))
338        .await
339    }
340
341    /// Handle a single command from the frontend
342    async fn handle_command(&mut self, command: SyncCommand) -> Result<()> {
343        match command {
344            SyncCommand::SendEntries { peer, entries } => {
345                if let Err(e) = self.send_to_peer(&peer, entries.clone()).await {
346                    let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
347                    self.add_to_retry_queue(peer, entries, e, now_ms);
348                }
349            }
350
351            SyncCommand::SyncWithPeer { peer } => {
352                if let Err(e) = self.sync_with_peer(&peer).await {
353                    // Log sync failure but don't crash the background engine
354                    tracing::error!("Failed to sync with peer {peer}: {e}");
355                }
356            }
357
358            SyncCommand::AddTransport {
359                name,
360                transport,
361                response,
362            } => {
363                // Stop server on existing transport if running
364                if let Some(old) = self.transport_manager.get_mut(&name)
365                    && old.is_server_running()
366                {
367                    let _ = old.stop_server().await;
368                }
369                self.transport_manager.add(&name, transport);
370                tracing::debug!("Added transport: {}", name);
371                let _ = response.send(Ok(()));
372            }
373
374            SyncCommand::StartServer { name, response } => {
375                let result = self.start_server(name.as_deref()).await;
376                let _ = response.send(result);
377            }
378
379            SyncCommand::StopServer { name, response } => {
380                let result = self.stop_server(name.as_deref()).await;
381                let _ = response.send(result);
382            }
383
384            SyncCommand::GetServerAddress { name, response } => {
385                let result = self.transport_manager.get_server_address(&name);
386                let _ = response.send(result);
387            }
388
389            SyncCommand::GetAllServerAddresses { response } => {
390                let addresses = self.transport_manager.get_all_server_addresses();
391                let _ = response.send(Ok(addresses));
392            }
393
394            SyncCommand::ConnectToPeer { address, response } => {
395                let result = self.connect_to_peer(&address).await;
396                let _ = response.send(result);
397            }
398
399            SyncCommand::SendRequest {
400                address,
401                request,
402                response,
403            } => {
404                let result = self.send_sync_request(&address, &request).await;
405                let _ = response.send(result);
406            }
407
408            SyncCommand::Flush { response } => {
409                // Process retry queue first (old failures), then main queue (new entries)
410                // This avoids double-trying entries that fail in process_queue
411                let retry_failures = self.flush_retry_queue().await;
412                let queue_failures = self.process_queue().await;
413
414                // Report error if any failures occurred
415                let result = match (retry_failures, queue_failures) {
416                    (0, 0) => Ok(()),
417                    (r, q) => Err(SyncError::Network(format!(
418                        "Flush had failures: {r} from retry queue, {q} from new entries"
419                    ))
420                    .into()),
421                };
422                let _ = response.send(result);
423            }
424
425            SyncCommand::Shutdown => {
426                // Shutdown command received - exit cleanly
427                return Err(SyncError::Network("Shutdown requested".to_string()).into());
428            }
429        }
430        Ok(())
431    }
432
433    /// Send specific entries to a peer without duplicate filtering.
434    ///
435    /// This method performs direct entry transmission and is used by:
436    /// - `SendEntries` commands from the frontend (caller handles filtering)
437    /// - `sync_tree_with_peer()` after smart duplicate prevention analysis
438    ///
439    /// # Design Note
440    ///
441    /// This method does NOT perform duplicate prevention - that responsibility
442    /// lies with the caller. The background sync's smart duplicate prevention
443    /// happens in `sync_tree_with_peer()` via tip comparison, while direct
444    /// `SendEntries` commands trust the caller to send appropriate entries.
445    ///
446    /// # Error Handling
447    ///
448    /// Failed sends are automatically added to the retry queue with exponential backoff.
449    async fn send_to_peer(&self, peer: &PeerId, entries: Vec<Entry>) -> Result<()> {
450        // Get peer address from sync tree (extract and drop transaction before await)
451        let address = {
452            let sync_tree = self.get_sync_tree().await?;
453            let txn = sync_tree.new_transaction().await?;
454            let peer_info = PeerManager::new(&txn)
455                .get_peer_info(peer.public_key())
456                .await?
457                .ok_or_else(|| SyncError::PeerNotFound(peer.to_string()))?;
458
459            peer_info
460                .addresses
461                .first()
462                .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?
463                .clone()
464        }; // Transaction is dropped here
465
466        let request = SyncRequest::SendEntries(entries);
467        let response = self
468            .transport_manager
469            .send_request(&address, &request)
470            .await?;
471
472        match response {
473            SyncResponse::Ack | SyncResponse::Count(_) => Ok(()),
474            SyncResponse::Error(msg) => Err(SyncError::SyncProtocolError(format!(
475                "Peer {peer} returned error: {msg}"
476            ))
477            .into()),
478            _ => Err(SyncError::UnexpectedResponse {
479                expected: "Ack or Count",
480                actual: format!("{response:?}"),
481            }
482            .into()),
483        }
484    }
485
486    /// Add failed send to retry queue
487    fn add_to_retry_queue(&mut self, peer: PeerId, entries: Vec<Entry>, error: Error, now_ms: u64) {
488        // Log send failure and add to retry queue
489        tracing::warn!("Failed to send to {peer}: {error}. Adding to retry queue.");
490        self.retry_queue.push(RetryEntry {
491            peer,
492            entries,
493            attempts: 1,
494            last_attempt_ms: now_ms,
495        });
496    }
497
498    /// Process entries from the sync queue, batching by peer.
499    ///
500    /// Drains the queue and sends entries to each peer. Failed sends
501    /// are added to the retry queue with exponential backoff.
502    /// Returns the number of peers that failed to receive entries.
503    async fn process_queue(&mut self) -> usize {
504        let batches = self.queue.drain();
505        if batches.is_empty() {
506            return 0;
507        }
508
509        let instance = match self.instance() {
510            Ok(i) => i,
511            Err(e) => {
512                tracing::warn!("Failed to get instance for queue processing: {e}");
513                return batches.len(); // All batches failed
514            }
515        };
516
517        let mut failures = 0;
518        for (peer, entry_ids) in batches {
519            // Fetch entries from backend
520            let mut entries = Vec::with_capacity(entry_ids.len());
521            for (entry_id, _tree_id) in &entry_ids {
522                match instance.backend().get(entry_id).await {
523                    Ok(entry) => entries.push(entry),
524                    Err(e) => {
525                        tracing::warn!("Failed to fetch entry {entry_id} for peer {peer}: {e}");
526                    }
527                }
528            }
529
530            if entries.is_empty() {
531                continue;
532            }
533
534            // Send batched entries to peer
535            if let Err(e) = self.send_to_peer(&peer, entries.clone()).await {
536                let now_ms = instance.clock().now_millis();
537                self.add_to_retry_queue(peer, entries, e, now_ms);
538                failures += 1;
539            }
540        }
541        failures
542    }
543
544    /// Process retry queue with exponential backoff
545    async fn process_retry_queue(&mut self) {
546        let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
547        let mut still_failed = Vec::new();
548
549        // Take the retry queue to avoid borrowing issues
550        let retry_queue = std::mem::take(&mut self.retry_queue);
551
552        // Process entries that are ready for retry
553        for mut entry in retry_queue {
554            // Backoff in milliseconds: 2^attempts * 1000ms, max 64 seconds
555            let backoff_ms = 2u64.pow(entry.attempts.min(6)) * 1000;
556            let elapsed_ms = now_ms.saturating_sub(entry.last_attempt_ms);
557
558            if elapsed_ms >= backoff_ms {
559                // Try sending again
560                if let Err(_e) = self.send_to_peer(&entry.peer, entry.entries.clone()).await {
561                    entry.attempts += 1;
562                    entry.last_attempt_ms = now_ms;
563
564                    if entry.attempts < 10 {
565                        // Max 10 attempts
566                        still_failed.push(entry);
567                    } else {
568                        // Max retries exceeded - give up on this batch
569                        tracing::error!("Giving up on sending to {} after 10 attempts", entry.peer);
570                    }
571                } else {
572                    // Successfully retried after failure
573                }
574            } else {
575                // Not ready for retry yet
576                still_failed.push(entry);
577            }
578        }
579
580        self.retry_queue = still_failed;
581    }
582
583    /// Flush retry queue immediately, ignoring backoff timers.
584    /// Returns the number of entries that still failed after retry.
585    async fn flush_retry_queue(&mut self) -> usize {
586        let mut still_failed = Vec::new();
587        let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
588
589        // Take the retry queue to process
590        let retry_queue = std::mem::take(&mut self.retry_queue);
591
592        // Try sending each entry immediately (ignore backoff)
593        for mut retry_entry in retry_queue {
594            if let Err(_e) = self
595                .send_to_peer(&retry_entry.peer, retry_entry.entries.clone())
596                .await
597            {
598                retry_entry.attempts += 1;
599                retry_entry.last_attempt_ms = now_ms;
600
601                if retry_entry.attempts < 10 {
602                    still_failed.push(retry_entry);
603                } else {
604                    tracing::error!(
605                        "Giving up on sending to {} after 10 attempts",
606                        retry_entry.peer
607                    );
608                }
609            }
610        }
611
612        let failed_count = still_failed.len();
613        self.retry_queue = still_failed;
614        failed_count
615    }
616
617    /// Perform periodic sync with all active peers
618    async fn periodic_sync_all_peers(&self) {
619        // Periodic sync triggered
620
621        // Get all peers from sync tree
622        let peers = match self.get_sync_tree().await {
623            Ok(sync_tree) => match sync_tree.new_transaction().await {
624                Ok(txn) => match PeerManager::new(&txn).list_peers().await {
625                    Ok(peers) => {
626                        // Extract peer list and drop the operation before awaiting
627                        peers
628                    }
629                    Err(_) => {
630                        // Skip sync if we can't list peers
631                        return;
632                    }
633                },
634                Err(_) => {
635                    // Skip sync if we can't create transaction
636                    return;
637                }
638            },
639            Err(_) => {
640                // Skip sync if we can't get sync tree
641                return;
642            }
643        };
644
645        // Now sync with peers (transaction is dropped, so no Send issues)
646        for peer_info in peers {
647            if peer_info.status == PeerStatus::Active
648                && let Err(e) = self.sync_with_peer(&peer_info.id).await
649            {
650                // Log individual peer sync failure but continue with others
651                tracing::error!("Periodic sync failed with {}: {e}", peer_info.id);
652            }
653        }
654    }
655
656    /// Sync with a specific peer (bidirectional)
657    async fn sync_with_peer(&self, peer_id: &PeerId) -> Result<()> {
658        async move {
659            info!(peer = %peer_id, "Starting peer synchronization");
660
661            // Get peer info and tree list from sync tree (extract and drop transaction before await)
662            let (address, sync_trees) = {
663                let sync_tree = self.get_sync_tree().await?;
664                let txn = sync_tree.new_transaction().await?;
665                let peer_manager = PeerManager::new(&txn);
666
667                let peer_info = peer_manager
668                    .get_peer_info(peer_id.public_key())
669                    .await?
670                    .ok_or_else(|| SyncError::PeerNotFound(peer_id.to_string()))?;
671
672                let address = peer_info
673                    .addresses
674                    .first()
675                    .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?
676                    .clone();
677
678                // Find all trees that sync with this peer from sync tree
679                let sync_trees = peer_manager.get_peer_trees(peer_id.public_key()).await?;
680
681                (address, sync_trees)
682            }; // Transaction is dropped here
683
684            if sync_trees.is_empty() {
685                debug!(peer = %peer_id, "No trees configured for sync with peer");
686                return Ok(()); // No trees to sync
687            }
688
689            info!(peer = %peer_id, tree_count = sync_trees.len(), "Synchronizing trees with peer");
690
691            for tree_id_str in sync_trees {
692                // Convert string ID to entry ID
693                let tree_id = ID::from(tree_id_str.as_str());
694                if let Err(e) = self.sync_tree_with_peer(peer_id, &tree_id, &address).await {
695                    // Log tree sync failure but continue with other trees
696                    tracing::error!("Failed to sync tree {tree_id} with peer {peer_id}: {e}");
697                }
698            }
699
700            info!(peer = %peer_id, "Completed peer synchronization");
701            Ok(())
702        }
703        .instrument(info_span!("sync_with_peer", peer = %peer_id))
704        .await
705    }
706
707    /// Sync a specific tree with a peer using smart duplicate prevention.
708    ///
709    /// This method implements Eidetica's core synchronization algorithm based on
710    /// Merkle-CRDT tip comparison. It eliminates duplicate sends by understanding
711    /// the semantic state of both peers' trees.
712    ///
713    /// # Algorithm
714    ///
715    /// 1. **Tip Exchange**: Get local tips and request peer's tips
716    /// 2. **Gap Analysis**: Compare tips to identify missing entries on both sides
717    /// 3. **Smart Transfer**: Only send/receive entries that are genuinely missing
718    /// 4. **DAG Completion**: Include all necessary ancestor entries
719    ///
720    /// # Benefits
721    ///
722    /// - **No duplicates**: Tips comparison guarantees no redundant network transfers
723    /// - **Complete data**: DAG traversal ensures all dependencies are satisfied
724    /// - **Bidirectional**: Both peers sync simultaneously for efficiency
725    /// - **Self-correcting**: Any missed entries are caught in subsequent syncs
726    ///
727    /// # Performance
728    ///
729    /// - **O(tip_count)** network requests for discovery
730    /// - **O(missing_entries)** data transfer (optimal)
731    /// - **Stateless**: No persistent tracking of individual sends needed
732    async fn sync_tree_with_peer(
733        &self,
734        peer_id: &PeerId,
735        tree_id: &ID,
736        address: &Address,
737    ) -> Result<()> {
738        async move {
739            trace!(peer = %peer_id, tree = %tree_id, "Starting unified tree synchronization");
740
741            // Get our tips for this tree (empty if tree doesn't exist)
742            let instance = self.instance()?;
743            let our_tips = instance
744                .backend()
745                .get_tips(tree_id)
746                .await
747                .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?;
748
749            // Get our device public key for automatic peer tracking
750            let our_device_pubkey = Some(instance.device_id());
751
752            debug!(peer = %peer_id, tree = %tree_id, our_tips = our_tips.len(), "Sending sync tree request");
753
754            // Send unified sync request
755            let request = SyncRequest::SyncTree(SyncTreeRequest {
756                tree_id: tree_id.clone(),
757                our_tips,
758                peer_pubkey: our_device_pubkey,
759                requesting_key: None, // TODO: Add auth support for background sync
760                requesting_key_name: None,
761                requested_permission: None,
762            });
763
764            let response = self.transport_manager.send_request(address, &request).await?;
765
766            match response {
767                SyncResponse::Bootstrap(bootstrap_response) => {
768                    info!(peer = %peer_id, tree = %tree_id, entry_count = bootstrap_response.all_entries.len() + 1, "Received bootstrap response");
769                    self.handle_bootstrap_response(bootstrap_response).await?;
770                }
771                SyncResponse::Incremental(incremental_response) => {
772                    debug!(peer = %peer_id, tree = %tree_id,
773                           their_tips = incremental_response.their_tips.len(),
774                           missing_count = incremental_response.missing_entries.len(),
775                           "Received incremental sync response");
776                    self.handle_incremental_response(incremental_response).await?;
777                }
778                SyncResponse::Error(msg) => {
779                    return Err(SyncError::SyncProtocolError(format!("Sync error: {msg}")).into());
780                }
781                _ => {
782                    return Err(SyncError::UnexpectedResponse {
783                        expected: "Bootstrap or Incremental",
784                        actual: format!("{response:?}"),
785                    }.into());
786                }
787            }
788
789            trace!(peer = %peer_id, tree = %tree_id, "Completed unified tree synchronization");
790            Ok(())
791        }
792        .instrument(info_span!("sync_tree", peer = %peer_id, tree = %tree_id))
793        .await
794    }
795
796    /// Check peer connections and attempt reconnection
797    async fn check_peer_connections(&mut self) {
798        // For now, this is a placeholder
799        // In the future, we could implement connection health checks
800        // and automatic reconnection logic here
801    }
802
803    /// Start the sync server on specified or all transports
804    async fn start_server(&mut self, name: Option<&str>) -> Result<()> {
805        // Create a sync handler with instance access and sync tree ID
806        let handler = Arc::new(SyncHandlerImpl::new(
807            self.instance()?,
808            self.sync_tree_id.clone(),
809        ));
810
811        match name {
812            Some(name) => {
813                // Start server on specific transport
814                self.transport_manager.start_server(name, handler).await?;
815                tracing::info!("Sync server started for transport {name}");
816            }
817            None => {
818                // Start servers on all transports
819                self.transport_manager.start_all_servers(handler).await?;
820                tracing::info!("Sync servers started for all transports");
821            }
822        }
823
824        Ok(())
825    }
826
827    /// Stop the sync server on specified or all transports
828    async fn stop_server(&mut self, name: Option<&str>) -> Result<()> {
829        match name {
830            Some(name) => {
831                self.transport_manager.stop_server(name).await?;
832                tracing::info!("Sync server stopped for transport {name}");
833            }
834            None => {
835                self.transport_manager.stop_all_servers().await?;
836                tracing::info!("All sync servers stopped");
837            }
838        }
839        Ok(())
840    }
841}