1use std::{sync::Arc, time::Duration};
8
9use tokio::{
10 sync::{mpsc, oneshot},
11 time::interval,
12};
13use tracing::{Instrument, debug, info, info_span, trace};
14
15use super::{
16 error::SyncError,
17 handler::SyncHandlerImpl,
18 peer_manager::PeerManager,
19 peer_types::{Address, PeerId, PeerStatus},
20 protocol::{SyncRequest, SyncResponse, SyncTreeRequest},
21 queue::SyncQueue,
22 transport_manager::TransportManager,
23};
24use crate::{
25 Database, Error, Instance, Result, WeakInstance,
26 auth::crypto::PublicKey,
27 database::DatabaseKey,
28 entry::{Entry, ID},
29 store::DocStore,
30};
31
32mod conn;
33
34#[allow(clippy::large_enum_variant)]
36pub enum SyncCommand {
37 SendEntries { peer: PeerId, entries: Vec<Entry> },
39 SyncWithPeer { peer: PeerId },
41 Shutdown,
43
44 AddTransport {
47 name: String,
48 transport: Box<dyn super::transports::SyncTransport>,
49 response: oneshot::Sender<Result<()>>,
50 },
51
52 StartServer {
55 name: Option<String>,
57 response: oneshot::Sender<Result<()>>,
58 },
59 StopServer {
61 name: Option<String>,
63 response: oneshot::Sender<Result<()>>,
64 },
65 GetServerAddress {
67 name: String,
68 response: oneshot::Sender<Result<String>>,
69 },
70 GetAllServerAddresses {
72 response: oneshot::Sender<Result<Vec<(String, String)>>>,
73 },
74
75 ConnectToPeer {
78 address: Address,
79 response: oneshot::Sender<Result<PublicKey>>, },
81
82 SendRequest {
85 address: Address,
86 request: Box<SyncRequest>,
87 response: oneshot::Sender<Result<SyncResponse>>,
88 },
89
90 Flush {
92 response: oneshot::Sender<Result<()>>,
93 },
94}
95
96impl std::fmt::Debug for SyncCommand {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 match self {
104 Self::SendEntries { peer, entries } => f
105 .debug_struct("SendEntries")
106 .field("peer", peer)
107 .field("entries_count", &entries.len())
108 .finish(),
109 Self::SyncWithPeer { peer } => {
110 f.debug_struct("SyncWithPeer").field("peer", peer).finish()
111 }
112 Self::Shutdown => write!(f, "Shutdown"),
113 Self::AddTransport {
114 name, transport, ..
115 } => f
116 .debug_struct("AddTransport")
117 .field("name", name)
118 .field("transport_type", &transport.transport_type())
119 .finish(),
120 Self::StartServer { name, .. } => {
121 f.debug_struct("StartServer").field("name", name).finish()
122 }
123 Self::StopServer { name, .. } => {
124 f.debug_struct("StopServer").field("name", name).finish()
125 }
126 Self::GetServerAddress { name, .. } => f
127 .debug_struct("GetServerAddress")
128 .field("name", name)
129 .finish(),
130 Self::GetAllServerAddresses { .. } => write!(f, "GetAllServerAddresses"),
131 Self::ConnectToPeer { address, .. } => f
132 .debug_struct("ConnectToPeer")
133 .field("address", address)
134 .finish(),
135 Self::SendRequest {
136 address, request, ..
137 } => f
138 .debug_struct("SendRequest")
139 .field("address", address)
140 .field("request", request)
141 .finish(),
142 Self::Flush { .. } => write!(f, "Flush"),
143 }
144 }
145}
146
147#[derive(Debug, Clone)]
149struct RetryEntry {
150 peer: PeerId,
151 entries: Vec<Entry>,
152 attempts: u32,
153 last_attempt_ms: u64,
155}
156
157pub struct BackgroundSync {
159 pub(super) transport_manager: TransportManager,
161 instance: WeakInstance,
162 pub(super) sync_tree_id: ID,
163
164 queue: Arc<SyncQueue>,
166
167 retry_queue: Vec<RetryEntry>,
169
170 command_rx: mpsc::Receiver<SyncCommand>,
172}
173
174impl BackgroundSync {
175 pub fn start(
180 instance: Instance,
181 sync_tree_id: ID,
182 queue: Arc<SyncQueue>,
183 ) -> mpsc::Sender<SyncCommand> {
184 let (tx, rx) = mpsc::channel(100);
185
186 let background = Self {
187 transport_manager: TransportManager::new(),
188 instance: instance.downgrade(),
189 sync_tree_id,
190 queue,
191 retry_queue: Vec::new(),
192 command_rx: rx,
193 };
194
195 tokio::spawn(background.run());
198 tx
199 }
200
201 pub(super) fn instance(&self) -> Result<Instance> {
203 self.instance
204 .upgrade()
205 .ok_or_else(|| SyncError::InstanceDropped.into())
206 }
207
208 pub(super) async fn get_sync_tree(&self) -> Result<Database> {
210 let instance = self.instance()?;
212 let signing_key = instance.device_key().clone();
213
214 Database::open(instance, &self.sync_tree_id, DatabaseKey::new(signing_key)).await
215 }
216
217 async fn get_min_sync_interval(&self) -> Option<u64> {
220 let sync_tree = match self.get_sync_tree().await {
221 Ok(tree) => tree,
222 Err(_) => return None,
223 };
224
225 let txn = match sync_tree.new_transaction().await {
226 Ok(txn) => txn,
227 Err(_) => return None,
228 };
229
230 let user_mgr = super::user_sync_manager::UserSyncManager::new(&txn);
231
232 let database_users = match txn
234 .get_store::<DocStore>(super::user_sync_manager::DATABASE_USERS_SUBTREE)
235 .await
236 {
237 Ok(store) => store,
238 Err(_) => return None,
239 };
240
241 let all_dbs = match database_users.get_all().await {
242 Ok(doc) => doc,
243 Err(_) => return None,
244 };
245
246 let mut min_interval: Option<u64> = None;
248 for db_id_str in all_dbs.keys() {
249 if let Ok(db_id) = ID::parse(db_id_str)
250 && let Ok(Some(settings)) = user_mgr.get_combined_settings(&db_id).await
251 && let Some(interval) = settings.interval_seconds
252 {
253 min_interval = Some(match min_interval {
254 Some(current_min) => current_min.min(interval),
255 None => interval,
256 });
257 }
258 }
259
260 min_interval
261 }
262
263 async fn run(mut self) {
265 async move {
266 info!("Starting background sync engine");
267
268 let mut current_interval_secs = self.get_min_sync_interval().await.unwrap_or(300);
270 info!("Initial periodic sync interval: {} seconds", current_interval_secs);
271
272 let mut periodic_sync = interval(Duration::from_secs(current_interval_secs));
274 let mut queue_check = interval(Duration::from_secs(5)); let mut retry_check = interval(Duration::from_secs(30)); let mut connection_check = interval(Duration::from_secs(60)); let mut settings_check = interval(Duration::from_secs(60)); periodic_sync.tick().await;
281 queue_check.tick().await;
282 retry_check.tick().await;
283 connection_check.tick().await;
284 settings_check.tick().await;
285
286 loop {
287 tokio::select! {
288 Some(cmd) = self.command_rx.recv() => {
290 if let Err(e) = self.handle_command(cmd).await {
291 tracing::error!("Background sync command error: {e}");
293 }
294 }
295
296 _ = queue_check.tick() => {
298 self.process_queue().await;
299 }
300
301 _ = periodic_sync.tick() => {
303 self.periodic_sync_all_peers().await;
304 }
305
306 _ = retry_check.tick() => {
308 self.process_retry_queue().await;
309 }
310
311 _ = connection_check.tick() => {
313 self.check_peer_connections().await;
314 }
315
316 _ = settings_check.tick() => {
318 if let Some(new_interval) = self.get_min_sync_interval().await
319 && new_interval != current_interval_secs {
320 info!("Sync interval changed from {} to {} seconds", current_interval_secs, new_interval);
321 current_interval_secs = new_interval;
322 periodic_sync = interval(Duration::from_secs(new_interval));
324 periodic_sync.tick().await; }
326 }
327
328 else => {
330 info!("Background sync engine shutting down");
332 break;
333 }
334 }
335 }
336 }
337 .instrument(info_span!("background_sync"))
338 .await
339 }
340
341 async fn handle_command(&mut self, command: SyncCommand) -> Result<()> {
343 match command {
344 SyncCommand::SendEntries { peer, entries } => {
345 if let Err(e) = self.send_to_peer(&peer, entries.clone()).await {
346 let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
347 self.add_to_retry_queue(peer, entries, e, now_ms);
348 }
349 }
350
351 SyncCommand::SyncWithPeer { peer } => {
352 if let Err(e) = self.sync_with_peer(&peer).await {
353 tracing::error!("Failed to sync with peer {peer}: {e}");
355 }
356 }
357
358 SyncCommand::AddTransport {
359 name,
360 transport,
361 response,
362 } => {
363 if let Some(old) = self.transport_manager.get_mut(&name)
365 && old.is_server_running()
366 {
367 let _ = old.stop_server().await;
368 }
369 self.transport_manager.add(&name, transport);
370 tracing::debug!("Added transport: {}", name);
371 let _ = response.send(Ok(()));
372 }
373
374 SyncCommand::StartServer { name, response } => {
375 let result = self.start_server(name.as_deref()).await;
376 let _ = response.send(result);
377 }
378
379 SyncCommand::StopServer { name, response } => {
380 let result = self.stop_server(name.as_deref()).await;
381 let _ = response.send(result);
382 }
383
384 SyncCommand::GetServerAddress { name, response } => {
385 let result = self.transport_manager.get_server_address(&name);
386 let _ = response.send(result);
387 }
388
389 SyncCommand::GetAllServerAddresses { response } => {
390 let addresses = self.transport_manager.get_all_server_addresses();
391 let _ = response.send(Ok(addresses));
392 }
393
394 SyncCommand::ConnectToPeer { address, response } => {
395 let result = self.connect_to_peer(&address).await;
396 let _ = response.send(result);
397 }
398
399 SyncCommand::SendRequest {
400 address,
401 request,
402 response,
403 } => {
404 let result = self.send_sync_request(&address, &request).await;
405 let _ = response.send(result);
406 }
407
408 SyncCommand::Flush { response } => {
409 let retry_failures = self.flush_retry_queue().await;
412 let queue_failures = self.process_queue().await;
413
414 let result = match (retry_failures, queue_failures) {
416 (0, 0) => Ok(()),
417 (r, q) => Err(SyncError::Network(format!(
418 "Flush had failures: {r} from retry queue, {q} from new entries"
419 ))
420 .into()),
421 };
422 let _ = response.send(result);
423 }
424
425 SyncCommand::Shutdown => {
426 return Err(SyncError::Network("Shutdown requested".to_string()).into());
428 }
429 }
430 Ok(())
431 }
432
433 async fn send_to_peer(&self, peer: &PeerId, entries: Vec<Entry>) -> Result<()> {
450 let address = {
452 let sync_tree = self.get_sync_tree().await?;
453 let txn = sync_tree.new_transaction().await?;
454 let peer_info = PeerManager::new(&txn)
455 .get_peer_info(peer.public_key())
456 .await?
457 .ok_or_else(|| SyncError::PeerNotFound(peer.to_string()))?;
458
459 peer_info
460 .addresses
461 .first()
462 .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?
463 .clone()
464 }; let request = SyncRequest::SendEntries(entries);
467 let response = self
468 .transport_manager
469 .send_request(&address, &request)
470 .await?;
471
472 match response {
473 SyncResponse::Ack | SyncResponse::Count(_) => Ok(()),
474 SyncResponse::Error(msg) => Err(SyncError::SyncProtocolError(format!(
475 "Peer {peer} returned error: {msg}"
476 ))
477 .into()),
478 _ => Err(SyncError::UnexpectedResponse {
479 expected: "Ack or Count",
480 actual: format!("{response:?}"),
481 }
482 .into()),
483 }
484 }
485
486 fn add_to_retry_queue(&mut self, peer: PeerId, entries: Vec<Entry>, error: Error, now_ms: u64) {
488 tracing::warn!("Failed to send to {peer}: {error}. Adding to retry queue.");
490 self.retry_queue.push(RetryEntry {
491 peer,
492 entries,
493 attempts: 1,
494 last_attempt_ms: now_ms,
495 });
496 }
497
498 async fn process_queue(&mut self) -> usize {
504 let batches = self.queue.drain();
505 if batches.is_empty() {
506 return 0;
507 }
508
509 let instance = match self.instance() {
510 Ok(i) => i,
511 Err(e) => {
512 tracing::warn!("Failed to get instance for queue processing: {e}");
513 return batches.len(); }
515 };
516
517 let mut failures = 0;
518 for (peer, entry_ids) in batches {
519 let mut entries = Vec::with_capacity(entry_ids.len());
521 for (entry_id, _tree_id) in &entry_ids {
522 match instance.backend().get(entry_id).await {
523 Ok(entry) => entries.push(entry),
524 Err(e) => {
525 tracing::warn!("Failed to fetch entry {entry_id} for peer {peer}: {e}");
526 }
527 }
528 }
529
530 if entries.is_empty() {
531 continue;
532 }
533
534 if let Err(e) = self.send_to_peer(&peer, entries.clone()).await {
536 let now_ms = instance.clock().now_millis();
537 self.add_to_retry_queue(peer, entries, e, now_ms);
538 failures += 1;
539 }
540 }
541 failures
542 }
543
544 async fn process_retry_queue(&mut self) {
546 let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
547 let mut still_failed = Vec::new();
548
549 let retry_queue = std::mem::take(&mut self.retry_queue);
551
552 for mut entry in retry_queue {
554 let backoff_ms = 2u64.pow(entry.attempts.min(6)) * 1000;
556 let elapsed_ms = now_ms.saturating_sub(entry.last_attempt_ms);
557
558 if elapsed_ms >= backoff_ms {
559 if let Err(_e) = self.send_to_peer(&entry.peer, entry.entries.clone()).await {
561 entry.attempts += 1;
562 entry.last_attempt_ms = now_ms;
563
564 if entry.attempts < 10 {
565 still_failed.push(entry);
567 } else {
568 tracing::error!("Giving up on sending to {} after 10 attempts", entry.peer);
570 }
571 } else {
572 }
574 } else {
575 still_failed.push(entry);
577 }
578 }
579
580 self.retry_queue = still_failed;
581 }
582
583 async fn flush_retry_queue(&mut self) -> usize {
586 let mut still_failed = Vec::new();
587 let now_ms = self.instance().map(|i| i.clock().now_millis()).unwrap_or(0);
588
589 let retry_queue = std::mem::take(&mut self.retry_queue);
591
592 for mut retry_entry in retry_queue {
594 if let Err(_e) = self
595 .send_to_peer(&retry_entry.peer, retry_entry.entries.clone())
596 .await
597 {
598 retry_entry.attempts += 1;
599 retry_entry.last_attempt_ms = now_ms;
600
601 if retry_entry.attempts < 10 {
602 still_failed.push(retry_entry);
603 } else {
604 tracing::error!(
605 "Giving up on sending to {} after 10 attempts",
606 retry_entry.peer
607 );
608 }
609 }
610 }
611
612 let failed_count = still_failed.len();
613 self.retry_queue = still_failed;
614 failed_count
615 }
616
617 async fn periodic_sync_all_peers(&self) {
619 let peers = match self.get_sync_tree().await {
623 Ok(sync_tree) => match sync_tree.new_transaction().await {
624 Ok(txn) => match PeerManager::new(&txn).list_peers().await {
625 Ok(peers) => {
626 peers
628 }
629 Err(_) => {
630 return;
632 }
633 },
634 Err(_) => {
635 return;
637 }
638 },
639 Err(_) => {
640 return;
642 }
643 };
644
645 for peer_info in peers {
647 if peer_info.status == PeerStatus::Active
648 && let Err(e) = self.sync_with_peer(&peer_info.id).await
649 {
650 tracing::error!("Periodic sync failed with {}: {e}", peer_info.id);
652 }
653 }
654 }
655
656 async fn sync_with_peer(&self, peer_id: &PeerId) -> Result<()> {
658 async move {
659 info!(peer = %peer_id, "Starting peer synchronization");
660
661 let (address, sync_trees) = {
663 let sync_tree = self.get_sync_tree().await?;
664 let txn = sync_tree.new_transaction().await?;
665 let peer_manager = PeerManager::new(&txn);
666
667 let peer_info = peer_manager
668 .get_peer_info(peer_id.public_key())
669 .await?
670 .ok_or_else(|| SyncError::PeerNotFound(peer_id.to_string()))?;
671
672 let address = peer_info
673 .addresses
674 .first()
675 .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?
676 .clone();
677
678 let sync_trees = peer_manager.get_peer_trees(peer_id.public_key()).await?;
680
681 (address, sync_trees)
682 }; if sync_trees.is_empty() {
685 debug!(peer = %peer_id, "No trees configured for sync with peer");
686 return Ok(()); }
688
689 info!(peer = %peer_id, tree_count = sync_trees.len(), "Synchronizing trees with peer");
690
691 for tree_id_str in sync_trees {
692 let tree_id = ID::from(tree_id_str.as_str());
694 if let Err(e) = self.sync_tree_with_peer(peer_id, &tree_id, &address).await {
695 tracing::error!("Failed to sync tree {tree_id} with peer {peer_id}: {e}");
697 }
698 }
699
700 info!(peer = %peer_id, "Completed peer synchronization");
701 Ok(())
702 }
703 .instrument(info_span!("sync_with_peer", peer = %peer_id))
704 .await
705 }
706
707 async fn sync_tree_with_peer(
733 &self,
734 peer_id: &PeerId,
735 tree_id: &ID,
736 address: &Address,
737 ) -> Result<()> {
738 async move {
739 trace!(peer = %peer_id, tree = %tree_id, "Starting unified tree synchronization");
740
741 let instance = self.instance()?;
743 let our_tips = instance
744 .backend()
745 .get_tips(tree_id)
746 .await
747 .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?;
748
749 let our_device_pubkey = Some(instance.device_id());
751
752 debug!(peer = %peer_id, tree = %tree_id, our_tips = our_tips.len(), "Sending sync tree request");
753
754 let request = SyncRequest::SyncTree(SyncTreeRequest {
756 tree_id: tree_id.clone(),
757 our_tips,
758 peer_pubkey: our_device_pubkey,
759 requesting_key: None, requesting_key_name: None,
761 requested_permission: None,
762 });
763
764 let response = self.transport_manager.send_request(address, &request).await?;
765
766 match response {
767 SyncResponse::Bootstrap(bootstrap_response) => {
768 info!(peer = %peer_id, tree = %tree_id, entry_count = bootstrap_response.all_entries.len() + 1, "Received bootstrap response");
769 self.handle_bootstrap_response(bootstrap_response).await?;
770 }
771 SyncResponse::Incremental(incremental_response) => {
772 debug!(peer = %peer_id, tree = %tree_id,
773 their_tips = incremental_response.their_tips.len(),
774 missing_count = incremental_response.missing_entries.len(),
775 "Received incremental sync response");
776 self.handle_incremental_response(incremental_response).await?;
777 }
778 SyncResponse::Error(msg) => {
779 return Err(SyncError::SyncProtocolError(format!("Sync error: {msg}")).into());
780 }
781 _ => {
782 return Err(SyncError::UnexpectedResponse {
783 expected: "Bootstrap or Incremental",
784 actual: format!("{response:?}"),
785 }.into());
786 }
787 }
788
789 trace!(peer = %peer_id, tree = %tree_id, "Completed unified tree synchronization");
790 Ok(())
791 }
792 .instrument(info_span!("sync_tree", peer = %peer_id, tree = %tree_id))
793 .await
794 }
795
796 async fn check_peer_connections(&mut self) {
798 }
802
803 async fn start_server(&mut self, name: Option<&str>) -> Result<()> {
805 let handler = Arc::new(SyncHandlerImpl::new(
807 self.instance()?,
808 self.sync_tree_id.clone(),
809 ));
810
811 match name {
812 Some(name) => {
813 self.transport_manager.start_server(name, handler).await?;
815 tracing::info!("Sync server started for transport {name}");
816 }
817 None => {
818 self.transport_manager.start_all_servers(handler).await?;
820 tracing::info!("Sync servers started for all transports");
821 }
822 }
823
824 Ok(())
825 }
826
827 async fn stop_server(&mut self, name: Option<&str>) -> Result<()> {
829 match name {
830 Some(name) => {
831 self.transport_manager.stop_server(name).await?;
832 tracing::info!("Sync server stopped for transport {name}");
833 }
834 None => {
835 self.transport_manager.stop_all_servers().await?;
836 tracing::info!("All sync servers stopped");
837 }
838 }
839 Ok(())
840 }
841}