Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/sync/
utils.rs

1//! Sync-specific utility functions.
2//!
3//! This module contains DAG traversal algorithms and other utilities
4//! specific to the synchronization protocol.
5
6use std::collections::{HashMap, HashSet, VecDeque};
7
8use crate::{
9    Result,
10    backend::BackendImpl,
11    entry::{Entry, ID},
12    sync::error::SyncError,
13};
14
15/// Collect all missing ancestors for given entry IDs using DAG traversal.
16///
17/// This function performs a breadth-first traversal of the DAG to find
18/// all entries that are referenced but not present locally. It's used
19/// when receiving tips from a peer to determine what entries need to be fetched.
20///
21/// # Algorithm Complexity
22/// * **Time**: O(V + E) where V is the number of visited entries and E is the number of parent relationships
23/// * **Space**: O(V) for the visited set and queue storage
24///
25/// # Arguments
26/// * `backend` - Database backend to check for entry existence
27/// * `entry_ids` - Starting entry IDs to traverse from
28///
29/// # Returns
30/// Vec of entry IDs that are missing from the local database
31///
32/// # Example
33/// ```rust,ignore
34/// use eidetica::sync::utils::collect_missing_ancestors;
35/// use eidetica::backend::database::InMemory;
36///
37/// let backend = InMemory::new();
38/// let missing_ids = vec![ID::from("tip1"), ID::from("tip2")];
39/// let missing = collect_missing_ancestors(&backend, &missing_ids).await?;
40/// // Returns IDs of entries that need to be fetched from peer
41/// ```
42pub async fn collect_missing_ancestors(
43    backend: &dyn BackendImpl,
44    entry_ids: &[ID],
45) -> Result<Vec<ID>> {
46    let mut missing = Vec::new();
47    let mut visited = HashSet::new();
48    let mut queue = VecDeque::new();
49
50    // Start with the given entry IDs
51    for id in entry_ids {
52        queue.push_back(id.clone());
53    }
54
55    while let Some(entry_id) = queue.pop_front() {
56        if visited.contains(&entry_id) {
57            continue;
58        }
59        visited.insert(entry_id.clone());
60
61        match backend.get(&entry_id).await {
62            Ok(entry) => {
63                // We have this entry, but we need to check its parents
64                if let Ok(parents) = entry.parents() {
65                    for parent_id in parents {
66                        if !visited.contains(&parent_id) {
67                            queue.push_back(parent_id);
68                        }
69                    }
70                }
71            }
72            Err(e) if e.is_not_found() => {
73                // We don't have this entry - mark as missing
74                missing.push(entry_id);
75                // Note: We can't traverse parents of missing entries
76                // The peer will need to send parent info separately
77            }
78            Err(e) => {
79                return Err(SyncError::BackendError(format!(
80                    "Failed to check for entry {entry_id}: {e}"
81                ))
82                .into());
83            }
84        }
85    }
86
87    Ok(missing)
88}
89
90/// Collect ancestors that need to be sent with the given entries.
91///
92/// This function performs DAG traversal to find all entries that need to be
93/// sent along with the given entry IDs, excluding entries that the peer
94/// already has (based on their tips). The algorithm ensures that all necessary
95/// parent entries are included for proper DAG reconstruction on the peer.
96///
97/// # Algorithm Complexity
98/// * **Time**: O(V + E) where V is the number of visited entries and E is the number of parent relationships
99/// * **Space**: O(V) for the visited set, queue storage, and result entries
100///
101/// # Arguments
102/// * `backend` - Database backend to retrieve entries
103/// * `entry_ids` - Starting entry IDs to collect ancestors for
104/// * `their_tips` - Entry IDs that the peer already has
105///
106/// # Returns
107/// Vec of entries that need to be sent (including ancestors)
108///
109/// # Example
110/// ```rust,ignore
111/// use eidetica::sync::utils::collect_ancestors_to_send;
112/// use eidetica::backend::database::InMemory;
113///
114/// let backend = InMemory::new();
115/// let our_tips = vec![ID::from("tip1"), ID::from("tip2")];
116/// let their_tips = vec![ID::from("common_ancestor")];
117/// let to_send = collect_ancestors_to_send(&backend, &our_tips, &their_tips).await?;
118/// // Returns entries that peer needs, excluding what they already have
119/// ```
120pub async fn collect_ancestors_to_send(
121    backend: &dyn BackendImpl,
122    entry_ids: &[ID],
123    their_tips: &[ID],
124) -> Result<Vec<Entry>> {
125    let mut entries_to_send = HashMap::new();
126    let mut visited = HashSet::new();
127    let mut queue = VecDeque::new();
128    let their_tips_set: HashSet<&ID> = their_tips.iter().collect();
129
130    // Start with the given entry IDs
131    for id in entry_ids {
132        queue.push_back(id.clone());
133    }
134
135    while let Some(entry_id) = queue.pop_front() {
136        if visited.contains(&entry_id) || their_tips_set.contains(&entry_id) {
137            continue; // Skip already visited or entries peer already has
138        }
139        visited.insert(entry_id.clone());
140
141        match backend.get(&entry_id).await {
142            Ok(entry) => {
143                entries_to_send.insert(entry_id.clone(), entry.clone());
144
145                // Add parents to queue if peer might not have them
146                if let Ok(parents) = entry.parents() {
147                    for parent_id in parents {
148                        if !their_tips_set.contains(&parent_id) && !visited.contains(&parent_id) {
149                            queue.push_back(parent_id);
150                        }
151                    }
152                }
153            }
154            Err(e) => {
155                return Err(SyncError::BackendError(format!(
156                    "Failed to get entry {entry_id} to send: {e}"
157                ))
158                .into());
159            }
160        }
161    }
162
163    // Return entries without height sorting for now
164    // Height-based ordering should be done by the sync_tree_with_peer method
165    // when it has the tree context needed for height calculation
166    let entries: Vec<Entry> = entries_to_send.into_values().collect();
167
168    Ok(entries)
169}
170
171#[cfg(test)]
172mod tests {
173    use std::sync::Arc;
174
175    use super::*;
176    use crate::{Entry, backend::database::InMemory};
177
178    fn create_test_backend() -> Arc<InMemory> {
179        Arc::new(InMemory::new())
180    }
181
182    #[tokio::test]
183    async fn test_collect_missing_ancestors_empty() {
184        let backend = create_test_backend();
185        let result = collect_missing_ancestors(backend.as_ref(), &[])
186            .await
187            .unwrap();
188        assert!(result.is_empty());
189    }
190
191    #[tokio::test]
192    async fn test_collect_missing_ancestors_not_found() {
193        let backend = create_test_backend();
194        let missing_id = ID::from("missing123");
195
196        let result = collect_missing_ancestors(backend.as_ref(), std::slice::from_ref(&missing_id))
197            .await
198            .unwrap();
199        assert_eq!(result, vec![missing_id]);
200    }
201
202    #[tokio::test]
203    async fn test_collect_missing_ancestors_present() {
204        let backend = create_test_backend();
205        let entry = Entry::root_builder()
206            .build()
207            .expect("Root entry should build successfully");
208        let entry_id = entry.id();
209
210        // Store the entry
211        backend.put_verified(entry).await.unwrap();
212
213        let result = collect_missing_ancestors(backend.as_ref(), &[entry_id])
214            .await
215            .unwrap();
216        assert!(result.is_empty()); // Entry exists, so nothing missing
217    }
218
219    #[tokio::test]
220    async fn test_collect_ancestors_to_send_empty() {
221        let backend = create_test_backend();
222        let result = collect_ancestors_to_send(backend.as_ref(), &[], &[])
223            .await
224            .unwrap();
225        assert!(result.is_empty());
226    }
227
228    #[tokio::test]
229    async fn test_collect_ancestors_to_send_single_entry() {
230        let backend = create_test_backend();
231        let entry = Entry::root_builder()
232            .build()
233            .expect("Root entry should build successfully");
234        let entry_id = entry.id();
235
236        // Store the entry
237        backend.put_verified(entry.clone()).await.unwrap();
238
239        let result = collect_ancestors_to_send(backend.as_ref(), &[entry_id], &[])
240            .await
241            .unwrap();
242        assert_eq!(result.len(), 1);
243        assert_eq!(result[0].id(), entry.id());
244    }
245
246    #[tokio::test]
247    async fn test_collect_ancestors_to_send_peer_already_has() {
248        let backend = create_test_backend();
249        let entry = Entry::root_builder()
250            .build()
251            .expect("Root entry should build successfully");
252        let entry_id = entry.id();
253
254        // Store the entry
255        backend.put_verified(entry).await.unwrap();
256
257        // Peer already has this entry
258        let result = collect_ancestors_to_send(
259            backend.as_ref(),
260            std::slice::from_ref(&entry_id),
261            std::slice::from_ref(&entry_id),
262        )
263        .await
264        .unwrap();
265        assert!(result.is_empty());
266    }
267}