eidetica/sync/mod.rs
1//! Synchronization module for Eidetica database.
2//!
3//! # Quick Start
4//!
5//! ```rust,ignore
6//! // Enable sync
7//! instance.enable_sync().await?;
8//! let sync = instance.sync().unwrap();
9//!
10//! // Register transports with their configurations
11//! sync.register_transport("http", HttpTransport::builder()
12//! .bind("127.0.0.1:8080")
13//! ).await?;
14//! sync.register_transport("p2p", IrohTransport::builder()).await?;
15//!
16//! // Start accepting incoming connections
17//! sync.accept_connections().await?;
18//!
19//! // Outbound sync works via the registered transports
20//! sync.sync_with_peer(&Address::http("peer:8080"), Some(&tree_id)).await?;
21//! ```
22//!
23//! # Architecture
24//!
25//! The sync system uses a Background Sync architecture with command-pattern communication:
26//!
27//! - **[`Sync`]**: Thread-safe frontend using `Arc<Sync>` with interior mutability
28//! (`OnceLock`). Provides the public API and sends commands to the background.
29//! - **[`background::BackgroundSync`]**: Single background thread handling all sync operations
30//! via a command loop. Owns the transport and retry queue.
31//! - **Write Callbacks**: Automatically trigger sync when entries are committed via
32//! database write callbacks.
33//!
34//! # Connection Model
35//!
36//! The sync system separates **outbound** and **inbound** connection handling:
37//!
38//! - **Outbound** ([`Sync::sync_with_peer`]): Works after registering transports via
39//! [`Sync::register_transport`]. Each transport can be configured via its builder.
40//! - **Inbound** ([`Sync::accept_connections`]): Must be explicitly called each time
41//! the instance starts. Starts servers on all registered transports.
42//!
43//! This design provides security by default. Nodes don't accept incoming connections
44//! unless explicitly opted in.
45//!
46//! # Bootstrap Protocol
47//!
48//! The sync protocol detects whether a client needs bootstrap or incremental sync:
49//!
50//! - **Empty tips** → Full bootstrap (complete database transfer)
51//! - **Has tips** → Incremental sync (only missing entries)
52//!
53//! Use [`Sync::sync_with_peer`] which handles both cases automatically.
54//!
55//! # Duplicate Prevention
56//!
57//! Uses Merkle-DAG tip comparison instead of tracking individual sent entries:
58//!
59//! 1. Exchange tips with peer
60//! 2. Compare DAGs to find missing entries
61//! 3. Send only what peer doesn't have
62//! 4. Receive only what we're missing
63//!
64//! This approach requires no extra storage apart from tracking relevant Merkle-DAG tips.
65//!
66//! # Transport Layer
67//!
68//! Two transport implementations are available:
69//!
70//! - **HTTP** ([`transports::http::HttpTransport`]): REST API at `/api/v0`, JSON serialization
71//! - **Iroh P2P** ([`transports::iroh::IrohTransport`]): QUIC-based with NAT traversal
72//!
73//! Transports are registered via [`Sync::register_transport`] with their builders.
74//! State (like Iroh node identity) is automatically persisted per named instance.
75//! Both implement the [`transports::SyncTransport`] trait.
76//!
77//! # Peer and State Management
78//!
79//! Peers and sync relationships are stored in a dedicated sync database (`_sync`):
80//!
81//! - `peer_manager::PeerManager`: Handles peer registration and relationships
82//! - [`state::SyncStateManager`]: Tracks sync cursors, metadata, and history
83//!
84//! # Connection Behavior
85//!
86//! - **Lazy connections**: Established on-demand, not at peer registration
87//! - **Periodic sync**: Configurable interval (default 5 minutes)
88//! - **Retry queue**: Failed sends retried with exponential backoff
89
90use std::sync::{Arc, OnceLock};
91use std::time::SystemTime;
92use tokio::sync::mpsc;
93
94use handle_trait::Handle;
95
96use crate::{
97 Database, Instance, Result, WeakInstance,
98 auth::{Permission, crypto::PublicKey},
99 crdt::{Doc, doc::Value},
100 database::DatabaseKey,
101 entry::ID,
102 instance::backend::Backend,
103 store::{DocStore, Registry},
104};
105
106// Public submodules
107pub mod background;
108pub mod error;
109pub mod handler;
110mod handler_tree_ops;
111pub mod peer_manager;
112pub mod peer_types;
113pub mod protocol;
114pub mod state;
115pub mod ticket;
116pub mod transports;
117pub mod utils;
118
119// Private submodules
120mod bootstrap;
121mod bootstrap_request_manager;
122mod ops;
123mod peer;
124mod queue;
125mod transport;
126mod transport_manager;
127mod user;
128mod user_sync_manager;
129
130// Re-exports
131use background::SyncCommand;
132pub use bootstrap_request_manager::{BootstrapRequest, RequestStatus};
133pub use error::SyncError;
134pub use peer_types::{Address, ConnectionState, PeerId, PeerInfo, PeerStatus};
135use queue::SyncQueue;
136pub use ticket::DatabaseTicket;
137use transports::TransportConfig;
138
139/// Private constant for the sync settings subtree name
140const SETTINGS_SUBTREE: &str = "settings_map";
141
142/// Private constant for the transports registry subtree name
143const TRANSPORTS_SUBTREE: &str = "transports";
144
145/// Private constant for the transport state store name (persisted identity/state per transport instance)
146const TRANSPORT_STATE_STORE: &str = "transport_state";
147
148/// Authentication parameters for sync operations.
149#[derive(Debug, Clone)]
150pub struct AuthParams {
151 /// The public key making the request
152 pub requesting_key: PublicKey,
153 /// The name/ID of the requesting key
154 pub requesting_key_name: String,
155 /// The permission level being requested
156 pub requested_permission: Permission,
157}
158
159/// Information needed to register a peer for syncing.
160///
161/// This is used with [`Sync::register_sync_peer()`] to declare sync intent.
162#[derive(Debug, Clone)]
163pub struct SyncPeerInfo {
164 /// The peer's public key
165 pub peer_pubkey: PublicKey,
166 /// The tree/database to sync
167 pub tree_id: ID,
168 /// Initial address hints where the peer might be found
169 pub addresses: Vec<Address>,
170 /// Optional authentication parameters for bootstrap
171 pub auth: Option<AuthParams>,
172 /// Optional display name for the peer
173 pub display_name: Option<String>,
174}
175
176/// Handle for tracking sync status with a specific peer.
177///
178/// Returned by [`Sync::register_sync_peer()`].
179#[derive(Debug, Clone)]
180pub struct SyncHandle {
181 tree_id: ID,
182 peer_pubkey: PublicKey,
183 sync: Sync,
184}
185
186impl SyncHandle {
187 /// Get the current sync status.
188 pub async fn status(&self) -> Result<SyncStatus> {
189 self.sync
190 .get_sync_status(&self.tree_id, &self.peer_pubkey)
191 .await
192 }
193
194 /// Add another address hint for this peer.
195 pub async fn add_address(&self, address: Address) -> Result<()> {
196 self.sync.add_peer_address(&self.peer_pubkey, address).await
197 }
198
199 /// Block until initial sync completes (has local data).
200 ///
201 /// This is a convenience method for backwards compatibility.
202 /// The sync happens in the background, this just polls until data arrives.
203 pub async fn wait_for_initial_sync(&self) -> Result<()> {
204 loop {
205 let status = self.status().await?;
206 if status.has_local_data {
207 return Ok(());
208 }
209 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
210 }
211 }
212
213 /// Get the tree ID being synced.
214 pub fn tree_id(&self) -> &ID {
215 &self.tree_id
216 }
217
218 /// Get the peer public key.
219 pub fn peer_pubkey(&self) -> &PublicKey {
220 &self.peer_pubkey
221 }
222}
223
224/// Current sync status for a tree/peer pair.
225#[derive(Debug, Clone)]
226pub struct SyncStatus {
227 /// Whether we have local data for this tree
228 pub has_local_data: bool,
229 /// Last time sync succeeded (if ever)
230 pub last_sync: Option<SystemTime>,
231 /// Last error encountered (if any)
232 pub last_error: Option<String>,
233}
234
235/// Synchronization manager for the database.
236///
237/// The Sync module is a thin frontend that communicates with a background
238/// sync engine thread via command channels. All actual sync operations, transport
239/// communication, and state management happen in the background thread.
240///
241/// ## Multi-Transport Support
242///
243/// Multiple transports can be enabled simultaneously (e.g., HTTP + Iroh P2P),
244/// allowing peers to be reachable via different networks. Requests are automatically
245/// routed to the appropriate transport based on address type.
246///
247/// ```rust,ignore
248/// // Enable both HTTP and Iroh transports
249/// sync.enable_http_transport().await?;
250/// sync.enable_iroh_transport().await?;
251///
252/// // Start servers on all transports
253/// sync.start_server("127.0.0.1:0").await?;
254///
255/// // Get all server addresses
256/// let addresses = sync.get_all_server_addresses().await?;
257/// ```
258#[derive(Debug)]
259pub struct Sync {
260 /// Communication channel to the background sync engine.
261 /// Initialized when the first transport is enabled via `enable_*_transport()` or `add_transport()`.
262 background_tx: OnceLock<mpsc::Sender<SyncCommand>>,
263 /// The instance for read operations and tree management
264 instance: WeakInstance,
265 /// The tree containing synchronization settings
266 sync_tree: Database,
267 /// Queue for entries pending synchronization
268 queue: Arc<SyncQueue>,
269}
270
271impl Clone for Sync {
272 fn clone(&self) -> Self {
273 let background_tx = OnceLock::new();
274 if let Some(tx) = self.background_tx.get() {
275 let _ = background_tx.set(tx.clone());
276 }
277 Self {
278 background_tx,
279 instance: self.instance.clone(),
280 sync_tree: self.sync_tree.clone(),
281 queue: Arc::clone(&self.queue),
282 }
283 }
284}
285
286impl Sync {
287 /// Create a new Sync instance with a dedicated settings tree.
288 ///
289 /// # Arguments
290 /// * `instance` - The database instance for tree operations
291 ///
292 /// # Returns
293 /// A new Sync instance with its own settings tree.
294 pub async fn new(instance: Instance) -> Result<Self> {
295 // Get device key from instance
296 let signing_key = instance.device_key().clone();
297
298 let mut sync_settings = Doc::new();
299 sync_settings.set("name", "_sync");
300 sync_settings.set("type", "sync_settings");
301
302 let sync_tree = Database::create(&instance, signing_key, sync_settings).await?;
303
304 let sync = Self {
305 background_tx: OnceLock::new(),
306 instance: instance.downgrade(),
307 sync_tree,
308 queue: Arc::new(SyncQueue::new()),
309 };
310
311 // Initialize combined settings for all tracked users
312 sync.initialize_user_settings().await?;
313
314 Ok(sync)
315 }
316
317 /// Load an existing Sync instance from a sync tree root ID.
318 ///
319 /// # Arguments
320 /// * `instance` - The database instance
321 /// * `sync_tree_root_id` - The root ID of the existing sync tree
322 ///
323 /// # Returns
324 /// A Sync instance loaded from the existing tree.
325 pub async fn load(instance: Instance, sync_tree_root_id: &ID) -> Result<Self> {
326 let device_key = instance.device_key().clone();
327
328 let sync_tree = Database::open(
329 instance.handle(),
330 sync_tree_root_id,
331 DatabaseKey::new(device_key),
332 )
333 .await?;
334
335 let sync = Self {
336 background_tx: OnceLock::new(),
337 instance: instance.downgrade(),
338 sync_tree,
339 queue: Arc::new(SyncQueue::new()),
340 };
341
342 // Initialize combined settings for all tracked users
343 sync.initialize_user_settings().await?;
344
345 Ok(sync)
346 }
347
348 /// Get the root ID of the sync settings tree.
349 pub fn sync_tree_root_id(&self) -> &ID {
350 self.sync_tree.root_id()
351 }
352
353 /// Store a setting in the sync_settings subtree.
354 ///
355 /// # Arguments
356 /// * `key` - The setting key
357 /// * `value` - The setting value
358 pub async fn set_setting(
359 &self,
360 key: impl Into<String>,
361 value: impl Into<String>,
362 ) -> Result<()> {
363 let txn = self.sync_tree.new_transaction().await?;
364 let sync_settings = txn.get_store::<DocStore>(SETTINGS_SUBTREE).await?;
365 sync_settings.set(key, Value::Text(value.into())).await?;
366 txn.commit().await?;
367 Ok(())
368 }
369
370 /// Retrieve a setting from the settings_map subtree.
371 ///
372 /// # Arguments
373 /// * `key` - The setting key to retrieve
374 ///
375 /// # Returns
376 /// The setting value if found, None otherwise.
377 pub async fn get_setting(&self, key: impl AsRef<str>) -> Result<Option<String>> {
378 let sync_settings = self
379 .sync_tree
380 .get_store_viewer::<DocStore>(SETTINGS_SUBTREE)
381 .await?;
382 match sync_settings.get_string(key).await {
383 Ok(value) => Ok(Some(value)),
384 Err(e) if e.is_not_found() => Ok(None),
385 Err(e) => Err(e),
386 }
387 }
388
389 /// Load a transport configuration from the `_sync` database.
390 ///
391 /// Transport configurations are stored in the `transports` subtree,
392 /// keyed by their name. If no configuration exists for the transport,
393 /// returns the default configuration.
394 ///
395 /// # Type Parameters
396 /// * `T` - The transport configuration type implementing [`TransportConfig`]
397 ///
398 /// # Arguments
399 /// * `name` - The name of the transport instance (e.g., "iroh", "http")
400 ///
401 /// # Returns
402 /// The loaded configuration, or the default if not found.
403 ///
404 /// # Example
405 ///
406 /// ```ignore
407 /// use eidetica::sync::transports::iroh::IrohTransportConfig;
408 ///
409 /// let config: IrohTransportConfig = sync.load_transport_config("iroh")?;
410 /// ```
411 pub async fn load_transport_config<T: TransportConfig>(&self, name: &str) -> Result<T> {
412 let tx = self.sync_tree.new_transaction().await?;
413 let registry = Registry::new(&tx, TRANSPORTS_SUBTREE).await?;
414
415 match registry.get_entry(name).await {
416 Ok(entry) => {
417 // Verify the type matches
418 if entry.type_id != T::type_id() {
419 return Err(SyncError::TransportTypeMismatch {
420 name: name.to_string(),
421 expected: T::type_id().to_string(),
422 found: entry.type_id,
423 }
424 .into());
425 }
426 entry.config.get_json("data").map_err(|e| {
427 SyncError::SerializationError(format!(
428 "Failed to deserialize transport config '{name}': {e}"
429 ))
430 .into()
431 })
432 }
433 Err(e) if e.is_not_found() => Ok(T::default()),
434 Err(e) => Err(e),
435 }
436 }
437
438 /// Save a transport configuration to the `_sync` database.
439 ///
440 /// Transport configurations are stored in the `transports` subtree,
441 /// keyed by their name. This persists the configuration so it can
442 /// be loaded on subsequent startups.
443 ///
444 /// # Type Parameters
445 /// * `T` - The transport configuration type implementing [`TransportConfig`]
446 ///
447 /// # Arguments
448 /// * `name` - The name of the transport instance (e.g., "iroh", "http")
449 /// * `config` - The configuration to save
450 ///
451 /// # Example
452 ///
453 /// ```ignore
454 /// use eidetica::sync::transports::iroh::IrohTransportConfig;
455 ///
456 /// let mut config = IrohTransportConfig::default();
457 /// config.get_or_create_secret_key(); // Generate key
458 /// sync.save_transport_config("iroh", &config)?;
459 /// ```
460 pub async fn save_transport_config<T: TransportConfig>(
461 &self,
462 name: &str,
463 config: &T,
464 ) -> Result<()> {
465 let mut config_doc = crate::crdt::Doc::new();
466 config_doc.set_json("data", config).map_err(|e| {
467 SyncError::SerializationError(format!(
468 "Failed to serialize transport config '{name}': {e}"
469 ))
470 })?;
471 let tx = self.sync_tree.new_transaction().await?;
472 let registry = Registry::new(&tx, TRANSPORTS_SUBTREE).await?;
473 registry.set_entry(name, T::type_id(), config_doc).await?;
474 tx.commit().await?;
475 Ok(())
476 }
477
478 /// Get a reference to the Instance.
479 pub fn instance(&self) -> Result<Instance> {
480 self.instance
481 .upgrade()
482 .ok_or_else(|| SyncError::InstanceDropped.into())
483 }
484
485 /// Get a reference to the backend.
486 pub fn backend(&self) -> Result<Backend> {
487 Ok(self.instance()?.backend().clone())
488 }
489
490 /// Get the sync tree database.
491 pub fn sync_tree(&self) -> &Database {
492 &self.sync_tree
493 }
494
495 /// Get the device public key for this sync instance.
496 ///
497 /// # Returns
498 /// The device's public key.
499 pub fn get_device_pubkey(&self) -> Result<PublicKey> {
500 Ok(self.instance()?.device_id())
501 }
502}
503
504impl Sync {
505 // === Test Helpers ===
506}