Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/sync/
peer.rs

1//! Peer management, sync relationships, and address handling for the sync system.
2
3use tokio::sync::oneshot;
4use tracing::info;
5
6use super::{
7    Address, ConnectionState, PeerId, PeerInfo, PeerStatus, Sync, SyncError, SyncHandle,
8    SyncPeerInfo, SyncStatus, background::SyncCommand, peer_manager::PeerManager,
9};
10use crate::{Result, auth::crypto::PublicKey, entry::ID};
11
12impl Sync {
13    // === Peer Management Methods ===
14
15    /// Register a new remote peer in the sync network.
16    ///
17    /// # Arguments
18    /// * `pubkey` - The peer's public key
19    /// * `display_name` - Optional human-readable name for the peer
20    ///
21    /// # Returns
22    /// A Result indicating success or an error.
23    pub async fn register_peer(
24        &self,
25        pubkey: &PublicKey,
26        display_name: Option<&str>,
27    ) -> Result<()> {
28        // Store in sync tree via PeerManager
29        let txn = self.sync_tree.new_transaction().await?;
30        PeerManager::new(&txn)
31            .register_peer(pubkey, display_name)
32            .await?;
33        txn.commit().await?;
34
35        // Background sync will read peer info directly from sync tree when needed
36        Ok(())
37    }
38
39    /// Update the status of a registered peer.
40    ///
41    /// # Arguments
42    /// * `pubkey` - The peer's public key
43    /// * `status` - The new status for the peer
44    ///
45    /// # Returns
46    /// A Result indicating success or an error.
47    pub async fn update_peer_status(&self, pubkey: &PublicKey, status: PeerStatus) -> Result<()> {
48        let txn = self.sync_tree.new_transaction().await?;
49        PeerManager::new(&txn)
50            .update_peer_status(pubkey, status)
51            .await?;
52        txn.commit().await?;
53        Ok(())
54    }
55
56    /// Get information about a registered peer.
57    ///
58    /// # Arguments
59    /// * `pubkey` - The peer's public key
60    ///
61    /// # Returns
62    /// The peer information if found, None otherwise.
63    pub async fn get_peer_info(&self, pubkey: &PublicKey) -> Result<Option<PeerInfo>> {
64        let txn = self.sync_tree.new_transaction().await?;
65        PeerManager::new(&txn).get_peer_info(pubkey).await
66        // No commit - just reading
67    }
68
69    /// List all registered peers.
70    ///
71    /// # Returns
72    /// A vector of all registered peer information.
73    pub async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
74        let txn = self.sync_tree.new_transaction().await?;
75        PeerManager::new(&txn).list_peers().await
76        // No commit - just reading
77    }
78
79    /// Remove a peer from the sync network.
80    ///
81    /// This removes the peer entry and all associated sync relationships and transport info.
82    ///
83    /// # Arguments
84    /// * `pubkey` - The peer's public key
85    ///
86    /// # Returns
87    /// A Result indicating success or an error.
88    pub async fn remove_peer(&self, pubkey: &PublicKey) -> Result<()> {
89        let txn = self.sync_tree.new_transaction().await?;
90        PeerManager::new(&txn).remove_peer(pubkey).await?;
91        txn.commit().await?;
92        Ok(())
93    }
94
95    // === Declarative Sync API ===
96
97    /// Register a peer for syncing (declarative API).
98    ///
99    /// This is the recommended way to set up syncing. It immediately registers
100    /// the peer and tree/peer relationship, then the background sync engine
101    /// handles the actual data synchronization.
102    ///
103    /// # Arguments
104    /// * `info` - Information about the peer and sync configuration
105    ///
106    /// # Returns
107    /// A handle for tracking sync status and adding more address hints.
108    ///
109    /// # Example
110    /// ```no_run
111    /// # use eidetica::*;
112    /// # use eidetica::sync::{SyncPeerInfo, Address, AuthParams};
113    /// # use eidetica::auth::PublicKey;
114    /// # async fn example(sync: sync::Sync, peer_pubkey: PublicKey, tree_id: entry::ID) -> Result<()> {
115    /// // Register peer for syncing
116    /// let handle = sync.register_sync_peer(SyncPeerInfo {
117    ///     peer_pubkey,
118    ///     tree_id,
119    ///     addresses: vec![Address {
120    ///         transport_type: "http".to_string(),
121    ///         address: "http://localhost:8080".to_string(),
122    ///     }],
123    ///     auth: None,
124    ///     display_name: Some("My Peer".to_string()),
125    /// }).await?;
126    ///
127    /// // Optionally wait for initial sync
128    /// handle.wait_for_initial_sync().await?;
129    ///
130    /// // Check status anytime
131    /// let status = handle.status().await?;
132    /// println!("Has local data: {}", status.has_local_data);
133    /// # Ok(())
134    /// # }
135    /// ```
136    pub async fn register_sync_peer(&self, info: SyncPeerInfo) -> Result<SyncHandle> {
137        let txn = self.sync_tree.new_transaction().await?;
138        let peer_mgr = PeerManager::new(&txn);
139
140        // Register peer if it doesn't exist
141        if peer_mgr.get_peer_info(&info.peer_pubkey).await?.is_none() {
142            peer_mgr
143                .register_peer(&info.peer_pubkey, info.display_name.as_deref())
144                .await?;
145        }
146
147        // Add all address hints
148        for addr in &info.addresses {
149            peer_mgr
150                .add_address(&info.peer_pubkey, addr.clone())
151                .await?;
152        }
153
154        // Register the tree/peer relationship
155        peer_mgr
156            .add_tree_sync(&info.peer_pubkey, &info.tree_id)
157            .await?;
158
159        // TODO: Store auth params if provided for bootstrap
160        // For now, auth is passed during the actual sync handshake via on_local_write callback
161
162        txn.commit().await?;
163
164        info!(
165            peer = %info.peer_pubkey,
166            tree = %info.tree_id,
167            address_count = info.addresses.len(),
168            "Registered peer for syncing"
169        );
170
171        Ok(SyncHandle {
172            tree_id: info.tree_id,
173            peer_pubkey: info.peer_pubkey,
174            sync: self.clone(),
175        })
176    }
177
178    /// Get the current sync status for a tree/peer pair.
179    ///
180    /// # Arguments
181    /// * `tree_id` - The tree to check
182    /// * `peer_pubkey` - The peer public key
183    ///
184    /// # Returns
185    /// Current sync status including whether we have local data.
186    pub async fn get_sync_status(
187        &self,
188        tree_id: &ID,
189        _peer_pubkey: &PublicKey,
190    ) -> Result<SyncStatus> {
191        // Check if we have local data for this tree
192        let backend = self.backend()?;
193        let our_tips = backend.get_tips(tree_id).await.unwrap_or_default();
194
195        // TODO: Track last_sync time and last_error in sync tree
196        // For now, just report if we have data
197        Ok(SyncStatus {
198            has_local_data: !our_tips.is_empty(),
199            last_sync: None,
200            last_error: None,
201        })
202    }
203
204    // === Database Sync Relationship Methods ===
205
206    /// Add a tree to the sync relationship with a peer.
207    ///
208    /// # Arguments
209    /// * `peer_pubkey` - The peer's public key
210    /// * `tree_root_id` - The root ID of the tree to sync
211    ///
212    /// # Returns
213    /// A Result indicating success or an error.
214    pub async fn add_tree_sync(
215        &self,
216        peer_pubkey: &PublicKey,
217        tree_root_id: impl AsRef<str>,
218    ) -> Result<()> {
219        let txn = self.sync_tree.new_transaction().await?;
220        PeerManager::new(&txn)
221            .add_tree_sync(peer_pubkey, tree_root_id.as_ref())
222            .await?;
223        txn.commit().await?;
224        Ok(())
225    }
226
227    /// Remove a tree from the sync relationship with a peer.
228    ///
229    /// # Arguments
230    /// * `peer_pubkey` - The peer's public key
231    /// * `tree_root_id` - The root ID of the tree to stop syncing
232    ///
233    /// # Returns
234    /// A Result indicating success or an error.
235    pub async fn remove_tree_sync(
236        &self,
237        peer_pubkey: &PublicKey,
238        tree_root_id: impl AsRef<str>,
239    ) -> Result<()> {
240        let txn = self.sync_tree.new_transaction().await?;
241        PeerManager::new(&txn)
242            .remove_tree_sync(peer_pubkey, tree_root_id.as_ref())
243            .await?;
244        txn.commit().await?;
245        Ok(())
246    }
247
248    /// Get the list of trees synced with a peer.
249    ///
250    /// # Arguments
251    /// * `peer_pubkey` - The peer's public key
252    ///
253    /// # Returns
254    /// A vector of tree root IDs synced with this peer.
255    pub async fn get_peer_trees(&self, peer_pubkey: &PublicKey) -> Result<Vec<String>> {
256        let txn = self.sync_tree.new_transaction().await?;
257        PeerManager::new(&txn).get_peer_trees(peer_pubkey).await
258        // No commit - just reading
259    }
260
261    /// Get all peers that sync a specific tree.
262    ///
263    /// # Arguments
264    /// * `tree_root_id` - The root ID of the tree
265    ///
266    /// # Returns
267    /// A vector of peer IDs that sync this tree.
268    pub async fn get_tree_peers(&self, tree_root_id: impl AsRef<str>) -> Result<Vec<PeerId>> {
269        let txn = self.sync_tree.new_transaction().await?;
270        PeerManager::new(&txn)
271            .get_tree_peers(tree_root_id.as_ref())
272            .await
273        // No commit - just reading
274    }
275
276    /// Connect to a remote peer and perform handshake.
277    ///
278    /// This method initiates a connection to a peer, performs the handshake protocol,
279    /// and automatically registers the peer if successful.
280    ///
281    /// # Arguments
282    /// * `address` - The address of the peer to connect to
283    ///
284    /// # Returns
285    /// A Result containing the peer's public key if successful.
286    pub async fn connect_to_peer(&self, address: &Address) -> Result<PublicKey> {
287        let (tx, rx) = oneshot::channel();
288
289        self.background_tx
290            .get()
291            .ok_or(SyncError::NoTransportEnabled)?
292            .send(SyncCommand::ConnectToPeer {
293                address: address.clone(),
294                response: tx,
295            })
296            .await
297            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
298
299        rx.await
300            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
301    }
302
303    /// Update the connection state of a peer.
304    ///
305    /// # Arguments
306    /// * `pubkey` - The peer's public key
307    /// * `state` - The new connection state
308    ///
309    /// # Returns
310    /// A Result indicating success or an error.
311    pub async fn update_peer_connection_state(
312        &self,
313        pubkey: &PublicKey,
314        state: ConnectionState,
315    ) -> Result<()> {
316        let txn = self.sync_tree.new_transaction().await?;
317        let peer_manager = PeerManager::new(&txn);
318
319        // Get current peer info
320        let mut peer_info = match peer_manager.get_peer_info(pubkey).await? {
321            Some(info) => info,
322            None => return Err(SyncError::PeerNotFound(pubkey.to_string()).into()),
323        };
324
325        // Update connection state
326        peer_info.connection_state = state;
327        peer_info.touch_at(txn.now_rfc3339()?);
328
329        // Save updated peer info
330        peer_manager.update_peer_info(pubkey, peer_info).await?;
331        txn.commit().await?;
332        Ok(())
333    }
334
335    /// Check if a tree is synced with a specific peer.
336    ///
337    /// # Arguments
338    /// * `peer_pubkey` - The peer's public key
339    /// * `tree_root_id` - The root ID of the tree
340    ///
341    /// # Returns
342    /// True if the tree is synced with the peer, false otherwise.
343    pub async fn is_tree_synced_with_peer(
344        &self,
345        peer_pubkey: &PublicKey,
346        tree_root_id: impl AsRef<str>,
347    ) -> Result<bool> {
348        let txn = self.sync_tree.new_transaction().await?;
349        PeerManager::new(&txn)
350            .is_tree_synced_with_peer(peer_pubkey, tree_root_id.as_ref())
351            .await
352        // No commit - just reading
353    }
354
355    // === Address Management Methods ===
356
357    /// Add an address to a peer.
358    ///
359    /// # Arguments
360    /// * `peer_pubkey` - The peer's public key
361    /// * `address` - The address to add
362    ///
363    /// # Returns
364    /// A Result indicating success or an error.
365    pub async fn add_peer_address(&self, peer_pubkey: &PublicKey, address: Address) -> Result<()> {
366        // Update sync tree via PeerManager
367        let txn = self.sync_tree.new_transaction().await?;
368        PeerManager::new(&txn)
369            .add_address(peer_pubkey, address)
370            .await?;
371        txn.commit().await?;
372
373        // Background sync will read updated peer info directly from sync tree when needed
374        Ok(())
375    }
376
377    /// Remove a specific address from a peer.
378    ///
379    /// # Arguments
380    /// * `peer_pubkey` - The peer's public key
381    /// * `address` - The address to remove
382    ///
383    /// # Returns
384    /// A Result indicating success or an error (true if removed, false if not found).
385    pub async fn remove_peer_address(
386        &self,
387        peer_pubkey: &PublicKey,
388        address: &Address,
389    ) -> Result<bool> {
390        let txn = self.sync_tree.new_transaction().await?;
391        let result = PeerManager::new(&txn)
392            .remove_address(peer_pubkey, address)
393            .await?;
394        txn.commit().await?;
395        Ok(result)
396    }
397
398    /// Get addresses for a peer, optionally filtered by transport type.
399    ///
400    /// # Arguments
401    /// * `peer_pubkey` - The peer's public key
402    /// * `transport_type` - Optional transport type filter
403    ///
404    /// # Returns
405    /// A vector of addresses matching the criteria.
406    pub async fn get_peer_addresses(
407        &self,
408        peer_pubkey: &PublicKey,
409        transport_type: Option<&str>,
410    ) -> Result<Vec<Address>> {
411        let txn = self.sync_tree.new_transaction().await?;
412        PeerManager::new(&txn)
413            .get_addresses(peer_pubkey, transport_type)
414            .await
415        // No commit - just reading
416    }
417}