Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/sync/
mod.rs

1//! Synchronization module for Eidetica database.
2//!
3//! # Quick Start
4//!
5//! ```rust,ignore
6//! // Enable sync
7//! instance.enable_sync().await?;
8//! let sync = instance.sync().unwrap();
9//!
10//! // Register transports with their configurations
11//! sync.register_transport("http", HttpTransport::builder()
12//!     .bind("127.0.0.1:8080")
13//! ).await?;
14//! sync.register_transport("p2p", IrohTransport::builder()).await?;
15//!
16//! // Start accepting incoming connections
17//! sync.accept_connections().await?;
18//!
19//! // Outbound sync works via the registered transports
20//! sync.sync_with_peer(&Address::http("peer:8080"), Some(&tree_id)).await?;
21//! ```
22//!
23//! # Architecture
24//!
25//! The sync system uses a Background Sync architecture with command-pattern communication:
26//!
27//! - **[`Sync`]**: Thread-safe frontend using `Arc<Sync>` with interior mutability
28//!   (`OnceLock`). Provides the public API and sends commands to the background.
29//! - **[`background::BackgroundSync`]**: Single background thread handling all sync operations
30//!   via a command loop. Owns the transport and retry queue.
31//! - **Write Callbacks**: Automatically trigger sync when entries are committed via
32//!   database write callbacks.
33//!
34//! # Connection Model
35//!
36//! The sync system separates **outbound** and **inbound** connection handling:
37//!
38//! - **Outbound** ([`Sync::sync_with_peer`]): Works after registering transports via
39//!   [`Sync::register_transport`]. Each transport can be configured via its builder.
40//! - **Inbound** ([`Sync::accept_connections`]): Must be explicitly called each time
41//!   the instance starts. Starts servers on all registered transports.
42//!
43//! This design provides security by default. Nodes don't accept incoming connections
44//! unless explicitly opted in.
45//!
46//! # Bootstrap Protocol
47//!
48//! The sync protocol detects whether a client needs bootstrap or incremental sync:
49//!
50//! - **Empty tips** → Full bootstrap (complete database transfer)
51//! - **Has tips** → Incremental sync (only missing entries)
52//!
53//! Use [`Sync::sync_with_peer`] which handles both cases automatically.
54//!
55//! # Duplicate Prevention
56//!
57//! Uses Merkle-DAG tip comparison instead of tracking individual sent entries:
58//!
59//! 1. Exchange tips with peer
60//! 2. Compare DAGs to find missing entries
61//! 3. Send only what peer doesn't have
62//! 4. Receive only what we're missing
63//!
64//! This approach requires no extra storage apart from tracking relevant Merkle-DAG tips.
65//!
66//! # Transport Layer
67//!
68//! Two transport implementations are available:
69//!
70//! - **HTTP** ([`transports::http::HttpTransport`]): REST API at `/api/v0`, JSON serialization
71//! - **Iroh P2P** ([`transports::iroh::IrohTransport`]): QUIC-based with NAT traversal
72//!
73//! Transports are registered via [`Sync::register_transport`] with their builders.
74//! State (like Iroh node identity) is automatically persisted per named instance.
75//! Both implement the [`transports::SyncTransport`] trait.
76//!
77//! # Peer and State Management
78//!
79//! Peers and sync relationships are stored in a dedicated sync database (`_sync`):
80//!
81//! - `peer_manager::PeerManager`: Handles peer registration and relationships
82//! - [`state::SyncStateManager`]: Tracks sync cursors, metadata, and history
83//!
84//! # Connection Behavior
85//!
86//! - **Lazy connections**: Established on-demand, not at peer registration
87//! - **Periodic sync**: Configurable interval (default 5 minutes)
88//! - **Retry queue**: Failed sends retried with exponential backoff
89
90use std::sync::{Arc, OnceLock};
91use std::time::SystemTime;
92use tokio::sync::mpsc;
93
94use handle_trait::Handle;
95
96use crate::{
97    Database, Instance, Result, WeakInstance,
98    auth::{Permission, crypto::PublicKey},
99    crdt::{Doc, doc::Value},
100    database::DatabaseKey,
101    entry::ID,
102    instance::backend::Backend,
103    store::{DocStore, Registry},
104};
105
106// Public submodules
107pub mod background;
108pub mod error;
109pub mod handler;
110mod handler_tree_ops;
111pub mod peer_manager;
112pub mod peer_types;
113pub mod protocol;
114pub mod state;
115pub mod ticket;
116pub mod transports;
117pub mod utils;
118
119// Private submodules
120mod bootstrap;
121mod bootstrap_request_manager;
122mod ops;
123mod peer;
124mod queue;
125mod transport;
126mod transport_manager;
127mod user;
128mod user_sync_manager;
129
130// Re-exports
131use background::SyncCommand;
132pub use bootstrap_request_manager::{BootstrapRequest, RequestStatus};
133pub use error::SyncError;
134pub use peer_types::{Address, ConnectionState, PeerId, PeerInfo, PeerStatus};
135use queue::SyncQueue;
136pub use ticket::DatabaseTicket;
137use transports::TransportConfig;
138
139/// Private constant for the sync settings subtree name
140const SETTINGS_SUBTREE: &str = "settings_map";
141
142/// Private constant for the transports registry subtree name
143const TRANSPORTS_SUBTREE: &str = "transports";
144
145/// Private constant for the transport state store name (persisted identity/state per transport instance)
146const TRANSPORT_STATE_STORE: &str = "transport_state";
147
148/// Authentication parameters for sync operations.
149#[derive(Debug, Clone)]
150pub struct AuthParams {
151    /// The public key making the request
152    pub requesting_key: PublicKey,
153    /// The name/ID of the requesting key
154    pub requesting_key_name: String,
155    /// The permission level being requested
156    pub requested_permission: Permission,
157}
158
159/// Information needed to register a peer for syncing.
160///
161/// This is used with [`Sync::register_sync_peer()`] to declare sync intent.
162#[derive(Debug, Clone)]
163pub struct SyncPeerInfo {
164    /// The peer's public key
165    pub peer_pubkey: PublicKey,
166    /// The tree/database to sync
167    pub tree_id: ID,
168    /// Initial address hints where the peer might be found
169    pub addresses: Vec<Address>,
170    /// Optional authentication parameters for bootstrap
171    pub auth: Option<AuthParams>,
172    /// Optional display name for the peer
173    pub display_name: Option<String>,
174}
175
176/// Handle for tracking sync status with a specific peer.
177///
178/// Returned by [`Sync::register_sync_peer()`].
179#[derive(Debug, Clone)]
180pub struct SyncHandle {
181    tree_id: ID,
182    peer_pubkey: PublicKey,
183    sync: Sync,
184}
185
186impl SyncHandle {
187    /// Get the current sync status.
188    pub async fn status(&self) -> Result<SyncStatus> {
189        self.sync
190            .get_sync_status(&self.tree_id, &self.peer_pubkey)
191            .await
192    }
193
194    /// Add another address hint for this peer.
195    pub async fn add_address(&self, address: Address) -> Result<()> {
196        self.sync.add_peer_address(&self.peer_pubkey, address).await
197    }
198
199    /// Block until initial sync completes (has local data).
200    ///
201    /// This is a convenience method for backwards compatibility.
202    /// The sync happens in the background, this just polls until data arrives.
203    pub async fn wait_for_initial_sync(&self) -> Result<()> {
204        loop {
205            let status = self.status().await?;
206            if status.has_local_data {
207                return Ok(());
208            }
209            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
210        }
211    }
212
213    /// Get the tree ID being synced.
214    pub fn tree_id(&self) -> &ID {
215        &self.tree_id
216    }
217
218    /// Get the peer public key.
219    pub fn peer_pubkey(&self) -> &PublicKey {
220        &self.peer_pubkey
221    }
222}
223
224/// Current sync status for a tree/peer pair.
225#[derive(Debug, Clone)]
226pub struct SyncStatus {
227    /// Whether we have local data for this tree
228    pub has_local_data: bool,
229    /// Last time sync succeeded (if ever)
230    pub last_sync: Option<SystemTime>,
231    /// Last error encountered (if any)
232    pub last_error: Option<String>,
233}
234
235/// Synchronization manager for the database.
236///
237/// The Sync module is a thin frontend that communicates with a background
238/// sync engine thread via command channels. All actual sync operations, transport
239/// communication, and state management happen in the background thread.
240///
241/// ## Multi-Transport Support
242///
243/// Multiple transports can be enabled simultaneously (e.g., HTTP + Iroh P2P),
244/// allowing peers to be reachable via different networks. Requests are automatically
245/// routed to the appropriate transport based on address type.
246///
247/// ```rust,ignore
248/// // Enable both HTTP and Iroh transports
249/// sync.enable_http_transport().await?;
250/// sync.enable_iroh_transport().await?;
251///
252/// // Start servers on all transports
253/// sync.start_server("127.0.0.1:0").await?;
254///
255/// // Get all server addresses
256/// let addresses = sync.get_all_server_addresses().await?;
257/// ```
258#[derive(Debug)]
259pub struct Sync {
260    /// Communication channel to the background sync engine.
261    /// Initialized when the first transport is enabled via `enable_*_transport()` or `add_transport()`.
262    background_tx: OnceLock<mpsc::Sender<SyncCommand>>,
263    /// The instance for read operations and tree management
264    instance: WeakInstance,
265    /// The tree containing synchronization settings
266    sync_tree: Database,
267    /// Queue for entries pending synchronization
268    queue: Arc<SyncQueue>,
269}
270
271impl Clone for Sync {
272    fn clone(&self) -> Self {
273        let background_tx = OnceLock::new();
274        if let Some(tx) = self.background_tx.get() {
275            let _ = background_tx.set(tx.clone());
276        }
277        Self {
278            background_tx,
279            instance: self.instance.clone(),
280            sync_tree: self.sync_tree.clone(),
281            queue: Arc::clone(&self.queue),
282        }
283    }
284}
285
286impl Sync {
287    /// Create a new Sync instance with a dedicated settings tree.
288    ///
289    /// # Arguments
290    /// * `instance` - The database instance for tree operations
291    ///
292    /// # Returns
293    /// A new Sync instance with its own settings tree.
294    pub async fn new(instance: Instance) -> Result<Self> {
295        // Get device key from instance
296        let signing_key = instance.device_key().clone();
297
298        let mut sync_settings = Doc::new();
299        sync_settings.set("name", "_sync");
300        sync_settings.set("type", "sync_settings");
301
302        let sync_tree = Database::create(&instance, signing_key, sync_settings).await?;
303
304        let sync = Self {
305            background_tx: OnceLock::new(),
306            instance: instance.downgrade(),
307            sync_tree,
308            queue: Arc::new(SyncQueue::new()),
309        };
310
311        // Initialize combined settings for all tracked users
312        sync.initialize_user_settings().await?;
313
314        Ok(sync)
315    }
316
317    /// Load an existing Sync instance from a sync tree root ID.
318    ///
319    /// # Arguments
320    /// * `instance` - The database instance
321    /// * `sync_tree_root_id` - The root ID of the existing sync tree
322    ///
323    /// # Returns
324    /// A Sync instance loaded from the existing tree.
325    pub async fn load(instance: Instance, sync_tree_root_id: &ID) -> Result<Self> {
326        let device_key = instance.device_key().clone();
327
328        let sync_tree = Database::open(
329            instance.handle(),
330            sync_tree_root_id,
331            DatabaseKey::new(device_key),
332        )
333        .await?;
334
335        let sync = Self {
336            background_tx: OnceLock::new(),
337            instance: instance.downgrade(),
338            sync_tree,
339            queue: Arc::new(SyncQueue::new()),
340        };
341
342        // Initialize combined settings for all tracked users
343        sync.initialize_user_settings().await?;
344
345        Ok(sync)
346    }
347
348    /// Get the root ID of the sync settings tree.
349    pub fn sync_tree_root_id(&self) -> &ID {
350        self.sync_tree.root_id()
351    }
352
353    /// Store a setting in the sync_settings subtree.
354    ///
355    /// # Arguments
356    /// * `key` - The setting key
357    /// * `value` - The setting value
358    pub async fn set_setting(
359        &self,
360        key: impl Into<String>,
361        value: impl Into<String>,
362    ) -> Result<()> {
363        let txn = self.sync_tree.new_transaction().await?;
364        let sync_settings = txn.get_store::<DocStore>(SETTINGS_SUBTREE).await?;
365        sync_settings.set(key, Value::Text(value.into())).await?;
366        txn.commit().await?;
367        Ok(())
368    }
369
370    /// Retrieve a setting from the settings_map subtree.
371    ///
372    /// # Arguments
373    /// * `key` - The setting key to retrieve
374    ///
375    /// # Returns
376    /// The setting value if found, None otherwise.
377    pub async fn get_setting(&self, key: impl AsRef<str>) -> Result<Option<String>> {
378        let sync_settings = self
379            .sync_tree
380            .get_store_viewer::<DocStore>(SETTINGS_SUBTREE)
381            .await?;
382        match sync_settings.get_string(key).await {
383            Ok(value) => Ok(Some(value)),
384            Err(e) if e.is_not_found() => Ok(None),
385            Err(e) => Err(e),
386        }
387    }
388
389    /// Load a transport configuration from the `_sync` database.
390    ///
391    /// Transport configurations are stored in the `transports` subtree,
392    /// keyed by their name. If no configuration exists for the transport,
393    /// returns the default configuration.
394    ///
395    /// # Type Parameters
396    /// * `T` - The transport configuration type implementing [`TransportConfig`]
397    ///
398    /// # Arguments
399    /// * `name` - The name of the transport instance (e.g., "iroh", "http")
400    ///
401    /// # Returns
402    /// The loaded configuration, or the default if not found.
403    ///
404    /// # Example
405    ///
406    /// ```ignore
407    /// use eidetica::sync::transports::iroh::IrohTransportConfig;
408    ///
409    /// let config: IrohTransportConfig = sync.load_transport_config("iroh")?;
410    /// ```
411    pub async fn load_transport_config<T: TransportConfig>(&self, name: &str) -> Result<T> {
412        let tx = self.sync_tree.new_transaction().await?;
413        let registry = Registry::new(&tx, TRANSPORTS_SUBTREE).await?;
414
415        match registry.get_entry(name).await {
416            Ok(entry) => {
417                // Verify the type matches
418                if entry.type_id != T::type_id() {
419                    return Err(SyncError::TransportTypeMismatch {
420                        name: name.to_string(),
421                        expected: T::type_id().to_string(),
422                        found: entry.type_id,
423                    }
424                    .into());
425                }
426                entry.config.get_json("data").map_err(|e| {
427                    SyncError::SerializationError(format!(
428                        "Failed to deserialize transport config '{name}': {e}"
429                    ))
430                    .into()
431                })
432            }
433            Err(e) if e.is_not_found() => Ok(T::default()),
434            Err(e) => Err(e),
435        }
436    }
437
438    /// Save a transport configuration to the `_sync` database.
439    ///
440    /// Transport configurations are stored in the `transports` subtree,
441    /// keyed by their name. This persists the configuration so it can
442    /// be loaded on subsequent startups.
443    ///
444    /// # Type Parameters
445    /// * `T` - The transport configuration type implementing [`TransportConfig`]
446    ///
447    /// # Arguments
448    /// * `name` - The name of the transport instance (e.g., "iroh", "http")
449    /// * `config` - The configuration to save
450    ///
451    /// # Example
452    ///
453    /// ```ignore
454    /// use eidetica::sync::transports::iroh::IrohTransportConfig;
455    ///
456    /// let mut config = IrohTransportConfig::default();
457    /// config.get_or_create_secret_key(); // Generate key
458    /// sync.save_transport_config("iroh", &config)?;
459    /// ```
460    pub async fn save_transport_config<T: TransportConfig>(
461        &self,
462        name: &str,
463        config: &T,
464    ) -> Result<()> {
465        let mut config_doc = crate::crdt::Doc::new();
466        config_doc.set_json("data", config).map_err(|e| {
467            SyncError::SerializationError(format!(
468                "Failed to serialize transport config '{name}': {e}"
469            ))
470        })?;
471        let tx = self.sync_tree.new_transaction().await?;
472        let registry = Registry::new(&tx, TRANSPORTS_SUBTREE).await?;
473        registry.set_entry(name, T::type_id(), config_doc).await?;
474        tx.commit().await?;
475        Ok(())
476    }
477
478    /// Get a reference to the Instance.
479    pub fn instance(&self) -> Result<Instance> {
480        self.instance
481            .upgrade()
482            .ok_or_else(|| SyncError::InstanceDropped.into())
483    }
484
485    /// Get a reference to the backend.
486    pub fn backend(&self) -> Result<Backend> {
487        Ok(self.instance()?.backend().clone())
488    }
489
490    /// Get the sync tree database.
491    pub fn sync_tree(&self) -> &Database {
492        &self.sync_tree
493    }
494
495    /// Get the device public key for this sync instance.
496    ///
497    /// # Returns
498    /// The device's public key.
499    pub fn get_device_pubkey(&self) -> Result<PublicKey> {
500        Ok(self.instance()?.device_id())
501    }
502}
503
504impl Sync {
505    // === Test Helpers ===
506}