eidetica/sync/peer.rs
1//! Peer management, sync relationships, and address handling for the sync system.
2
3use tokio::sync::oneshot;
4use tracing::info;
5
6use super::{
7 Address, ConnectionState, PeerId, PeerInfo, PeerStatus, Sync, SyncError, SyncHandle,
8 SyncPeerInfo, SyncStatus, background::SyncCommand, peer_manager::PeerManager,
9};
10use crate::{Result, auth::crypto::PublicKey, entry::ID};
11
12impl Sync {
13 // === Peer Management Methods ===
14
15 /// Register a new remote peer in the sync network.
16 ///
17 /// # Arguments
18 /// * `pubkey` - The peer's public key
19 /// * `display_name` - Optional human-readable name for the peer
20 ///
21 /// # Returns
22 /// A Result indicating success or an error.
23 pub async fn register_peer(
24 &self,
25 pubkey: &PublicKey,
26 display_name: Option<&str>,
27 ) -> Result<()> {
28 // Store in sync tree via PeerManager
29 let txn = self.sync_tree.new_transaction().await?;
30 PeerManager::new(&txn)
31 .register_peer(pubkey, display_name)
32 .await?;
33 txn.commit().await?;
34
35 // Background sync will read peer info directly from sync tree when needed
36 Ok(())
37 }
38
39 /// Update the status of a registered peer.
40 ///
41 /// # Arguments
42 /// * `pubkey` - The peer's public key
43 /// * `status` - The new status for the peer
44 ///
45 /// # Returns
46 /// A Result indicating success or an error.
47 pub async fn update_peer_status(&self, pubkey: &PublicKey, status: PeerStatus) -> Result<()> {
48 let txn = self.sync_tree.new_transaction().await?;
49 PeerManager::new(&txn)
50 .update_peer_status(pubkey, status)
51 .await?;
52 txn.commit().await?;
53 Ok(())
54 }
55
56 /// Get information about a registered peer.
57 ///
58 /// # Arguments
59 /// * `pubkey` - The peer's public key
60 ///
61 /// # Returns
62 /// The peer information if found, None otherwise.
63 pub async fn get_peer_info(&self, pubkey: &PublicKey) -> Result<Option<PeerInfo>> {
64 let txn = self.sync_tree.new_transaction().await?;
65 PeerManager::new(&txn).get_peer_info(pubkey).await
66 // No commit - just reading
67 }
68
69 /// List all registered peers.
70 ///
71 /// # Returns
72 /// A vector of all registered peer information.
73 pub async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
74 let txn = self.sync_tree.new_transaction().await?;
75 PeerManager::new(&txn).list_peers().await
76 // No commit - just reading
77 }
78
79 /// Remove a peer from the sync network.
80 ///
81 /// This removes the peer entry and all associated sync relationships and transport info.
82 ///
83 /// # Arguments
84 /// * `pubkey` - The peer's public key
85 ///
86 /// # Returns
87 /// A Result indicating success or an error.
88 pub async fn remove_peer(&self, pubkey: &PublicKey) -> Result<()> {
89 let txn = self.sync_tree.new_transaction().await?;
90 PeerManager::new(&txn).remove_peer(pubkey).await?;
91 txn.commit().await?;
92 Ok(())
93 }
94
95 // === Declarative Sync API ===
96
97 /// Register a peer for syncing (declarative API).
98 ///
99 /// This is the recommended way to set up syncing. It immediately registers
100 /// the peer and tree/peer relationship, then the background sync engine
101 /// handles the actual data synchronization.
102 ///
103 /// # Arguments
104 /// * `info` - Information about the peer and sync configuration
105 ///
106 /// # Returns
107 /// A handle for tracking sync status and adding more address hints.
108 ///
109 /// # Example
110 /// ```no_run
111 /// # use eidetica::*;
112 /// # use eidetica::sync::{SyncPeerInfo, Address, AuthParams};
113 /// # use eidetica::auth::PublicKey;
114 /// # async fn example(sync: sync::Sync, peer_pubkey: PublicKey, tree_id: entry::ID) -> Result<()> {
115 /// // Register peer for syncing
116 /// let handle = sync.register_sync_peer(SyncPeerInfo {
117 /// peer_pubkey,
118 /// tree_id,
119 /// addresses: vec![Address {
120 /// transport_type: "http".to_string(),
121 /// address: "http://localhost:8080".to_string(),
122 /// }],
123 /// auth: None,
124 /// display_name: Some("My Peer".to_string()),
125 /// }).await?;
126 ///
127 /// // Optionally wait for initial sync
128 /// handle.wait_for_initial_sync().await?;
129 ///
130 /// // Check status anytime
131 /// let status = handle.status().await?;
132 /// println!("Has local data: {}", status.has_local_data);
133 /// # Ok(())
134 /// # }
135 /// ```
136 pub async fn register_sync_peer(&self, info: SyncPeerInfo) -> Result<SyncHandle> {
137 let txn = self.sync_tree.new_transaction().await?;
138 let peer_mgr = PeerManager::new(&txn);
139
140 // Register peer if it doesn't exist
141 if peer_mgr.get_peer_info(&info.peer_pubkey).await?.is_none() {
142 peer_mgr
143 .register_peer(&info.peer_pubkey, info.display_name.as_deref())
144 .await?;
145 }
146
147 // Add all address hints
148 for addr in &info.addresses {
149 peer_mgr
150 .add_address(&info.peer_pubkey, addr.clone())
151 .await?;
152 }
153
154 // Register the tree/peer relationship
155 peer_mgr
156 .add_tree_sync(&info.peer_pubkey, &info.tree_id)
157 .await?;
158
159 // TODO: Store auth params if provided for bootstrap
160 // For now, auth is passed during the actual sync handshake via on_local_write callback
161
162 txn.commit().await?;
163
164 info!(
165 peer = %info.peer_pubkey,
166 tree = %info.tree_id,
167 address_count = info.addresses.len(),
168 "Registered peer for syncing"
169 );
170
171 Ok(SyncHandle {
172 tree_id: info.tree_id,
173 peer_pubkey: info.peer_pubkey,
174 sync: self.clone(),
175 })
176 }
177
178 /// Get the current sync status for a tree/peer pair.
179 ///
180 /// # Arguments
181 /// * `tree_id` - The tree to check
182 /// * `peer_pubkey` - The peer public key
183 ///
184 /// # Returns
185 /// Current sync status including whether we have local data.
186 pub async fn get_sync_status(
187 &self,
188 tree_id: &ID,
189 _peer_pubkey: &PublicKey,
190 ) -> Result<SyncStatus> {
191 // Check if we have local data for this tree
192 let backend = self.backend()?;
193 let our_tips = backend.get_tips(tree_id).await.unwrap_or_default();
194
195 // TODO: Track last_sync time and last_error in sync tree
196 // For now, just report if we have data
197 Ok(SyncStatus {
198 has_local_data: !our_tips.is_empty(),
199 last_sync: None,
200 last_error: None,
201 })
202 }
203
204 // === Database Sync Relationship Methods ===
205
206 /// Add a tree to the sync relationship with a peer.
207 ///
208 /// # Arguments
209 /// * `peer_pubkey` - The peer's public key
210 /// * `tree_root_id` - The root ID of the tree to sync
211 ///
212 /// # Returns
213 /// A Result indicating success or an error.
214 pub async fn add_tree_sync(
215 &self,
216 peer_pubkey: &PublicKey,
217 tree_root_id: impl AsRef<str>,
218 ) -> Result<()> {
219 let txn = self.sync_tree.new_transaction().await?;
220 PeerManager::new(&txn)
221 .add_tree_sync(peer_pubkey, tree_root_id.as_ref())
222 .await?;
223 txn.commit().await?;
224 Ok(())
225 }
226
227 /// Remove a tree from the sync relationship with a peer.
228 ///
229 /// # Arguments
230 /// * `peer_pubkey` - The peer's public key
231 /// * `tree_root_id` - The root ID of the tree to stop syncing
232 ///
233 /// # Returns
234 /// A Result indicating success or an error.
235 pub async fn remove_tree_sync(
236 &self,
237 peer_pubkey: &PublicKey,
238 tree_root_id: impl AsRef<str>,
239 ) -> Result<()> {
240 let txn = self.sync_tree.new_transaction().await?;
241 PeerManager::new(&txn)
242 .remove_tree_sync(peer_pubkey, tree_root_id.as_ref())
243 .await?;
244 txn.commit().await?;
245 Ok(())
246 }
247
248 /// Get the list of trees synced with a peer.
249 ///
250 /// # Arguments
251 /// * `peer_pubkey` - The peer's public key
252 ///
253 /// # Returns
254 /// A vector of tree root IDs synced with this peer.
255 pub async fn get_peer_trees(&self, peer_pubkey: &PublicKey) -> Result<Vec<String>> {
256 let txn = self.sync_tree.new_transaction().await?;
257 PeerManager::new(&txn).get_peer_trees(peer_pubkey).await
258 // No commit - just reading
259 }
260
261 /// Get all peers that sync a specific tree.
262 ///
263 /// # Arguments
264 /// * `tree_root_id` - The root ID of the tree
265 ///
266 /// # Returns
267 /// A vector of peer IDs that sync this tree.
268 pub async fn get_tree_peers(&self, tree_root_id: impl AsRef<str>) -> Result<Vec<PeerId>> {
269 let txn = self.sync_tree.new_transaction().await?;
270 PeerManager::new(&txn)
271 .get_tree_peers(tree_root_id.as_ref())
272 .await
273 // No commit - just reading
274 }
275
276 /// Connect to a remote peer and perform handshake.
277 ///
278 /// This method initiates a connection to a peer, performs the handshake protocol,
279 /// and automatically registers the peer if successful.
280 ///
281 /// # Arguments
282 /// * `address` - The address of the peer to connect to
283 ///
284 /// # Returns
285 /// A Result containing the peer's public key if successful.
286 pub async fn connect_to_peer(&self, address: &Address) -> Result<PublicKey> {
287 let (tx, rx) = oneshot::channel();
288
289 self.background_tx
290 .get()
291 .ok_or(SyncError::NoTransportEnabled)?
292 .send(SyncCommand::ConnectToPeer {
293 address: address.clone(),
294 response: tx,
295 })
296 .await
297 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
298
299 rx.await
300 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
301 }
302
303 /// Update the connection state of a peer.
304 ///
305 /// # Arguments
306 /// * `pubkey` - The peer's public key
307 /// * `state` - The new connection state
308 ///
309 /// # Returns
310 /// A Result indicating success or an error.
311 pub async fn update_peer_connection_state(
312 &self,
313 pubkey: &PublicKey,
314 state: ConnectionState,
315 ) -> Result<()> {
316 let txn = self.sync_tree.new_transaction().await?;
317 let peer_manager = PeerManager::new(&txn);
318
319 // Get current peer info
320 let mut peer_info = match peer_manager.get_peer_info(pubkey).await? {
321 Some(info) => info,
322 None => return Err(SyncError::PeerNotFound(pubkey.to_string()).into()),
323 };
324
325 // Update connection state
326 peer_info.connection_state = state;
327 peer_info.touch_at(txn.now_rfc3339()?);
328
329 // Save updated peer info
330 peer_manager.update_peer_info(pubkey, peer_info).await?;
331 txn.commit().await?;
332 Ok(())
333 }
334
335 /// Check if a tree is synced with a specific peer.
336 ///
337 /// # Arguments
338 /// * `peer_pubkey` - The peer's public key
339 /// * `tree_root_id` - The root ID of the tree
340 ///
341 /// # Returns
342 /// True if the tree is synced with the peer, false otherwise.
343 pub async fn is_tree_synced_with_peer(
344 &self,
345 peer_pubkey: &PublicKey,
346 tree_root_id: impl AsRef<str>,
347 ) -> Result<bool> {
348 let txn = self.sync_tree.new_transaction().await?;
349 PeerManager::new(&txn)
350 .is_tree_synced_with_peer(peer_pubkey, tree_root_id.as_ref())
351 .await
352 // No commit - just reading
353 }
354
355 // === Address Management Methods ===
356
357 /// Add an address to a peer.
358 ///
359 /// # Arguments
360 /// * `peer_pubkey` - The peer's public key
361 /// * `address` - The address to add
362 ///
363 /// # Returns
364 /// A Result indicating success or an error.
365 pub async fn add_peer_address(&self, peer_pubkey: &PublicKey, address: Address) -> Result<()> {
366 // Update sync tree via PeerManager
367 let txn = self.sync_tree.new_transaction().await?;
368 PeerManager::new(&txn)
369 .add_address(peer_pubkey, address)
370 .await?;
371 txn.commit().await?;
372
373 // Background sync will read updated peer info directly from sync tree when needed
374 Ok(())
375 }
376
377 /// Remove a specific address from a peer.
378 ///
379 /// # Arguments
380 /// * `peer_pubkey` - The peer's public key
381 /// * `address` - The address to remove
382 ///
383 /// # Returns
384 /// A Result indicating success or an error (true if removed, false if not found).
385 pub async fn remove_peer_address(
386 &self,
387 peer_pubkey: &PublicKey,
388 address: &Address,
389 ) -> Result<bool> {
390 let txn = self.sync_tree.new_transaction().await?;
391 let result = PeerManager::new(&txn)
392 .remove_address(peer_pubkey, address)
393 .await?;
394 txn.commit().await?;
395 Ok(result)
396 }
397
398 /// Get addresses for a peer, optionally filtered by transport type.
399 ///
400 /// # Arguments
401 /// * `peer_pubkey` - The peer's public key
402 /// * `transport_type` - Optional transport type filter
403 ///
404 /// # Returns
405 /// A vector of addresses matching the criteria.
406 pub async fn get_peer_addresses(
407 &self,
408 peer_pubkey: &PublicKey,
409 transport_type: Option<&str>,
410 ) -> Result<Vec<Address>> {
411 let txn = self.sync_tree.new_transaction().await?;
412 PeerManager::new(&txn)
413 .get_addresses(peer_pubkey, transport_type)
414 .await
415 // No commit - just reading
416 }
417}