eidetica/sync/ops.rs
1//! Core sync operations for the sync system.
2
3use std::future::Future;
4use std::time::Duration;
5
6use tokio::sync::oneshot;
7use tracing::{debug, info, warn};
8
9use super::{
10 Address, DatabaseTicket, PeerId, Sync, SyncError,
11 background::SyncCommand,
12 peer_manager::PeerManager,
13 peer_types,
14 protocol::{self, SyncRequest, SyncResponse, SyncTreeRequest},
15 user_sync_manager::UserSyncManager,
16};
17use crate::{
18 Database, Entry, Result, auth::Permission, auth::crypto::PublicKey, entry::ID, store::DocStore,
19};
20
21use super::utils::collect_ancestors_to_send;
22
23impl Sync {
24 // === Core Sync Methods ===
25
26 /// Synchronize a specific tree with a peer using bidirectional sync.
27 ///
28 /// This is the main synchronization method that implements tip exchange
29 /// and bidirectional entry transfer to keep trees in sync between peers.
30 /// It performs both pull (fetch missing entries) and push (send our entries).
31 ///
32 /// # Arguments
33 /// * `peer_pubkey` - The public key of the peer to sync with
34 /// * `tree_id` - The ID of the tree to synchronize
35 ///
36 /// # Returns
37 /// A Result indicating success or failure of the sync operation.
38 pub async fn sync_tree_with_peer(&self, peer_pubkey: &PublicKey, tree_id: &ID) -> Result<()> {
39 // Get peer information and address
40 let peer_info = self
41 .get_peer_info(peer_pubkey)
42 .await?
43 .ok_or_else(|| SyncError::PeerNotFound(peer_pubkey.to_string()))?;
44
45 let address = peer_info
46 .addresses
47 .first()
48 .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?;
49
50 // Get our current tips for this tree (empty if tree doesn't exist)
51 let backend = self.backend()?;
52 let our_tips: Vec<ID> = backend
53 .snapshot(tree_id)
54 .await
55 .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?
56 .into_tips();
57
58 // Get our device public key for automatic peer tracking
59 let our_device_pubkey = self.get_device_pubkey().ok();
60
61 // Send unified sync request
62 let request = SyncRequest::SyncTree(SyncTreeRequest {
63 tree_id: tree_id.clone(),
64 our_tips,
65 peer_pubkey: our_device_pubkey,
66 requesting_key: None, // TODO: Add auth support for direct sync
67 requesting_key_name: None,
68 requested_permission: None,
69 });
70
71 // Send request via background sync command
72 let (tx, rx) = oneshot::channel();
73 self.background_tx
74 .get()
75 .ok_or(SyncError::NoTransportEnabled)?
76 .send(SyncCommand::SendRequest {
77 address: address.clone(),
78 request: Box::new(request),
79 response: tx,
80 })
81 .await
82 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
83
84 let response = rx
85 .await
86 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
87 .map_err(|e| SyncError::Network(format!("Request failed: {e}")))?;
88
89 match response {
90 SyncResponse::Bootstrap(bootstrap_response) => {
91 self.handle_bootstrap_response(bootstrap_response).await?;
92 }
93 SyncResponse::Incremental(incremental_response) => {
94 self.handle_incremental_response(incremental_response, address)
95 .await?;
96 }
97 SyncResponse::Error(msg) => {
98 return Err(SyncError::SyncProtocolError(format!("Sync error: {msg}")).into());
99 }
100 _ => {
101 return Err(SyncError::UnexpectedResponse {
102 expected: "Bootstrap or Incremental",
103 actual: format!("{response:?}"),
104 }
105 .into());
106 }
107 }
108
109 // Track tree/peer relationship for sync_on_commit to work
110 // This allows on_local_write() to find this peer when queueing entries
111 self.add_tree_sync(peer_pubkey, tree_id).await?;
112
113 Ok(())
114 }
115
116 /// Handle bootstrap response by storing root and all entries
117 pub(super) async fn handle_bootstrap_response(
118 &self,
119 response: protocol::BootstrapResponse,
120 ) -> Result<()> {
121 tracing::info!(tree_id = %response.tree_id, "Processing bootstrap response");
122
123 // Integrity check: the root entry's content must hash to the declared
124 // tree_id. Rejects peers serving substituted content. A mismatch here
125 // also covers the cross-algorithm bootstrap case (e.g. a SHA-256 tree
126 // advertised to a BLAKE3-default node) — those are unsupported until
127 // the backend gains multi-CID-per-entry storage, so failing loudly is
128 // better than silently re-keying the DAG under the wrong algorithm.
129 let derived = response.root_entry.id();
130 if derived != response.tree_id {
131 return Err(SyncError::InvalidEntry(format!(
132 "root entry content hashes to {} but bootstrap response declares tree_id {}",
133 derived, response.tree_id
134 ))
135 .into());
136 }
137
138 // Combine root entry with all other entries into a single batch
139 let mut all_entries = Vec::with_capacity(1 + response.all_entries.len());
140 all_entries.push(response.root_entry);
141 all_entries.extend(response.all_entries);
142
143 // Store all entries and fire callbacks once
144 self.store_received_entries(&response.tree_id, all_entries)
145 .await?;
146
147 tracing::info!(tree_id = %response.tree_id, "Bootstrap completed successfully");
148 Ok(())
149 }
150
151 /// Handle incremental response by storing missing entries and sending back what server is missing
152 pub(super) async fn handle_incremental_response(
153 &self,
154 response: protocol::IncrementalResponse,
155 peer_address: &peer_types::Address,
156 ) -> Result<()> {
157 tracing::debug!(tree_id = %response.tree_id, "Processing incremental response");
158
159 // Step 1: Store missing entries
160 self.store_received_entries(&response.tree_id, response.missing_entries)
161 .await?;
162
163 // Step 2: Check if server is missing entries from us
164 let backend = self.backend()?;
165 let our_snapshot = backend.snapshot(&response.tree_id).await?;
166 let their_tips = &response.their_tips;
167
168 // Find tips they don't have
169 let missing_tip_ids: Vec<_> = our_snapshot
170 .tips()
171 .iter()
172 .filter(|tip_id| !their_tips.contains(tip_id))
173 .cloned()
174 .collect();
175
176 if !missing_tip_ids.is_empty() {
177 tracing::debug!(
178 tree_id = %response.tree_id,
179 missing_tips = missing_tip_ids.len(),
180 "Server is missing some of our entries, sending them back"
181 );
182
183 // Collect entries server is missing
184 let engine = self
185 .backend()?
186 .local_engine()
187 .expect("sync requires local backend");
188 let entries_for_server =
189 collect_ancestors_to_send(engine.as_ref(), &missing_tip_ids, their_tips).await?;
190
191 if !entries_for_server.is_empty() {
192 // Send these entries back to server
193 self.send_missing_entries_to_peer(
194 peer_address,
195 &response.tree_id,
196 entries_for_server,
197 )
198 .await?;
199 }
200 }
201
202 tracing::debug!(tree_id = %response.tree_id, "Incremental sync completed");
203 Ok(())
204 }
205
206 /// Send entries that the server is missing back to complete bidirectional sync
207 async fn send_missing_entries_to_peer(
208 &self,
209 peer_address: &peer_types::Address,
210 tree_id: &ID,
211 entries: Vec<Entry>,
212 ) -> Result<()> {
213 if entries.is_empty() {
214 return Ok(());
215 }
216
217 tracing::debug!(
218 tree_id = %tree_id,
219 entry_count = entries.len(),
220 "Sending missing entries back to peer for bidirectional sync"
221 );
222
223 let request = protocol::SyncRequest::SendEntries(entries);
224
225 // Send via command channel
226 let (tx, rx) = tokio::sync::oneshot::channel();
227 self.background_tx
228 .get()
229 .ok_or(SyncError::NoTransportEnabled)?
230 .send(SyncCommand::SendRequest {
231 address: peer_address.clone(),
232 request: Box::new(request),
233 response: tx,
234 })
235 .await
236 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
237
238 // Wait for acknowledgment
239 let response = rx
240 .await
241 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
242 .map_err(|e| SyncError::Network(format!("Request failed: {e}")))?;
243
244 match response {
245 protocol::SyncResponse::Ack | protocol::SyncResponse::Count(_) => {
246 tracing::debug!(tree_id = %tree_id, "Server acknowledged receipt of missing entries");
247 Ok(())
248 }
249 protocol::SyncResponse::Error(e) => {
250 Err(SyncError::Network(format!("Server error receiving entries: {e}")).into())
251 }
252 _ => Err(SyncError::UnexpectedResponse {
253 expected: "Ack or Count",
254 actual: format!("{response:?}"),
255 }
256 .into()),
257 }
258 }
259
260 /// Validate and store received entries from a peer, firing remote write callbacks.
261 pub(super) async fn store_received_entries(
262 &self,
263 tree_id: &ID,
264 entries: Vec<Entry>,
265 ) -> Result<()> {
266 // These entries arrive without per-entry declared IDs — they were batched
267 // under a single tree_id by the sender. Content is stored under whatever
268 // ID our local `entry.id()` derives, so substitution attacks on individual
269 // entries would fail DAG connectivity checks via parent pointers rather
270 // than a per-entry hash check here. Root-level integrity is verified by
271 // the bootstrap handler against the declared tree_id.
272 //
273 // TODO: Add signature verification and parent-existence / DAG-connectivity
274 // checks before marking entries as verified.
275
276 // Store entries and fire callbacks via Instance::put_remote_entries.
277 // Stored Unverified: these arrived from a peer and have not been
278 // verified by this node.
279 let instance = self.instance()?;
280 instance
281 .put_remote_entries(tree_id, entries)
282 .await
283 .map_err(|e| SyncError::BackendError(format!("Failed to store entries: {e}")))?;
284
285 Ok(())
286 }
287
288 /// Send a batch of entries to a sync peer (async version).
289 ///
290 /// # Arguments
291 /// * `entries` - The entries to send
292 /// * `address` - The address of the peer to send to
293 ///
294 /// # Returns
295 /// A Result indicating whether the entries were successfully acknowledged.
296 pub async fn send_entries(
297 &self,
298 entries: impl AsRef<[Entry]>,
299 address: &Address,
300 ) -> Result<()> {
301 let entries_vec = entries.as_ref().to_vec();
302 let request = SyncRequest::SendEntries(entries_vec);
303 let response = self.send_request(&request, address).await?;
304
305 match response {
306 SyncResponse::Ack | SyncResponse::Count(_) => Ok(()),
307 SyncResponse::Error(msg) => Err(SyncError::SyncProtocolError(format!(
308 "Peer {} returned error: {}",
309 address.address, msg
310 ))
311 .into()),
312 _ => Err(SyncError::UnexpectedResponse {
313 expected: "Ack or Count",
314 actual: format!("{response:?}"),
315 }
316 .into()),
317 }
318 }
319
320 /// Send specific entries to a peer via the background sync engine.
321 ///
322 /// This method queues entries for direct transmission without duplicate filtering.
323 /// The caller is responsible for determining which entries should be sent.
324 ///
325 /// # Duplicate Prevention Architecture
326 ///
327 /// Eidetica uses **smart duplicate prevention** in the background sync engine:
328 /// - **Database sync** (`SyncWithPeer` command): Uses tip comparison for semantic filtering
329 /// - **Direct send** (this method): Trusts caller to provide appropriate entries
330 ///
331 /// For automatic duplicate prevention, use tree-based sync relationships instead
332 /// of calling this method directly.
333 ///
334 /// # Arguments
335 /// * `peer_id` - The peer ID to send to
336 /// * `entries` - The specific entries to send (no filtering applied)
337 ///
338 /// # Returns
339 /// A Result indicating whether the command was successfully queued for background processing.
340 pub async fn send_entries_to_peer(&self, peer_id: &PeerId, entries: Vec<Entry>) -> Result<()> {
341 self.background_tx
342 .get()
343 .ok_or(SyncError::NoTransportEnabled)?
344 .send(SyncCommand::SendEntries {
345 peer: peer_id.clone(),
346 entries,
347 })
348 .await
349 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
350 Ok(())
351 }
352
353 /// Queue an entry for sync to a peer (non-blocking, for use in callbacks).
354 ///
355 /// This method is designed for use in write callbacks where async operations
356 /// are not possible. It uses try_send to avoid blocking, and logs errors
357 /// rather than failing the callback.
358 ///
359 /// # Arguments
360 /// * `peer_pubkey` - The public key of the peer to sync with
361 /// * `entry_id` - The ID of the entry to queue
362 /// * `tree_id` - The tree ID where the entry belongs
363 ///
364 /// # Returns
365 /// Ok(()) if the entry was successfully queued.
366 /// Only returns Err if transport is not enabled.
367 pub fn queue_entry_for_sync(
368 &self,
369 peer_id: &PeerId,
370 entry_id: &ID,
371 tree_id: &ID,
372 ) -> Result<()> {
373 // Ensure background sync is running
374 if self.background_tx.get().is_none() {
375 return Err(SyncError::NoTransportEnabled.into());
376 }
377
378 // Add to queue - BackgroundSync will process and send
379 self.queue
380 .enqueue(peer_id, entry_id.clone(), tree_id.clone());
381
382 Ok(())
383 }
384
385 /// Handle local write events for automatic sync.
386 ///
387 /// This method is called by the Instance write callback system when entries
388 /// are committed locally. It looks up the combined sync settings for the database
389 /// and queues the entry for sync with all configured peers if sync is enabled.
390 ///
391 /// This is the core method that implements automatic sync-on-commit behavior.
392 ///
393 /// # Arguments
394 /// * `event` - The write event containing the newly committed entries
395 /// * `database` - The database where the entries were committed
396 ///
397 /// # Returns
398 /// Ok(()) on success, or an error if settings lookup fails
399 pub(crate) async fn on_local_write(
400 &self,
401 event: &crate::instance::WriteEvent,
402 database: &Database,
403 ) -> Result<()> {
404 // Early return if background sync not running
405 if self.background_tx.get().is_none() {
406 return Ok(());
407 }
408
409 // Look up combined settings for this database
410 let tx = self.sync_tree.new_transaction().await?;
411 let user_mgr = UserSyncManager::new(&tx);
412 let peer_mgr = PeerManager::new(&tx);
413
414 let combined_settings = match user_mgr.get_combined_settings(database.root_id()).await? {
415 Some(settings) => settings,
416 None => {
417 // No settings configured for this database - no sync needed
418 debug!(database_id = %database.root_id(), "No sync settings for database, skipping");
419 return Ok(());
420 }
421 };
422
423 // Check if sync is enabled and sync_on_commit is true
424 if !combined_settings.sync_enabled || !combined_settings.sync_on_commit {
425 debug!(
426 database_id = %database.root_id(),
427 sync_enabled = combined_settings.sync_enabled,
428 sync_on_commit = combined_settings.sync_on_commit,
429 "Sync not enabled for database"
430 );
431 return Ok(());
432 }
433
434 // Get list of peers for this database
435 let peers = peer_mgr.get_tree_peers(database.root_id()).await?;
436
437 if peers.is_empty() {
438 debug!(database_id = %database.root_id(), "No peers configured for database");
439 return Ok(());
440 }
441
442 // Queue each entry for sync with each peer
443 let tree_id = database.root_id();
444
445 for entry in event.entries() {
446 let entry_id = entry.id();
447 debug!(
448 database_id = %tree_id,
449 entry_id = %entry_id,
450 peer_count = peers.len(),
451 "Queueing entry for automatic sync"
452 );
453
454 for peer_id in &peers {
455 self.queue_entry_for_sync(peer_id, &entry_id, tree_id)?;
456 }
457 }
458
459 Ok(())
460 }
461
462 /// Initialize combined settings for all users.
463 ///
464 /// This is called during Sync initialization. For new sync trees (just created),
465 /// it scans the _users database to register all existing users. For existing
466 /// sync trees (loaded), it updates combined settings for already-tracked users.
467 pub(super) async fn initialize_user_settings(&self) -> Result<()> {
468 use crate::store::Table;
469 use crate::user::types::UserInfo;
470
471 // Check if sync tree is freshly created (no users tracked yet)
472 let user_tracking = self
473 .sync_tree
474 .get_store_viewer::<DocStore>(super::user_sync_manager::USER_TRACKING_SUBTREE)
475 .await?;
476 let all_tracked = user_tracking.get_all().await?;
477
478 if all_tracked.keys().count() == 0 {
479 // New sync tree - register all users from _users database
480 let instance = self.instance.upgrade().ok_or(SyncError::InstanceDropped)?;
481 let users_db = instance.users_db().await?;
482 let users_table = users_db
483 .get_store_viewer::<Table<UserInfo>>("users")
484 .await?;
485 let all_users = users_table.search(|_| true).await?;
486
487 for (user_uuid, user_info) in all_users {
488 self.sync_user(&user_uuid, &user_info.user_database_id)
489 .await?;
490 }
491 } else {
492 // Existing sync tree - update settings for tracked users if changed
493 let tx = self.sync_tree.new_transaction().await?;
494 let user_mgr = UserSyncManager::new(&tx);
495
496 for user_uuid in all_tracked.keys() {
497 if let Some((prefs_db_id, _tips)) =
498 user_mgr.get_tracked_user_state(user_uuid).await?
499 {
500 self.sync_user(user_uuid, &prefs_db_id).await?;
501 }
502 }
503 }
504
505 Ok(())
506 }
507
508 /// Send a sync request to a peer and get a response (async version).
509 ///
510 /// # Arguments
511 /// * `request` - The sync request to send
512 /// * `address` - The address of the peer
513 ///
514 /// # Returns
515 /// The sync response from the peer.
516 pub(super) async fn send_request(
517 &self,
518 request: &SyncRequest,
519 address: &Address,
520 ) -> Result<SyncResponse> {
521 let (tx, rx) = oneshot::channel();
522
523 self.background_tx
524 .get()
525 .ok_or(SyncError::NoTransportEnabled)?
526 .send(SyncCommand::SendRequest {
527 address: address.clone(),
528 request: Box::new(request.clone()),
529 response: tx,
530 })
531 .await
532 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
533
534 rx.await
535 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
536 }
537
538 /// Discover available trees from a peer (simplified API).
539 ///
540 /// This method connects to a peer and retrieves the list of trees they're willing to sync.
541 /// This is useful for discovering what can be synced before setting up sync relationships.
542 ///
543 /// # Arguments
544 /// * `address` - The transport address of the peer.
545 ///
546 /// # Returns
547 /// A vector of TreeInfo describing available trees, or an error.
548 pub async fn discover_peer_trees(&self, address: &Address) -> Result<Vec<protocol::TreeInfo>> {
549 // Connect and get handshake info
550 let _peer_pubkey = self.connect_to_peer(address).await?;
551
552 // The handshake already contains the tree list, but we need to get it again
553 // since connect_to_peer doesn't return it. For now, return empty list
554 // TODO: Enhance this to actually return the tree list from handshake
555
556 tracing::warn!(
557 "discover_peer_trees not fully implemented - handshake contains tree info but API needs enhancement"
558 );
559 Ok(vec![])
560 }
561
562 /// Sync with a peer at a given address.
563 ///
564 /// This is a blocking convenience method that:
565 /// 1. Connects to discover the peer's public key
566 /// 2. Registers the peer and performs immediate sync
567 /// 3. Returns after sync completes
568 ///
569 /// For new code, prefer using [`register_sync_peer()`](Self::register_sync_peer)
570 /// directly, which registers intent and lets background sync handle it.
571 ///
572 /// # Arguments
573 /// * `address` - The transport address of the peer.
574 /// * `tree_id` - Optional tree ID to sync (None = discover available trees)
575 ///
576 /// # Returns
577 /// Result indicating success or failure.
578 pub async fn sync_with_peer(&self, address: &Address, tree_id: Option<&ID>) -> Result<()> {
579 // Connect to peer if not already connected
580 let peer_pubkey = self.connect_to_peer(address).await?;
581
582 // Store the address for this peer (needed for sync_tree_with_peer)
583 self.add_peer_address(&peer_pubkey, address.clone()).await?;
584
585 if let Some(tree_id) = tree_id {
586 // Sync specific tree
587 self.sync_tree_with_peer(&peer_pubkey, tree_id).await?;
588 } else {
589 // TODO: Sync all available trees
590 tracing::warn!(
591 "Syncing all trees not yet implemented - need to enhance discover_peer_trees first"
592 );
593 }
594
595 Ok(())
596 }
597
598 /// Sync with a peer using a [`DatabaseTicket`].
599 ///
600 /// Attempts [`sync_with_peer`](Self::sync_with_peer) for every address
601 /// hint in the ticket concurrently. Each address may point to a different
602 /// peer, so connections are independent. Succeeds if at least one address
603 /// syncs successfully; returns the last error if all fail.
604 ///
605 /// # Arguments
606 /// * `ticket` - A ticket containing the database ID and address hints.
607 ///
608 /// # Errors
609 /// Returns [`SyncError::InvalidAddress`] if the ticket has no address hints.
610 /// Returns the last sync error if no address succeeded.
611 pub async fn sync_with_ticket(&self, ticket: &DatabaseTicket) -> Result<()> {
612 let database_id = ticket.database_id().clone();
613 self.try_addresses_concurrently(ticket.addresses(), |sync, addr| {
614 let db_id = database_id.clone();
615 async move { sync.sync_with_peer(&addr, Some(&db_id)).await }
616 })
617 .await
618 }
619
620 /// Sync a specific tree with a peer, with optional authentication for bootstrap.
621 ///
622 /// This is a lower-level method that allows specifying authentication parameters
623 /// for bootstrap scenarios where access needs to be requested.
624 ///
625 /// # Arguments
626 /// * `peer_pubkey` - The public key of the peer to sync with
627 /// * `tree_id` - The ID of the tree to sync
628 /// * `requesting_key` - Optional public key requesting access (for bootstrap)
629 /// * `requesting_key_name` - Optional name/ID of the requesting key
630 /// * `requested_permission` - Optional permission level being requested
631 ///
632 /// # Returns
633 /// A Result indicating success or failure.
634 pub async fn sync_tree_with_peer_auth(
635 &self,
636 peer_pubkey: &PublicKey,
637 tree_id: &ID,
638 requesting_key: Option<&PublicKey>,
639 requesting_key_name: Option<&str>,
640 requested_permission: Option<Permission>,
641 ) -> Result<()> {
642 // Get peer information and address
643 let peer_info = self
644 .get_peer_info(peer_pubkey)
645 .await?
646 .ok_or_else(|| SyncError::PeerNotFound(peer_pubkey.to_string()))?;
647
648 let address = peer_info
649 .addresses
650 .first()
651 .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?;
652
653 // Get our current tips for this tree (empty if tree doesn't exist)
654 let backend = self.backend()?;
655 let our_tips: Vec<ID> = backend
656 .snapshot(tree_id)
657 .await
658 .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?
659 .into_tips();
660
661 // Get our device public key for automatic peer tracking
662 let our_device_pubkey = self.get_device_pubkey().ok();
663
664 // Send unified sync request with auth parameters
665 let request = SyncRequest::SyncTree(SyncTreeRequest {
666 tree_id: tree_id.clone(),
667 our_tips,
668 peer_pubkey: our_device_pubkey,
669 requesting_key: requesting_key.cloned(),
670 requesting_key_name: requesting_key_name.map(|k| k.to_string()),
671 requested_permission,
672 });
673
674 // Send request via background sync command
675 let (tx, rx) = oneshot::channel();
676 self.background_tx
677 .get()
678 .ok_or(SyncError::NoTransportEnabled)?
679 .send(SyncCommand::SendRequest {
680 address: address.clone(),
681 request: Box::new(request),
682 response: tx,
683 })
684 .await
685 .map_err(|_| {
686 SyncError::CommandSendError("Background sync command channel closed".to_string())
687 })?;
688
689 // Wait for response
690 let response = rx
691 .await
692 .map_err(|_| {
693 SyncError::CommandSendError("Background sync response channel closed".to_string())
694 })?
695 .map_err(|e| SyncError::Network(format!("Sync request failed: {e}")))?;
696
697 // Handle the response (same logic as existing sync_tree_with_peer)
698 match response {
699 SyncResponse::Bootstrap(bootstrap_response) => {
700 info!(peer = %peer_pubkey, tree = %tree_id, entry_count = bootstrap_response.all_entries.len() + 1, "Received bootstrap response");
701
702 // Store root + all entries as a single batch with callback dispatch
703 let mut all_entries = Vec::with_capacity(1 + bootstrap_response.all_entries.len());
704 all_entries.push(bootstrap_response.root_entry);
705 all_entries.extend(bootstrap_response.all_entries);
706
707 // Bootstrap entries come from a peer; stored Unverified.
708 let instance = self.instance()?;
709 instance.put_remote_entries(tree_id, all_entries).await?;
710
711 info!(peer = %peer_pubkey, tree = %tree_id, "Bootstrap sync completed successfully");
712 }
713 SyncResponse::Incremental(incremental_response) => {
714 info!(peer = %peer_pubkey, tree = %tree_id, missing_count = incremental_response.missing_entries.len(), "Received incremental sync response");
715
716 // Use the enhanced handler that supports bidirectional sync
717 self.handle_incremental_response(incremental_response, address)
718 .await?;
719
720 debug!(peer = %peer_pubkey, tree = %tree_id, "Incremental sync completed");
721 }
722 SyncResponse::BootstrapPending {
723 request_id,
724 message,
725 } => {
726 info!(peer = %peer_pubkey, tree = %tree_id, request_id = %request_id, "Bootstrap request pending manual approval");
727 return Err(SyncError::BootstrapPending {
728 request_id,
729 message,
730 }
731 .into());
732 }
733 SyncResponse::Error(err) => {
734 return Err(SyncError::Network(format!("Peer returned error: {err}")).into());
735 }
736 _ => {
737 return Err(SyncError::SyncProtocolError(
738 "Unexpected response type for sync tree request".to_string(),
739 )
740 .into());
741 }
742 }
743
744 // Track tree/peer relationship for sync_on_commit to work
745 // This allows on_local_write() to find this peer when queueing entries
746 self.add_tree_sync(peer_pubkey, tree_id).await?;
747
748 Ok(())
749 }
750
751 // === Flush Operations ===
752
753 /// Process all queued entries and retry any failed sends.
754 ///
755 /// This method:
756 /// 1. Retries all entries in the retry queue (ignoring backoff timers)
757 /// 2. Processes all entries in the sync queue (batched by peer)
758 ///
759 /// When this method returns, all pending sync work has been attempted.
760 /// This is useful to eensuree that all pending pushes have completed.
761 ///
762 /// # Returns
763 /// `Ok(())` if all operations completed successfully, or an error
764 /// if the background sync engine is not running or sends failed.
765 pub async fn flush(&self) -> Result<()> {
766 let (tx, rx) = oneshot::channel();
767
768 self.background_tx
769 .get()
770 .ok_or(SyncError::NoTransportEnabled)?
771 .send(SyncCommand::Flush { response: tx })
772 .await
773 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
774
775 rx.await
776 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
777 }
778
779 /// Timeout applied to each address attempt in
780 /// [`try_addresses_concurrently`](Self::try_addresses_concurrently).
781 const ADDRESS_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(30);
782
783 /// Try an operation against multiple addresses concurrently, returning on
784 /// the first success.
785 ///
786 /// Spawns one detached task per address via [`tokio::spawn`]. Returns as
787 /// soon as any task succeeds. Remaining tasks are **not** cancelled — they
788 /// continue running in the background so that additional peer connections
789 /// can be established and registered for future syncs. Each task is subject
790 /// to [`ADDRESS_ATTEMPT_TIMEOUT`](Self::ADDRESS_ATTEMPT_TIMEOUT).
791 ///
792 /// If all tasks fail the last error is returned. If `addresses` is empty
793 /// an [`SyncError::InvalidAddress`] error is returned.
794 pub(super) async fn try_addresses_concurrently<F, Fut>(
795 &self,
796 addresses: &[Address],
797 f: F,
798 ) -> Result<()>
799 where
800 F: Fn(Sync, Address) -> Fut,
801 Fut: Future<Output = Result<()>> + Send + 'static,
802 {
803 if addresses.is_empty() {
804 return Err(SyncError::InvalidAddress("Ticket has no address hints".into()).into());
805 }
806
807 let (tx, mut rx) = tokio::sync::mpsc::channel(addresses.len());
808
809 for addr in addresses {
810 let tx = tx.clone();
811 let fut = f(self.clone(), addr.clone());
812 let addr_info = addr.clone();
813 // Detached spawn: the task keeps running even after we return.
814 tokio::spawn(async move {
815 let result = tokio::time::timeout(Self::ADDRESS_ATTEMPT_TIMEOUT, fut).await;
816 let result = match result {
817 Ok(inner) => inner,
818 Err(_) => {
819 warn!(
820 address = ?addr_info,
821 "Address attempt timed out after {:?}",
822 Self::ADDRESS_ATTEMPT_TIMEOUT,
823 );
824 Err(SyncError::Network(format!(
825 "Address attempt timed out after {:?}",
826 Self::ADDRESS_ATTEMPT_TIMEOUT,
827 ))
828 .into())
829 }
830 };
831 match &result {
832 Ok(()) => debug!(address = ?addr_info, "Address attempt succeeded"),
833 Err(e) => debug!(address = ?addr_info, error = %e, "Address attempt failed"),
834 }
835 // Ignore send errors — the receiver is dropped on early success,
836 // but the task still completes its work (peer registration, etc.).
837 let _ = tx.send(result).await;
838 });
839 }
840 // Drop our sender so the channel closes when all tasks finish.
841 drop(tx);
842
843 let mut last_err = None;
844 while let Some(result) = rx.recv().await {
845 match result {
846 Ok(()) => return Ok(()),
847 Err(e) => last_err = Some(e),
848 }
849 }
850
851 Err(last_err.expect("at least one task was spawned"))
852 }
853}