Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/sync/
ops.rs

1//! Core sync operations for the sync system.
2
3use std::future::Future;
4use std::time::Duration;
5
6use tokio::sync::oneshot;
7use tracing::{debug, info, warn};
8
9use super::{
10    Address, DatabaseTicket, PeerId, Sync, SyncError,
11    background::SyncCommand,
12    peer_manager::PeerManager,
13    peer_types,
14    protocol::{self, SyncRequest, SyncResponse, SyncTreeRequest},
15    user_sync_manager::UserSyncManager,
16};
17use crate::{
18    Database, Entry, Instance, Result, auth::Permission, auth::crypto::PublicKey, entry::ID,
19    store::DocStore,
20};
21
22use super::utils::collect_ancestors_to_send;
23
24impl Sync {
25    // === Core Sync Methods ===
26
27    /// Synchronize a specific tree with a peer using bidirectional sync.
28    ///
29    /// This is the main synchronization method that implements tip exchange
30    /// and bidirectional entry transfer to keep trees in sync between peers.
31    /// It performs both pull (fetch missing entries) and push (send our entries).
32    ///
33    /// # Arguments
34    /// * `peer_pubkey` - The public key of the peer to sync with
35    /// * `tree_id` - The ID of the tree to synchronize
36    ///
37    /// # Returns
38    /// A Result indicating success or failure of the sync operation.
39    pub async fn sync_tree_with_peer(&self, peer_pubkey: &PublicKey, tree_id: &ID) -> Result<()> {
40        // Get peer information and address
41        let peer_info = self
42            .get_peer_info(peer_pubkey)
43            .await?
44            .ok_or_else(|| SyncError::PeerNotFound(peer_pubkey.to_string()))?;
45
46        let address = peer_info
47            .addresses
48            .first()
49            .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?;
50
51        // Get our current tips for this tree (empty if tree doesn't exist)
52        let backend = self.backend()?;
53        let our_tips = backend
54            .get_tips(tree_id)
55            .await
56            .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?;
57
58        // Get our device public key for automatic peer tracking
59        let our_device_pubkey = self.get_device_pubkey().ok();
60
61        // Send unified sync request
62        let request = SyncRequest::SyncTree(SyncTreeRequest {
63            tree_id: tree_id.clone(),
64            our_tips,
65            peer_pubkey: our_device_pubkey,
66            requesting_key: None, // TODO: Add auth support for direct sync
67            requesting_key_name: None,
68            requested_permission: None,
69        });
70
71        // Send request via background sync command
72        let (tx, rx) = oneshot::channel();
73        self.background_tx
74            .get()
75            .ok_or(SyncError::NoTransportEnabled)?
76            .send(SyncCommand::SendRequest {
77                address: address.clone(),
78                request: Box::new(request),
79                response: tx,
80            })
81            .await
82            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
83
84        let response = rx
85            .await
86            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
87            .map_err(|e| SyncError::Network(format!("Request failed: {e}")))?;
88
89        match response {
90            SyncResponse::Bootstrap(bootstrap_response) => {
91                self.handle_bootstrap_response(bootstrap_response).await?;
92            }
93            SyncResponse::Incremental(incremental_response) => {
94                self.handle_incremental_response(incremental_response, address)
95                    .await?;
96            }
97            SyncResponse::Error(msg) => {
98                return Err(SyncError::SyncProtocolError(format!("Sync error: {msg}")).into());
99            }
100            _ => {
101                return Err(SyncError::UnexpectedResponse {
102                    expected: "Bootstrap or Incremental",
103                    actual: format!("{response:?}"),
104                }
105                .into());
106            }
107        }
108
109        // Track tree/peer relationship for sync_on_commit to work
110        // This allows on_local_write() to find this peer when queueing entries
111        self.add_tree_sync(peer_pubkey, tree_id).await?;
112
113        Ok(())
114    }
115
116    /// Handle bootstrap response by storing root and all entries
117    pub(super) async fn handle_bootstrap_response(
118        &self,
119        response: protocol::BootstrapResponse,
120    ) -> Result<()> {
121        tracing::info!(tree_id = %response.tree_id, "Processing bootstrap response");
122
123        // Store root entry first
124
125        // Store the root entry
126        let backend = self.backend()?;
127        backend
128            .put_verified(response.root_entry.clone())
129            .await
130            .map_err(|e| SyncError::BackendError(format!("Failed to store root entry: {e}")))?;
131
132        // Store all other entries using existing method
133        self.store_received_entries(&response.tree_id, response.all_entries)
134            .await?;
135
136        tracing::info!(tree_id = %response.tree_id, "Bootstrap completed successfully");
137        Ok(())
138    }
139
140    /// Handle incremental response by storing missing entries and sending back what server is missing
141    pub(super) async fn handle_incremental_response(
142        &self,
143        response: protocol::IncrementalResponse,
144        peer_address: &peer_types::Address,
145    ) -> Result<()> {
146        tracing::debug!(tree_id = %response.tree_id, "Processing incremental response");
147
148        // Step 1: Store missing entries
149        self.store_received_entries(&response.tree_id, response.missing_entries)
150            .await?;
151
152        // Step 2: Check if server is missing entries from us
153        let backend = self.backend()?;
154        let our_tips = backend.get_tips(&response.tree_id).await?;
155        let their_tips = &response.their_tips;
156
157        // Find tips they don't have
158        let missing_tip_ids: Vec<_> = our_tips
159            .iter()
160            .filter(|tip_id| !their_tips.contains(tip_id))
161            .cloned()
162            .collect();
163
164        if !missing_tip_ids.is_empty() {
165            tracing::debug!(
166                tree_id = %response.tree_id,
167                missing_tips = missing_tip_ids.len(),
168                "Server is missing some of our entries, sending them back"
169            );
170
171            // Collect entries server is missing
172            let backend = self.backend()?;
173            let entries_for_server =
174                collect_ancestors_to_send(backend.as_backend_impl(), &missing_tip_ids, their_tips)
175                    .await?;
176
177            if !entries_for_server.is_empty() {
178                // Send these entries back to server
179                self.send_missing_entries_to_peer(
180                    peer_address,
181                    &response.tree_id,
182                    entries_for_server,
183                )
184                .await?;
185            }
186        }
187
188        tracing::debug!(tree_id = %response.tree_id, "Incremental sync completed");
189        Ok(())
190    }
191
192    /// Send entries that the server is missing back to complete bidirectional sync
193    async fn send_missing_entries_to_peer(
194        &self,
195        peer_address: &peer_types::Address,
196        tree_id: &ID,
197        entries: Vec<Entry>,
198    ) -> Result<()> {
199        if entries.is_empty() {
200            return Ok(());
201        }
202
203        tracing::debug!(
204            tree_id = %tree_id,
205            entry_count = entries.len(),
206            "Sending missing entries back to peer for bidirectional sync"
207        );
208
209        let request = protocol::SyncRequest::SendEntries(entries);
210
211        // Send via command channel
212        let (tx, rx) = tokio::sync::oneshot::channel();
213        self.background_tx
214            .get()
215            .ok_or(SyncError::NoTransportEnabled)?
216            .send(SyncCommand::SendRequest {
217                address: peer_address.clone(),
218                request: Box::new(request),
219                response: tx,
220            })
221            .await
222            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
223
224        // Wait for acknowledgment
225        let response = rx
226            .await
227            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
228            .map_err(|e| SyncError::Network(format!("Request failed: {e}")))?;
229
230        match response {
231            protocol::SyncResponse::Ack | protocol::SyncResponse::Count(_) => {
232                tracing::debug!(tree_id = %tree_id, "Server acknowledged receipt of missing entries");
233                Ok(())
234            }
235            protocol::SyncResponse::Error(e) => {
236                Err(SyncError::Network(format!("Server error receiving entries: {e}")).into())
237            }
238            _ => Err(SyncError::UnexpectedResponse {
239                expected: "Ack or Count",
240                actual: format!("{response:?}"),
241            }
242            .into()),
243        }
244    }
245
246    /// Validate and store received entries from a peer.
247    pub(super) async fn store_received_entries(
248        &self,
249        _tree_id: &ID,
250        entries: Vec<Entry>,
251    ) -> Result<()> {
252        for entry in entries {
253            // Basic validation: check that entry ID matches content
254            let calculated_id = entry.id();
255            if entry.id() != calculated_id {
256                return Err(SyncError::InvalidEntry(format!(
257                    "Entry ID {} doesn't match calculated ID {}",
258                    entry.id(),
259                    calculated_id
260                ))
261                .into());
262            }
263
264            // TODO: Add more validation (signatures, parent existence, etc.)
265
266            // Store the entry (marking it as verified for now)
267            let backend = self.backend()?;
268            backend
269                .put_verified(entry)
270                .await
271                .map_err(|e| SyncError::BackendError(format!("Failed to store entry: {e}")))?;
272        }
273
274        Ok(())
275    }
276
277    /// Send a batch of entries to a sync peer (async version).
278    ///
279    /// # Arguments
280    /// * `entries` - The entries to send
281    /// * `address` - The address of the peer to send to
282    ///
283    /// # Returns
284    /// A Result indicating whether the entries were successfully acknowledged.
285    pub async fn send_entries(
286        &self,
287        entries: impl AsRef<[Entry]>,
288        address: &Address,
289    ) -> Result<()> {
290        let entries_vec = entries.as_ref().to_vec();
291        let request = SyncRequest::SendEntries(entries_vec);
292        let response = self.send_request(&request, address).await?;
293
294        match response {
295            SyncResponse::Ack | SyncResponse::Count(_) => Ok(()),
296            SyncResponse::Error(msg) => Err(SyncError::SyncProtocolError(format!(
297                "Peer {} returned error: {}",
298                address.address, msg
299            ))
300            .into()),
301            _ => Err(SyncError::UnexpectedResponse {
302                expected: "Ack or Count",
303                actual: format!("{response:?}"),
304            }
305            .into()),
306        }
307    }
308
309    /// Send specific entries to a peer via the background sync engine.
310    ///
311    /// This method queues entries for direct transmission without duplicate filtering.
312    /// The caller is responsible for determining which entries should be sent.
313    ///
314    /// # Duplicate Prevention Architecture
315    ///
316    /// Eidetica uses **smart duplicate prevention** in the background sync engine:
317    /// - **Database sync** (`SyncWithPeer` command): Uses tip comparison for semantic filtering
318    /// - **Direct send** (this method): Trusts caller to provide appropriate entries
319    ///
320    /// For automatic duplicate prevention, use tree-based sync relationships instead
321    /// of calling this method directly.
322    ///
323    /// # Arguments
324    /// * `peer_id` - The peer ID to send to
325    /// * `entries` - The specific entries to send (no filtering applied)
326    ///
327    /// # Returns
328    /// A Result indicating whether the command was successfully queued for background processing.
329    pub async fn send_entries_to_peer(&self, peer_id: &PeerId, entries: Vec<Entry>) -> Result<()> {
330        self.background_tx
331            .get()
332            .ok_or(SyncError::NoTransportEnabled)?
333            .send(SyncCommand::SendEntries {
334                peer: peer_id.clone(),
335                entries,
336            })
337            .await
338            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
339        Ok(())
340    }
341
342    /// Queue an entry for sync to a peer (non-blocking, for use in callbacks).
343    ///
344    /// This method is designed for use in write callbacks where async operations
345    /// are not possible. It uses try_send to avoid blocking, and logs errors
346    /// rather than failing the callback.
347    ///
348    /// # Arguments
349    /// * `peer_pubkey` - The public key of the peer to sync with
350    /// * `entry_id` - The ID of the entry to queue
351    /// * `tree_id` - The tree ID where the entry belongs
352    ///
353    /// # Returns
354    /// Ok(()) if the entry was successfully queued.
355    /// Only returns Err if transport is not enabled.
356    pub fn queue_entry_for_sync(
357        &self,
358        peer_id: &PeerId,
359        entry_id: &ID,
360        tree_id: &ID,
361    ) -> Result<()> {
362        // Ensure background sync is running
363        if self.background_tx.get().is_none() {
364            return Err(SyncError::NoTransportEnabled.into());
365        }
366
367        // Add to queue - BackgroundSync will process and send
368        self.queue
369            .enqueue(peer_id, entry_id.clone(), tree_id.clone());
370
371        Ok(())
372    }
373
374    /// Handle local write events for automatic sync.
375    ///
376    /// This method is called by the Instance write callback system when entries
377    /// are committed locally. It looks up the combined sync settings for the database
378    /// and queues the entry for sync with all configured peers if sync is enabled.
379    ///
380    /// This is the core method that implements automatic sync-on-commit behavior.
381    ///
382    /// # Arguments
383    /// * `entry` - The newly committed entry
384    /// * `database` - The database where the entry was committed
385    /// * `_instance` - The instance (unused but required by callback signature)
386    ///
387    /// # Returns
388    /// Ok(()) on success, or an error if settings lookup fails
389    pub(crate) async fn on_local_write(
390        &self,
391        entry: &Entry,
392        database: &Database,
393        _instance: &Instance,
394    ) -> Result<()> {
395        // Early return if background sync not running
396        if self.background_tx.get().is_none() {
397            return Ok(());
398        }
399
400        // Look up combined settings for this database
401        let tx = self.sync_tree.new_transaction().await?;
402        let user_mgr = UserSyncManager::new(&tx);
403        let peer_mgr = PeerManager::new(&tx);
404
405        let combined_settings = match user_mgr.get_combined_settings(database.root_id()).await? {
406            Some(settings) => settings,
407            None => {
408                // No settings configured for this database - no sync needed
409                debug!(database_id = %database.root_id(), "No sync settings for database, skipping");
410                return Ok(());
411            }
412        };
413
414        // Check if sync is enabled and sync_on_commit is true
415        if !combined_settings.sync_enabled || !combined_settings.sync_on_commit {
416            debug!(
417                database_id = %database.root_id(),
418                sync_enabled = combined_settings.sync_enabled,
419                sync_on_commit = combined_settings.sync_on_commit,
420                "Sync not enabled for database"
421            );
422            return Ok(());
423        }
424
425        // Get list of peers for this database
426        let peers = peer_mgr.get_tree_peers(database.root_id()).await?;
427
428        if peers.is_empty() {
429            debug!(database_id = %database.root_id(), "No peers configured for database");
430            return Ok(());
431        }
432
433        // Queue entry for sync with each peer
434        let entry_id = entry.id();
435        let tree_id = database.root_id();
436
437        debug!(
438            database_id = %tree_id,
439            entry_id = %entry_id,
440            peer_count = peers.len(),
441            "Queueing entry for automatic sync"
442        );
443
444        for peer_id in peers {
445            self.queue_entry_for_sync(&peer_id, &entry_id, tree_id)?;
446        }
447
448        Ok(())
449    }
450
451    /// Initialize combined settings for all users.
452    ///
453    /// This is called during Sync initialization. For new sync trees (just created),
454    /// it scans the _users database to register all existing users. For existing
455    /// sync trees (loaded), it updates combined settings for already-tracked users.
456    pub(super) async fn initialize_user_settings(&self) -> Result<()> {
457        use crate::store::Table;
458        use crate::user::types::UserInfo;
459
460        // Check if sync tree is freshly created (no users tracked yet)
461        let user_tracking = self
462            .sync_tree
463            .get_store_viewer::<DocStore>(super::user_sync_manager::USER_TRACKING_SUBTREE)
464            .await?;
465        let all_tracked = user_tracking.get_all().await?;
466
467        if all_tracked.keys().count() == 0 {
468            // New sync tree - register all users from _users database
469            let instance = self.instance.upgrade().ok_or(SyncError::InstanceDropped)?;
470            let users_db = instance.users_db().await?;
471            let users_table = users_db
472                .get_store_viewer::<Table<UserInfo>>("users")
473                .await?;
474            let all_users = users_table.search(|_| true).await?;
475
476            for (user_uuid, user_info) in all_users {
477                self.sync_user(&user_uuid, &user_info.user_database_id)
478                    .await?;
479            }
480        } else {
481            // Existing sync tree - update settings for tracked users if changed
482            let tx = self.sync_tree.new_transaction().await?;
483            let user_mgr = UserSyncManager::new(&tx);
484
485            for user_uuid in all_tracked.keys() {
486                if let Some((prefs_db_id, _tips)) =
487                    user_mgr.get_tracked_user_state(user_uuid).await?
488                {
489                    self.sync_user(user_uuid, &prefs_db_id).await?;
490                }
491            }
492        }
493
494        Ok(())
495    }
496
497    /// Send a sync request to a peer and get a response (async version).
498    ///
499    /// # Arguments
500    /// * `request` - The sync request to send
501    /// * `address` - The address of the peer
502    ///
503    /// # Returns
504    /// The sync response from the peer.
505    pub(super) async fn send_request(
506        &self,
507        request: &SyncRequest,
508        address: &Address,
509    ) -> Result<SyncResponse> {
510        let (tx, rx) = oneshot::channel();
511
512        self.background_tx
513            .get()
514            .ok_or(SyncError::NoTransportEnabled)?
515            .send(SyncCommand::SendRequest {
516                address: address.clone(),
517                request: Box::new(request.clone()),
518                response: tx,
519            })
520            .await
521            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
522
523        rx.await
524            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
525    }
526
527    /// Discover available trees from a peer (simplified API).
528    ///
529    /// This method connects to a peer and retrieves the list of trees they're willing to sync.
530    /// This is useful for discovering what can be synced before setting up sync relationships.
531    ///
532    /// # Arguments
533    /// * `address` - The transport address of the peer.
534    ///
535    /// # Returns
536    /// A vector of TreeInfo describing available trees, or an error.
537    pub async fn discover_peer_trees(&self, address: &Address) -> Result<Vec<protocol::TreeInfo>> {
538        // Connect and get handshake info
539        let _peer_pubkey = self.connect_to_peer(address).await?;
540
541        // The handshake already contains the tree list, but we need to get it again
542        // since connect_to_peer doesn't return it. For now, return empty list
543        // TODO: Enhance this to actually return the tree list from handshake
544
545        tracing::warn!(
546            "discover_peer_trees not fully implemented - handshake contains tree info but API needs enhancement"
547        );
548        Ok(vec![])
549    }
550
551    /// Sync with a peer at a given address.
552    ///
553    /// This is a blocking convenience method that:
554    /// 1. Connects to discover the peer's public key
555    /// 2. Registers the peer and performs immediate sync
556    /// 3. Returns after sync completes
557    ///
558    /// For new code, prefer using [`register_sync_peer()`](Self::register_sync_peer)
559    /// directly, which registers intent and lets background sync handle it.
560    ///
561    /// # Arguments
562    /// * `address` - The transport address of the peer.
563    /// * `tree_id` - Optional tree ID to sync (None = discover available trees)
564    ///
565    /// # Returns
566    /// Result indicating success or failure.
567    pub async fn sync_with_peer(&self, address: &Address, tree_id: Option<&ID>) -> Result<()> {
568        // Connect to peer if not already connected
569        let peer_pubkey = self.connect_to_peer(address).await?;
570
571        // Store the address for this peer (needed for sync_tree_with_peer)
572        self.add_peer_address(&peer_pubkey, address.clone()).await?;
573
574        if let Some(tree_id) = tree_id {
575            // Sync specific tree
576            self.sync_tree_with_peer(&peer_pubkey, tree_id).await?;
577        } else {
578            // TODO: Sync all available trees
579            tracing::warn!(
580                "Syncing all trees not yet implemented - need to enhance discover_peer_trees first"
581            );
582        }
583
584        Ok(())
585    }
586
587    /// Sync with a peer using a [`DatabaseTicket`].
588    ///
589    /// Attempts [`sync_with_peer`](Self::sync_with_peer) for every address
590    /// hint in the ticket concurrently. Each address may point to a different
591    /// peer, so connections are independent. Succeeds if at least one address
592    /// syncs successfully; returns the last error if all fail.
593    ///
594    /// # Arguments
595    /// * `ticket` - A ticket containing the database ID and address hints.
596    ///
597    /// # Errors
598    /// Returns [`SyncError::InvalidAddress`] if the ticket has no address hints.
599    /// Returns the last sync error if no address succeeded.
600    pub async fn sync_with_ticket(&self, ticket: &DatabaseTicket) -> Result<()> {
601        let database_id = ticket.database_id().clone();
602        self.try_addresses_concurrently(ticket.addresses(), |sync, addr| {
603            let db_id = database_id.clone();
604            async move { sync.sync_with_peer(&addr, Some(&db_id)).await }
605        })
606        .await
607    }
608
609    /// Sync a specific tree with a peer, with optional authentication for bootstrap.
610    ///
611    /// This is a lower-level method that allows specifying authentication parameters
612    /// for bootstrap scenarios where access needs to be requested.
613    ///
614    /// # Arguments
615    /// * `peer_pubkey` - The public key of the peer to sync with
616    /// * `tree_id` - The ID of the tree to sync
617    /// * `requesting_key` - Optional public key requesting access (for bootstrap)
618    /// * `requesting_key_name` - Optional name/ID of the requesting key
619    /// * `requested_permission` - Optional permission level being requested
620    ///
621    /// # Returns
622    /// A Result indicating success or failure.
623    pub async fn sync_tree_with_peer_auth(
624        &self,
625        peer_pubkey: &PublicKey,
626        tree_id: &ID,
627        requesting_key: Option<&PublicKey>,
628        requesting_key_name: Option<&str>,
629        requested_permission: Option<Permission>,
630    ) -> Result<()> {
631        // Get peer information and address
632        let peer_info = self
633            .get_peer_info(peer_pubkey)
634            .await?
635            .ok_or_else(|| SyncError::PeerNotFound(peer_pubkey.to_string()))?;
636
637        let address = peer_info
638            .addresses
639            .first()
640            .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?;
641
642        // Get our current tips for this tree (empty if tree doesn't exist)
643        let backend = self.backend()?;
644        let our_tips = backend
645            .get_tips(tree_id)
646            .await
647            .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?;
648
649        // Get our device public key for automatic peer tracking
650        let our_device_pubkey = self.get_device_pubkey().ok();
651
652        // Send unified sync request with auth parameters
653        let request = SyncRequest::SyncTree(SyncTreeRequest {
654            tree_id: tree_id.clone(),
655            our_tips,
656            peer_pubkey: our_device_pubkey,
657            requesting_key: requesting_key.cloned(),
658            requesting_key_name: requesting_key_name.map(|k| k.to_string()),
659            requested_permission,
660        });
661
662        // Send request via background sync command
663        let (tx, rx) = oneshot::channel();
664        self.background_tx
665            .get()
666            .ok_or(SyncError::NoTransportEnabled)?
667            .send(SyncCommand::SendRequest {
668                address: address.clone(),
669                request: Box::new(request),
670                response: tx,
671            })
672            .await
673            .map_err(|_| {
674                SyncError::CommandSendError("Background sync command channel closed".to_string())
675            })?;
676
677        // Wait for response
678        let response = rx
679            .await
680            .map_err(|_| {
681                SyncError::CommandSendError("Background sync response channel closed".to_string())
682            })?
683            .map_err(|e| SyncError::Network(format!("Sync request failed: {e}")))?;
684
685        // Handle the response (same logic as existing sync_tree_with_peer)
686        match response {
687            SyncResponse::Bootstrap(bootstrap_response) => {
688                info!(peer = %peer_pubkey, tree = %tree_id, entry_count = bootstrap_response.all_entries.len() + 1, "Received bootstrap response");
689
690                // Store the root entry
691                let backend = self.backend()?;
692                backend.put_verified(bootstrap_response.root_entry).await?;
693
694                // Store all other entries
695                for entry in bootstrap_response.all_entries {
696                    backend.put_unverified(entry).await?;
697                }
698
699                info!(peer = %peer_pubkey, tree = %tree_id, "Bootstrap sync completed successfully");
700            }
701            SyncResponse::Incremental(incremental_response) => {
702                info!(peer = %peer_pubkey, tree = %tree_id, missing_count = incremental_response.missing_entries.len(), "Received incremental sync response");
703
704                // Use the enhanced handler that supports bidirectional sync
705                self.handle_incremental_response(incremental_response, address)
706                    .await?;
707
708                debug!(peer = %peer_pubkey, tree = %tree_id, "Incremental sync completed");
709            }
710            SyncResponse::BootstrapPending {
711                request_id,
712                message,
713            } => {
714                info!(peer = %peer_pubkey, tree = %tree_id, request_id = %request_id, "Bootstrap request pending manual approval");
715                return Err(SyncError::BootstrapPending {
716                    request_id,
717                    message,
718                }
719                .into());
720            }
721            SyncResponse::Error(err) => {
722                return Err(SyncError::Network(format!("Peer returned error: {err}")).into());
723            }
724            _ => {
725                return Err(SyncError::SyncProtocolError(
726                    "Unexpected response type for sync tree request".to_string(),
727                )
728                .into());
729            }
730        }
731
732        // Track tree/peer relationship for sync_on_commit to work
733        // This allows on_local_write() to find this peer when queueing entries
734        self.add_tree_sync(peer_pubkey, tree_id).await?;
735
736        Ok(())
737    }
738
739    // === Flush Operations ===
740
741    /// Process all queued entries and retry any failed sends.
742    ///
743    /// This method:
744    /// 1. Retries all entries in the retry queue (ignoring backoff timers)
745    /// 2. Processes all entries in the sync queue (batched by peer)
746    ///
747    /// When this method returns, all pending sync work has been attempted.
748    /// This is useful to eensuree that all pending pushes have completed.
749    ///
750    /// # Returns
751    /// `Ok(())` if all operations completed successfully, or an error
752    /// if the background sync engine is not running or sends failed.
753    pub async fn flush(&self) -> Result<()> {
754        let (tx, rx) = oneshot::channel();
755
756        self.background_tx
757            .get()
758            .ok_or(SyncError::NoTransportEnabled)?
759            .send(SyncCommand::Flush { response: tx })
760            .await
761            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
762
763        rx.await
764            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
765    }
766
767    /// Timeout applied to each address attempt in
768    /// [`try_addresses_concurrently`](Self::try_addresses_concurrently).
769    const ADDRESS_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(30);
770
771    /// Try an operation against multiple addresses concurrently, returning on
772    /// the first success.
773    ///
774    /// Spawns one detached task per address via [`tokio::spawn`]. Returns as
775    /// soon as any task succeeds. Remaining tasks are **not** cancelled — they
776    /// continue running in the background so that additional peer connections
777    /// can be established and registered for future syncs. Each task is subject
778    /// to [`ADDRESS_ATTEMPT_TIMEOUT`](Self::ADDRESS_ATTEMPT_TIMEOUT).
779    ///
780    /// If all tasks fail the last error is returned. If `addresses` is empty
781    /// an [`SyncError::InvalidAddress`] error is returned.
782    pub(super) async fn try_addresses_concurrently<F, Fut>(
783        &self,
784        addresses: &[Address],
785        f: F,
786    ) -> Result<()>
787    where
788        F: Fn(Sync, Address) -> Fut,
789        Fut: Future<Output = Result<()>> + Send + 'static,
790    {
791        if addresses.is_empty() {
792            return Err(SyncError::InvalidAddress("Ticket has no address hints".into()).into());
793        }
794
795        let (tx, mut rx) = tokio::sync::mpsc::channel(addresses.len());
796
797        for addr in addresses {
798            let tx = tx.clone();
799            let fut = f(self.clone(), addr.clone());
800            let addr_info = addr.clone();
801            // Detached spawn: the task keeps running even after we return.
802            tokio::spawn(async move {
803                let result = tokio::time::timeout(Self::ADDRESS_ATTEMPT_TIMEOUT, fut).await;
804                let result = match result {
805                    Ok(inner) => inner,
806                    Err(_) => {
807                        warn!(
808                            address = ?addr_info,
809                            "Address attempt timed out after {:?}",
810                            Self::ADDRESS_ATTEMPT_TIMEOUT,
811                        );
812                        Err(SyncError::Network(format!(
813                            "Address attempt timed out after {:?}",
814                            Self::ADDRESS_ATTEMPT_TIMEOUT,
815                        ))
816                        .into())
817                    }
818                };
819                match &result {
820                    Ok(()) => debug!(address = ?addr_info, "Address attempt succeeded"),
821                    Err(e) => debug!(address = ?addr_info, error = %e, "Address attempt failed"),
822                }
823                // Ignore send errors — the receiver is dropped on early success,
824                // but the task still completes its work (peer registration, etc.).
825                let _ = tx.send(result).await;
826            });
827        }
828        // Drop our sender so the channel closes when all tasks finish.
829        drop(tx);
830
831        let mut last_err = None;
832        while let Some(result) = rx.recv().await {
833            match result {
834                Ok(()) => return Ok(()),
835                Err(e) => last_err = Some(e),
836            }
837        }
838
839        Err(last_err.expect("at least one task was spawned"))
840    }
841}