Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/sync/
ops.rs

1//! Core sync operations for the sync system.
2
3use std::future::Future;
4use std::time::Duration;
5
6use tokio::sync::oneshot;
7use tracing::{debug, info, warn};
8
9use super::{
10    Address, DatabaseTicket, PeerId, Sync, SyncError,
11    background::SyncCommand,
12    peer_manager::PeerManager,
13    peer_types,
14    protocol::{self, SyncRequest, SyncResponse, SyncTreeRequest},
15    user_sync_manager::UserSyncManager,
16};
17use crate::{
18    Database, Entry, Result, auth::Permission, auth::crypto::PublicKey, entry::ID, store::DocStore,
19};
20
21use super::utils::collect_ancestors_to_send;
22
23impl Sync {
24    // === Core Sync Methods ===
25
26    /// Synchronize a specific tree with a peer using bidirectional sync.
27    ///
28    /// This is the main synchronization method that implements tip exchange
29    /// and bidirectional entry transfer to keep trees in sync between peers.
30    /// It performs both pull (fetch missing entries) and push (send our entries).
31    ///
32    /// # Arguments
33    /// * `peer_pubkey` - The public key of the peer to sync with
34    /// * `tree_id` - The ID of the tree to synchronize
35    ///
36    /// # Returns
37    /// A Result indicating success or failure of the sync operation.
38    pub async fn sync_tree_with_peer(&self, peer_pubkey: &PublicKey, tree_id: &ID) -> Result<()> {
39        // Get peer information and address
40        let peer_info = self
41            .get_peer_info(peer_pubkey)
42            .await?
43            .ok_or_else(|| SyncError::PeerNotFound(peer_pubkey.to_string()))?;
44
45        let address = peer_info
46            .addresses
47            .first()
48            .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?;
49
50        // Get our current tips for this tree (empty if tree doesn't exist)
51        let backend = self.backend()?;
52        let our_tips: Vec<ID> = backend
53            .snapshot(tree_id)
54            .await
55            .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?
56            .into_tips();
57
58        // Get our device public key for automatic peer tracking
59        let our_device_pubkey = self.get_device_pubkey().ok();
60
61        // Send unified sync request
62        let request = SyncRequest::SyncTree(SyncTreeRequest {
63            tree_id: tree_id.clone(),
64            our_tips,
65            peer_pubkey: our_device_pubkey,
66            requesting_key: None, // TODO: Add auth support for direct sync
67            requesting_key_name: None,
68            requested_permission: None,
69        });
70
71        // Send request via background sync command
72        let (tx, rx) = oneshot::channel();
73        self.background_tx
74            .get()
75            .ok_or(SyncError::NoTransportEnabled)?
76            .send(SyncCommand::SendRequest {
77                address: address.clone(),
78                request: Box::new(request),
79                response: tx,
80            })
81            .await
82            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
83
84        let response = rx
85            .await
86            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
87            .map_err(|e| SyncError::Network(format!("Request failed: {e}")))?;
88
89        match response {
90            SyncResponse::Bootstrap(bootstrap_response) => {
91                self.handle_bootstrap_response(bootstrap_response).await?;
92            }
93            SyncResponse::Incremental(incremental_response) => {
94                self.handle_incremental_response(incremental_response, address)
95                    .await?;
96            }
97            SyncResponse::Error(msg) => {
98                return Err(SyncError::SyncProtocolError(format!("Sync error: {msg}")).into());
99            }
100            _ => {
101                return Err(SyncError::UnexpectedResponse {
102                    expected: "Bootstrap or Incremental",
103                    actual: format!("{response:?}"),
104                }
105                .into());
106            }
107        }
108
109        // Track tree/peer relationship for sync_on_commit to work
110        // This allows on_local_write() to find this peer when queueing entries
111        self.add_tree_sync(peer_pubkey, tree_id).await?;
112
113        Ok(())
114    }
115
116    /// Handle bootstrap response by storing root and all entries
117    pub(super) async fn handle_bootstrap_response(
118        &self,
119        response: protocol::BootstrapResponse,
120    ) -> Result<()> {
121        tracing::info!(tree_id = %response.tree_id, "Processing bootstrap response");
122
123        // Integrity check: the root entry's content must hash to the declared
124        // tree_id. Rejects peers serving substituted content. A mismatch here
125        // also covers the cross-algorithm bootstrap case (e.g. a SHA-256 tree
126        // advertised to a BLAKE3-default node) — those are unsupported until
127        // the backend gains multi-CID-per-entry storage, so failing loudly is
128        // better than silently re-keying the DAG under the wrong algorithm.
129        let derived = response.root_entry.id();
130        if derived != response.tree_id {
131            return Err(SyncError::InvalidEntry(format!(
132                "root entry content hashes to {} but bootstrap response declares tree_id {}",
133                derived, response.tree_id
134            ))
135            .into());
136        }
137
138        // Combine root entry with all other entries into a single batch
139        let mut all_entries = Vec::with_capacity(1 + response.all_entries.len());
140        all_entries.push(response.root_entry);
141        all_entries.extend(response.all_entries);
142
143        // Store all entries and fire callbacks once
144        self.store_received_entries(&response.tree_id, all_entries)
145            .await?;
146
147        tracing::info!(tree_id = %response.tree_id, "Bootstrap completed successfully");
148        Ok(())
149    }
150
151    /// Handle incremental response by storing missing entries and sending back what server is missing
152    pub(super) async fn handle_incremental_response(
153        &self,
154        response: protocol::IncrementalResponse,
155        peer_address: &peer_types::Address,
156    ) -> Result<()> {
157        tracing::debug!(tree_id = %response.tree_id, "Processing incremental response");
158
159        // Step 1: Store missing entries
160        self.store_received_entries(&response.tree_id, response.missing_entries)
161            .await?;
162
163        // Step 2: Check if server is missing entries from us
164        let backend = self.backend()?;
165        let our_snapshot = backend.snapshot(&response.tree_id).await?;
166        let their_tips = &response.their_tips;
167
168        // Find tips they don't have
169        let missing_tip_ids: Vec<_> = our_snapshot
170            .tips()
171            .iter()
172            .filter(|tip_id| !their_tips.contains(tip_id))
173            .cloned()
174            .collect();
175
176        if !missing_tip_ids.is_empty() {
177            tracing::debug!(
178                tree_id = %response.tree_id,
179                missing_tips = missing_tip_ids.len(),
180                "Server is missing some of our entries, sending them back"
181            );
182
183            // Collect entries server is missing
184            let engine = self
185                .backend()?
186                .local_engine()
187                .expect("sync requires local backend");
188            let entries_for_server =
189                collect_ancestors_to_send(engine.as_ref(), &missing_tip_ids, their_tips).await?;
190
191            if !entries_for_server.is_empty() {
192                // Send these entries back to server
193                self.send_missing_entries_to_peer(
194                    peer_address,
195                    &response.tree_id,
196                    entries_for_server,
197                )
198                .await?;
199            }
200        }
201
202        tracing::debug!(tree_id = %response.tree_id, "Incremental sync completed");
203        Ok(())
204    }
205
206    /// Send entries that the server is missing back to complete bidirectional sync
207    async fn send_missing_entries_to_peer(
208        &self,
209        peer_address: &peer_types::Address,
210        tree_id: &ID,
211        entries: Vec<Entry>,
212    ) -> Result<()> {
213        if entries.is_empty() {
214            return Ok(());
215        }
216
217        tracing::debug!(
218            tree_id = %tree_id,
219            entry_count = entries.len(),
220            "Sending missing entries back to peer for bidirectional sync"
221        );
222
223        let request = protocol::SyncRequest::SendEntries(entries);
224
225        // Send via command channel
226        let (tx, rx) = tokio::sync::oneshot::channel();
227        self.background_tx
228            .get()
229            .ok_or(SyncError::NoTransportEnabled)?
230            .send(SyncCommand::SendRequest {
231                address: peer_address.clone(),
232                request: Box::new(request),
233                response: tx,
234            })
235            .await
236            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
237
238        // Wait for acknowledgment
239        let response = rx
240            .await
241            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
242            .map_err(|e| SyncError::Network(format!("Request failed: {e}")))?;
243
244        match response {
245            protocol::SyncResponse::Ack | protocol::SyncResponse::Count(_) => {
246                tracing::debug!(tree_id = %tree_id, "Server acknowledged receipt of missing entries");
247                Ok(())
248            }
249            protocol::SyncResponse::Error(e) => {
250                Err(SyncError::Network(format!("Server error receiving entries: {e}")).into())
251            }
252            _ => Err(SyncError::UnexpectedResponse {
253                expected: "Ack or Count",
254                actual: format!("{response:?}"),
255            }
256            .into()),
257        }
258    }
259
260    /// Validate and store received entries from a peer, firing remote write callbacks.
261    pub(super) async fn store_received_entries(
262        &self,
263        tree_id: &ID,
264        entries: Vec<Entry>,
265    ) -> Result<()> {
266        // These entries arrive without per-entry declared IDs — they were batched
267        // under a single tree_id by the sender. Content is stored under whatever
268        // ID our local `entry.id()` derives, so substitution attacks on individual
269        // entries would fail DAG connectivity checks via parent pointers rather
270        // than a per-entry hash check here. Root-level integrity is verified by
271        // the bootstrap handler against the declared tree_id.
272        //
273        // TODO: Add signature verification and parent-existence / DAG-connectivity
274        // checks before marking entries as verified.
275
276        // Store entries and fire callbacks via Instance::put_remote_entries.
277        // Stored Unverified: these arrived from a peer and have not been
278        // verified by this node.
279        let instance = self.instance()?;
280        instance
281            .put_remote_entries(tree_id, entries)
282            .await
283            .map_err(|e| SyncError::BackendError(format!("Failed to store entries: {e}")))?;
284
285        Ok(())
286    }
287
288    /// Send a batch of entries to a sync peer (async version).
289    ///
290    /// # Arguments
291    /// * `entries` - The entries to send
292    /// * `address` - The address of the peer to send to
293    ///
294    /// # Returns
295    /// A Result indicating whether the entries were successfully acknowledged.
296    pub async fn send_entries(
297        &self,
298        entries: impl AsRef<[Entry]>,
299        address: &Address,
300    ) -> Result<()> {
301        let entries_vec = entries.as_ref().to_vec();
302        let request = SyncRequest::SendEntries(entries_vec);
303        let response = self.send_request(&request, address).await?;
304
305        match response {
306            SyncResponse::Ack | SyncResponse::Count(_) => Ok(()),
307            SyncResponse::Error(msg) => Err(SyncError::SyncProtocolError(format!(
308                "Peer {} returned error: {}",
309                address.address, msg
310            ))
311            .into()),
312            _ => Err(SyncError::UnexpectedResponse {
313                expected: "Ack or Count",
314                actual: format!("{response:?}"),
315            }
316            .into()),
317        }
318    }
319
320    /// Send specific entries to a peer via the background sync engine.
321    ///
322    /// This method queues entries for direct transmission without duplicate filtering.
323    /// The caller is responsible for determining which entries should be sent.
324    ///
325    /// # Duplicate Prevention Architecture
326    ///
327    /// Eidetica uses **smart duplicate prevention** in the background sync engine:
328    /// - **Database sync** (`SyncWithPeer` command): Uses tip comparison for semantic filtering
329    /// - **Direct send** (this method): Trusts caller to provide appropriate entries
330    ///
331    /// For automatic duplicate prevention, use tree-based sync relationships instead
332    /// of calling this method directly.
333    ///
334    /// # Arguments
335    /// * `peer_id` - The peer ID to send to
336    /// * `entries` - The specific entries to send (no filtering applied)
337    ///
338    /// # Returns
339    /// A Result indicating whether the command was successfully queued for background processing.
340    pub async fn send_entries_to_peer(&self, peer_id: &PeerId, entries: Vec<Entry>) -> Result<()> {
341        self.background_tx
342            .get()
343            .ok_or(SyncError::NoTransportEnabled)?
344            .send(SyncCommand::SendEntries {
345                peer: peer_id.clone(),
346                entries,
347            })
348            .await
349            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
350        Ok(())
351    }
352
353    /// Queue an entry for sync to a peer (non-blocking, for use in callbacks).
354    ///
355    /// This method is designed for use in write callbacks where async operations
356    /// are not possible. It uses try_send to avoid blocking, and logs errors
357    /// rather than failing the callback.
358    ///
359    /// # Arguments
360    /// * `peer_pubkey` - The public key of the peer to sync with
361    /// * `entry_id` - The ID of the entry to queue
362    /// * `tree_id` - The tree ID where the entry belongs
363    ///
364    /// # Returns
365    /// Ok(()) if the entry was successfully queued.
366    /// Only returns Err if transport is not enabled.
367    pub fn queue_entry_for_sync(
368        &self,
369        peer_id: &PeerId,
370        entry_id: &ID,
371        tree_id: &ID,
372    ) -> Result<()> {
373        // Ensure background sync is running
374        if self.background_tx.get().is_none() {
375            return Err(SyncError::NoTransportEnabled.into());
376        }
377
378        // Add to queue - BackgroundSync will process and send
379        self.queue
380            .enqueue(peer_id, entry_id.clone(), tree_id.clone());
381
382        Ok(())
383    }
384
385    /// Handle local write events for automatic sync.
386    ///
387    /// This method is called by the Instance write callback system when entries
388    /// are committed locally. It looks up the combined sync settings for the database
389    /// and queues the entry for sync with all configured peers if sync is enabled.
390    ///
391    /// This is the core method that implements automatic sync-on-commit behavior.
392    ///
393    /// # Arguments
394    /// * `event` - The write event containing the newly committed entries
395    /// * `database` - The database where the entries were committed
396    ///
397    /// # Returns
398    /// Ok(()) on success, or an error if settings lookup fails
399    pub(crate) async fn on_local_write(
400        &self,
401        event: &crate::instance::WriteEvent,
402        database: &Database,
403    ) -> Result<()> {
404        // Early return if background sync not running
405        if self.background_tx.get().is_none() {
406            return Ok(());
407        }
408
409        // Look up combined settings for this database
410        let tx = self.sync_tree.new_transaction().await?;
411        let user_mgr = UserSyncManager::new(&tx);
412        let peer_mgr = PeerManager::new(&tx);
413
414        let combined_settings = match user_mgr.get_combined_settings(database.root_id()).await? {
415            Some(settings) => settings,
416            None => {
417                // No settings configured for this database - no sync needed
418                debug!(database_id = %database.root_id(), "No sync settings for database, skipping");
419                return Ok(());
420            }
421        };
422
423        // Check if sync is enabled and sync_on_commit is true
424        if !combined_settings.sync_enabled || !combined_settings.sync_on_commit {
425            debug!(
426                database_id = %database.root_id(),
427                sync_enabled = combined_settings.sync_enabled,
428                sync_on_commit = combined_settings.sync_on_commit,
429                "Sync not enabled for database"
430            );
431            return Ok(());
432        }
433
434        // Get list of peers for this database
435        let peers = peer_mgr.get_tree_peers(database.root_id()).await?;
436
437        if peers.is_empty() {
438            debug!(database_id = %database.root_id(), "No peers configured for database");
439            return Ok(());
440        }
441
442        // Queue each entry for sync with each peer
443        let tree_id = database.root_id();
444
445        for entry in event.entries() {
446            let entry_id = entry.id();
447            debug!(
448                database_id = %tree_id,
449                entry_id = %entry_id,
450                peer_count = peers.len(),
451                "Queueing entry for automatic sync"
452            );
453
454            for peer_id in &peers {
455                self.queue_entry_for_sync(peer_id, &entry_id, tree_id)?;
456            }
457        }
458
459        Ok(())
460    }
461
462    /// Initialize combined settings for all users.
463    ///
464    /// This is called during Sync initialization. For new sync trees (just created),
465    /// it scans the _users database to register all existing users. For existing
466    /// sync trees (loaded), it updates combined settings for already-tracked users.
467    pub(super) async fn initialize_user_settings(&self) -> Result<()> {
468        use crate::store::Table;
469        use crate::user::types::UserInfo;
470
471        // Check if sync tree is freshly created (no users tracked yet)
472        let user_tracking = self
473            .sync_tree
474            .get_store_viewer::<DocStore>(super::user_sync_manager::USER_TRACKING_SUBTREE)
475            .await?;
476        let all_tracked = user_tracking.get_all().await?;
477
478        if all_tracked.keys().count() == 0 {
479            // New sync tree - register all users from _users database
480            let instance = self.instance.upgrade().ok_or(SyncError::InstanceDropped)?;
481            let users_db = instance.users_db().await?;
482            let users_table = users_db
483                .get_store_viewer::<Table<UserInfo>>("users")
484                .await?;
485            let all_users = users_table.search(|_| true).await?;
486
487            for (user_uuid, user_info) in all_users {
488                self.sync_user(&user_uuid, &user_info.user_database_id)
489                    .await?;
490            }
491        } else {
492            // Existing sync tree - update settings for tracked users if changed
493            let tx = self.sync_tree.new_transaction().await?;
494            let user_mgr = UserSyncManager::new(&tx);
495
496            for user_uuid in all_tracked.keys() {
497                if let Some((prefs_db_id, _tips)) =
498                    user_mgr.get_tracked_user_state(user_uuid).await?
499                {
500                    self.sync_user(user_uuid, &prefs_db_id).await?;
501                }
502            }
503        }
504
505        Ok(())
506    }
507
508    /// Send a sync request to a peer and get a response (async version).
509    ///
510    /// # Arguments
511    /// * `request` - The sync request to send
512    /// * `address` - The address of the peer
513    ///
514    /// # Returns
515    /// The sync response from the peer.
516    pub(super) async fn send_request(
517        &self,
518        request: &SyncRequest,
519        address: &Address,
520    ) -> Result<SyncResponse> {
521        let (tx, rx) = oneshot::channel();
522
523        self.background_tx
524            .get()
525            .ok_or(SyncError::NoTransportEnabled)?
526            .send(SyncCommand::SendRequest {
527                address: address.clone(),
528                request: Box::new(request.clone()),
529                response: tx,
530            })
531            .await
532            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
533
534        rx.await
535            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
536    }
537
538    /// Discover available trees from a peer (simplified API).
539    ///
540    /// This method connects to a peer and retrieves the list of trees they're willing to sync.
541    /// This is useful for discovering what can be synced before setting up sync relationships.
542    ///
543    /// # Arguments
544    /// * `address` - The transport address of the peer.
545    ///
546    /// # Returns
547    /// A vector of TreeInfo describing available trees, or an error.
548    pub async fn discover_peer_trees(&self, address: &Address) -> Result<Vec<protocol::TreeInfo>> {
549        // Connect and get handshake info
550        let _peer_pubkey = self.connect_to_peer(address).await?;
551
552        // The handshake already contains the tree list, but we need to get it again
553        // since connect_to_peer doesn't return it. For now, return empty list
554        // TODO: Enhance this to actually return the tree list from handshake
555
556        tracing::warn!(
557            "discover_peer_trees not fully implemented - handshake contains tree info but API needs enhancement"
558        );
559        Ok(vec![])
560    }
561
562    /// Sync with a peer at a given address.
563    ///
564    /// This is a blocking convenience method that:
565    /// 1. Connects to discover the peer's public key
566    /// 2. Registers the peer and performs immediate sync
567    /// 3. Returns after sync completes
568    ///
569    /// For new code, prefer using [`register_sync_peer()`](Self::register_sync_peer)
570    /// directly, which registers intent and lets background sync handle it.
571    ///
572    /// # Arguments
573    /// * `address` - The transport address of the peer.
574    /// * `tree_id` - Optional tree ID to sync (None = discover available trees)
575    ///
576    /// # Returns
577    /// Result indicating success or failure.
578    pub async fn sync_with_peer(&self, address: &Address, tree_id: Option<&ID>) -> Result<()> {
579        // Connect to peer if not already connected
580        let peer_pubkey = self.connect_to_peer(address).await?;
581
582        // Store the address for this peer (needed for sync_tree_with_peer)
583        self.add_peer_address(&peer_pubkey, address.clone()).await?;
584
585        if let Some(tree_id) = tree_id {
586            // Sync specific tree
587            self.sync_tree_with_peer(&peer_pubkey, tree_id).await?;
588        } else {
589            // TODO: Sync all available trees
590            tracing::warn!(
591                "Syncing all trees not yet implemented - need to enhance discover_peer_trees first"
592            );
593        }
594
595        Ok(())
596    }
597
598    /// Sync with a peer using a [`DatabaseTicket`].
599    ///
600    /// Attempts [`sync_with_peer`](Self::sync_with_peer) for every address
601    /// hint in the ticket concurrently. Each address may point to a different
602    /// peer, so connections are independent. Succeeds if at least one address
603    /// syncs successfully; returns the last error if all fail.
604    ///
605    /// # Arguments
606    /// * `ticket` - A ticket containing the database ID and address hints.
607    ///
608    /// # Errors
609    /// Returns [`SyncError::InvalidAddress`] if the ticket has no address hints.
610    /// Returns the last sync error if no address succeeded.
611    pub async fn sync_with_ticket(&self, ticket: &DatabaseTicket) -> Result<()> {
612        let database_id = ticket.database_id().clone();
613        self.try_addresses_concurrently(ticket.addresses(), |sync, addr| {
614            let db_id = database_id.clone();
615            async move { sync.sync_with_peer(&addr, Some(&db_id)).await }
616        })
617        .await
618    }
619
620    /// Sync a specific tree with a peer, with optional authentication for bootstrap.
621    ///
622    /// This is a lower-level method that allows specifying authentication parameters
623    /// for bootstrap scenarios where access needs to be requested.
624    ///
625    /// # Arguments
626    /// * `peer_pubkey` - The public key of the peer to sync with
627    /// * `tree_id` - The ID of the tree to sync
628    /// * `requesting_key` - Optional public key requesting access (for bootstrap)
629    /// * `requesting_key_name` - Optional name/ID of the requesting key
630    /// * `requested_permission` - Optional permission level being requested
631    ///
632    /// # Returns
633    /// A Result indicating success or failure.
634    pub async fn sync_tree_with_peer_auth(
635        &self,
636        peer_pubkey: &PublicKey,
637        tree_id: &ID,
638        requesting_key: Option<&PublicKey>,
639        requesting_key_name: Option<&str>,
640        requested_permission: Option<Permission>,
641    ) -> Result<()> {
642        // Get peer information and address
643        let peer_info = self
644            .get_peer_info(peer_pubkey)
645            .await?
646            .ok_or_else(|| SyncError::PeerNotFound(peer_pubkey.to_string()))?;
647
648        let address = peer_info
649            .addresses
650            .first()
651            .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?;
652
653        // Get our current tips for this tree (empty if tree doesn't exist)
654        let backend = self.backend()?;
655        let our_tips: Vec<ID> = backend
656            .snapshot(tree_id)
657            .await
658            .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?
659            .into_tips();
660
661        // Get our device public key for automatic peer tracking
662        let our_device_pubkey = self.get_device_pubkey().ok();
663
664        // Send unified sync request with auth parameters
665        let request = SyncRequest::SyncTree(SyncTreeRequest {
666            tree_id: tree_id.clone(),
667            our_tips,
668            peer_pubkey: our_device_pubkey,
669            requesting_key: requesting_key.cloned(),
670            requesting_key_name: requesting_key_name.map(|k| k.to_string()),
671            requested_permission,
672        });
673
674        // Send request via background sync command
675        let (tx, rx) = oneshot::channel();
676        self.background_tx
677            .get()
678            .ok_or(SyncError::NoTransportEnabled)?
679            .send(SyncCommand::SendRequest {
680                address: address.clone(),
681                request: Box::new(request),
682                response: tx,
683            })
684            .await
685            .map_err(|_| {
686                SyncError::CommandSendError("Background sync command channel closed".to_string())
687            })?;
688
689        // Wait for response
690        let response = rx
691            .await
692            .map_err(|_| {
693                SyncError::CommandSendError("Background sync response channel closed".to_string())
694            })?
695            .map_err(|e| SyncError::Network(format!("Sync request failed: {e}")))?;
696
697        // Handle the response (same logic as existing sync_tree_with_peer)
698        match response {
699            SyncResponse::Bootstrap(bootstrap_response) => {
700                info!(peer = %peer_pubkey, tree = %tree_id, entry_count = bootstrap_response.all_entries.len() + 1, "Received bootstrap response");
701
702                // Store root + all entries as a single batch with callback dispatch
703                let mut all_entries = Vec::with_capacity(1 + bootstrap_response.all_entries.len());
704                all_entries.push(bootstrap_response.root_entry);
705                all_entries.extend(bootstrap_response.all_entries);
706
707                // Bootstrap entries come from a peer; stored Unverified.
708                let instance = self.instance()?;
709                instance.put_remote_entries(tree_id, all_entries).await?;
710
711                info!(peer = %peer_pubkey, tree = %tree_id, "Bootstrap sync completed successfully");
712            }
713            SyncResponse::Incremental(incremental_response) => {
714                info!(peer = %peer_pubkey, tree = %tree_id, missing_count = incremental_response.missing_entries.len(), "Received incremental sync response");
715
716                // Use the enhanced handler that supports bidirectional sync
717                self.handle_incremental_response(incremental_response, address)
718                    .await?;
719
720                debug!(peer = %peer_pubkey, tree = %tree_id, "Incremental sync completed");
721            }
722            SyncResponse::BootstrapPending {
723                request_id,
724                message,
725            } => {
726                info!(peer = %peer_pubkey, tree = %tree_id, request_id = %request_id, "Bootstrap request pending manual approval");
727                return Err(SyncError::BootstrapPending {
728                    request_id,
729                    message,
730                }
731                .into());
732            }
733            SyncResponse::Error(err) => {
734                return Err(SyncError::Network(format!("Peer returned error: {err}")).into());
735            }
736            _ => {
737                return Err(SyncError::SyncProtocolError(
738                    "Unexpected response type for sync tree request".to_string(),
739                )
740                .into());
741            }
742        }
743
744        // Track tree/peer relationship for sync_on_commit to work
745        // This allows on_local_write() to find this peer when queueing entries
746        self.add_tree_sync(peer_pubkey, tree_id).await?;
747
748        Ok(())
749    }
750
751    // === Flush Operations ===
752
753    /// Process all queued entries and retry any failed sends.
754    ///
755    /// This method:
756    /// 1. Retries all entries in the retry queue (ignoring backoff timers)
757    /// 2. Processes all entries in the sync queue (batched by peer)
758    ///
759    /// When this method returns, all pending sync work has been attempted.
760    /// This is useful to eensuree that all pending pushes have completed.
761    ///
762    /// # Returns
763    /// `Ok(())` if all operations completed successfully, or an error
764    /// if the background sync engine is not running or sends failed.
765    pub async fn flush(&self) -> Result<()> {
766        let (tx, rx) = oneshot::channel();
767
768        self.background_tx
769            .get()
770            .ok_or(SyncError::NoTransportEnabled)?
771            .send(SyncCommand::Flush { response: tx })
772            .await
773            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
774
775        rx.await
776            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
777    }
778
779    /// Timeout applied to each address attempt in
780    /// [`try_addresses_concurrently`](Self::try_addresses_concurrently).
781    const ADDRESS_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(30);
782
783    /// Try an operation against multiple addresses concurrently, returning on
784    /// the first success.
785    ///
786    /// Spawns one detached task per address via [`tokio::spawn`]. Returns as
787    /// soon as any task succeeds. Remaining tasks are **not** cancelled — they
788    /// continue running in the background so that additional peer connections
789    /// can be established and registered for future syncs. Each task is subject
790    /// to [`ADDRESS_ATTEMPT_TIMEOUT`](Self::ADDRESS_ATTEMPT_TIMEOUT).
791    ///
792    /// If all tasks fail the last error is returned. If `addresses` is empty
793    /// an [`SyncError::InvalidAddress`] error is returned.
794    pub(super) async fn try_addresses_concurrently<F, Fut>(
795        &self,
796        addresses: &[Address],
797        f: F,
798    ) -> Result<()>
799    where
800        F: Fn(Sync, Address) -> Fut,
801        Fut: Future<Output = Result<()>> + Send + 'static,
802    {
803        if addresses.is_empty() {
804            return Err(SyncError::InvalidAddress("Ticket has no address hints".into()).into());
805        }
806
807        let (tx, mut rx) = tokio::sync::mpsc::channel(addresses.len());
808
809        for addr in addresses {
810            let tx = tx.clone();
811            let fut = f(self.clone(), addr.clone());
812            let addr_info = addr.clone();
813            // Detached spawn: the task keeps running even after we return.
814            tokio::spawn(async move {
815                let result = tokio::time::timeout(Self::ADDRESS_ATTEMPT_TIMEOUT, fut).await;
816                let result = match result {
817                    Ok(inner) => inner,
818                    Err(_) => {
819                        warn!(
820                            address = ?addr_info,
821                            "Address attempt timed out after {:?}",
822                            Self::ADDRESS_ATTEMPT_TIMEOUT,
823                        );
824                        Err(SyncError::Network(format!(
825                            "Address attempt timed out after {:?}",
826                            Self::ADDRESS_ATTEMPT_TIMEOUT,
827                        ))
828                        .into())
829                    }
830                };
831                match &result {
832                    Ok(()) => debug!(address = ?addr_info, "Address attempt succeeded"),
833                    Err(e) => debug!(address = ?addr_info, error = %e, "Address attempt failed"),
834                }
835                // Ignore send errors — the receiver is dropped on early success,
836                // but the task still completes its work (peer registration, etc.).
837                let _ = tx.send(result).await;
838            });
839        }
840        // Drop our sender so the channel closes when all tasks finish.
841        drop(tx);
842
843        let mut last_err = None;
844        while let Some(result) = rx.recv().await {
845            match result {
846                Ok(()) => return Ok(()),
847                Err(e) => last_err = Some(e),
848            }
849        }
850
851        Err(last_err.expect("at least one task was spawned"))
852    }
853}