1use std::future::Future;
4use std::time::Duration;
5
6use tokio::sync::oneshot;
7use tracing::{debug, info, warn};
8
9use super::{
10 Address, DatabaseTicket, PeerId, Sync, SyncError,
11 background::SyncCommand,
12 peer_manager::PeerManager,
13 peer_types,
14 protocol::{self, SyncRequest, SyncResponse, SyncTreeRequest},
15 user_sync_manager::UserSyncManager,
16};
17use crate::{
18 Database, Entry, Instance, Result, auth::Permission, auth::crypto::PublicKey, entry::ID,
19 store::DocStore,
20};
21
22use super::utils::collect_ancestors_to_send;
23
24impl Sync {
25 pub async fn sync_tree_with_peer(&self, peer_pubkey: &PublicKey, tree_id: &ID) -> Result<()> {
40 let peer_info = self
42 .get_peer_info(peer_pubkey)
43 .await?
44 .ok_or_else(|| SyncError::PeerNotFound(peer_pubkey.to_string()))?;
45
46 let address = peer_info
47 .addresses
48 .first()
49 .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?;
50
51 let backend = self.backend()?;
53 let our_tips = backend
54 .get_tips(tree_id)
55 .await
56 .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?;
57
58 let our_device_pubkey = self.get_device_pubkey().ok();
60
61 let request = SyncRequest::SyncTree(SyncTreeRequest {
63 tree_id: tree_id.clone(),
64 our_tips,
65 peer_pubkey: our_device_pubkey,
66 requesting_key: None, requesting_key_name: None,
68 requested_permission: None,
69 });
70
71 let (tx, rx) = oneshot::channel();
73 self.background_tx
74 .get()
75 .ok_or(SyncError::NoTransportEnabled)?
76 .send(SyncCommand::SendRequest {
77 address: address.clone(),
78 request: Box::new(request),
79 response: tx,
80 })
81 .await
82 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
83
84 let response = rx
85 .await
86 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
87 .map_err(|e| SyncError::Network(format!("Request failed: {e}")))?;
88
89 match response {
90 SyncResponse::Bootstrap(bootstrap_response) => {
91 self.handle_bootstrap_response(bootstrap_response).await?;
92 }
93 SyncResponse::Incremental(incremental_response) => {
94 self.handle_incremental_response(incremental_response, address)
95 .await?;
96 }
97 SyncResponse::Error(msg) => {
98 return Err(SyncError::SyncProtocolError(format!("Sync error: {msg}")).into());
99 }
100 _ => {
101 return Err(SyncError::UnexpectedResponse {
102 expected: "Bootstrap or Incremental",
103 actual: format!("{response:?}"),
104 }
105 .into());
106 }
107 }
108
109 self.add_tree_sync(peer_pubkey, tree_id).await?;
112
113 Ok(())
114 }
115
116 pub(super) async fn handle_bootstrap_response(
118 &self,
119 response: protocol::BootstrapResponse,
120 ) -> Result<()> {
121 tracing::info!(tree_id = %response.tree_id, "Processing bootstrap response");
122
123 let backend = self.backend()?;
127 backend
128 .put_verified(response.root_entry.clone())
129 .await
130 .map_err(|e| SyncError::BackendError(format!("Failed to store root entry: {e}")))?;
131
132 self.store_received_entries(&response.tree_id, response.all_entries)
134 .await?;
135
136 tracing::info!(tree_id = %response.tree_id, "Bootstrap completed successfully");
137 Ok(())
138 }
139
140 pub(super) async fn handle_incremental_response(
142 &self,
143 response: protocol::IncrementalResponse,
144 peer_address: &peer_types::Address,
145 ) -> Result<()> {
146 tracing::debug!(tree_id = %response.tree_id, "Processing incremental response");
147
148 self.store_received_entries(&response.tree_id, response.missing_entries)
150 .await?;
151
152 let backend = self.backend()?;
154 let our_tips = backend.get_tips(&response.tree_id).await?;
155 let their_tips = &response.their_tips;
156
157 let missing_tip_ids: Vec<_> = our_tips
159 .iter()
160 .filter(|tip_id| !their_tips.contains(tip_id))
161 .cloned()
162 .collect();
163
164 if !missing_tip_ids.is_empty() {
165 tracing::debug!(
166 tree_id = %response.tree_id,
167 missing_tips = missing_tip_ids.len(),
168 "Server is missing some of our entries, sending them back"
169 );
170
171 let backend = self.backend()?;
173 let entries_for_server =
174 collect_ancestors_to_send(backend.as_backend_impl(), &missing_tip_ids, their_tips)
175 .await?;
176
177 if !entries_for_server.is_empty() {
178 self.send_missing_entries_to_peer(
180 peer_address,
181 &response.tree_id,
182 entries_for_server,
183 )
184 .await?;
185 }
186 }
187
188 tracing::debug!(tree_id = %response.tree_id, "Incremental sync completed");
189 Ok(())
190 }
191
192 async fn send_missing_entries_to_peer(
194 &self,
195 peer_address: &peer_types::Address,
196 tree_id: &ID,
197 entries: Vec<Entry>,
198 ) -> Result<()> {
199 if entries.is_empty() {
200 return Ok(());
201 }
202
203 tracing::debug!(
204 tree_id = %tree_id,
205 entry_count = entries.len(),
206 "Sending missing entries back to peer for bidirectional sync"
207 );
208
209 let request = protocol::SyncRequest::SendEntries(entries);
210
211 let (tx, rx) = tokio::sync::oneshot::channel();
213 self.background_tx
214 .get()
215 .ok_or(SyncError::NoTransportEnabled)?
216 .send(SyncCommand::SendRequest {
217 address: peer_address.clone(),
218 request: Box::new(request),
219 response: tx,
220 })
221 .await
222 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
223
224 let response = rx
226 .await
227 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
228 .map_err(|e| SyncError::Network(format!("Request failed: {e}")))?;
229
230 match response {
231 protocol::SyncResponse::Ack | protocol::SyncResponse::Count(_) => {
232 tracing::debug!(tree_id = %tree_id, "Server acknowledged receipt of missing entries");
233 Ok(())
234 }
235 protocol::SyncResponse::Error(e) => {
236 Err(SyncError::Network(format!("Server error receiving entries: {e}")).into())
237 }
238 _ => Err(SyncError::UnexpectedResponse {
239 expected: "Ack or Count",
240 actual: format!("{response:?}"),
241 }
242 .into()),
243 }
244 }
245
246 pub(super) async fn store_received_entries(
248 &self,
249 _tree_id: &ID,
250 entries: Vec<Entry>,
251 ) -> Result<()> {
252 for entry in entries {
253 let calculated_id = entry.id();
255 if entry.id() != calculated_id {
256 return Err(SyncError::InvalidEntry(format!(
257 "Entry ID {} doesn't match calculated ID {}",
258 entry.id(),
259 calculated_id
260 ))
261 .into());
262 }
263
264 let backend = self.backend()?;
268 backend
269 .put_verified(entry)
270 .await
271 .map_err(|e| SyncError::BackendError(format!("Failed to store entry: {e}")))?;
272 }
273
274 Ok(())
275 }
276
277 pub async fn send_entries(
286 &self,
287 entries: impl AsRef<[Entry]>,
288 address: &Address,
289 ) -> Result<()> {
290 let entries_vec = entries.as_ref().to_vec();
291 let request = SyncRequest::SendEntries(entries_vec);
292 let response = self.send_request(&request, address).await?;
293
294 match response {
295 SyncResponse::Ack | SyncResponse::Count(_) => Ok(()),
296 SyncResponse::Error(msg) => Err(SyncError::SyncProtocolError(format!(
297 "Peer {} returned error: {}",
298 address.address, msg
299 ))
300 .into()),
301 _ => Err(SyncError::UnexpectedResponse {
302 expected: "Ack or Count",
303 actual: format!("{response:?}"),
304 }
305 .into()),
306 }
307 }
308
309 pub async fn send_entries_to_peer(&self, peer_id: &PeerId, entries: Vec<Entry>) -> Result<()> {
330 self.background_tx
331 .get()
332 .ok_or(SyncError::NoTransportEnabled)?
333 .send(SyncCommand::SendEntries {
334 peer: peer_id.clone(),
335 entries,
336 })
337 .await
338 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
339 Ok(())
340 }
341
342 pub fn queue_entry_for_sync(
357 &self,
358 peer_id: &PeerId,
359 entry_id: &ID,
360 tree_id: &ID,
361 ) -> Result<()> {
362 if self.background_tx.get().is_none() {
364 return Err(SyncError::NoTransportEnabled.into());
365 }
366
367 self.queue
369 .enqueue(peer_id, entry_id.clone(), tree_id.clone());
370
371 Ok(())
372 }
373
374 pub(crate) async fn on_local_write(
390 &self,
391 entry: &Entry,
392 database: &Database,
393 _instance: &Instance,
394 ) -> Result<()> {
395 if self.background_tx.get().is_none() {
397 return Ok(());
398 }
399
400 let tx = self.sync_tree.new_transaction().await?;
402 let user_mgr = UserSyncManager::new(&tx);
403 let peer_mgr = PeerManager::new(&tx);
404
405 let combined_settings = match user_mgr.get_combined_settings(database.root_id()).await? {
406 Some(settings) => settings,
407 None => {
408 debug!(database_id = %database.root_id(), "No sync settings for database, skipping");
410 return Ok(());
411 }
412 };
413
414 if !combined_settings.sync_enabled || !combined_settings.sync_on_commit {
416 debug!(
417 database_id = %database.root_id(),
418 sync_enabled = combined_settings.sync_enabled,
419 sync_on_commit = combined_settings.sync_on_commit,
420 "Sync not enabled for database"
421 );
422 return Ok(());
423 }
424
425 let peers = peer_mgr.get_tree_peers(database.root_id()).await?;
427
428 if peers.is_empty() {
429 debug!(database_id = %database.root_id(), "No peers configured for database");
430 return Ok(());
431 }
432
433 let entry_id = entry.id();
435 let tree_id = database.root_id();
436
437 debug!(
438 database_id = %tree_id,
439 entry_id = %entry_id,
440 peer_count = peers.len(),
441 "Queueing entry for automatic sync"
442 );
443
444 for peer_id in peers {
445 self.queue_entry_for_sync(&peer_id, &entry_id, tree_id)?;
446 }
447
448 Ok(())
449 }
450
451 pub(super) async fn initialize_user_settings(&self) -> Result<()> {
457 use crate::store::Table;
458 use crate::user::types::UserInfo;
459
460 let user_tracking = self
462 .sync_tree
463 .get_store_viewer::<DocStore>(super::user_sync_manager::USER_TRACKING_SUBTREE)
464 .await?;
465 let all_tracked = user_tracking.get_all().await?;
466
467 if all_tracked.keys().count() == 0 {
468 let instance = self.instance.upgrade().ok_or(SyncError::InstanceDropped)?;
470 let users_db = instance.users_db().await?;
471 let users_table = users_db
472 .get_store_viewer::<Table<UserInfo>>("users")
473 .await?;
474 let all_users = users_table.search(|_| true).await?;
475
476 for (user_uuid, user_info) in all_users {
477 self.sync_user(&user_uuid, &user_info.user_database_id)
478 .await?;
479 }
480 } else {
481 let tx = self.sync_tree.new_transaction().await?;
483 let user_mgr = UserSyncManager::new(&tx);
484
485 for user_uuid in all_tracked.keys() {
486 if let Some((prefs_db_id, _tips)) =
487 user_mgr.get_tracked_user_state(user_uuid).await?
488 {
489 self.sync_user(user_uuid, &prefs_db_id).await?;
490 }
491 }
492 }
493
494 Ok(())
495 }
496
497 pub(super) async fn send_request(
506 &self,
507 request: &SyncRequest,
508 address: &Address,
509 ) -> Result<SyncResponse> {
510 let (tx, rx) = oneshot::channel();
511
512 self.background_tx
513 .get()
514 .ok_or(SyncError::NoTransportEnabled)?
515 .send(SyncCommand::SendRequest {
516 address: address.clone(),
517 request: Box::new(request.clone()),
518 response: tx,
519 })
520 .await
521 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
522
523 rx.await
524 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
525 }
526
527 pub async fn discover_peer_trees(&self, address: &Address) -> Result<Vec<protocol::TreeInfo>> {
538 let _peer_pubkey = self.connect_to_peer(address).await?;
540
541 tracing::warn!(
546 "discover_peer_trees not fully implemented - handshake contains tree info but API needs enhancement"
547 );
548 Ok(vec![])
549 }
550
551 pub async fn sync_with_peer(&self, address: &Address, tree_id: Option<&ID>) -> Result<()> {
568 let peer_pubkey = self.connect_to_peer(address).await?;
570
571 self.add_peer_address(&peer_pubkey, address.clone()).await?;
573
574 if let Some(tree_id) = tree_id {
575 self.sync_tree_with_peer(&peer_pubkey, tree_id).await?;
577 } else {
578 tracing::warn!(
580 "Syncing all trees not yet implemented - need to enhance discover_peer_trees first"
581 );
582 }
583
584 Ok(())
585 }
586
587 pub async fn sync_with_ticket(&self, ticket: &DatabaseTicket) -> Result<()> {
601 let database_id = ticket.database_id().clone();
602 self.try_addresses_concurrently(ticket.addresses(), |sync, addr| {
603 let db_id = database_id.clone();
604 async move { sync.sync_with_peer(&addr, Some(&db_id)).await }
605 })
606 .await
607 }
608
609 pub async fn sync_tree_with_peer_auth(
624 &self,
625 peer_pubkey: &PublicKey,
626 tree_id: &ID,
627 requesting_key: Option<&PublicKey>,
628 requesting_key_name: Option<&str>,
629 requested_permission: Option<Permission>,
630 ) -> Result<()> {
631 let peer_info = self
633 .get_peer_info(peer_pubkey)
634 .await?
635 .ok_or_else(|| SyncError::PeerNotFound(peer_pubkey.to_string()))?;
636
637 let address = peer_info
638 .addresses
639 .first()
640 .ok_or_else(|| SyncError::Network("No addresses found for peer".to_string()))?;
641
642 let backend = self.backend()?;
644 let our_tips = backend
645 .get_tips(tree_id)
646 .await
647 .map_err(|e| SyncError::BackendError(format!("Failed to get local tips: {e}")))?;
648
649 let our_device_pubkey = self.get_device_pubkey().ok();
651
652 let request = SyncRequest::SyncTree(SyncTreeRequest {
654 tree_id: tree_id.clone(),
655 our_tips,
656 peer_pubkey: our_device_pubkey,
657 requesting_key: requesting_key.cloned(),
658 requesting_key_name: requesting_key_name.map(|k| k.to_string()),
659 requested_permission,
660 });
661
662 let (tx, rx) = oneshot::channel();
664 self.background_tx
665 .get()
666 .ok_or(SyncError::NoTransportEnabled)?
667 .send(SyncCommand::SendRequest {
668 address: address.clone(),
669 request: Box::new(request),
670 response: tx,
671 })
672 .await
673 .map_err(|_| {
674 SyncError::CommandSendError("Background sync command channel closed".to_string())
675 })?;
676
677 let response = rx
679 .await
680 .map_err(|_| {
681 SyncError::CommandSendError("Background sync response channel closed".to_string())
682 })?
683 .map_err(|e| SyncError::Network(format!("Sync request failed: {e}")))?;
684
685 match response {
687 SyncResponse::Bootstrap(bootstrap_response) => {
688 info!(peer = %peer_pubkey, tree = %tree_id, entry_count = bootstrap_response.all_entries.len() + 1, "Received bootstrap response");
689
690 let backend = self.backend()?;
692 backend.put_verified(bootstrap_response.root_entry).await?;
693
694 for entry in bootstrap_response.all_entries {
696 backend.put_unverified(entry).await?;
697 }
698
699 info!(peer = %peer_pubkey, tree = %tree_id, "Bootstrap sync completed successfully");
700 }
701 SyncResponse::Incremental(incremental_response) => {
702 info!(peer = %peer_pubkey, tree = %tree_id, missing_count = incremental_response.missing_entries.len(), "Received incremental sync response");
703
704 self.handle_incremental_response(incremental_response, address)
706 .await?;
707
708 debug!(peer = %peer_pubkey, tree = %tree_id, "Incremental sync completed");
709 }
710 SyncResponse::BootstrapPending {
711 request_id,
712 message,
713 } => {
714 info!(peer = %peer_pubkey, tree = %tree_id, request_id = %request_id, "Bootstrap request pending manual approval");
715 return Err(SyncError::BootstrapPending {
716 request_id,
717 message,
718 }
719 .into());
720 }
721 SyncResponse::Error(err) => {
722 return Err(SyncError::Network(format!("Peer returned error: {err}")).into());
723 }
724 _ => {
725 return Err(SyncError::SyncProtocolError(
726 "Unexpected response type for sync tree request".to_string(),
727 )
728 .into());
729 }
730 }
731
732 self.add_tree_sync(peer_pubkey, tree_id).await?;
735
736 Ok(())
737 }
738
739 pub async fn flush(&self) -> Result<()> {
754 let (tx, rx) = oneshot::channel();
755
756 self.background_tx
757 .get()
758 .ok_or(SyncError::NoTransportEnabled)?
759 .send(SyncCommand::Flush { response: tx })
760 .await
761 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
762
763 rx.await
764 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
765 }
766
767 const ADDRESS_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(30);
770
771 pub(super) async fn try_addresses_concurrently<F, Fut>(
783 &self,
784 addresses: &[Address],
785 f: F,
786 ) -> Result<()>
787 where
788 F: Fn(Sync, Address) -> Fut,
789 Fut: Future<Output = Result<()>> + Send + 'static,
790 {
791 if addresses.is_empty() {
792 return Err(SyncError::InvalidAddress("Ticket has no address hints".into()).into());
793 }
794
795 let (tx, mut rx) = tokio::sync::mpsc::channel(addresses.len());
796
797 for addr in addresses {
798 let tx = tx.clone();
799 let fut = f(self.clone(), addr.clone());
800 let addr_info = addr.clone();
801 tokio::spawn(async move {
803 let result = tokio::time::timeout(Self::ADDRESS_ATTEMPT_TIMEOUT, fut).await;
804 let result = match result {
805 Ok(inner) => inner,
806 Err(_) => {
807 warn!(
808 address = ?addr_info,
809 "Address attempt timed out after {:?}",
810 Self::ADDRESS_ATTEMPT_TIMEOUT,
811 );
812 Err(SyncError::Network(format!(
813 "Address attempt timed out after {:?}",
814 Self::ADDRESS_ATTEMPT_TIMEOUT,
815 ))
816 .into())
817 }
818 };
819 match &result {
820 Ok(()) => debug!(address = ?addr_info, "Address attempt succeeded"),
821 Err(e) => debug!(address = ?addr_info, error = %e, "Address attempt failed"),
822 }
823 let _ = tx.send(result).await;
826 });
827 }
828 drop(tx);
830
831 let mut last_err = None;
832 while let Some(result) = rx.recv().await {
833 match result {
834 Ok(()) => return Ok(()),
835 Err(e) => last_err = Some(e),
836 }
837 }
838
839 Err(last_err.expect("at least one task was spawned"))
840 }
841}