Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/sync/
transport.rs

1//! Network transport management for the sync system.
2
3use std::sync::Arc;
4use tokio::sync::oneshot;
5use tracing::info;
6
7use super::{
8    DatabaseTicket, Sync, SyncError, TRANSPORT_STATE_STORE,
9    background::BackgroundSync,
10    background::SyncCommand,
11    peer_types::Address,
12    transports::{SyncTransport, TransportBuilder},
13};
14use crate::{
15    Result,
16    crdt::{Doc, doc::Value},
17    entry::ID,
18    store::DocStore,
19};
20
21impl Sync {
22    // === Network Transport Methods ===
23
24    /// Stop the running sync server (async version).
25    ///
26    /// Stops servers on all transports.
27    ///
28    /// # Returns
29    /// A Result indicating success or failure of server shutdown.
30    pub async fn stop_server(&self) -> Result<()> {
31        let (tx, rx) = oneshot::channel();
32
33        self.background_tx
34            .get()
35            .ok_or(SyncError::NoTransportEnabled)?
36            .send(SyncCommand::StopServer {
37                name: None, // Stop all transports
38                response: tx,
39            })
40            .await
41            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
42
43        rx.await
44            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
45    }
46
47    /// Start accepting incoming connections on all registered transports.
48    ///
49    /// Must be called each time the instance is created to accept inbound sync requests.
50    /// This starts servers on all transports that have been registered via `register_transport()`.
51    /// Each transport uses its pre-configured bind address.
52    ///
53    /// # Example
54    ///
55    /// ```rust,ignore
56    /// instance.enable_sync().await?;
57    /// let sync = instance.sync().unwrap();
58    ///
59    /// // Register transports with their configurations
60    /// sync.register_transport("http-local", HttpTransport::builder()
61    ///     .bind("127.0.0.1:8080")
62    /// ).await?;
63    /// sync.register_transport("p2p", IrohTransport::builder()).await?;
64    ///
65    /// // Start all servers
66    /// sync.accept_connections().await?;
67    /// ```
68    pub async fn accept_connections(&self) -> Result<()> {
69        // Ensure transports are enabled
70        if self.background_tx.get().is_none() {
71            return Err(SyncError::NoTransportEnabled.into());
72        }
73
74        // Start servers on all registered transports
75        // Each transport uses its pre-configured bind address
76        let (tx, rx) = oneshot::channel();
77
78        self.background_tx
79            .get()
80            .ok_or(SyncError::NoTransportEnabled)?
81            .send(SyncCommand::StartServer {
82                name: None, // Start all
83                response: tx,
84            })
85            .await
86            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
87
88        rx.await
89            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))??;
90
91        info!("Started servers on all registered transports");
92        Ok(())
93    }
94
95    /// Start accepting incoming connections on a specific named transport.
96    ///
97    /// Use this when you want fine-grained control over which transports
98    /// accept connections and when.
99    ///
100    /// # Arguments
101    /// * `name` - The name of the transport to start (as used in `register_transport`)
102    ///
103    /// # Example
104    ///
105    /// ```rust,ignore
106    /// // Register transports
107    /// sync.register_transport("http-local", HttpTransport::builder()
108    ///     .bind("127.0.0.1:8080")
109    /// ).await?;
110    /// sync.register_transport("p2p", IrohTransport::builder()).await?;
111    ///
112    /// // Only start HTTP server (P2P stays inactive)
113    /// sync.accept_connections_on("http-local").await?;
114    /// ```
115    pub async fn accept_connections_on(&self, name: impl Into<String>) -> Result<()> {
116        let name = name.into();
117
118        if self.background_tx.get().is_none() {
119            return Err(SyncError::NoTransportEnabled.into());
120        }
121
122        let (tx, rx) = oneshot::channel();
123
124        self.background_tx
125            .get()
126            .ok_or(SyncError::NoTransportEnabled)?
127            .send(SyncCommand::StartServer {
128                name: Some(name.clone()),
129                response: tx,
130            })
131            .await
132            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
133
134        rx.await
135            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))??;
136
137        info!(name = %name, "Started server on transport");
138        Ok(())
139    }
140
141    /// Add a named transport to the sync system.
142    ///
143    /// If a transport with the same name already exists, it will be replaced
144    /// (the old transport's server will be stopped if running).
145    ///
146    /// # Arguments
147    /// * `name` - Unique name for this transport instance (e.g., "http-local", "p2p")
148    /// * `transport` - The transport to add
149    pub async fn add_transport(
150        &self,
151        name: impl Into<String>,
152        transport: Box<dyn SyncTransport>,
153    ) -> Result<()> {
154        // Ensure background sync is running
155        self.start_background_sync()?;
156
157        let name = name.into();
158        let (tx, rx) = oneshot::channel();
159
160        self.background_tx
161            .get()
162            .ok_or(SyncError::NoTransportEnabled)?
163            .send(SyncCommand::AddTransport {
164                name,
165                transport,
166                response: tx,
167            })
168            .await
169            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
170
171        rx.await
172            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
173    }
174
175    /// Register a named transport instance with persisted state.
176    ///
177    /// This is the recommended way to add transports. The builder's `build()` method
178    /// receives persisted state for this named instance (may be empty on first run)
179    /// and can update it. The updated state is automatically saved.
180    ///
181    /// # Arguments
182    /// * `name` - Unique name for this transport instance (e.g., "http-local", "p2p")
183    /// * `builder` - The transport builder that creates the transport
184    ///
185    /// # Example
186    ///
187    /// ```ignore
188    /// use eidetica::sync::transports::http::HttpTransport;
189    /// use eidetica::sync::transports::iroh::IrohTransport;
190    ///
191    /// // Register an HTTP transport with bind address
192    /// sync.register_transport("http-local", HttpTransport::builder()
193    ///     .bind("127.0.0.1:8080")
194    /// ).await?;
195    ///
196    /// // Register an Iroh transport (generates and persists node ID on first run)
197    /// sync.register_transport("p2p", IrohTransport::builder()).await?;
198    ///
199    /// // Register multiple Iroh transports with different identities
200    /// sync.register_transport("p2p-work", IrohTransport::builder()).await?;
201    /// sync.register_transport("p2p-personal", IrohTransport::builder()).await?;
202    /// ```
203    pub async fn register_transport<B: TransportBuilder>(
204        &self,
205        name: impl Into<String>,
206        builder: B,
207    ) -> Result<()>
208    where
209        B::Transport: 'static,
210    {
211        let name = name.into();
212
213        // Load persisted state for this named instance
214        let persisted = self.load_transport_state(&name).await?;
215
216        // Build transport - may generate/update persisted state
217        let (transport, updated) = builder.build(persisted).await?;
218
219        // Save updated persisted state if changed
220        if let Some(state) = updated {
221            self.save_transport_state(&name, &state).await?;
222        }
223
224        // Add transport by name
225        self.add_transport(name, Box::new(transport)).await
226    }
227
228    /// Load persisted state for a named transport instance.
229    ///
230    /// Returns an empty Doc if no state exists for this transport.
231    async fn load_transport_state(&self, name: &str) -> Result<Doc> {
232        let tx = self.sync_tree.new_transaction().await?;
233        let store = tx.get_store::<DocStore>(TRANSPORT_STATE_STORE).await?;
234
235        match store.get(name).await {
236            Ok(Value::Doc(doc)) => Ok(doc),
237            Ok(_) => Ok(Doc::new()), // Unexpected value type
238            Err(e) if e.is_not_found() => Ok(Doc::new()),
239            Err(e) => Err(e),
240        }
241    }
242
243    /// Save persisted state for a named transport instance.
244    async fn save_transport_state(&self, name: &str, state: &Doc) -> Result<()> {
245        let tx = self.sync_tree.new_transaction().await?;
246        let store = tx.get_store::<DocStore>(TRANSPORT_STATE_STORE).await?;
247        store.set(name, Value::Doc(state.clone())).await?;
248        tx.commit().await?;
249        Ok(())
250    }
251
252    /// Start the background sync engine.
253    ///
254    /// The engine starts with no transports registered. Use `add_transport()`
255    /// or `register_transport()` to add transports.
256    pub(crate) fn start_background_sync(&self) -> Result<()> {
257        if self.background_tx.get().is_some() {
258            return Ok(()); // Already enabled
259        }
260
261        let sync_tree_id = self.sync_tree.root_id().clone();
262        let instance = self.instance()?;
263
264        // Create the background sync and get command sender
265        let background_tx = BackgroundSync::start(instance, sync_tree_id, Arc::clone(&self.queue));
266
267        // Initialize the command channel (can only be done once)
268        self.background_tx
269            .set(background_tx)
270            .map_err(|_| SyncError::ServerAlreadyRunning {
271                address: "command channel already initialized".to_string(),
272            })?;
273
274        Ok(())
275    }
276
277    /// Get a server address if any transport is running a server.
278    ///
279    /// Returns the first available raw server address string (e.g.,
280    /// `127.0.0.1:8080`). To produce a shareable link, use
281    /// [`create_ticket`](Self::create_ticket) to generate a full ticket URL,
282    /// or construct an [`Address`] with [`Address::new`] for programmatic use.
283    ///
284    /// Use `get_server_address_for` for a specific transport.
285    ///
286    /// # Returns
287    /// The address of the first running server, or an error if no server is running.
288    pub async fn get_server_address(&self) -> Result<String> {
289        let addresses = self.get_all_server_addresses().await?;
290        addresses
291            .into_iter()
292            .next()
293            .map(|(_, addr)| addr)
294            .ok_or_else(|| SyncError::ServerNotRunning.into())
295    }
296
297    /// Get the server address for a specific transport.
298    ///
299    /// # Arguments
300    /// * `name` - The name of the transport
301    ///
302    /// # Returns
303    /// The address the server is bound to for that transport.
304    pub async fn get_server_address_for(&self, name: &str) -> Result<String> {
305        let (tx, rx) = oneshot::channel();
306
307        self.background_tx
308            .get()
309            .ok_or(SyncError::NoTransportEnabled)?
310            .send(SyncCommand::GetServerAddress {
311                name: name.to_string(),
312                response: tx,
313            })
314            .await
315            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
316
317        rx.await
318            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
319    }
320
321    /// Get all server addresses for running transports.
322    ///
323    /// # Returns
324    /// A vector of (transport_type, address) pairs for all running servers.
325    pub async fn get_all_server_addresses(&self) -> Result<Vec<(String, String)>> {
326        let (tx, rx) = oneshot::channel();
327
328        self.background_tx
329            .get()
330            .ok_or(SyncError::NoTransportEnabled)?
331            .send(SyncCommand::GetAllServerAddresses { response: tx })
332            .await
333            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
334
335        rx.await
336            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
337    }
338
339    /// Generate a ticket for a database using all running transports' addresses.
340    ///
341    /// The ticket contains the database ID and address hints from all running
342    /// transport servers.
343    ///
344    /// # Arguments
345    /// * `database_id` - The ID of the database to create a ticket for
346    ///
347    /// # Returns
348    /// A `DatabaseTicket` containing the database ID and transport address hints.
349    pub async fn create_ticket(&self, database_id: &ID) -> Result<DatabaseTicket> {
350        let mut ticket = DatabaseTicket::new(database_id.clone());
351        let addresses = self.get_all_server_addresses().await?;
352        for (transport_type, address) in addresses {
353            ticket.add_address(Address::new(transport_type, address));
354        }
355        Ok(ticket)
356    }
357}