1use std::{sync::Arc, time::Duration};
8
9use tokio::{
10 sync::{mpsc, oneshot},
11 time::interval,
12};
13use tracing::{Instrument, debug, info, info_span, trace};
14
15use super::{
16 error::SyncError,
17 handler::SyncHandlerImpl,
18 peer_manager::PeerManager,
19 peer_types::{Address, PeerId, PeerStatus},
20 protocol::{SyncRequest, SyncResponse, SyncTreeRequest},
21 queue::SyncQueue,
22 transport_manager::TransportManager,
23};
24use crate::{
25 Database, Error, Instance, Result, WeakInstance,
26 auth::crypto::PublicKey,
27 entry::{Entry, ID},
28 store::DocStore,
29};
30
31mod conn;
32
33#[allow(clippy::large_enum_variant)]
35pub enum SyncCommand {
36 SendEntries { peer: PeerId, entries: Vec<Entry> },
38 SyncWithPeer { peer: PeerId },
40 Shutdown,
42
43 AddTransport {
46 name: String,
47 transport: Box<dyn super::transports::SyncTransport>,
48 response: oneshot::Sender<Result<()>>,
49 },
50
51 StartServer {
54 name: Option<String>,
56 response: oneshot::Sender<Result<()>>,
57 },
58 StopServer {
60 name: Option<String>,
62 response: oneshot::Sender<Result<()>>,
63 },
64 GetServerAddress {
66 name: String,
67 response: oneshot::Sender<Result<String>>,
68 },
69 GetAllServerAddresses {
71 response: oneshot::Sender<Result<Vec<(String, String)>>>,
72 },
73
74 ConnectToPeer {
77 address: Address,
78 response: oneshot::Sender<Result<PublicKey>>, },
80
81 SendRequest {
84 address: Address,
85 request: Box<SyncRequest>,
86 response: oneshot::Sender<Result<SyncResponse>>,
87 },
88
89 Flush {
91 response: oneshot::Sender<Result<()>>,
92 },
93}
94
95impl std::fmt::Debug for SyncCommand {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 match self {
103 Self::SendEntries { peer, entries } => f
104 .debug_struct("SendEntries")
105 .field("peer", peer)
106 .field("entries_count", &entries.len())
107 .finish(),
108 Self::SyncWithPeer { peer } => {
109 f.debug_struct("SyncWithPeer").field("peer", peer).finish()
110 }
111 Self::Shutdown => write!(f, "Shutdown"),
112 Self::AddTransport {
113 name, transport, ..
114 } => f
115 .debug_struct("AddTransport")
116 .field("name", name)
117 .field("transport_type", &transport.transport_type())
118 .finish(),
119 Self::StartServer { name, .. } => {
120 f.debug_struct("StartServer").field("name", name).finish()
121 }
122 Self::StopServer { name, .. } => {
123 f.debug_struct("StopServer").field("name", name).finish()
124 }
125 Self::GetServerAddress { name, .. } => f
126 .debug_struct("GetServerAddress")
127 .field("name", name)
128 .finish(),
129 Self::GetAllServerAddresses { .. } => write!(f, "GetAllServerAddresses"),
130 Self::ConnectToPeer { address, .. } => f
131 .debug_struct("ConnectToPeer")
132 .field("address", address)
133 .finish(),
134 Self::SendRequest {
135 address, request, ..
136 } => f
137 .debug_struct("SendRequest")
138 .field("address", address)
139 .field("request", request)
140 .finish(),
141 Self::Flush { .. } => write!(f, "Flush"),
142 }
143 }
144}
145
146#[derive(Debug, Clone)]
148struct RetryEntry {
149 peer: PeerId,
150 entries: Vec<Entry>,
151 attempts: u32,
152 last_attempt_ms: u64,
154}
155
156pub struct BackgroundSync {
158 pub(super) transport_manager: TransportManager,
160 instance: WeakInstance,
161 pub(super) sync_tree_id: ID,
162
163 queue: Arc<SyncQueue>,
165
166 retry_queue: Vec<RetryEntry>,
168
169 command_rx: mpsc::Receiver<SyncCommand>,
171}
172
173impl BackgroundSync {
174 pub fn start(
179 instance: Instance,
180 sync_tree_id: ID,
181 queue: Arc<SyncQueue>,
182 ) -> mpsc::Sender<SyncCommand> {
183 let (tx, rx) = mpsc::channel(100);
184
185 let background = Self {
186 transport_manager: TransportManager::new(),
187 instance: instance.downgrade(),
188 sync_tree_id,
189 queue,
190 retry_queue: Vec::new(),
191 command_rx: rx,
192 };
193
194 tokio::spawn(background.run());
197 tx
198 }
199
200 pub(super) fn instance(&self) -> Result<Instance> {
202 self.instance
203 .upgrade()
204 .ok_or_else(|| SyncError::InstanceDropped.into())
205 }
206
207 pub(super) async fn get_sync_tree(&self) -> Result<Database> {
209 let instance = self.instance()?;
211 let signing_key = instance.signing_key()?.clone();
212 Ok(Database::open(&instance, &self.sync_tree_id)
213 .await?
214 .with_key(signing_key))
215 }
216
217 async fn get_min_sync_interval(&self) -> Option<u64> {
220 let sync_tree = match self.get_sync_tree().await {
221 Ok(tree) => tree,
222 Err(_) => return None,
223 };
224
225 let txn = match sync_tree.new_transaction().await {
226 Ok(txn) => txn,
227 Err(_) => return None,
228 };
229
230 let user_mgr = super::user_sync_manager::UserSyncManager::new(&txn);
231
232 let database_users = match txn
234 .get_store::<DocStore>(super::user_sync_manager::DATABASE_USERS_SUBTREE)
235 .await
236 {
237 Ok(store) => store,
238 Err(_) => return None,
239 };
240
241 let all_dbs = match database_users.get_all().await {
242 Ok(doc) => doc,
243 Err(_) => return None,
244 };
245
246 let mut min_interval: Option<u64> = None;
248 for db_id_str in all_dbs.keys() {
249 if let Ok(db_id) = ID::parse(db_id_str)
250 && let Ok(Some(settings)) = user_mgr.get_combined_settings(&db_id).await
251 && let Some(interval) = settings.interval_seconds
252 {
253 min_interval = Some(match min_interval {
254 Some(current_min) => current_min.min(interval),
255 None => interval,
256 });
257 }
258 }
259
260 min_interval
261 }
262
263 async fn run(mut self) {
265 async move {
266 info!("Starting background sync engine");
267
268 let mut current_interval_secs = self.get_min_sync_interval().await.unwrap_or(300);
270 info!("Initial periodic sync interval: {} seconds", current_interval_secs);
271
272 let mut periodic_sync = interval(Duration::from_secs(current_interval_secs));
274 let mut queue_check = interval(Duration::from_secs(5)); let mut retry_check = interval(Duration::from_secs(30)); let mut connection_check = interval(Duration::from_secs(60)); let mut settings_check = interval(Duration::from_secs(60)); periodic_sync.tick().await;
281 queue_check.tick().await;
282 retry_check.tick().await;
283 connection_check.tick().await;
284 settings_check.tick().await;
285
286 loop {
287 tokio::select! {
288 Some(cmd) = self.command_rx.recv() => {
290 if let Err(e) = self.handle_command(cmd).await {
291 tracing::error!("Background sync command error: {e}");
293 }
294 }
295
296 _ = queue_check.tick() => {
298 self.process_queue().await;
299 }
300
301 _ = periodic_sync.tick() => {
303 self.periodic_sync_all_peers().await;
304 }
305
306 _ = retry_check.tick() => {
308 self.process_retry_queue().await;
309 }
310
311 _ = connection_check.tick() => {
313 self.check_peer_connections().await;
314 }
315
316 _ = settings_check.tick() => {
318 if let Some(new_interval) = self.get_min_sync_interval().await
319 && new_interval != current_interval_secs {
320 info!("Sync interval changed from {} to {} seconds", current_interval_secs, new_interval);
321 current_interval_secs = new_interval;
322 periodic_sync = interval(Duration::from_secs(new_interval));
324 periodic_sync.tick().await; }
326 }
327
328 else => {
330 info!("Background sync engine shutting down");
332 break;
333 }
334 }
335 }
336 }
337 .instrument(info_span!("background_sync"))
338 .await
339 }
340
341 async fn handle_command(&mut self, command: SyncCommand) -> Result<()> {
343 match command {
344 SyncCommand::SendEntries { peer, entries } => {
345 if let Err(e) = self.send_to_peer(&peer, entries.clone()).await {
346 let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
347 self.add_to_retry_queue(peer, entries, e, now_ms);
348 }
349 }
350
351 SyncCommand::SyncWithPeer { peer } => {
352 if let Err(e) = self.sync_with_peer(&peer).await {
353 tracing::error!("Failed to sync with peer {peer}: {e}");
355 }
356 }
357
358 SyncCommand::AddTransport {
359 name,
360 transport,
361 response,
362 } => {
363 if let Some(old) = self.transport_manager.get_mut(&name)
365 && old.is_server_running()
366 {
367 let _ = old.stop_server().await;
368 }
369 self.transport_manager.add(&name, transport);
370 tracing::debug!("Added transport: {}", name);
371 let _ = response.send(Ok(()));
372 }
373
374 SyncCommand::StartServer { name, response } => {
375 let result = self.start_server(name.as_deref()).await;
376 let _ = response.send(result);
377 }
378
379 SyncCommand::StopServer { name, response } => {
380 let result = self.stop_server(name.as_deref()).await;
381 let _ = response.send(result);
382 }
383
384 SyncCommand::GetServerAddress { name, response } => {
385 let result = self.transport_manager.get_server_address(&name);
386 let _ = response.send(result);
387 }
388
389 SyncCommand::GetAllServerAddresses { response } => {
390 let addresses = self.transport_manager.get_all_server_addresses();
391 let _ = response.send(Ok(addresses));
392 }
393
394 SyncCommand::ConnectToPeer { address, response } => {
395 let result = self.connect_to_peer(&address).await;
396 let _ = response.send(result);
397 }
398
399 SyncCommand::SendRequest {
400 address,
401 request,
402 response,
403 } => {
404 let result = self.send_sync_request(&address, &request).await;
405 let _ = response.send(result);
406 }
407
408 SyncCommand::Flush { response } => {
409 let retry_failures = self.flush_retry_queue().await;
412 let queue_failures = self.process_queue().await;
413
414 let result = match (retry_failures, queue_failures) {
416 (0, 0) => Ok(()),
417 (r, q) => Err(SyncError::Network(format!(
418 "Flush had failures: {r} from retry queue, {q} from new entries"
419 ))
420 .into()),
421 };
422 let _ = response.send(result);
423 }
424
425 SyncCommand::Shutdown => {
426 return Err(SyncError::Network("Shutdown requested".to_string()).into());
428 }
429 }
430 Ok(())
431 }
432
433 async fn send_to_peer(&self, peer: &PeerId, entries: Vec<Entry>) -> Result<()> {
450 let address = {
452 let sync_tree = self.get_sync_tree().await?;
453 let txn = sync_tree.new_transaction().await?;
454 let peer_info = PeerManager::new(&txn)
455 .get_peer_info(peer.public_key())
456 .await?
457 .ok_or_else(|| SyncError::PeerNotFound(peer.to_string()))?;
458
459 peer_info
460 .addresses
461 .first()
462 .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?
463 .clone()
464 }; let request = SyncRequest::SendEntries(entries);
467 let response = self
468 .transport_manager
469 .send_request(&address, &request)
470 .await?;
471
472 match response {
473 SyncResponse::Ack | SyncResponse::Count(_) => Ok(()),
474 SyncResponse::Error(msg) => Err(SyncError::SyncProtocolError(format!(
475 "Peer {peer} returned error: {msg}"
476 ))
477 .into()),
478 _ => Err(SyncError::UnexpectedResponse {
479 expected: "Ack or Count",
480 actual: format!("{response:?}"),
481 }
482 .into()),
483 }
484 }
485
486 fn add_to_retry_queue(&mut self, peer: PeerId, entries: Vec<Entry>, error: Error, now_ms: u64) {
488 tracing::warn!("Failed to send to {peer}: {error}. Adding to retry queue.");
490 self.retry_queue.push(RetryEntry {
491 peer,
492 entries,
493 attempts: 1,
494 last_attempt_ms: now_ms,
495 });
496 }
497
498 async fn process_queue(&mut self) -> usize {
504 let batches = self.queue.drain();
505 if batches.is_empty() {
506 return 0;
507 }
508
509 let instance = match self.instance() {
510 Ok(i) => i,
511 Err(e) => {
512 tracing::warn!("Failed to get instance for queue processing: {e}");
513 return batches.len(); }
515 };
516
517 let mut failures = 0;
518 for (peer, entry_ids) in batches {
519 let mut entries = Vec::with_capacity(entry_ids.len());
521 for (entry_id, _tree_id) in &entry_ids {
522 match instance.backend().get(entry_id).await {
523 Ok(entry) => entries.push(entry),
524 Err(e) => {
525 tracing::warn!("Failed to fetch entry {entry_id} for peer {peer}: {e}");
526 }
527 }
528 }
529
530 if entries.is_empty() {
531 continue;
532 }
533
534 if let Err(e) = self.send_to_peer(&peer, entries.clone()).await {
536 let now_ms = instance.clock().now_millis();
537 self.add_to_retry_queue(peer, entries, e, now_ms);
538 failures += 1;
539 }
540 }
541 failures
542 }
543
544 async fn process_retry_queue(&mut self) {
546 let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
547 let mut still_failed = Vec::new();
548
549 let retry_queue = std::mem::take(&mut self.retry_queue);
551
552 for mut entry in retry_queue {
554 let backoff_ms = 2u64.pow(entry.attempts.min(6)) * 1000;
556 let elapsed_ms = now_ms.saturating_sub(entry.last_attempt_ms);
557
558 if elapsed_ms >= backoff_ms {
559 if let Err(_e) = self.send_to_peer(&entry.peer, entry.entries.clone()).await {
561 entry.attempts += 1;
562 entry.last_attempt_ms = now_ms;
563
564 if entry.attempts < 10 {
565 still_failed.push(entry);
567 } else {
568 tracing::error!("Giving up on sending to {} after 10 attempts", entry.peer);
570 }
571 } else {
572 }
574 } else {
575 still_failed.push(entry);
577 }
578 }
579
580 self.retry_queue = still_failed;
581 }
582
583 async fn flush_retry_queue(&mut self) -> usize {
586 let mut still_failed = Vec::new();
587 let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
588
589 let retry_queue = std::mem::take(&mut self.retry_queue);
591
592 for mut retry_entry in retry_queue {
594 if let Err(_e) = self
595 .send_to_peer(&retry_entry.peer, retry_entry.entries.clone())
596 .await
597 {
598 retry_entry.attempts += 1;
599 retry_entry.last_attempt_ms = now_ms;
600
601 if retry_entry.attempts < 10 {
602 still_failed.push(retry_entry);
603 } else {
604 tracing::error!(
605 "Giving up on sending to {} after 10 attempts",
606 retry_entry.peer
607 );
608 }
609 }
610 }
611
612 let failed_count = still_failed.len();
613 self.retry_queue = still_failed;
614 failed_count
615 }
616
617 async fn periodic_sync_all_peers(&self) {
619 let peers = match self.get_sync_tree().await {
623 Ok(sync_tree) => match sync_tree.new_transaction().await {
624 Ok(txn) => match PeerManager::new(&txn).list_peers().await {
625 Ok(peers) => {
626 peers
628 }
629 Err(_) => {
630 return;
632 }
633 },
634 Err(_) => {
635 return;
637 }
638 },
639 Err(_) => {
640 return;
642 }
643 };
644
645 for peer_info in peers {
647 if peer_info.status == PeerStatus::Active
648 && let Err(e) = self.sync_with_peer(&peer_info.id).await
649 {
650 tracing::error!("Periodic sync failed with {}: {e}", peer_info.id);
652 }
653 }
654 }
655
656 async fn sync_with_peer(&self, peer_id: &PeerId) -> Result<()> {
658 async move {
659 info!(peer = %peer_id, "Starting peer synchronization");
660
661 let (address, sync_trees) = {
663 let sync_tree = self.get_sync_tree().await?;
664 let txn = sync_tree.new_transaction().await?;
665 let peer_manager = PeerManager::new(&txn);
666
667 let peer_info = peer_manager
668 .get_peer_info(peer_id.public_key())
669 .await?
670 .ok_or_else(|| SyncError::PeerNotFound(peer_id.to_string()))?;
671
672 let address = peer_info
673 .addresses
674 .first()
675 .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?
676 .clone();
677
678 let sync_trees = peer_manager.get_peer_trees(peer_id.public_key()).await?;
680
681 (address, sync_trees)
682 }; if sync_trees.is_empty() {
685 debug!(peer = %peer_id, "No trees configured for sync with peer");
686 return Ok(()); }
688
689 info!(peer = %peer_id, tree_count = sync_trees.len(), "Synchronizing trees with peer");
690
691 for tree_id in sync_trees {
692 if let Err(e) = self.sync_tree_with_peer(peer_id, &tree_id, &address).await {
693 tracing::error!("Failed to sync tree {tree_id} with peer {peer_id}: {e}");
695 }
696 }
697
698 info!(peer = %peer_id, "Completed peer synchronization");
699 Ok(())
700 }
701 .instrument(info_span!("sync_with_peer", peer = %peer_id))
702 .await
703 }
704
705 async fn sync_tree_with_peer(
731 &self,
732 peer_id: &PeerId,
733 tree_id: &ID,
734 address: &Address,
735 ) -> Result<()> {
736 async move {
737 trace!(peer = %peer_id, tree = %tree_id, "Starting unified tree synchronization");
738
739 let instance = self.instance()?;
741 let our_tips: Vec<ID> = instance
742 .backend()
743 .snapshot(tree_id)
744 .await
745 .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?
746 .into_tips();
747
748 let our_device_pubkey = Some(instance.id());
750
751 debug!(peer = %peer_id, tree = %tree_id, our_tips = our_tips.len(), "Sending sync tree request");
752
753 let request = SyncRequest::SyncTree(SyncTreeRequest {
755 tree_id: tree_id.clone(),
756 our_tips,
757 peer_pubkey: our_device_pubkey,
758 requesting_key: None, requesting_key_name: None,
760 requested_permission: None,
761 });
762
763 let response = self.transport_manager.send_request(address, &request).await?;
764
765 match response {
766 SyncResponse::Bootstrap(bootstrap_response) => {
767 info!(peer = %peer_id, tree = %tree_id, entry_count = bootstrap_response.all_entries.len() + 1, "Received bootstrap response");
768 self.handle_bootstrap_response(bootstrap_response).await?;
769 }
770 SyncResponse::Incremental(incremental_response) => {
771 debug!(peer = %peer_id, tree = %tree_id,
772 their_tips = incremental_response.their_tips.len(),
773 missing_count = incremental_response.missing_entries.len(),
774 "Received incremental sync response");
775 self.handle_incremental_response(incremental_response).await?;
776 }
777 SyncResponse::Error(msg) => {
778 return Err(SyncError::SyncProtocolError(format!("Sync error: {msg}")).into());
779 }
780 _ => {
781 return Err(SyncError::UnexpectedResponse {
782 expected: "Bootstrap or Incremental",
783 actual: format!("{response:?}"),
784 }.into());
785 }
786 }
787
788 trace!(peer = %peer_id, tree = %tree_id, "Completed unified tree synchronization");
789 Ok(())
790 }
791 .instrument(info_span!("sync_tree", peer = %peer_id, tree = %tree_id))
792 .await
793 }
794
795 async fn check_peer_connections(&mut self) {
797 }
801
802 async fn start_server(&mut self, name: Option<&str>) -> Result<()> {
804 let handler = Arc::new(SyncHandlerImpl::new(
806 self.instance()?,
807 self.sync_tree_id.clone(),
808 ));
809
810 match name {
811 Some(name) => {
812 self.transport_manager.start_server(name, handler).await?;
814 tracing::info!("Sync server started for transport {name}");
815 }
816 None => {
817 self.transport_manager.start_all_servers(handler).await?;
819 tracing::info!("Sync servers started for all transports");
820 }
821 }
822
823 Ok(())
824 }
825
826 async fn stop_server(&mut self, name: Option<&str>) -> Result<()> {
828 match name {
829 Some(name) => {
830 self.transport_manager.stop_server(name).await?;
831 tracing::info!("Sync server stopped for transport {name}");
832 }
833 None => {
834 self.transport_manager.stop_all_servers().await?;
835 tracing::info!("All sync servers stopped");
836 }
837 }
838 Ok(())
839 }
840}