Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/sync/
mod.rs

1//! Synchronization module for Eidetica database.
2//!
3//! # Quick Start
4//!
5//! ```rust,ignore
6//! // Enable sync
7//! instance.enable_sync().await?;
8//! let sync = instance.sync().unwrap();
9//!
10//! // Register transports with their configurations
11//! sync.register_transport("http", HttpTransport::builder()
12//!     .bind("127.0.0.1:8080")
13//! ).await?;
14//! sync.register_transport("p2p", IrohTransport::builder()).await?;
15//!
16//! // Start accepting incoming connections
17//! sync.accept_connections().await?;
18//!
19//! // Outbound sync works via the registered transports
20//! sync.sync_with_peer(&Address::http("peer:8080"), Some(&tree_id)).await?;
21//! ```
22//!
23//! # Architecture
24//!
25//! The sync system uses a Background Sync architecture with command-pattern communication:
26//!
27//! - **[`Sync`]**: Thread-safe frontend using `Arc<Sync>` with interior mutability
28//!   (`OnceLock`). Provides the public API and sends commands to the background.
29//! - **[`background::BackgroundSync`]**: Single background thread handling all sync operations
30//!   via a command loop. Owns the transport and retry queue.
31//! - **Write Callbacks**: Automatically trigger sync when entries are committed via
32//!   database write callbacks.
33//!
34//! # Connection Model
35//!
36//! The sync system separates **outbound** and **inbound** connection handling:
37//!
38//! - **Outbound** ([`Sync::sync_with_peer`]): Works after registering transports via
39//!   [`Sync::register_transport`]. Each transport can be configured via its builder.
40//! - **Inbound** ([`Sync::accept_connections`]): Must be explicitly called each time
41//!   the instance starts. Starts servers on all registered transports.
42//!
43//! This design provides security by default. Nodes don't accept incoming connections
44//! unless explicitly opted in.
45//!
46//! # Bootstrap Protocol
47//!
48//! The sync protocol detects whether a client needs bootstrap or incremental sync:
49//!
50//! - **Empty tips** → Full bootstrap (complete database transfer)
51//! - **Has tips** → Incremental sync (only missing entries)
52//!
53//! Use [`Sync::sync_with_peer`] which handles both cases automatically.
54//!
55//! # Duplicate Prevention
56//!
57//! Uses Merkle-DAG tip comparison instead of tracking individual sent entries:
58//!
59//! 1. Exchange tips with peer
60//! 2. Compare DAGs to find missing entries
61//! 3. Send only what peer doesn't have
62//! 4. Receive only what we're missing
63//!
64//! This approach requires no extra storage apart from tracking relevant Merkle-DAG tips.
65//!
66//! # Transport Layer
67//!
68//! Two transport implementations are available:
69//!
70//! - **HTTP** ([`transports::http::HttpTransport`]): REST API at `/api/v0`, JSON serialization
71//! - **Iroh P2P** ([`transports::iroh::IrohTransport`]): QUIC-based with NAT traversal
72//!
73//! Transports are registered via [`Sync::register_transport`] with their builders.
74//! State (like Iroh node identity) is automatically persisted per named instance.
75//! Both implement the [`transports::SyncTransport`] trait.
76//!
77//! # Peer and State Management
78//!
79//! Peers and sync relationships are stored in a dedicated sync database (`_sync`):
80//!
81//! - `peer_manager::PeerManager`: Handles peer registration and relationships
82//! - [`state::SyncStateManager`]: Tracks sync cursors, metadata, and history
83//!
84//! # Connection Behavior
85//!
86//! - **Lazy connections**: Established on-demand, not at peer registration
87//! - **Periodic sync**: Configurable interval (default 5 minutes)
88//! - **Retry queue**: Failed sends retried with exponential backoff
89
90use std::sync::{Arc, OnceLock};
91use std::time::SystemTime;
92use tokio::sync::mpsc;
93
94use crate::{
95    Database, Instance, Result, WeakInstance,
96    auth::{Permission, crypto::PublicKey},
97    crdt::{Doc, doc::Value},
98    entry::ID,
99    instance::backend::Backend,
100    store::{DocStore, Registry},
101};
102
103// Public submodules
104pub mod background;
105pub mod error;
106pub mod handler;
107mod handler_tree_ops;
108pub mod peer_manager;
109pub mod peer_types;
110pub mod protocol;
111pub mod state;
112pub mod ticket;
113pub mod transports;
114pub mod utils;
115
116// Private submodules
117mod bootstrap;
118mod bootstrap_request_manager;
119mod ops;
120mod peer;
121mod queue;
122mod transport;
123mod transport_manager;
124mod user;
125mod user_sync_manager;
126
127// Re-exports
128use background::SyncCommand;
129pub use bootstrap_request_manager::{BootstrapRequest, RequestStatus};
130pub use error::SyncError;
131pub use peer_types::{Address, ConnectionState, PeerId, PeerInfo, PeerStatus};
132use queue::SyncQueue;
133pub use ticket::DatabaseTicket;
134use transports::TransportConfig;
135
136/// Private constant for the sync settings subtree name
137const SETTINGS_SUBTREE: &str = "settings_map";
138
139/// Private constant for the transports registry subtree name
140const TRANSPORTS_SUBTREE: &str = "transports";
141
142/// Private constant for the transport state store name (persisted identity/state per transport instance)
143const TRANSPORT_STATE_STORE: &str = "transport_state";
144
145/// Authentication parameters for sync operations.
146#[derive(Debug, Clone)]
147pub struct AuthParams {
148    /// The public key making the request
149    pub requesting_key: PublicKey,
150    /// The name/ID of the requesting key
151    pub requesting_key_name: String,
152    /// The permission level being requested
153    pub requested_permission: Permission,
154}
155
156/// Information needed to register a peer for syncing.
157///
158/// This is used with [`Sync::register_sync_peer()`] to declare sync intent.
159#[derive(Debug, Clone)]
160pub struct SyncPeerInfo {
161    /// The peer's public key
162    pub peer_pubkey: PublicKey,
163    /// The tree/database to sync
164    pub tree_id: ID,
165    /// Initial address hints where the peer might be found
166    pub addresses: Vec<Address>,
167    /// Optional authentication parameters for bootstrap
168    pub auth: Option<AuthParams>,
169    /// Optional display name for the peer
170    pub display_name: Option<String>,
171}
172
173/// Handle for tracking sync status with a specific peer.
174///
175/// Returned by [`Sync::register_sync_peer()`].
176#[derive(Debug, Clone)]
177pub struct SyncHandle {
178    tree_id: ID,
179    peer_pubkey: PublicKey,
180    sync: Sync,
181}
182
183impl SyncHandle {
184    /// Get the current sync status.
185    pub async fn status(&self) -> Result<SyncStatus> {
186        self.sync
187            .get_sync_status(&self.tree_id, &self.peer_pubkey)
188            .await
189    }
190
191    /// Add another address hint for this peer.
192    pub async fn add_address(&self, address: Address) -> Result<()> {
193        self.sync.add_peer_address(&self.peer_pubkey, address).await
194    }
195
196    /// Block until initial sync completes (has local data).
197    ///
198    /// This is a convenience method for backwards compatibility.
199    /// The sync happens in the background, this just polls until data arrives.
200    pub async fn wait_for_initial_sync(&self) -> Result<()> {
201        loop {
202            let status = self.status().await?;
203            if status.has_local_data {
204                return Ok(());
205            }
206            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
207        }
208    }
209
210    /// Get the tree ID being synced.
211    pub fn tree_id(&self) -> &ID {
212        &self.tree_id
213    }
214
215    /// Get the peer public key.
216    pub fn peer_pubkey(&self) -> &PublicKey {
217        &self.peer_pubkey
218    }
219}
220
221/// Current sync status for a tree/peer pair.
222#[derive(Debug, Clone)]
223pub struct SyncStatus {
224    /// Whether we have local data for this tree
225    pub has_local_data: bool,
226    /// Last time sync succeeded (if ever)
227    pub last_sync: Option<SystemTime>,
228    /// Last error encountered (if any)
229    pub last_error: Option<String>,
230}
231
232/// Synchronization manager for the database.
233///
234/// The Sync module is a thin frontend that communicates with a background
235/// sync engine thread via command channels. All actual sync operations, transport
236/// communication, and state management happen in the background thread.
237///
238/// ## Multi-Transport Support
239///
240/// Multiple transports can be enabled simultaneously (e.g., HTTP + Iroh P2P),
241/// allowing peers to be reachable via different networks. Requests are automatically
242/// routed to the appropriate transport based on address type.
243///
244/// ```rust,ignore
245/// // Enable both HTTP and Iroh transports
246/// sync.enable_http_transport().await?;
247/// sync.enable_iroh_transport().await?;
248///
249/// // Start servers on all transports
250/// sync.start_server("127.0.0.1:0").await?;
251///
252/// // Get all server addresses
253/// let addresses = sync.get_all_server_addresses().await?;
254/// ```
255#[derive(Debug)]
256pub struct Sync {
257    /// Communication channel to the background sync engine.
258    /// Initialized when the first transport is enabled via `enable_*_transport()` or `add_transport()`.
259    background_tx: OnceLock<mpsc::Sender<SyncCommand>>,
260    /// The instance for read operations and tree management
261    instance: WeakInstance,
262    /// The tree containing synchronization settings
263    sync_tree: Database,
264    /// Queue for entries pending synchronization
265    queue: Arc<SyncQueue>,
266}
267
268impl Clone for Sync {
269    fn clone(&self) -> Self {
270        let background_tx = OnceLock::new();
271        if let Some(tx) = self.background_tx.get() {
272            let _ = background_tx.set(tx.clone());
273        }
274        Self {
275            background_tx,
276            instance: self.instance.clone(),
277            sync_tree: self.sync_tree.clone(),
278            queue: Arc::clone(&self.queue),
279        }
280    }
281}
282
283impl Sync {
284    /// Create a new Sync instance with a dedicated settings tree.
285    ///
286    /// # Arguments
287    /// * `instance` - The database instance for tree operations
288    ///
289    /// # Returns
290    /// A new Sync instance with its own settings tree.
291    pub async fn new(instance: Instance) -> Result<Self> {
292        // Get device key from instance
293        let signing_key = instance.signing_key()?.clone();
294
295        let mut sync_settings = Doc::new();
296        sync_settings.set("name", "_sync");
297        sync_settings.set("type", "sync_settings");
298
299        let sync_tree = Database::create(&instance, signing_key, sync_settings).await?;
300
301        let sync = Self {
302            background_tx: OnceLock::new(),
303            instance: instance.downgrade(),
304            sync_tree,
305            queue: Arc::new(SyncQueue::new()),
306        };
307
308        // Initialize combined settings for all tracked users
309        sync.initialize_user_settings().await?;
310
311        Ok(sync)
312    }
313
314    /// Load an existing Sync instance from a sync tree root ID.
315    ///
316    /// # Arguments
317    /// * `instance` - The database instance
318    /// * `sync_tree_root_id` - The root ID of the existing sync tree
319    ///
320    /// # Returns
321    /// A Sync instance loaded from the existing tree.
322    pub async fn load(instance: Instance, sync_tree_root_id: &ID) -> Result<Self> {
323        let device_key = instance.signing_key()?.clone();
324
325        let sync_tree = Database::open(&instance, sync_tree_root_id)
326            .await?
327            .with_key(device_key);
328
329        let sync = Self {
330            background_tx: OnceLock::new(),
331            instance: instance.downgrade(),
332            sync_tree,
333            queue: Arc::new(SyncQueue::new()),
334        };
335
336        // Initialize combined settings for all tracked users
337        sync.initialize_user_settings().await?;
338
339        Ok(sync)
340    }
341
342    /// Get the root ID of the sync settings tree.
343    pub fn sync_tree_root_id(&self) -> &ID {
344        self.sync_tree.root_id()
345    }
346
347    /// Store a setting in the sync_settings subtree.
348    ///
349    /// # Arguments
350    /// * `key` - The setting key
351    /// * `value` - The setting value
352    pub async fn set_setting(
353        &self,
354        key: impl Into<String>,
355        value: impl Into<String>,
356    ) -> Result<()> {
357        let txn = self.sync_tree.new_transaction().await?;
358        let sync_settings = txn.get_store::<DocStore>(SETTINGS_SUBTREE).await?;
359        sync_settings.set(key, Value::Text(value.into())).await?;
360        txn.commit().await?;
361        Ok(())
362    }
363
364    /// Retrieve a setting from the settings_map subtree.
365    ///
366    /// # Arguments
367    /// * `key` - The setting key to retrieve
368    ///
369    /// # Returns
370    /// The setting value if found, None otherwise.
371    pub async fn get_setting(&self, key: impl AsRef<str>) -> Result<Option<String>> {
372        let sync_settings = self
373            .sync_tree
374            .get_store_viewer::<DocStore>(SETTINGS_SUBTREE)
375            .await?;
376        match sync_settings.get_string(key).await {
377            Ok(value) => Ok(Some(value)),
378            Err(e) if e.is_not_found() => Ok(None),
379            Err(e) => Err(e),
380        }
381    }
382
383    /// Load a transport configuration from the `_sync` database.
384    ///
385    /// Transport configurations are stored in the `transports` subtree,
386    /// keyed by their name. If no configuration exists for the transport,
387    /// returns the default configuration.
388    ///
389    /// # Type Parameters
390    /// * `T` - The transport configuration type implementing [`TransportConfig`]
391    ///
392    /// # Arguments
393    /// * `name` - The name of the transport instance (e.g., "iroh", "http")
394    ///
395    /// # Returns
396    /// The loaded configuration, or the default if not found.
397    ///
398    /// # Example
399    ///
400    /// ```ignore
401    /// use eidetica::sync::transports::iroh::IrohTransportConfig;
402    ///
403    /// let config: IrohTransportConfig = sync.load_transport_config("iroh")?;
404    /// ```
405    pub async fn load_transport_config<T: TransportConfig>(&self, name: &str) -> Result<T> {
406        let tx = self.sync_tree.new_transaction().await?;
407        let registry = Registry::new(&tx, TRANSPORTS_SUBTREE).await?;
408
409        match registry.get_entry(name).await {
410            Ok(entry) => {
411                // Verify the type matches
412                if entry.type_id != T::type_id() {
413                    return Err(SyncError::TransportTypeMismatch {
414                        name: name.to_string(),
415                        expected: T::type_id().to_string(),
416                        found: entry.type_id,
417                    }
418                    .into());
419                }
420                entry.config.get_json("data").map_err(|e| {
421                    SyncError::SerializationError(format!(
422                        "Failed to deserialize transport config '{name}': {e}"
423                    ))
424                    .into()
425                })
426            }
427            Err(e) if e.is_not_found() => Ok(T::default()),
428            Err(e) => Err(e),
429        }
430    }
431
432    /// Save a transport configuration to the `_sync` database.
433    ///
434    /// Transport configurations are stored in the `transports` subtree,
435    /// keyed by their name. This persists the configuration so it can
436    /// be loaded on subsequent startups.
437    ///
438    /// # Type Parameters
439    /// * `T` - The transport configuration type implementing [`TransportConfig`]
440    ///
441    /// # Arguments
442    /// * `name` - The name of the transport instance (e.g., "iroh", "http")
443    /// * `config` - The configuration to save
444    ///
445    /// # Example
446    ///
447    /// ```ignore
448    /// use eidetica::sync::transports::iroh::IrohTransportConfig;
449    ///
450    /// let mut config = IrohTransportConfig::default();
451    /// config.get_or_create_secret_key(); // Generate key
452    /// sync.save_transport_config("iroh", &config)?;
453    /// ```
454    pub async fn save_transport_config<T: TransportConfig>(
455        &self,
456        name: &str,
457        config: &T,
458    ) -> Result<()> {
459        let mut config_doc = crate::crdt::Doc::new();
460        config_doc.set_json("data", config).map_err(|e| {
461            SyncError::SerializationError(format!(
462                "Failed to serialize transport config '{name}': {e}"
463            ))
464        })?;
465        let tx = self.sync_tree.new_transaction().await?;
466        let registry = Registry::new(&tx, TRANSPORTS_SUBTREE).await?;
467        registry.set_entry(name, T::type_id(), config_doc).await?;
468        tx.commit().await?;
469        Ok(())
470    }
471
472    /// Get a reference to the Instance.
473    pub fn instance(&self) -> Result<Instance> {
474        self.instance
475            .upgrade()
476            .ok_or_else(|| SyncError::InstanceDropped.into())
477    }
478
479    /// Get a clone of the backend seam.
480    pub fn backend(&self) -> Result<Arc<dyn Backend>> {
481        Ok(self.instance()?.backend().clone())
482    }
483
484    /// Get the sync tree database.
485    pub fn sync_tree(&self) -> &Database {
486        &self.sync_tree
487    }
488
489    /// Get the device public key for this sync instance.
490    ///
491    /// # Returns
492    /// The device's public key.
493    pub fn get_device_pubkey(&self) -> Result<PublicKey> {
494        Ok(self.instance()?.id())
495    }
496}
497
498impl Sync {
499    // === Test Helpers ===
500}