eidetica/sync/utils.rs
1//! Sync-specific utility functions.
2//!
3//! This module contains DAG traversal algorithms and other utilities
4//! specific to the synchronization protocol.
5
6use std::collections::{HashMap, HashSet, VecDeque};
7
8use crate::{
9 Result,
10 backend::BackendImpl,
11 entry::{Entry, ID},
12 sync::error::SyncError,
13};
14
15/// Collect all missing ancestors for given entry IDs using DAG traversal.
16///
17/// This function performs a breadth-first traversal of the DAG to find
18/// all entries that are referenced but not present locally. It's used
19/// when receiving tips from a peer to determine what entries need to be fetched.
20///
21/// # Algorithm Complexity
22/// * **Time**: O(V + E) where V is the number of visited entries and E is the number of parent relationships
23/// * **Space**: O(V) for the visited set and queue storage
24///
25/// # Arguments
26/// * `backend` - Database backend to check for entry existence
27/// * `entry_ids` - Starting entry IDs to traverse from
28///
29/// # Returns
30/// Vec of entry IDs that are missing from the local database
31///
32/// # Example
33/// ```rust,ignore
34/// use eidetica::sync::utils::collect_missing_ancestors;
35/// use eidetica::backend::database::InMemory;
36///
37/// let backend = InMemory::new();
38/// let missing_ids = vec![ID::from("tip1"), ID::from("tip2")];
39/// let missing = collect_missing_ancestors(&backend, &missing_ids).await?;
40/// // Returns IDs of entries that need to be fetched from peer
41/// ```
42pub async fn collect_missing_ancestors(
43 backend: &dyn BackendImpl,
44 entry_ids: &[ID],
45) -> Result<Vec<ID>> {
46 let mut missing = Vec::new();
47 let mut visited = HashSet::new();
48 let mut queue = VecDeque::new();
49
50 // Start with the given entry IDs
51 for id in entry_ids {
52 queue.push_back(id.clone());
53 }
54
55 while let Some(entry_id) = queue.pop_front() {
56 if visited.contains(&entry_id) {
57 continue;
58 }
59 visited.insert(entry_id.clone());
60
61 match backend.get(&entry_id).await {
62 Ok(entry) => {
63 // We have this entry, but we need to check its parents
64 if let Ok(parents) = entry.parents() {
65 for parent_id in parents {
66 if !visited.contains(&parent_id) {
67 queue.push_back(parent_id);
68 }
69 }
70 }
71 }
72 Err(e) if e.is_not_found() => {
73 // We don't have this entry - mark as missing
74 missing.push(entry_id);
75 // Note: We can't traverse parents of missing entries
76 // The peer will need to send parent info separately
77 }
78 Err(e) => {
79 return Err(SyncError::BackendError(format!(
80 "Failed to check for entry {entry_id}: {e}"
81 ))
82 .into());
83 }
84 }
85 }
86
87 Ok(missing)
88}
89
90/// Collect ancestors that need to be sent with the given entries.
91///
92/// This function performs DAG traversal to find all entries that need to be
93/// sent along with the given entry IDs, excluding entries that the peer
94/// already has (based on their tips). The algorithm ensures that all necessary
95/// parent entries are included for proper DAG reconstruction on the peer.
96///
97/// # Algorithm Complexity
98/// * **Time**: O(V + E) where V is the number of visited entries and E is the number of parent relationships
99/// * **Space**: O(V) for the visited set, queue storage, and result entries
100///
101/// # Arguments
102/// * `backend` - Database backend to retrieve entries
103/// * `entry_ids` - Starting entry IDs to collect ancestors for
104/// * `their_tips` - Entry IDs that the peer already has
105///
106/// # Returns
107/// Vec of entries that need to be sent (including ancestors)
108///
109/// # Example
110/// ```rust,ignore
111/// use eidetica::sync::utils::collect_ancestors_to_send;
112/// use eidetica::backend::database::InMemory;
113///
114/// let backend = InMemory::new();
115/// let our_tips = vec![ID::from("tip1"), ID::from("tip2")];
116/// let their_tips = vec![ID::from("common_ancestor")];
117/// let to_send = collect_ancestors_to_send(&backend, &our_tips, &their_tips).await?;
118/// // Returns entries that peer needs, excluding what they already have
119/// ```
120pub async fn collect_ancestors_to_send(
121 backend: &dyn BackendImpl,
122 entry_ids: &[ID],
123 their_tips: &[ID],
124) -> Result<Vec<Entry>> {
125 let mut entries_to_send = HashMap::new();
126 let mut visited = HashSet::new();
127 let mut queue = VecDeque::new();
128 let their_tips_set: HashSet<&ID> = their_tips.iter().collect();
129
130 // Start with the given entry IDs
131 for id in entry_ids {
132 queue.push_back(id.clone());
133 }
134
135 while let Some(entry_id) = queue.pop_front() {
136 if visited.contains(&entry_id) || their_tips_set.contains(&entry_id) {
137 continue; // Skip already visited or entries peer already has
138 }
139 visited.insert(entry_id.clone());
140
141 match backend.get(&entry_id).await {
142 Ok(entry) => {
143 entries_to_send.insert(entry_id.clone(), entry.clone());
144
145 // Add parents to queue if peer might not have them
146 if let Ok(parents) = entry.parents() {
147 for parent_id in parents {
148 if !their_tips_set.contains(&parent_id) && !visited.contains(&parent_id) {
149 queue.push_back(parent_id);
150 }
151 }
152 }
153 }
154 Err(e) => {
155 return Err(SyncError::BackendError(format!(
156 "Failed to get entry {entry_id} to send: {e}"
157 ))
158 .into());
159 }
160 }
161 }
162
163 // Return entries without height sorting for now
164 // Height-based ordering should be done by the sync_tree_with_peer method
165 // when it has the tree context needed for height calculation
166 let entries: Vec<Entry> = entries_to_send.into_values().collect();
167
168 Ok(entries)
169}
170
171#[cfg(test)]
172mod tests {
173 use std::sync::Arc;
174
175 use super::*;
176 use crate::{Entry, backend::database::InMemory};
177
178 fn create_test_backend() -> Arc<InMemory> {
179 Arc::new(InMemory::new())
180 }
181
182 #[tokio::test]
183 async fn test_collect_missing_ancestors_empty() {
184 let backend = create_test_backend();
185 let result = collect_missing_ancestors(backend.as_ref(), &[])
186 .await
187 .unwrap();
188 assert!(result.is_empty());
189 }
190
191 #[tokio::test]
192 async fn test_collect_missing_ancestors_not_found() {
193 let backend = create_test_backend();
194 let missing_id = ID::from("missing123");
195
196 let result = collect_missing_ancestors(backend.as_ref(), std::slice::from_ref(&missing_id))
197 .await
198 .unwrap();
199 assert_eq!(result, vec![missing_id]);
200 }
201
202 #[tokio::test]
203 async fn test_collect_missing_ancestors_present() {
204 let backend = create_test_backend();
205 let entry = Entry::root_builder()
206 .build()
207 .expect("Root entry should build successfully");
208 let entry_id = entry.id();
209
210 // Store the entry
211 backend.put_verified(entry).await.unwrap();
212
213 let result = collect_missing_ancestors(backend.as_ref(), &[entry_id])
214 .await
215 .unwrap();
216 assert!(result.is_empty()); // Entry exists, so nothing missing
217 }
218
219 #[tokio::test]
220 async fn test_collect_ancestors_to_send_empty() {
221 let backend = create_test_backend();
222 let result = collect_ancestors_to_send(backend.as_ref(), &[], &[])
223 .await
224 .unwrap();
225 assert!(result.is_empty());
226 }
227
228 #[tokio::test]
229 async fn test_collect_ancestors_to_send_single_entry() {
230 let backend = create_test_backend();
231 let entry = Entry::root_builder()
232 .build()
233 .expect("Root entry should build successfully");
234 let entry_id = entry.id();
235
236 // Store the entry
237 backend.put_verified(entry.clone()).await.unwrap();
238
239 let result = collect_ancestors_to_send(backend.as_ref(), &[entry_id], &[])
240 .await
241 .unwrap();
242 assert_eq!(result.len(), 1);
243 assert_eq!(result[0].id(), entry.id());
244 }
245
246 #[tokio::test]
247 async fn test_collect_ancestors_to_send_peer_already_has() {
248 let backend = create_test_backend();
249 let entry = Entry::root_builder()
250 .build()
251 .expect("Root entry should build successfully");
252 let entry_id = entry.id();
253
254 // Store the entry
255 backend.put_verified(entry).await.unwrap();
256
257 // Peer already has this entry
258 let result = collect_ancestors_to_send(
259 backend.as_ref(),
260 std::slice::from_ref(&entry_id),
261 std::slice::from_ref(&entry_id),
262 )
263 .await
264 .unwrap();
265 assert!(result.is_empty());
266 }
267}