Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/sync/
peer.rs

1//! Peer management, sync relationships, and address handling for the sync system.
2
3use tokio::sync::oneshot;
4use tracing::info;
5
6use super::{
7    Address, ConnectionState, PeerId, PeerInfo, PeerStatus, Sync, SyncError, SyncHandle,
8    SyncPeerInfo, SyncStatus, background::SyncCommand, peer_manager::PeerManager,
9};
10use crate::{Result, auth::crypto::PublicKey, entry::ID};
11
12impl Sync {
13    // === Peer Management Methods ===
14
15    /// Register a new remote peer in the sync network.
16    ///
17    /// # Arguments
18    /// * `pubkey` - The peer's public key
19    /// * `display_name` - Optional human-readable name for the peer
20    ///
21    /// # Returns
22    /// A Result indicating success or an error.
23    pub async fn register_peer(
24        &self,
25        pubkey: &PublicKey,
26        display_name: Option<&str>,
27    ) -> Result<()> {
28        // Store in sync tree via PeerManager
29        let txn = self.sync_tree.new_transaction().await?;
30        PeerManager::new(&txn)
31            .register_peer(pubkey, display_name)
32            .await?;
33        txn.commit().await?;
34
35        // Background sync will read peer info directly from sync tree when needed
36        Ok(())
37    }
38
39    /// Update the status of a registered peer.
40    ///
41    /// # Arguments
42    /// * `pubkey` - The peer's public key
43    /// * `status` - The new status for the peer
44    ///
45    /// # Returns
46    /// A Result indicating success or an error.
47    pub async fn update_peer_status(&self, pubkey: &PublicKey, status: PeerStatus) -> Result<()> {
48        let txn = self.sync_tree.new_transaction().await?;
49        PeerManager::new(&txn)
50            .update_peer_status(pubkey, status)
51            .await?;
52        txn.commit().await?;
53        Ok(())
54    }
55
56    /// Get information about a registered peer.
57    ///
58    /// # Arguments
59    /// * `pubkey` - The peer's public key
60    ///
61    /// # Returns
62    /// The peer information if found, None otherwise.
63    pub async fn get_peer_info(&self, pubkey: &PublicKey) -> Result<Option<PeerInfo>> {
64        let txn = self.sync_tree.new_transaction().await?;
65        PeerManager::new(&txn).get_peer_info(pubkey).await
66        // No commit - just reading
67    }
68
69    /// List all registered peers.
70    ///
71    /// # Returns
72    /// A vector of all registered peer information.
73    pub async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
74        let txn = self.sync_tree.new_transaction().await?;
75        PeerManager::new(&txn).list_peers().await
76        // No commit - just reading
77    }
78
79    /// Remove a peer from the sync network.
80    ///
81    /// This removes the peer entry and all associated sync relationships and transport info.
82    ///
83    /// # Arguments
84    /// * `pubkey` - The peer's public key
85    ///
86    /// # Returns
87    /// A Result indicating success or an error.
88    pub async fn remove_peer(&self, pubkey: &PublicKey) -> Result<()> {
89        let txn = self.sync_tree.new_transaction().await?;
90        PeerManager::new(&txn).remove_peer(pubkey).await?;
91        txn.commit().await?;
92        Ok(())
93    }
94
95    // === Declarative Sync API ===
96
97    /// Register a peer for syncing (declarative API).
98    ///
99    /// This is the recommended way to set up syncing. It immediately registers
100    /// the peer and tree/peer relationship, then the background sync engine
101    /// handles the actual data synchronization.
102    ///
103    /// # Arguments
104    /// * `info` - Information about the peer and sync configuration
105    ///
106    /// # Returns
107    /// A handle for tracking sync status and adding more address hints.
108    ///
109    /// # Example
110    /// ```no_run
111    /// # use eidetica::*;
112    /// # use eidetica::sync::{SyncPeerInfo, Address, AuthParams};
113    /// # use eidetica::auth::PublicKey;
114    /// # async fn example(sync: sync::Sync, peer_pubkey: PublicKey, tree_id: entry::ID) -> Result<()> {
115    /// // Register peer for syncing
116    /// let handle = sync.register_sync_peer(SyncPeerInfo {
117    ///     peer_pubkey,
118    ///     tree_id,
119    ///     addresses: vec![Address {
120    ///         transport_type: "http".to_string(),
121    ///         address: "http://localhost:8080".to_string(),
122    ///     }],
123    ///     auth: None,
124    ///     display_name: Some("My Peer".to_string()),
125    /// }).await?;
126    ///
127    /// // Optionally wait for initial sync
128    /// handle.wait_for_initial_sync().await?;
129    ///
130    /// // Check status anytime
131    /// let status = handle.status().await?;
132    /// println!("Has local data: {}", status.has_local_data);
133    /// # Ok(())
134    /// # }
135    /// ```
136    pub async fn register_sync_peer(&self, info: SyncPeerInfo) -> Result<SyncHandle> {
137        let txn = self.sync_tree.new_transaction().await?;
138        let peer_mgr = PeerManager::new(&txn);
139
140        // Register peer if it doesn't exist
141        if peer_mgr.get_peer_info(&info.peer_pubkey).await?.is_none() {
142            peer_mgr
143                .register_peer(&info.peer_pubkey, info.display_name.as_deref())
144                .await?;
145        }
146
147        // Add all address hints
148        for addr in &info.addresses {
149            peer_mgr
150                .add_address(&info.peer_pubkey, addr.clone())
151                .await?;
152        }
153
154        // Register the tree/peer relationship
155        peer_mgr
156            .add_tree_sync(&info.peer_pubkey, &info.tree_id)
157            .await?;
158
159        // TODO: Store auth params if provided for bootstrap
160        // For now, auth is passed during the actual sync handshake via on_local_write callback
161
162        txn.commit().await?;
163
164        info!(
165            peer = %info.peer_pubkey,
166            tree = %info.tree_id,
167            address_count = info.addresses.len(),
168            "Registered peer for syncing"
169        );
170
171        Ok(SyncHandle {
172            tree_id: info.tree_id,
173            peer_pubkey: info.peer_pubkey,
174            sync: self.clone(),
175        })
176    }
177
178    /// Get the current sync status for a tree/peer pair.
179    ///
180    /// # Arguments
181    /// * `tree_id` - The tree to check
182    /// * `peer_pubkey` - The peer public key
183    ///
184    /// # Returns
185    /// Current sync status including whether we have local data.
186    pub async fn get_sync_status(
187        &self,
188        tree_id: &ID,
189        _peer_pubkey: &PublicKey,
190    ) -> Result<SyncStatus> {
191        // Check if we have local data for this tree
192        let backend = self.backend()?;
193        let our_snapshot = backend.snapshot(tree_id).await.unwrap_or_default();
194
195        // TODO: Track last_sync time and last_error in sync tree
196        // For now, just report if we have data
197        Ok(SyncStatus {
198            has_local_data: !our_snapshot.is_empty(),
199            last_sync: None,
200            last_error: None,
201        })
202    }
203
204    // === Database Sync Relationship Methods ===
205
206    /// Add a tree to the sync relationship with a peer.
207    ///
208    /// # Arguments
209    /// * `peer_pubkey` - The peer's public key
210    /// * `tree_root_id` - The root ID of the tree to sync
211    ///
212    /// # Returns
213    /// A Result indicating success or an error.
214    pub async fn add_tree_sync(&self, peer_pubkey: &PublicKey, tree_root_id: &ID) -> Result<()> {
215        let txn = self.sync_tree.new_transaction().await?;
216        PeerManager::new(&txn)
217            .add_tree_sync(peer_pubkey, tree_root_id)
218            .await?;
219        txn.commit().await?;
220        Ok(())
221    }
222
223    /// Remove a tree from the sync relationship with a peer.
224    ///
225    /// # Arguments
226    /// * `peer_pubkey` - The peer's public key
227    /// * `tree_root_id` - The root ID of the tree to stop syncing
228    ///
229    /// # Returns
230    /// A Result indicating success or an error.
231    pub async fn remove_tree_sync(&self, peer_pubkey: &PublicKey, tree_root_id: &ID) -> Result<()> {
232        let txn = self.sync_tree.new_transaction().await?;
233        PeerManager::new(&txn)
234            .remove_tree_sync(peer_pubkey, tree_root_id)
235            .await?;
236        txn.commit().await?;
237        Ok(())
238    }
239
240    /// Get the list of trees synced with a peer.
241    ///
242    /// # Arguments
243    /// * `peer_pubkey` - The peer's public key
244    ///
245    /// # Returns
246    /// A vector of tree root IDs synced with this peer.
247    pub async fn get_peer_trees(&self, peer_pubkey: &PublicKey) -> Result<Vec<ID>> {
248        let txn = self.sync_tree.new_transaction().await?;
249        PeerManager::new(&txn).get_peer_trees(peer_pubkey).await
250        // No commit - just reading
251    }
252
253    /// Get all peers that sync a specific tree.
254    ///
255    /// # Arguments
256    /// * `tree_root_id` - The root ID of the tree
257    ///
258    /// # Returns
259    /// A vector of peer IDs that sync this tree.
260    pub async fn get_tree_peers(&self, tree_root_id: &ID) -> Result<Vec<PeerId>> {
261        let txn = self.sync_tree.new_transaction().await?;
262        PeerManager::new(&txn).get_tree_peers(tree_root_id).await
263        // No commit - just reading
264    }
265
266    /// Connect to a remote peer and perform handshake.
267    ///
268    /// This method initiates a connection to a peer, performs the handshake protocol,
269    /// and automatically registers the peer if successful.
270    ///
271    /// # Arguments
272    /// * `address` - The address of the peer to connect to
273    ///
274    /// # Returns
275    /// A Result containing the peer's public key if successful.
276    pub async fn connect_to_peer(&self, address: &Address) -> Result<PublicKey> {
277        let (tx, rx) = oneshot::channel();
278
279        self.background_tx
280            .get()
281            .ok_or(SyncError::NoTransportEnabled)?
282            .send(SyncCommand::ConnectToPeer {
283                address: address.clone(),
284                response: tx,
285            })
286            .await
287            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
288
289        rx.await
290            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
291    }
292
293    /// Update the connection state of a peer.
294    ///
295    /// # Arguments
296    /// * `pubkey` - The peer's public key
297    /// * `state` - The new connection state
298    ///
299    /// # Returns
300    /// A Result indicating success or an error.
301    pub async fn update_peer_connection_state(
302        &self,
303        pubkey: &PublicKey,
304        state: ConnectionState,
305    ) -> Result<()> {
306        let txn = self.sync_tree.new_transaction().await?;
307        let peer_manager = PeerManager::new(&txn);
308
309        // Get current peer info
310        let mut peer_info = match peer_manager.get_peer_info(pubkey).await? {
311            Some(info) => info,
312            None => return Err(SyncError::PeerNotFound(pubkey.to_string()).into()),
313        };
314
315        // Update connection state
316        peer_info.connection_state = state;
317        peer_info.touch_at(txn.now_rfc3339()?);
318
319        // Save updated peer info
320        peer_manager.update_peer_info(pubkey, peer_info).await?;
321        txn.commit().await?;
322        Ok(())
323    }
324
325    /// Check if a tree is synced with a specific peer.
326    ///
327    /// # Arguments
328    /// * `peer_pubkey` - The peer's public key
329    /// * `tree_root_id` - The root ID of the tree
330    ///
331    /// # Returns
332    /// True if the tree is synced with the peer, false otherwise.
333    pub async fn is_tree_synced_with_peer(
334        &self,
335        peer_pubkey: &PublicKey,
336        tree_root_id: &ID,
337    ) -> Result<bool> {
338        let txn = self.sync_tree.new_transaction().await?;
339        PeerManager::new(&txn)
340            .is_tree_synced_with_peer(peer_pubkey, tree_root_id)
341            .await
342        // No commit - just reading
343    }
344
345    // === Address Management Methods ===
346
347    /// Add an address to a peer.
348    ///
349    /// # Arguments
350    /// * `peer_pubkey` - The peer's public key
351    /// * `address` - The address to add
352    ///
353    /// # Returns
354    /// A Result indicating success or an error.
355    pub async fn add_peer_address(&self, peer_pubkey: &PublicKey, address: Address) -> Result<()> {
356        // Update sync tree via PeerManager
357        let txn = self.sync_tree.new_transaction().await?;
358        PeerManager::new(&txn)
359            .add_address(peer_pubkey, address)
360            .await?;
361        txn.commit().await?;
362
363        // Background sync will read updated peer info directly from sync tree when needed
364        Ok(())
365    }
366
367    /// Remove a specific address from a peer.
368    ///
369    /// # Arguments
370    /// * `peer_pubkey` - The peer's public key
371    /// * `address` - The address to remove
372    ///
373    /// # Returns
374    /// A Result indicating success or an error (true if removed, false if not found).
375    pub async fn remove_peer_address(
376        &self,
377        peer_pubkey: &PublicKey,
378        address: &Address,
379    ) -> Result<bool> {
380        let txn = self.sync_tree.new_transaction().await?;
381        let result = PeerManager::new(&txn)
382            .remove_address(peer_pubkey, address)
383            .await?;
384        txn.commit().await?;
385        Ok(result)
386    }
387
388    /// Get addresses for a peer, optionally filtered by transport type.
389    ///
390    /// # Arguments
391    /// * `peer_pubkey` - The peer's public key
392    /// * `transport_type` - Optional transport type filter
393    ///
394    /// # Returns
395    /// A vector of addresses matching the criteria.
396    pub async fn get_peer_addresses(
397        &self,
398        peer_pubkey: &PublicKey,
399        transport_type: Option<&str>,
400    ) -> Result<Vec<Address>> {
401        let txn = self.sync_tree.new_transaction().await?;
402        PeerManager::new(&txn)
403            .get_addresses(peer_pubkey, transport_type)
404            .await
405        // No commit - just reading
406    }
407}