eidetica/sync/peer.rs
1//! Peer management, sync relationships, and address handling for the sync system.
2
3use tokio::sync::oneshot;
4use tracing::info;
5
6use super::{
7 Address, ConnectionState, PeerId, PeerInfo, PeerStatus, Sync, SyncError, SyncHandle,
8 SyncPeerInfo, SyncStatus, background::SyncCommand, peer_manager::PeerManager,
9};
10use crate::{Result, auth::crypto::PublicKey, entry::ID};
11
12impl Sync {
13 // === Peer Management Methods ===
14
15 /// Register a new remote peer in the sync network.
16 ///
17 /// # Arguments
18 /// * `pubkey` - The peer's public key
19 /// * `display_name` - Optional human-readable name for the peer
20 ///
21 /// # Returns
22 /// A Result indicating success or an error.
23 pub async fn register_peer(
24 &self,
25 pubkey: &PublicKey,
26 display_name: Option<&str>,
27 ) -> Result<()> {
28 // Store in sync tree via PeerManager
29 let txn = self.sync_tree.new_transaction().await?;
30 PeerManager::new(&txn)
31 .register_peer(pubkey, display_name)
32 .await?;
33 txn.commit().await?;
34
35 // Background sync will read peer info directly from sync tree when needed
36 Ok(())
37 }
38
39 /// Update the status of a registered peer.
40 ///
41 /// # Arguments
42 /// * `pubkey` - The peer's public key
43 /// * `status` - The new status for the peer
44 ///
45 /// # Returns
46 /// A Result indicating success or an error.
47 pub async fn update_peer_status(&self, pubkey: &PublicKey, status: PeerStatus) -> Result<()> {
48 let txn = self.sync_tree.new_transaction().await?;
49 PeerManager::new(&txn)
50 .update_peer_status(pubkey, status)
51 .await?;
52 txn.commit().await?;
53 Ok(())
54 }
55
56 /// Get information about a registered peer.
57 ///
58 /// # Arguments
59 /// * `pubkey` - The peer's public key
60 ///
61 /// # Returns
62 /// The peer information if found, None otherwise.
63 pub async fn get_peer_info(&self, pubkey: &PublicKey) -> Result<Option<PeerInfo>> {
64 let txn = self.sync_tree.new_transaction().await?;
65 PeerManager::new(&txn).get_peer_info(pubkey).await
66 // No commit - just reading
67 }
68
69 /// List all registered peers.
70 ///
71 /// # Returns
72 /// A vector of all registered peer information.
73 pub async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
74 let txn = self.sync_tree.new_transaction().await?;
75 PeerManager::new(&txn).list_peers().await
76 // No commit - just reading
77 }
78
79 /// Remove a peer from the sync network.
80 ///
81 /// This removes the peer entry and all associated sync relationships and transport info.
82 ///
83 /// # Arguments
84 /// * `pubkey` - The peer's public key
85 ///
86 /// # Returns
87 /// A Result indicating success or an error.
88 pub async fn remove_peer(&self, pubkey: &PublicKey) -> Result<()> {
89 let txn = self.sync_tree.new_transaction().await?;
90 PeerManager::new(&txn).remove_peer(pubkey).await?;
91 txn.commit().await?;
92 Ok(())
93 }
94
95 // === Declarative Sync API ===
96
97 /// Register a peer for syncing (declarative API).
98 ///
99 /// This is the recommended way to set up syncing. It immediately registers
100 /// the peer and tree/peer relationship, then the background sync engine
101 /// handles the actual data synchronization.
102 ///
103 /// # Arguments
104 /// * `info` - Information about the peer and sync configuration
105 ///
106 /// # Returns
107 /// A handle for tracking sync status and adding more address hints.
108 ///
109 /// # Example
110 /// ```no_run
111 /// # use eidetica::*;
112 /// # use eidetica::sync::{SyncPeerInfo, Address, AuthParams};
113 /// # use eidetica::auth::PublicKey;
114 /// # async fn example(sync: sync::Sync, peer_pubkey: PublicKey, tree_id: entry::ID) -> Result<()> {
115 /// // Register peer for syncing
116 /// let handle = sync.register_sync_peer(SyncPeerInfo {
117 /// peer_pubkey,
118 /// tree_id,
119 /// addresses: vec![Address {
120 /// transport_type: "http".to_string(),
121 /// address: "http://localhost:8080".to_string(),
122 /// }],
123 /// auth: None,
124 /// display_name: Some("My Peer".to_string()),
125 /// }).await?;
126 ///
127 /// // Optionally wait for initial sync
128 /// handle.wait_for_initial_sync().await?;
129 ///
130 /// // Check status anytime
131 /// let status = handle.status().await?;
132 /// println!("Has local data: {}", status.has_local_data);
133 /// # Ok(())
134 /// # }
135 /// ```
136 pub async fn register_sync_peer(&self, info: SyncPeerInfo) -> Result<SyncHandle> {
137 let txn = self.sync_tree.new_transaction().await?;
138 let peer_mgr = PeerManager::new(&txn);
139
140 // Register peer if it doesn't exist
141 if peer_mgr.get_peer_info(&info.peer_pubkey).await?.is_none() {
142 peer_mgr
143 .register_peer(&info.peer_pubkey, info.display_name.as_deref())
144 .await?;
145 }
146
147 // Add all address hints
148 for addr in &info.addresses {
149 peer_mgr
150 .add_address(&info.peer_pubkey, addr.clone())
151 .await?;
152 }
153
154 // Register the tree/peer relationship
155 peer_mgr
156 .add_tree_sync(&info.peer_pubkey, &info.tree_id)
157 .await?;
158
159 // TODO: Store auth params if provided for bootstrap
160 // For now, auth is passed during the actual sync handshake via on_local_write callback
161
162 txn.commit().await?;
163
164 info!(
165 peer = %info.peer_pubkey,
166 tree = %info.tree_id,
167 address_count = info.addresses.len(),
168 "Registered peer for syncing"
169 );
170
171 Ok(SyncHandle {
172 tree_id: info.tree_id,
173 peer_pubkey: info.peer_pubkey,
174 sync: self.clone(),
175 })
176 }
177
178 /// Get the current sync status for a tree/peer pair.
179 ///
180 /// # Arguments
181 /// * `tree_id` - The tree to check
182 /// * `peer_pubkey` - The peer public key
183 ///
184 /// # Returns
185 /// Current sync status including whether we have local data.
186 pub async fn get_sync_status(
187 &self,
188 tree_id: &ID,
189 _peer_pubkey: &PublicKey,
190 ) -> Result<SyncStatus> {
191 // Check if we have local data for this tree
192 let backend = self.backend()?;
193 let our_snapshot = backend.snapshot(tree_id).await.unwrap_or_default();
194
195 // TODO: Track last_sync time and last_error in sync tree
196 // For now, just report if we have data
197 Ok(SyncStatus {
198 has_local_data: !our_snapshot.is_empty(),
199 last_sync: None,
200 last_error: None,
201 })
202 }
203
204 // === Database Sync Relationship Methods ===
205
206 /// Add a tree to the sync relationship with a peer.
207 ///
208 /// # Arguments
209 /// * `peer_pubkey` - The peer's public key
210 /// * `tree_root_id` - The root ID of the tree to sync
211 ///
212 /// # Returns
213 /// A Result indicating success or an error.
214 pub async fn add_tree_sync(&self, peer_pubkey: &PublicKey, tree_root_id: &ID) -> Result<()> {
215 let txn = self.sync_tree.new_transaction().await?;
216 PeerManager::new(&txn)
217 .add_tree_sync(peer_pubkey, tree_root_id)
218 .await?;
219 txn.commit().await?;
220 Ok(())
221 }
222
223 /// Remove a tree from the sync relationship with a peer.
224 ///
225 /// # Arguments
226 /// * `peer_pubkey` - The peer's public key
227 /// * `tree_root_id` - The root ID of the tree to stop syncing
228 ///
229 /// # Returns
230 /// A Result indicating success or an error.
231 pub async fn remove_tree_sync(&self, peer_pubkey: &PublicKey, tree_root_id: &ID) -> Result<()> {
232 let txn = self.sync_tree.new_transaction().await?;
233 PeerManager::new(&txn)
234 .remove_tree_sync(peer_pubkey, tree_root_id)
235 .await?;
236 txn.commit().await?;
237 Ok(())
238 }
239
240 /// Get the list of trees synced with a peer.
241 ///
242 /// # Arguments
243 /// * `peer_pubkey` - The peer's public key
244 ///
245 /// # Returns
246 /// A vector of tree root IDs synced with this peer.
247 pub async fn get_peer_trees(&self, peer_pubkey: &PublicKey) -> Result<Vec<ID>> {
248 let txn = self.sync_tree.new_transaction().await?;
249 PeerManager::new(&txn).get_peer_trees(peer_pubkey).await
250 // No commit - just reading
251 }
252
253 /// Get all peers that sync a specific tree.
254 ///
255 /// # Arguments
256 /// * `tree_root_id` - The root ID of the tree
257 ///
258 /// # Returns
259 /// A vector of peer IDs that sync this tree.
260 pub async fn get_tree_peers(&self, tree_root_id: &ID) -> Result<Vec<PeerId>> {
261 let txn = self.sync_tree.new_transaction().await?;
262 PeerManager::new(&txn).get_tree_peers(tree_root_id).await
263 // No commit - just reading
264 }
265
266 /// Connect to a remote peer and perform handshake.
267 ///
268 /// This method initiates a connection to a peer, performs the handshake protocol,
269 /// and automatically registers the peer if successful.
270 ///
271 /// # Arguments
272 /// * `address` - The address of the peer to connect to
273 ///
274 /// # Returns
275 /// A Result containing the peer's public key if successful.
276 pub async fn connect_to_peer(&self, address: &Address) -> Result<PublicKey> {
277 let (tx, rx) = oneshot::channel();
278
279 self.background_tx
280 .get()
281 .ok_or(SyncError::NoTransportEnabled)?
282 .send(SyncCommand::ConnectToPeer {
283 address: address.clone(),
284 response: tx,
285 })
286 .await
287 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
288
289 rx.await
290 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
291 }
292
293 /// Update the connection state of a peer.
294 ///
295 /// # Arguments
296 /// * `pubkey` - The peer's public key
297 /// * `state` - The new connection state
298 ///
299 /// # Returns
300 /// A Result indicating success or an error.
301 pub async fn update_peer_connection_state(
302 &self,
303 pubkey: &PublicKey,
304 state: ConnectionState,
305 ) -> Result<()> {
306 let txn = self.sync_tree.new_transaction().await?;
307 let peer_manager = PeerManager::new(&txn);
308
309 // Get current peer info
310 let mut peer_info = match peer_manager.get_peer_info(pubkey).await? {
311 Some(info) => info,
312 None => return Err(SyncError::PeerNotFound(pubkey.to_string()).into()),
313 };
314
315 // Update connection state
316 peer_info.connection_state = state;
317 peer_info.touch_at(txn.now_rfc3339()?);
318
319 // Save updated peer info
320 peer_manager.update_peer_info(pubkey, peer_info).await?;
321 txn.commit().await?;
322 Ok(())
323 }
324
325 /// Check if a tree is synced with a specific peer.
326 ///
327 /// # Arguments
328 /// * `peer_pubkey` - The peer's public key
329 /// * `tree_root_id` - The root ID of the tree
330 ///
331 /// # Returns
332 /// True if the tree is synced with the peer, false otherwise.
333 pub async fn is_tree_synced_with_peer(
334 &self,
335 peer_pubkey: &PublicKey,
336 tree_root_id: &ID,
337 ) -> Result<bool> {
338 let txn = self.sync_tree.new_transaction().await?;
339 PeerManager::new(&txn)
340 .is_tree_synced_with_peer(peer_pubkey, tree_root_id)
341 .await
342 // No commit - just reading
343 }
344
345 // === Address Management Methods ===
346
347 /// Add an address to a peer.
348 ///
349 /// # Arguments
350 /// * `peer_pubkey` - The peer's public key
351 /// * `address` - The address to add
352 ///
353 /// # Returns
354 /// A Result indicating success or an error.
355 pub async fn add_peer_address(&self, peer_pubkey: &PublicKey, address: Address) -> Result<()> {
356 // Update sync tree via PeerManager
357 let txn = self.sync_tree.new_transaction().await?;
358 PeerManager::new(&txn)
359 .add_address(peer_pubkey, address)
360 .await?;
361 txn.commit().await?;
362
363 // Background sync will read updated peer info directly from sync tree when needed
364 Ok(())
365 }
366
367 /// Remove a specific address from a peer.
368 ///
369 /// # Arguments
370 /// * `peer_pubkey` - The peer's public key
371 /// * `address` - The address to remove
372 ///
373 /// # Returns
374 /// A Result indicating success or an error (true if removed, false if not found).
375 pub async fn remove_peer_address(
376 &self,
377 peer_pubkey: &PublicKey,
378 address: &Address,
379 ) -> Result<bool> {
380 let txn = self.sync_tree.new_transaction().await?;
381 let result = PeerManager::new(&txn)
382 .remove_address(peer_pubkey, address)
383 .await?;
384 txn.commit().await?;
385 Ok(result)
386 }
387
388 /// Get addresses for a peer, optionally filtered by transport type.
389 ///
390 /// # Arguments
391 /// * `peer_pubkey` - The peer's public key
392 /// * `transport_type` - Optional transport type filter
393 ///
394 /// # Returns
395 /// A vector of addresses matching the criteria.
396 pub async fn get_peer_addresses(
397 &self,
398 peer_pubkey: &PublicKey,
399 transport_type: Option<&str>,
400 ) -> Result<Vec<Address>> {
401 let txn = self.sync_tree.new_transaction().await?;
402 PeerManager::new(&txn)
403 .get_addresses(peer_pubkey, transport_type)
404 .await
405 // No commit - just reading
406 }
407}