1use tracing::{debug, info};
7
8use super::{
9 error::SyncError,
10 peer_types::{Address, ConnectionState, PeerInfo, PeerStatus},
11};
12use crate::{
13 Error, Result, Transaction, auth::crypto::PublicKey, crdt::doc::path, store::DocStore,
14 sync::PeerId,
15};
16
17pub(super) const PEERS_SUBTREE: &str = "peers"; pub(super) const TREES_SUBTREE: &str = "trees"; pub(super) struct PeerManager<'a> {
26 txn: &'a Transaction,
27}
28
29impl<'a> PeerManager<'a> {
30 pub(super) fn new(txn: &'a Transaction) -> Self {
32 Self { txn }
33 }
34
35 pub(super) async fn register_peer(
44 &self,
45 pubkey: &PublicKey,
46 display_name: Option<&str>,
47 ) -> Result<()> {
48 let pk_str = pubkey.to_string();
49 let now = self.txn.now_rfc3339()?;
50 let peer_info = PeerInfo::new_at(pubkey, display_name, now);
51 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
52
53 if peers.contains_path(path!(&pk_str)).await {
55 debug!(peer = %pk_str, "Peer already registered, skipping");
56 return Err(Error::Sync(SyncError::PeerAlreadyExists(pk_str)));
57 }
58
59 debug!(peer = %pk_str, display_name = ?display_name, "Registering new peer");
60
61 peers
63 .set_path(path!(&pk_str, "pubkey"), peer_info.id.to_string())
64 .await?;
65 if let Some(name) = &peer_info.display_name {
66 peers
67 .set_path(path!(&pk_str, "display_name"), name.clone())
68 .await?;
69 }
70 peers
71 .set_path(path!(&pk_str, "first_seen"), peer_info.first_seen.clone())
72 .await?;
73 peers
74 .set_path(path!(&pk_str, "last_seen"), peer_info.last_seen.clone())
75 .await?;
76 peers
77 .set_path(
78 path!(&pk_str, "status"),
79 match peer_info.status {
80 PeerStatus::Active => "active".to_string(),
81 PeerStatus::Inactive => "inactive".to_string(),
82 PeerStatus::Blocked => "blocked".to_string(),
83 },
84 )
85 .await?;
86
87 if !peer_info.addresses.is_empty() {
89 let addresses_json = serde_json::to_string(&peer_info.addresses).unwrap_or_default();
90 peers
91 .set_path(path!(&pk_str, "addresses"), addresses_json)
92 .await?;
93 }
94
95 info!(peer = %pk_str, display_name = ?display_name, "Successfully registered new peer");
96 Ok(())
97 }
98
99 pub(super) async fn update_peer_info(
108 &self,
109 pubkey: &PublicKey,
110 peer_info: PeerInfo,
111 ) -> Result<()> {
112 let pk_str = pubkey.to_string();
113 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
114
115 if !peers.contains_path_str(&pk_str).await {
117 return Err(Error::Sync(SyncError::PeerNotFound(pk_str)));
118 }
119
120 peers
122 .set_path(path!(&pk_str, "pubkey"), peer_info.id.to_string())
123 .await?;
124
125 if let Some(name) = &peer_info.display_name {
126 peers
127 .set_path(path!(&pk_str, "display_name"), name.clone())
128 .await?;
129 }
130
131 peers
132 .set_path(path!(&pk_str, "first_seen"), peer_info.first_seen.clone())
133 .await?;
134
135 peers
136 .set_path(path!(&pk_str, "last_seen"), peer_info.last_seen.clone())
137 .await?;
138
139 let status_str = match peer_info.status {
141 PeerStatus::Active => "active",
142 PeerStatus::Inactive => "inactive",
143 PeerStatus::Blocked => "blocked",
144 };
145 peers
146 .set_path(path!(&pk_str, "status"), status_str.to_string())
147 .await?;
148
149 let connection_state_str = match &peer_info.connection_state {
151 ConnectionState::Disconnected => "disconnected",
152 ConnectionState::Connecting => "connecting",
153 ConnectionState::Connected => "connected",
154 ConnectionState::Failed(msg) => &format!("failed:{msg}"),
155 };
156 peers
157 .set_path(
158 path!(&pk_str, "connection_state"),
159 connection_state_str.to_string(),
160 )
161 .await?;
162
163 if let Some(last_sync) = &peer_info.last_successful_sync {
165 peers
166 .set_path(path!(&pk_str, "last_successful_sync"), last_sync.clone())
167 .await?;
168 }
169
170 peers
171 .set_path(
172 path!(&pk_str, "connection_attempts"),
173 peer_info.connection_attempts as i64,
174 )
175 .await?;
176
177 if let Some(error) = &peer_info.last_error {
178 peers
179 .set_path(path!(&pk_str, "last_error"), error.clone())
180 .await?;
181 }
182
183 if !peer_info.addresses.is_empty() {
185 let addresses_json = serde_json::to_string(&peer_info.addresses).unwrap_or_default();
186 peers
187 .set_path(path!(&pk_str, "addresses"), addresses_json)
188 .await?;
189 }
190
191 debug!(peer = %pk_str, "Successfully updated peer information");
192 Ok(())
193 }
194
195 pub(super) async fn update_peer_status(
204 &self,
205 pubkey: &PublicKey,
206 status: PeerStatus,
207 ) -> Result<()> {
208 let pk_str = pubkey.to_string();
209 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
210
211 if !peers.contains_path_str(&pk_str).await {
213 return Err(Error::Sync(SyncError::PeerNotFound(pk_str)));
214 }
215
216 let status_str = match status {
218 PeerStatus::Active => "active",
219 PeerStatus::Inactive => "inactive",
220 PeerStatus::Blocked => "blocked",
221 };
222 peers
223 .set_path(path!(&pk_str, "status"), status_str.to_string())
224 .await?;
225
226 let now = self.txn.now_rfc3339()?;
228 peers.set_path(path!(&pk_str, "last_seen"), now).await?;
229
230 Ok(())
231 }
232
233 pub(super) async fn get_peer_info(&self, pubkey: &PublicKey) -> Result<Option<PeerInfo>> {
241 self.get_peer_info_str(&pubkey.to_string()).await
242 }
243
244 async fn get_peer_info_str(&self, pk_str: &str) -> Result<Option<PeerInfo>> {
246 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
247
248 if !peers.contains_path_str(pk_str).await {
250 return Ok(None);
251 }
252
253 let peer_pubkey = peers
255 .get_path_as::<String>(path!(pk_str, "pubkey"))
256 .await
257 .map_err(|_| {
258 Error::Sync(SyncError::SerializationError(
259 "Missing pubkey field".to_string(),
260 ))
261 })?;
262
263 let display_name = peers
264 .get_path_as::<String>(path!(pk_str, "display_name"))
265 .await
266 .ok();
267
268 let first_seen = peers
269 .get_path_as::<String>(path!(pk_str, "first_seen"))
270 .await
271 .map_err(|_| {
272 Error::Sync(SyncError::SerializationError(
273 "Missing first_seen field".to_string(),
274 ))
275 })?;
276
277 let last_seen = peers
278 .get_path_as::<String>(path!(pk_str, "last_seen"))
279 .await
280 .map_err(|_| {
281 Error::Sync(SyncError::SerializationError(
282 "Missing last_seen field".to_string(),
283 ))
284 })?;
285
286 let status_str = peers
287 .get_path_as::<String>(path!(pk_str, "status"))
288 .await
289 .unwrap_or_else(|_| "active".to_string());
290 let status = match status_str.as_str() {
291 "active" => PeerStatus::Active,
292 "inactive" => PeerStatus::Inactive,
293 "blocked" => PeerStatus::Blocked,
294 _ => PeerStatus::Active, };
296
297 let connection_state_str = peers
299 .get_path_as::<String>(path!(pk_str, "connection_state"))
300 .await
301 .unwrap_or_else(|_| "disconnected".to_string());
302 let connection_state = match connection_state_str.as_str() {
303 "disconnected" => ConnectionState::Disconnected,
304 "connecting" => ConnectionState::Connecting,
305 "connected" => ConnectionState::Connected,
306 s if s.starts_with("failed:") => {
307 ConnectionState::Failed(s.strip_prefix("failed:").unwrap_or("").to_string())
308 }
309 _ => ConnectionState::Disconnected,
310 };
311
312 let last_successful_sync = peers
313 .get_path_as::<String>(path!(pk_str, "last_successful_sync"))
314 .await
315 .ok();
316
317 let connection_attempts = peers
318 .get_path_as::<i64>(path!(pk_str, "connection_attempts"))
319 .await
320 .map(|v| v as u32)
321 .unwrap_or(0);
322
323 let last_error = peers
324 .get_path_as::<String>(path!(pk_str, "last_error"))
325 .await
326 .ok();
327
328 let mut peer_info = PeerInfo {
329 id: PeerId::new(PublicKey::from_prefixed_string(&peer_pubkey)?),
330 display_name,
331 first_seen,
332 last_seen,
333 status,
334 addresses: Vec::new(),
335 connection_state,
336 last_successful_sync,
337 connection_attempts,
338 last_error,
339 };
340
341 if let Ok(addresses_json) = peers
343 .get_path_as::<String>(path!(pk_str, "addresses"))
344 .await
345 && let Ok(addresses) = serde_json::from_str(&addresses_json)
346 {
347 peer_info.addresses = addresses;
348 }
349
350 if peer_info.status != PeerStatus::Blocked {
352 Ok(Some(peer_info))
353 } else {
354 Ok(None)
355 }
356 }
357
358 pub(super) async fn list_peers(&self) -> Result<Vec<PeerInfo>> {
363 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
364 let all_peers = peers.get_all().await?;
365 let mut peer_list = Vec::new();
366
367 for pubkey_str in all_peers.keys() {
369 if let Some(peer_info) = self.get_peer_info_str(pubkey_str).await? {
371 peer_list.push(peer_info);
372 }
373 }
374
375 Ok(peer_list)
376 }
377
378 pub(super) async fn remove_peer(&self, pubkey: &PublicKey) -> Result<()> {
388 let pk_str = pubkey.to_string();
389 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
390
391 if peers.contains_path_str(&pk_str).await {
393 peers
394 .set_path(path!(&pk_str, "status"), "blocked".to_string())
395 .await?;
396 }
397
398 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
400 let all_keys = trees.get_all().await?.keys().cloned().collect::<Vec<_>>();
401 for tree_id in all_keys {
402 let peer_list_path = path!(&tree_id, "peer_pubkeys");
403 if let Ok(peer_list_json) = trees.get_path_as::<String>(&peer_list_path).await
404 && let Ok(mut peer_pubkeys) = serde_json::from_str::<Vec<String>>(&peer_list_json)
405 {
406 let initial_len = peer_pubkeys.len();
407 peer_pubkeys.retain(|p| p.as_str() != pk_str);
408
409 if peer_pubkeys.len() != initial_len {
410 if peer_pubkeys.is_empty() {
412 trees.delete(&tree_id).await?;
413 } else {
414 let updated_json = serde_json::to_string(&peer_pubkeys).unwrap_or_default();
415 trees.set_path(&peer_list_path, updated_json).await?;
416 }
417 }
418 }
419 }
420
421 Ok(())
422 }
423
424 pub(super) async fn add_tree_sync(
435 &self,
436 peer_pubkey: &PublicKey,
437 tree_root_id: impl AsRef<str>,
438 ) -> Result<()> {
439 let pk_str = peer_pubkey.to_string();
440
441 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
443 if !peers.contains_path_str(&pk_str).await {
444 return Err(Error::Sync(SyncError::PeerNotFound(pk_str)));
445 }
446
447 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
448
449 let peer_list_path = path!(tree_root_id.as_ref(), "peer_pubkeys");
451 let peer_list_result = trees.get_path_as::<String>(&peer_list_path).await;
452 let mut peer_pubkeys: Vec<String> = peer_list_result
453 .ok()
454 .and_then(|json| serde_json::from_str(&json).ok())
455 .unwrap_or_else(Vec::new);
456
457 if !peer_pubkeys.contains(&pk_str) {
459 peer_pubkeys.push(pk_str.clone());
460
461 let peer_list_json = serde_json::to_string(&peer_pubkeys).unwrap_or_default();
463 trees.set_path(&peer_list_path, peer_list_json).await?;
464
465 trees
467 .set_path(
468 path!(tree_root_id.as_ref(), "tree_id"),
469 tree_root_id.as_ref().to_string(),
470 )
471 .await?;
472 } else {
473 debug!(peer = %pk_str, tree = %tree_root_id.as_ref(), "Peer already syncing with tree");
474 }
475
476 Ok(())
477 }
478
479 pub(super) async fn remove_tree_sync(
488 &self,
489 peer_pubkey: &PublicKey,
490 tree_root_id: impl AsRef<str>,
491 ) -> Result<()> {
492 let pk_str = peer_pubkey.to_string();
493 info!(peer = %pk_str, tree = %tree_root_id.as_ref(), "Removing tree sync relationship");
494 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
495
496 let peer_list_path = path!(tree_root_id.as_ref(), "peer_pubkeys");
498 if let Ok(peer_list_json) = trees.get_path_as::<String>(&peer_list_path).await
499 && let Ok(mut peer_pubkeys) = serde_json::from_str::<Vec<String>>(&peer_list_json)
500 {
501 let initial_len = peer_pubkeys.len();
503 peer_pubkeys.retain(|p| p.as_str() != pk_str);
504
505 if peer_pubkeys.len() != initial_len {
506 if peer_pubkeys.is_empty() {
508 trees.delete(tree_root_id.as_ref()).await?;
510 } else {
511 let updated_json = serde_json::to_string(&peer_pubkeys).unwrap_or_default();
513 trees.set_path(&peer_list_path, updated_json).await?;
514 }
515 }
516 }
517
518 Ok(())
519 }
520
521 pub(super) async fn get_peer_trees(&self, peer_pubkey: &PublicKey) -> Result<Vec<String>> {
529 let pk_str = peer_pubkey.to_string();
530 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
531 let all_trees = trees.get_all().await?;
532 let mut synced_trees = Vec::new();
533
534 for tree_id in all_trees.keys() {
535 let peer_list_path = path!(tree_id, "peer_pubkeys");
536 if let Ok(peer_list_json) = trees.get_path_as::<String>(&peer_list_path).await
537 && let Ok(peer_pubkeys) = serde_json::from_str::<Vec<String>>(&peer_list_json)
538 && peer_pubkeys.contains(&pk_str)
539 {
540 synced_trees.push(tree_id.clone());
541 }
542 }
543
544 Ok(synced_trees)
545 }
546
547 pub(super) async fn get_tree_peers(
555 &self,
556 tree_root_id: impl AsRef<str>,
557 ) -> Result<Vec<PeerId>> {
558 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
559 let peer_list_path = path!(tree_root_id.as_ref(), "peer_pubkeys");
560 let peer_list_result = trees.get_path_as::<String>(&peer_list_path).await;
561 let string_vec: Vec<String> = peer_list_result
562 .ok()
563 .and_then(|json| serde_json::from_str(&json).ok())
564 .unwrap_or_else(Vec::new);
565 Ok(string_vec
566 .into_iter()
567 .filter_map(|s| PublicKey::from_prefixed_string(&s).ok())
568 .map(PeerId::new)
569 .collect())
570 }
571
572 pub(super) async fn is_tree_synced_with_peer(
581 &self,
582 peer_pubkey: &PublicKey,
583 tree_root_id: impl AsRef<str>,
584 ) -> Result<bool> {
585 let pk_str = peer_pubkey.to_string();
586 let trees = self.txn.get_store::<DocStore>(TREES_SUBTREE).await?;
587 let peer_list_path = path!(tree_root_id.as_ref(), "peer_pubkeys");
588 match trees.get_path_as::<String>(&peer_list_path).await {
589 Ok(peer_list_json) => {
590 if let Ok(peer_pubkeys) = serde_json::from_str::<Vec<String>>(&peer_list_json) {
591 Ok(peer_pubkeys.contains(&pk_str))
592 } else {
593 Ok(false)
594 }
595 }
596 Err(_) => Ok(false),
597 }
598 }
599
600 pub(super) async fn add_address(
611 &self,
612 peer_pubkey: &PublicKey,
613 address: Address,
614 ) -> Result<()> {
615 let pk_str = peer_pubkey.to_string();
616 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
617
618 if !peers.contains_path_str(&pk_str).await {
620 return Err(Error::Sync(SyncError::PeerNotFound(pk_str)));
621 }
622
623 let addresses_result = peers
625 .get_path_as::<String>(path!(&pk_str, "addresses"))
626 .await;
627 let mut all_addresses: Vec<Address> = addresses_result
628 .ok()
629 .and_then(|json| serde_json::from_str(&json).ok())
630 .unwrap_or_else(Vec::new);
631
632 if !all_addresses.contains(&address) {
634 all_addresses.push(address);
635
636 let addresses_json = serde_json::to_string(&all_addresses).unwrap_or_default();
638 peers
639 .set_path(path!(&pk_str, "addresses"), addresses_json)
640 .await?;
641
642 let now = self.txn.now_rfc3339()?;
644 peers.set_path(path!(&pk_str, "last_seen"), now).await?;
645 }
646
647 Ok(())
648 }
649
650 pub(super) async fn remove_address(
659 &self,
660 peer_pubkey: &PublicKey,
661 address: &Address,
662 ) -> Result<bool> {
663 let pk_str = peer_pubkey.to_string();
664 let peers = self.txn.get_store::<DocStore>(PEERS_SUBTREE).await?;
665
666 if !peers.contains_path_str(&pk_str).await {
668 return Err(Error::Sync(SyncError::PeerNotFound(pk_str)));
669 }
670
671 let addresses_result = peers
673 .get_path_as::<String>(path!(&pk_str, "addresses"))
674 .await;
675 let mut all_addresses: Vec<Address> = addresses_result
676 .ok()
677 .and_then(|json| serde_json::from_str(&json).ok())
678 .unwrap_or_else(Vec::new);
679
680 let initial_len = all_addresses.len();
682 all_addresses.retain(|a| a != address);
683
684 if all_addresses.len() != initial_len {
685 let addresses_json = serde_json::to_string(&all_addresses).unwrap_or_default();
687 peers
688 .set_path(path!(&pk_str, "addresses"), addresses_json)
689 .await?;
690
691 let now = self.txn.now_rfc3339()?;
693 peers.set_path(path!(&pk_str, "last_seen"), now).await?;
694
695 Ok(true)
696 } else {
697 Ok(false)
698 }
699 }
700
701 pub(super) async fn get_addresses(
710 &self,
711 peer_pubkey: &PublicKey,
712 transport_type: Option<&str>,
713 ) -> Result<Vec<Address>> {
714 if let Some(peer_info) = self.get_peer_info(peer_pubkey).await? {
715 match transport_type {
716 Some(transport) => Ok(peer_info
717 .get_addresses(transport)
718 .into_iter()
719 .cloned()
720 .collect()),
721 None => Ok(peer_info.addresses),
722 }
723 } else {
724 Ok(Vec::new())
725 }
726 }
727}