1use tracing::{debug, info};
7
8use super::{
9 error::SyncError,
10 peer_types::{Address, ConnectionState, PeerInfo, PeerStatus},
11};
12use crate::{
13 Error, Result, Transaction, auth::crypto::PublicKey, crdt::doc::path, entry::ID,
14 store::DocStore, sync::PeerId,
15};
16
17pub(super) const PEERS_SUBTREE: &str = "peers"; pub(super) const TREES_SUBTREE: &str = "trees"; pub(super) struct PeerManager<'a> {
26 txn: &'a Transaction,
27}
28
29impl<'a> PeerManager<'a> {
30 pub(super) fn new(txn: &'a Transaction) -> Self {
32 Self { txn }
33 }
34
35 pub(super) async fn register_peer(
44 &self,
45 pubkey: &PublicKey,
46 display_name: Option<&str>,
47 ) -> Result<()> {
48 let pk_str = pubkey.to_string();
49 let now = self.txn.now_rfc3339()?;
50 let peer_info = PeerInfo::new_at(pubkey, display_name, now);
51 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
52
53 if peers.contains_path(path!(&pk_str)).await {
55 debug!(peer = %pk_str, "Peer already registered, skipping");
56 return Err(Error::Sync(Box::new(SyncError::PeerAlreadyExists(pk_str))));
57 }
58
59 debug!(peer = %pk_str, display_name = ?display_name, "Registering new peer");
60
61 peers
63 .set_path(path!(&pk_str, "pubkey"), peer_info.id.to_string())
64 .await?;
65 if let Some(name) = &peer_info.display_name {
66 peers
67 .set_path(path!(&pk_str, "display_name"), name.clone())
68 .await?;
69 }
70 peers
71 .set_path(path!(&pk_str, "first_seen"), peer_info.first_seen.clone())
72 .await?;
73 peers
74 .set_path(path!(&pk_str, "last_seen"), peer_info.last_seen.clone())
75 .await?;
76 peers
77 .set_path(
78 path!(&pk_str, "status"),
79 match peer_info.status {
80 PeerStatus::Active => "active".to_string(),
81 PeerStatus::Inactive => "inactive".to_string(),
82 PeerStatus::Blocked => "blocked".to_string(),
83 },
84 )
85 .await?;
86
87 if !peer_info.addresses.is_empty() {
89 let addresses_json = serde_json::to_string(&peer_info.addresses).unwrap_or_default();
90 peers
91 .set_path(path!(&pk_str, "addresses"), addresses_json)
92 .await?;
93 }
94
95 info!(peer = %pk_str, display_name = ?display_name, "Successfully registered new peer");
96 Ok(())
97 }
98
99 pub(super) async fn update_peer_info(
108 &self,
109 pubkey: &PublicKey,
110 peer_info: PeerInfo,
111 ) -> Result<()> {
112 let pk_str = pubkey.to_string();
113 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
114
115 if !peers.contains_path_str(&pk_str).await {
117 return Err(Error::Sync(Box::new(SyncError::PeerNotFound(pk_str))));
118 }
119
120 peers
122 .set_path(path!(&pk_str, "pubkey"), peer_info.id.to_string())
123 .await?;
124
125 if let Some(name) = &peer_info.display_name {
126 peers
127 .set_path(path!(&pk_str, "display_name"), name.clone())
128 .await?;
129 }
130
131 peers
132 .set_path(path!(&pk_str, "first_seen"), peer_info.first_seen.clone())
133 .await?;
134
135 peers
136 .set_path(path!(&pk_str, "last_seen"), peer_info.last_seen.clone())
137 .await?;
138
139 let status_str = match peer_info.status {
141 PeerStatus::Active => "active",
142 PeerStatus::Inactive => "inactive",
143 PeerStatus::Blocked => "blocked",
144 };
145 peers
146 .set_path(path!(&pk_str, "status"), status_str.to_string())
147 .await?;
148
149 let connection_state_str = match &peer_info.connection_state {
151 ConnectionState::Disconnected => "disconnected",
152 ConnectionState::Connecting => "connecting",
153 ConnectionState::Connected => "connected",
154 ConnectionState::Failed(msg) => &format!("failed:{msg}"),
155 };
156 peers
157 .set_path(
158 path!(&pk_str, "connection_state"),
159 connection_state_str.to_string(),
160 )
161 .await?;
162
163 if let Some(last_sync) = &peer_info.last_successful_sync {
165 peers
166 .set_path(path!(&pk_str, "last_successful_sync"), last_sync.clone())
167 .await?;
168 }
169
170 peers
171 .set_path(
172 path!(&pk_str, "connection_attempts"),
173 peer_info.connection_attempts as i64,
174 )
175 .await?;
176
177 if let Some(error) = &peer_info.last_error {
178 peers
179 .set_path(path!(&pk_str, "last_error"), error.clone())
180 .await?;
181 }
182
183 if !peer_info.addresses.is_empty() {
185 let addresses_json = serde_json::to_string(&peer_info.addresses).unwrap_or_default();
186 peers
187 .set_path(path!(&pk_str, "addresses"), addresses_json)
188 .await?;
189 }
190
191 debug!(peer = %pk_str, "Successfully updated peer information");
192 Ok(())
193 }
194
195 pub(super) async fn update_peer_status(
204 &self,
205 pubkey: &PublicKey,
206 status: PeerStatus,
207 ) -> Result<()> {
208 let pk_str = pubkey.to_string();
209 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
210
211 if !peers.contains_path_str(&pk_str).await {
213 return Err(Error::Sync(Box::new(SyncError::PeerNotFound(pk_str))));
214 }
215
216 let status_str = match status {
218 PeerStatus::Active => "active",
219 PeerStatus::Inactive => "inactive",
220 PeerStatus::Blocked => "blocked",
221 };
222 peers
223 .set_path(path!(&pk_str, "status"), status_str.to_string())
224 .await?;
225
226 let now = self.txn.now_rfc3339()?;
228 peers.set_path(path!(&pk_str, "last_seen"), now).await?;
229
230 Ok(())
231 }
232
233 pub(super) async fn get_peer_info(&self, pubkey: &PublicKey) -> Result<Option<PeerInfo>> {
241 self.get_peer_info_str(&pubkey.to_string()).await
242 }
243
244 async fn get_peer_info_str(&self, pk_str: &str) -> Result<Option<PeerInfo>> {
246 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
247
248 if !peers.contains_path_str(pk_str).await {
250 return Ok(None);
251 }
252
253 let peer_pubkey = peers
255 .get_path_as::<String>(path!(pk_str, "pubkey"))
256 .await
257 .map_err(|_| {
258 Error::Sync(Box::new(SyncError::SerializationError(
259 "Missing pubkey field".to_string(),
260 )))
261 })?;
262
263 let display_name = peers
264 .get_path_as::<String>(path!(pk_str, "display_name"))
265 .await
266 .ok();
267
268 let first_seen = peers
269 .get_path_as::<String>(path!(pk_str, "first_seen"))
270 .await
271 .map_err(|_| {
272 Error::Sync(Box::new(SyncError::SerializationError(
273 "Missing first_seen field".to_string(),
274 )))
275 })?;
276
277 let last_seen = peers
278 .get_path_as::<String>(path!(pk_str, "last_seen"))
279 .await
280 .map_err(|_| {
281 Error::Sync(Box::new(SyncError::SerializationError(
282 "Missing last_seen field".to_string(),
283 )))
284 })?;
285
286 let status_str = peers
287 .get_path_as::<String>(path!(pk_str, "status"))
288 .await
289 .unwrap_or_else(|_| "active".to_string());
290 let status = match status_str.as_str() {
291 "active" => PeerStatus::Active,
292 "inactive" => PeerStatus::Inactive,
293 "blocked" => PeerStatus::Blocked,
294 _ => PeerStatus::Active, };
296
297 let connection_state_str = peers
299 .get_path_as::<String>(path!(pk_str, "connection_state"))
300 .await
301 .unwrap_or_else(|_| "disconnected".to_string());
302 let connection_state = match connection_state_str.as_str() {
303 "disconnected" => ConnectionState::Disconnected,
304 "connecting" => ConnectionState::Connecting,
305 "connected" => ConnectionState::Connected,
306 s if s.starts_with("failed:") => {
307 ConnectionState::Failed(s.strip_prefix("failed:").unwrap_or("").to_string())
308 }
309 _ => ConnectionState::Disconnected,
310 };
311
312 let last_successful_sync = peers
313 .get_path_as::<String>(path!(pk_str, "last_successful_sync"))
314 .await
315 .ok();
316
317 let connection_attempts = peers
318 .get_path_as::<i64>(path!(pk_str, "connection_attempts"))
319 .await
320 .map(|v| v as u32)
321 .unwrap_or(0);
322
323 let last_error = peers
324 .get_path_as::<String>(path!(pk_str, "last_error"))
325 .await
326 .ok();
327
328 let mut peer_info = PeerInfo {
329 id: PeerId::new(PublicKey::from_prefixed_string(&peer_pubkey)?),
330 display_name,
331 first_seen,
332 last_seen,
333 status,
334 addresses: Vec::new(),
335 connection_state,
336 last_successful_sync,
337 connection_attempts,
338 last_error,
339 };
340
341 if let Ok(addresses_json) = peers
343 .get_path_as::<String>(path!(pk_str, "addresses"))
344 .await
345 && let Ok(addresses) = serde_json::from_str(&addresses_json)
346 {
347 peer_info.addresses = addresses;
348 }
349
350 if peer_info.status != PeerStatus::Blocked {
352 Ok(Some(peer_info))
353 } else {
354 Ok(None)
355 }
356 }
357
358 pub(super) async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
363 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
364 let all_peers = peers.get_all().await?;
365 let mut peer_list = Vec::new();
366
367 for pubkey_str in all_peers.keys() {
369 if let Some(peer_info) = self.get_peer_info_str(pubkey_str).await? {
371 peer_list.push(peer_info);
372 }
373 }
374
375 Ok(peer_list)
376 }
377
378 pub(super) async fn remove_peer(&self, pubkey: &PublicKey) -> Result<()> {
388 let pk_str = pubkey.to_string();
389 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
390
391 if peers.contains_path_str(&pk_str).await {
393 peers
394 .set_path(path!(&pk_str, "status"), "blocked".to_string())
395 .await?;
396 }
397
398 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
400 let all_keys = trees.get_all().await?.keys().cloned().collect::<Vec<_>>();
401 for tree_id in all_keys {
402 let peer_list_path = path!(&tree_id, "peer_pubkeys");
403 if let Ok(peer_list_json) = trees.get_path_as::<String>(&peer_list_path).await
404 && let Ok(mut peer_pubkeys) = serde_json::from_str::<Vec<String>>(&peer_list_json)
405 {
406 let initial_len = peer_pubkeys.len();
407 peer_pubkeys.retain(|p| p.as_str() != pk_str);
408
409 if peer_pubkeys.len() != initial_len {
410 if peer_pubkeys.is_empty() {
412 trees.delete(&tree_id).await?;
413 } else {
414 let updated_json = serde_json::to_string(&peer_pubkeys).unwrap_or_default();
415 trees.set_path(&peer_list_path, updated_json).await?;
416 }
417 }
418 }
419 }
420
421 Ok(())
422 }
423
424 pub(super) async fn add_tree_sync(
435 &self,
436 peer_pubkey: &PublicKey,
437 tree_root_id: &ID,
438 ) -> Result<()> {
439 let pk_str = peer_pubkey.to_string();
440 let tree_root_str = tree_root_id.to_string();
441
442 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
444 if !peers.contains_path_str(&pk_str).await {
445 return Err(Error::Sync(Box::new(SyncError::PeerNotFound(pk_str))));
446 }
447
448 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
449
450 let peer_list_path = path!(&tree_root_str, "peer_pubkeys");
452 let peer_list_result = trees.get_path_as::<String>(&peer_list_path).await;
453 let mut peer_pubkeys: Vec<String> = peer_list_result
454 .ok()
455 .and_then(|json| serde_json::from_str(&json).ok())
456 .unwrap_or_else(Vec::new);
457
458 if !peer_pubkeys.contains(&pk_str) {
460 peer_pubkeys.push(pk_str.clone());
461
462 let peer_list_json = serde_json::to_string(&peer_pubkeys).unwrap_or_default();
464 trees.set_path(&peer_list_path, peer_list_json).await?;
465
466 trees
468 .set_path(path!(&tree_root_str, "tree_id"), tree_root_str.clone())
469 .await?;
470 } else {
471 debug!(peer = %pk_str, tree = %tree_root_str, "Peer already syncing with tree");
472 }
473
474 Ok(())
475 }
476
477 pub(super) async fn remove_tree_sync(
486 &self,
487 peer_pubkey: &PublicKey,
488 tree_root_id: &ID,
489 ) -> Result<()> {
490 let pk_str = peer_pubkey.to_string();
491 let tree_root_str = tree_root_id.to_string();
492 info!(peer = %pk_str, tree = %tree_root_str, "Removing tree sync relationship");
493 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
494
495 let peer_list_path = path!(&tree_root_str, "peer_pubkeys");
497 if let Ok(peer_list_json) = trees.get_path_as::<String>(&peer_list_path).await
498 && let Ok(mut peer_pubkeys) = serde_json::from_str::<Vec<String>>(&peer_list_json)
499 {
500 let initial_len = peer_pubkeys.len();
502 peer_pubkeys.retain(|p| p.as_str() != pk_str);
503
504 if peer_pubkeys.len() != initial_len {
505 if peer_pubkeys.is_empty() {
507 trees.delete(&tree_root_str).await?;
509 } else {
510 let updated_json = serde_json::to_string(&peer_pubkeys).unwrap_or_default();
512 trees.set_path(&peer_list_path, updated_json).await?;
513 }
514 }
515 }
516
517 Ok(())
518 }
519
520 pub(super) async fn get_peer_trees(&self, peer_pubkey: &PublicKey) -> Result<Vec<ID>> {
528 let pk_str = peer_pubkey.to_string();
529 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
530 let all_trees = trees.get_all().await?;
531 let mut synced_trees = Vec::new();
532
533 for tree_id_str in all_trees.keys() {
534 let peer_list_path = path!(tree_id_str, "peer_pubkeys");
535 if let Ok(peer_list_json) = trees.get_path_as::<String>(&peer_list_path).await
536 && let Ok(peer_pubkeys) = serde_json::from_str::<Vec<String>>(&peer_list_json)
537 && peer_pubkeys.contains(&pk_str)
538 && let Ok(id) = ID::parse(tree_id_str)
539 {
540 synced_trees.push(id);
541 }
542 }
543
544 Ok(synced_trees)
545 }
546
547 pub(super) async fn get_tree_peers(&self, tree_root_id: &ID) -> Result<Vec<PeerId>> {
555 let tree_root_str = tree_root_id.to_string();
556 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
557 let peer_list_path = path!(&tree_root_str, "peer_pubkeys");
558 let peer_list_result = trees.get_path_as::<String>(&peer_list_path).await;
559 let string_vec: Vec<String> = peer_list_result
560 .ok()
561 .and_then(|json| serde_json::from_str(&json).ok())
562 .unwrap_or_else(Vec::new);
563 Ok(string_vec
564 .into_iter()
565 .filter_map(|s| PublicKey::from_prefixed_string(&s).ok())
566 .map(PeerId::new)
567 .collect())
568 }
569
570 pub(super) async fn is_tree_synced_with_peer(
579 &self,
580 peer_pubkey: &PublicKey,
581 tree_root_id: &ID,
582 ) -> Result<bool> {
583 let pk_str = peer_pubkey.to_string();
584 let tree_root_str = tree_root_id.to_string();
585 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
586 let peer_list_path = path!(&tree_root_str, "peer_pubkeys");
587 match trees.get_path_as::<String>(&peer_list_path).await {
588 Ok(peer_list_json) => {
589 if let Ok(peer_pubkeys) = serde_json::from_str::<Vec<String>>(&peer_list_json) {
590 Ok(peer_pubkeys.contains(&pk_str))
591 } else {
592 Ok(false)
593 }
594 }
595 Err(_) => Ok(false),
596 }
597 }
598
599 pub(super) async fn add_address(
610 &self,
611 peer_pubkey: &PublicKey,
612 address: Address,
613 ) -> Result<()> {
614 let pk_str = peer_pubkey.to_string();
615 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
616
617 if !peers.contains_path_str(&pk_str).await {
619 return Err(Error::Sync(Box::new(SyncError::PeerNotFound(pk_str))));
620 }
621
622 let addresses_result = peers
624 .get_path_as::<String>(path!(&pk_str, "addresses"))
625 .await;
626 let mut all_addresses: Vec<Address> = addresses_result
627 .ok()
628 .and_then(|json| serde_json::from_str(&json).ok())
629 .unwrap_or_else(Vec::new);
630
631 if !all_addresses.contains(&address) {
633 all_addresses.push(address);
634
635 let addresses_json = serde_json::to_string(&all_addresses).unwrap_or_default();
637 peers
638 .set_path(path!(&pk_str, "addresses"), addresses_json)
639 .await?;
640
641 let now = self.txn.now_rfc3339()?;
643 peers.set_path(path!(&pk_str, "last_seen"), now).await?;
644 }
645
646 Ok(())
647 }
648
649 pub(super) async fn remove_address(
658 &self,
659 peer_pubkey: &PublicKey,
660 address: &Address,
661 ) -> Result<bool> {
662 let pk_str = peer_pubkey.to_string();
663 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
664
665 if !peers.contains_path_str(&pk_str).await {
667 return Err(Error::Sync(Box::new(SyncError::PeerNotFound(pk_str))));
668 }
669
670 let addresses_result = peers
672 .get_path_as::<String>(path!(&pk_str, "addresses"))
673 .await;
674 let mut all_addresses: Vec<Address> = addresses_result
675 .ok()
676 .and_then(|json| serde_json::from_str(&json).ok())
677 .unwrap_or_else(Vec::new);
678
679 let initial_len = all_addresses.len();
681 all_addresses.retain(|a| a != address);
682
683 if all_addresses.len() != initial_len {
684 let addresses_json = serde_json::to_string(&all_addresses).unwrap_or_default();
686 peers
687 .set_path(path!(&pk_str, "addresses"), addresses_json)
688 .await?;
689
690 let now = self.txn.now_rfc3339()?;
692 peers.set_path(path!(&pk_str, "last_seen"), now).await?;
693
694 Ok(true)
695 } else {
696 Ok(false)
697 }
698 }
699
700 pub(super) async fn get_addresses(
709 &self,
710 peer_pubkey: &PublicKey,
711 transport_type: Option<&str>,
712 ) -> Result<Vec<Address>> {
713 if let Some(peer_info) = self.get_peer_info(peer_pubkey).await? {
714 match transport_type {
715 Some(transport) => Ok(peer_info
716 .get_addresses(transport)
717 .into_iter()
718 .cloned()
719 .collect()),
720 None => Ok(peer_info.addresses),
721 }
722 } else {
723 Ok(Vec::new())
724 }
725 }
726}