eidetica/sync/mod.rs
1//! Synchronization module for Eidetica database.
2//!
3//! # Quick Start
4//!
5//! ```rust,ignore
6//! // Enable sync
7//! instance.enable_sync().await?;
8//! let sync = instance.sync().unwrap();
9//!
10//! // Register transports with their configurations
11//! sync.register_transport("http", HttpTransport::builder()
12//! .bind("127.0.0.1:8080")
13//! ).await?;
14//! sync.register_transport("p2p", IrohTransport::builder()).await?;
15//!
16//! // Start accepting incoming connections
17//! sync.accept_connections().await?;
18//!
19//! // Outbound sync works via the registered transports
20//! sync.sync_with_peer(&Address::http("peer:8080"), Some(&tree_id)).await?;
21//! ```
22//!
23//! # Architecture
24//!
25//! The sync system uses a Background Sync architecture with command-pattern communication:
26//!
27//! - **[`Sync`]**: Thread-safe frontend using `Arc<Sync>` with interior mutability
28//! (`OnceLock`). Provides the public API and sends commands to the background.
29//! - **[`background::BackgroundSync`]**: Single background thread handling all sync operations
30//! via a command loop. Owns the transport and retry queue.
31//! - **Write Callbacks**: Automatically trigger sync when entries are committed via
32//! database write callbacks.
33//!
34//! # Connection Model
35//!
36//! The sync system separates **outbound** and **inbound** connection handling:
37//!
38//! - **Outbound** ([`Sync::sync_with_peer`]): Works after registering transports via
39//! [`Sync::register_transport`]. Each transport can be configured via its builder.
40//! - **Inbound** ([`Sync::accept_connections`]): Must be explicitly called each time
41//! the instance starts. Starts servers on all registered transports.
42//!
43//! This design provides security by default. Nodes don't accept incoming connections
44//! unless explicitly opted in.
45//!
46//! # Bootstrap Protocol
47//!
48//! The sync protocol detects whether a client needs bootstrap or incremental sync:
49//!
50//! - **Empty tips** → Full bootstrap (complete database transfer)
51//! - **Has tips** → Incremental sync (only missing entries)
52//!
53//! Use [`Sync::sync_with_peer`] which handles both cases automatically.
54//!
55//! # Duplicate Prevention
56//!
57//! Uses Merkle-DAG tip comparison instead of tracking individual sent entries:
58//!
59//! 1. Exchange tips with peer
60//! 2. Compare DAGs to find missing entries
61//! 3. Send only what peer doesn't have
62//! 4. Receive only what we're missing
63//!
64//! This approach requires no extra storage apart from tracking relevant Merkle-DAG tips.
65//!
66//! # Transport Layer
67//!
68//! Two transport implementations are available:
69//!
70//! - **HTTP** ([`transports::http::HttpTransport`]): REST API at `/api/v0`, JSON serialization
71//! - **Iroh P2P** ([`transports::iroh::IrohTransport`]): QUIC-based with NAT traversal
72//!
73//! Transports are registered via [`Sync::register_transport`] with their builders.
74//! State (like Iroh node identity) is automatically persisted per named instance.
75//! Both implement the [`transports::SyncTransport`] trait.
76//!
77//! # Peer and State Management
78//!
79//! Peers and sync relationships are stored in a dedicated sync database (`_sync`):
80//!
81//! - `peer_manager::PeerManager`: Handles peer registration and relationships
82//! - [`state::SyncStateManager`]: Tracks sync cursors, metadata, and history
83//!
84//! # Connection Behavior
85//!
86//! - **Lazy connections**: Established on-demand, not at peer registration
87//! - **Periodic sync**: Configurable interval (default 5 minutes)
88//! - **Retry queue**: Failed sends retried with exponential backoff
89
90use std::sync::{Arc, OnceLock};
91use std::time::SystemTime;
92use tokio::sync::mpsc;
93
94use crate::{
95 Database, Instance, Result, WeakInstance,
96 auth::{Permission, crypto::PublicKey},
97 crdt::{Doc, doc::Value},
98 entry::ID,
99 instance::backend::Backend,
100 store::{DocStore, Registry},
101};
102
103// Public submodules
104pub mod background;
105pub mod error;
106pub mod handler;
107mod handler_tree_ops;
108pub mod peer_manager;
109pub mod peer_types;
110pub mod protocol;
111pub mod state;
112pub mod ticket;
113pub mod transports;
114pub mod utils;
115
116// Private submodules
117mod bootstrap;
118mod bootstrap_request_manager;
119mod ops;
120mod peer;
121mod queue;
122mod transport;
123mod transport_manager;
124mod user;
125mod user_sync_manager;
126
127// Re-exports
128use background::SyncCommand;
129pub use bootstrap_request_manager::{BootstrapRequest, RequestStatus};
130pub use error::SyncError;
131pub use peer_types::{Address, ConnectionState, PeerId, PeerInfo, PeerStatus};
132use queue::SyncQueue;
133pub use ticket::DatabaseTicket;
134use transports::TransportConfig;
135
136/// Private constant for the sync settings subtree name
137const SETTINGS_SUBTREE: &str = "settings_map";
138
139/// Private constant for the transports registry subtree name
140const TRANSPORTS_SUBTREE: &str = "transports";
141
142/// Private constant for the transport state store name (persisted identity/state per transport instance)
143const TRANSPORT_STATE_STORE: &str = "transport_state";
144
145/// Authentication parameters for sync operations.
146#[derive(Debug, Clone)]
147pub struct AuthParams {
148 /// The public key making the request
149 pub requesting_key: PublicKey,
150 /// The name/ID of the requesting key
151 pub requesting_key_name: String,
152 /// The permission level being requested
153 pub requested_permission: Permission,
154}
155
156/// Information needed to register a peer for syncing.
157///
158/// This is used with [`Sync::register_sync_peer()`] to declare sync intent.
159#[derive(Debug, Clone)]
160pub struct SyncPeerInfo {
161 /// The peer's public key
162 pub peer_pubkey: PublicKey,
163 /// The tree/database to sync
164 pub tree_id: ID,
165 /// Initial address hints where the peer might be found
166 pub addresses: Vec<Address>,
167 /// Optional authentication parameters for bootstrap
168 pub auth: Option<AuthParams>,
169 /// Optional display name for the peer
170 pub display_name: Option<String>,
171}
172
173/// Handle for tracking sync status with a specific peer.
174///
175/// Returned by [`Sync::register_sync_peer()`].
176#[derive(Debug, Clone)]
177pub struct SyncHandle {
178 tree_id: ID,
179 peer_pubkey: PublicKey,
180 sync: Sync,
181}
182
183impl SyncHandle {
184 /// Get the current sync status.
185 pub async fn status(&self) -> Result<SyncStatus> {
186 self.sync
187 .get_sync_status(&self.tree_id, &self.peer_pubkey)
188 .await
189 }
190
191 /// Add another address hint for this peer.
192 pub async fn add_address(&self, address: Address) -> Result<()> {
193 self.sync.add_peer_address(&self.peer_pubkey, address).await
194 }
195
196 /// Block until initial sync completes (has local data).
197 ///
198 /// This is a convenience method for backwards compatibility.
199 /// The sync happens in the background, this just polls until data arrives.
200 pub async fn wait_for_initial_sync(&self) -> Result<()> {
201 loop {
202 let status = self.status().await?;
203 if status.has_local_data {
204 return Ok(());
205 }
206 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
207 }
208 }
209
210 /// Get the tree ID being synced.
211 pub fn tree_id(&self) -> &ID {
212 &self.tree_id
213 }
214
215 /// Get the peer public key.
216 pub fn peer_pubkey(&self) -> &PublicKey {
217 &self.peer_pubkey
218 }
219}
220
221/// Current sync status for a tree/peer pair.
222#[derive(Debug, Clone)]
223pub struct SyncStatus {
224 /// Whether we have local data for this tree
225 pub has_local_data: bool,
226 /// Last time sync succeeded (if ever)
227 pub last_sync: Option<SystemTime>,
228 /// Last error encountered (if any)
229 pub last_error: Option<String>,
230}
231
232/// Synchronization manager for the database.
233///
234/// The Sync module is a thin frontend that communicates with a background
235/// sync engine thread via command channels. All actual sync operations, transport
236/// communication, and state management happen in the background thread.
237///
238/// ## Multi-Transport Support
239///
240/// Multiple transports can be enabled simultaneously (e.g., HTTP + Iroh P2P),
241/// allowing peers to be reachable via different networks. Requests are automatically
242/// routed to the appropriate transport based on address type.
243///
244/// ```rust,ignore
245/// // Enable both HTTP and Iroh transports
246/// sync.enable_http_transport().await?;
247/// sync.enable_iroh_transport().await?;
248///
249/// // Start servers on all transports
250/// sync.start_server("127.0.0.1:0").await?;
251///
252/// // Get all server addresses
253/// let addresses = sync.get_all_server_addresses().await?;
254/// ```
255#[derive(Debug)]
256pub struct Sync {
257 /// Communication channel to the background sync engine.
258 /// Initialized when the first transport is enabled via `enable_*_transport()` or `add_transport()`.
259 background_tx: OnceLock<mpsc::Sender<SyncCommand>>,
260 /// The instance for read operations and tree management
261 instance: WeakInstance,
262 /// The tree containing synchronization settings
263 sync_tree: Database,
264 /// Queue for entries pending synchronization
265 queue: Arc<SyncQueue>,
266}
267
268impl Clone for Sync {
269 fn clone(&self) -> Self {
270 let background_tx = OnceLock::new();
271 if let Some(tx) = self.background_tx.get() {
272 let _ = background_tx.set(tx.clone());
273 }
274 Self {
275 background_tx,
276 instance: self.instance.clone(),
277 sync_tree: self.sync_tree.clone(),
278 queue: Arc::clone(&self.queue),
279 }
280 }
281}
282
283impl Sync {
284 /// Create a new Sync instance with a dedicated settings tree.
285 ///
286 /// # Arguments
287 /// * `instance` - The database instance for tree operations
288 ///
289 /// # Returns
290 /// A new Sync instance with its own settings tree.
291 pub async fn new(instance: Instance) -> Result<Self> {
292 // Get device key from instance
293 let signing_key = instance.signing_key()?.clone();
294
295 let mut sync_settings = Doc::new();
296 sync_settings.set("name", "_sync");
297 sync_settings.set("type", "sync_settings");
298
299 let sync_tree = Database::create(&instance, signing_key, sync_settings).await?;
300
301 let sync = Self {
302 background_tx: OnceLock::new(),
303 instance: instance.downgrade(),
304 sync_tree,
305 queue: Arc::new(SyncQueue::new()),
306 };
307
308 // Initialize combined settings for all tracked users
309 sync.initialize_user_settings().await?;
310
311 Ok(sync)
312 }
313
314 /// Load an existing Sync instance from a sync tree root ID.
315 ///
316 /// # Arguments
317 /// * `instance` - The database instance
318 /// * `sync_tree_root_id` - The root ID of the existing sync tree
319 ///
320 /// # Returns
321 /// A Sync instance loaded from the existing tree.
322 pub async fn load(instance: Instance, sync_tree_root_id: &ID) -> Result<Self> {
323 let device_key = instance.signing_key()?.clone();
324
325 let sync_tree = Database::open(&instance, sync_tree_root_id)
326 .await?
327 .with_key(device_key);
328
329 let sync = Self {
330 background_tx: OnceLock::new(),
331 instance: instance.downgrade(),
332 sync_tree,
333 queue: Arc::new(SyncQueue::new()),
334 };
335
336 // Initialize combined settings for all tracked users
337 sync.initialize_user_settings().await?;
338
339 Ok(sync)
340 }
341
342 /// Get the root ID of the sync settings tree.
343 pub fn sync_tree_root_id(&self) -> &ID {
344 self.sync_tree.root_id()
345 }
346
347 /// Store a setting in the sync_settings subtree.
348 ///
349 /// # Arguments
350 /// * `key` - The setting key
351 /// * `value` - The setting value
352 pub async fn set_setting(
353 &self,
354 key: impl Into<String>,
355 value: impl Into<String>,
356 ) -> Result<()> {
357 let txn = self.sync_tree.new_transaction().await?;
358 let sync_settings = txn.get_store::<DocStore>(SETTINGS_SUBTREE).await?;
359 sync_settings.set(key, Value::Text(value.into())).await?;
360 txn.commit().await?;
361 Ok(())
362 }
363
364 /// Retrieve a setting from the settings_map subtree.
365 ///
366 /// # Arguments
367 /// * `key` - The setting key to retrieve
368 ///
369 /// # Returns
370 /// The setting value if found, None otherwise.
371 pub async fn get_setting(&self, key: impl AsRef<str>) -> Result<Option<String>> {
372 let sync_settings = self
373 .sync_tree
374 .get_store_viewer::<DocStore>(SETTINGS_SUBTREE)
375 .await?;
376 match sync_settings.get_string(key).await {
377 Ok(value) => Ok(Some(value)),
378 Err(e) if e.is_not_found() => Ok(None),
379 Err(e) => Err(e),
380 }
381 }
382
383 /// Load a transport configuration from the `_sync` database.
384 ///
385 /// Transport configurations are stored in the `transports` subtree,
386 /// keyed by their name. If no configuration exists for the transport,
387 /// returns the default configuration.
388 ///
389 /// # Type Parameters
390 /// * `T` - The transport configuration type implementing [`TransportConfig`]
391 ///
392 /// # Arguments
393 /// * `name` - The name of the transport instance (e.g., "iroh", "http")
394 ///
395 /// # Returns
396 /// The loaded configuration, or the default if not found.
397 ///
398 /// # Example
399 ///
400 /// ```ignore
401 /// use eidetica::sync::transports::iroh::IrohTransportConfig;
402 ///
403 /// let config: IrohTransportConfig = sync.load_transport_config("iroh")?;
404 /// ```
405 pub async fn load_transport_config<T: TransportConfig>(&self, name: &str) -> Result<T> {
406 let tx = self.sync_tree.new_transaction().await?;
407 let registry = Registry::new(&tx, TRANSPORTS_SUBTREE).await?;
408
409 match registry.get_entry(name).await {
410 Ok(entry) => {
411 // Verify the type matches
412 if entry.type_id != T::type_id() {
413 return Err(SyncError::TransportTypeMismatch {
414 name: name.to_string(),
415 expected: T::type_id().to_string(),
416 found: entry.type_id,
417 }
418 .into());
419 }
420 entry.config.get_json("data").map_err(|e| {
421 SyncError::SerializationError(format!(
422 "Failed to deserialize transport config '{name}': {e}"
423 ))
424 .into()
425 })
426 }
427 Err(e) if e.is_not_found() => Ok(T::default()),
428 Err(e) => Err(e),
429 }
430 }
431
432 /// Save a transport configuration to the `_sync` database.
433 ///
434 /// Transport configurations are stored in the `transports` subtree,
435 /// keyed by their name. This persists the configuration so it can
436 /// be loaded on subsequent startups.
437 ///
438 /// # Type Parameters
439 /// * `T` - The transport configuration type implementing [`TransportConfig`]
440 ///
441 /// # Arguments
442 /// * `name` - The name of the transport instance (e.g., "iroh", "http")
443 /// * `config` - The configuration to save
444 ///
445 /// # Example
446 ///
447 /// ```ignore
448 /// use eidetica::sync::transports::iroh::IrohTransportConfig;
449 ///
450 /// let mut config = IrohTransportConfig::default();
451 /// config.get_or_create_secret_key(); // Generate key
452 /// sync.save_transport_config("iroh", &config)?;
453 /// ```
454 pub async fn save_transport_config<T: TransportConfig>(
455 &self,
456 name: &str,
457 config: &T,
458 ) -> Result<()> {
459 let mut config_doc = crate::crdt::Doc::new();
460 config_doc.set_json("data", config).map_err(|e| {
461 SyncError::SerializationError(format!(
462 "Failed to serialize transport config '{name}': {e}"
463 ))
464 })?;
465 let tx = self.sync_tree.new_transaction().await?;
466 let registry = Registry::new(&tx, TRANSPORTS_SUBTREE).await?;
467 registry.set_entry(name, T::type_id(), config_doc).await?;
468 tx.commit().await?;
469 Ok(())
470 }
471
472 /// Get a reference to the Instance.
473 pub fn instance(&self) -> Result<Instance> {
474 self.instance
475 .upgrade()
476 .ok_or_else(|| SyncError::InstanceDropped.into())
477 }
478
479 /// Get a clone of the backend seam.
480 pub fn backend(&self) -> Result<Arc<dyn Backend>> {
481 Ok(self.instance()?.backend().clone())
482 }
483
484 /// Get the sync tree database.
485 pub fn sync_tree(&self) -> &Database {
486 &self.sync_tree
487 }
488
489 /// Get the device public key for this sync instance.
490 ///
491 /// # Returns
492 /// The device's public key.
493 pub fn get_device_pubkey(&self) -> Result<PublicKey> {
494 Ok(self.instance()?.id())
495 }
496}
497
498impl Sync {
499 // === Test Helpers ===
500}