Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/sync/
transport.rs

1//! Network transport management for the sync system.
2
3use std::sync::Arc;
4use tokio::sync::oneshot;
5use tracing::info;
6
7use super::{
8    DatabaseTicket, Sync, SyncError, TRANSPORT_STATE_STORE,
9    background::BackgroundSync,
10    background::SyncCommand,
11    peer_types::Address,
12    transports::{SyncTransport, TransportBuilder},
13};
14use crate::{
15    Result,
16    crdt::{Doc, doc::Value},
17    entry::ID,
18    store::DocStore,
19};
20
21impl Sync {
22    // === Network Transport Methods ===
23
24    /// Stop the running sync server (async version).
25    ///
26    /// Stops servers on all transports.
27    ///
28    /// # Returns
29    /// A Result indicating success or failure of server shutdown.
30    pub async fn stop_server(&self) -> Result<()> {
31        let (tx, rx) = oneshot::channel();
32
33        self.background_tx
34            .get()
35            .ok_or(SyncError::NoTransportEnabled)?
36            .send(SyncCommand::StopServer {
37                name: None, // Stop all transports
38                response: tx,
39            })
40            .await
41            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
42
43        rx.await
44            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
45    }
46
47    /// Start accepting incoming connections on all registered transports.
48    ///
49    /// Must be called each time the instance is created to accept inbound sync requests.
50    /// This starts servers on all transports that have been registered via `register_transport()`.
51    /// Each transport uses its pre-configured bind address.
52    ///
53    /// # Example
54    ///
55    /// ```rust,ignore
56    /// instance.enable_sync().await?;
57    /// let sync = instance.sync().unwrap();
58    ///
59    /// // Register transports with their configurations
60    /// sync.register_transport("http-local", HttpTransport::builder()
61    ///     .bind("127.0.0.1:8080")
62    /// ).await?;
63    /// sync.register_transport("p2p", IrohTransport::builder()).await?;
64    ///
65    /// // Start all servers
66    /// sync.accept_connections().await?;
67    /// ```
68    pub async fn accept_connections(&self) -> Result<()> {
69        // Ensure transports are enabled
70        if self.background_tx.get().is_none() {
71            return Err(SyncError::NoTransportEnabled.into());
72        }
73
74        // Start servers on all registered transports
75        // Each transport uses its pre-configured bind address
76        let (tx, rx) = oneshot::channel();
77
78        self.background_tx
79            .get()
80            .ok_or(SyncError::NoTransportEnabled)?
81            .send(SyncCommand::StartServer {
82                name: None, // Start all
83                response: tx,
84            })
85            .await
86            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
87
88        rx.await
89            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))??;
90
91        info!("Started servers on all registered transports");
92        Ok(())
93    }
94
95    /// Start accepting incoming connections on a specific named transport.
96    ///
97    /// Use this when you want fine-grained control over which transports
98    /// accept connections and when.
99    ///
100    /// # Arguments
101    /// * `name` - The name of the transport to start (as used in `register_transport`)
102    ///
103    /// # Example
104    ///
105    /// ```rust,ignore
106    /// // Register transports
107    /// sync.register_transport("http-local", HttpTransport::builder()
108    ///     .bind("127.0.0.1:8080")
109    /// ).await?;
110    /// sync.register_transport("p2p", IrohTransport::builder()).await?;
111    ///
112    /// // Only start HTTP server (P2P stays inactive)
113    /// sync.accept_connections_on("http-local").await?;
114    /// ```
115    pub async fn accept_connections_on(&self, name: impl Into<String>) -> Result<()> {
116        let name = name.into();
117
118        if self.background_tx.get().is_none() {
119            return Err(SyncError::NoTransportEnabled.into());
120        }
121
122        let (tx, rx) = oneshot::channel();
123
124        self.background_tx
125            .get()
126            .ok_or(SyncError::NoTransportEnabled)?
127            .send(SyncCommand::StartServer {
128                name: Some(name.clone()),
129                response: tx,
130            })
131            .await
132            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
133
134        rx.await
135            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))??;
136
137        info!(name = %name, "Started server on transport");
138        Ok(())
139    }
140
141    /// Add a named transport to the sync system.
142    ///
143    /// If a transport with the same name already exists, it will be replaced
144    /// (the old transport's server will be stopped if running).
145    ///
146    /// # Arguments
147    /// * `name` - Unique name for this transport instance (e.g., "http-local", "p2p")
148    /// * `transport` - The transport to add
149    pub async fn add_transport(
150        &self,
151        name: impl Into<String>,
152        transport: Box<dyn SyncTransport>,
153    ) -> Result<()> {
154        // Ensure background sync is running
155        self.start_background_sync()?;
156
157        let name = name.into();
158        let (tx, rx) = oneshot::channel();
159
160        self.background_tx
161            .get()
162            .ok_or(SyncError::NoTransportEnabled)?
163            .send(SyncCommand::AddTransport {
164                name,
165                transport,
166                response: tx,
167            })
168            .await
169            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
170
171        rx.await
172            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
173    }
174
175    /// Register a named transport instance with persisted state.
176    ///
177    /// This is the recommended way to add transports. The builder's `build()` method
178    /// receives persisted state for this named instance (may be empty on first run)
179    /// and can update it. The updated state is automatically saved.
180    ///
181    /// # Arguments
182    /// * `name` - Unique name for this transport instance (e.g., "http-local", "p2p")
183    /// * `builder` - The transport builder that creates the transport
184    ///
185    /// # Example
186    ///
187    /// ```ignore
188    /// use eidetica::sync::transports::http::HttpTransport;
189    /// use eidetica::sync::transports::iroh::IrohTransport;
190    ///
191    /// // Register an HTTP transport with bind address
192    /// sync.register_transport("http-local", HttpTransport::builder()
193    ///     .bind("127.0.0.1:8080")
194    /// ).await?;
195    ///
196    /// // Register an Iroh transport (generates and persists node ID on first run)
197    /// sync.register_transport("p2p", IrohTransport::builder()).await?;
198    ///
199    /// // Register multiple Iroh transports with different identities
200    /// sync.register_transport("p2p-work", IrohTransport::builder()).await?;
201    /// sync.register_transport("p2p-personal", IrohTransport::builder()).await?;
202    /// ```
203    pub async fn register_transport<B: TransportBuilder>(
204        &self,
205        name: impl Into<String>,
206        builder: B,
207    ) -> Result<()>
208    where
209        B::Transport: 'static,
210    {
211        let name = name.into();
212
213        // Load persisted state for this named instance
214        let persisted = self.load_transport_state(&name).await?;
215
216        // Build transport - may generate/update persisted state
217        let (transport, updated) = builder.build(persisted).await?;
218
219        // Save updated persisted state if changed
220        if let Some(state) = updated {
221            self.save_transport_state(&name, &state).await?;
222        }
223
224        // Add transport by name
225        self.add_transport(name, Box::new(transport)).await
226    }
227
228    /// Load persisted state for a named transport instance.
229    ///
230    /// Returns an empty Doc if no state exists for this transport.
231    async fn load_transport_state(&self, name: &str) -> Result<Doc> {
232        let tx = self.sync_tree.new_transaction().await?;
233        let store = tx.get_store::<DocStore>(TRANSPORT_STATE_STORE).await?;
234
235        match store.get(name).await {
236            Ok(Value::Doc(doc)) => Ok(doc),
237            Ok(_) => Ok(Doc::new()), // Unexpected value type
238            Err(e) if e.is_not_found() => Ok(Doc::new()),
239            Err(e) => Err(e),
240        }
241    }
242
243    /// Save persisted state for a named transport instance.
244    async fn save_transport_state(&self, name: &str, state: &Doc) -> Result<()> {
245        let name = name.to_string();
246        let state = state.clone();
247        self.sync_tree
248            .with_transaction(|tx| async move {
249                let store = tx.get_store::<DocStore>(TRANSPORT_STATE_STORE).await?;
250                store.set(&name, Value::Doc(state)).await
251            })
252            .await
253    }
254
255    /// Start the background sync engine.
256    ///
257    /// The engine starts with no transports registered. Use `add_transport()`
258    /// or `register_transport()` to add transports.
259    pub(crate) fn start_background_sync(&self) -> Result<()> {
260        if self.background_tx.get().is_some() {
261            return Ok(()); // Already enabled
262        }
263
264        let sync_tree_id = self.sync_tree.root_id().clone();
265        let instance = self.instance()?;
266
267        // Create the background sync and get command sender
268        let background_tx = BackgroundSync::start(instance, sync_tree_id, Arc::clone(&self.queue));
269
270        // Initialize the command channel (can only be done once)
271        self.background_tx
272            .set(background_tx)
273            .map_err(|_| SyncError::ServerAlreadyRunning {
274                address: "command channel already initialized".to_string(),
275            })?;
276
277        Ok(())
278    }
279
280    /// Get a server address if any transport is running a server.
281    ///
282    /// Returns the first available raw server address string (e.g.,
283    /// `127.0.0.1:8080`). To produce a shareable link, use
284    /// [`create_ticket`](Self::create_ticket) to generate a full ticket URL,
285    /// or construct an [`Address`] with [`Address::new`] for programmatic use.
286    ///
287    /// Use `get_server_address_for` for a specific transport.
288    ///
289    /// # Returns
290    /// The address of the first running server, or an error if no server is running.
291    pub async fn get_server_address(&self) -> Result<String> {
292        let addresses = self.get_all_server_addresses().await?;
293        addresses
294            .into_iter()
295            .next()
296            .map(|(_, addr)| addr)
297            .ok_or_else(|| SyncError::ServerNotRunning.into())
298    }
299
300    /// Get the server address for a specific transport.
301    ///
302    /// # Arguments
303    /// * `name` - The name of the transport
304    ///
305    /// # Returns
306    /// The address the server is bound to for that transport.
307    pub async fn get_server_address_for(&self, name: &str) -> Result<String> {
308        let (tx, rx) = oneshot::channel();
309
310        self.background_tx
311            .get()
312            .ok_or(SyncError::NoTransportEnabled)?
313            .send(SyncCommand::GetServerAddress {
314                name: name.to_string(),
315                response: tx,
316            })
317            .await
318            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
319
320        rx.await
321            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
322    }
323
324    /// Get all server addresses for running transports.
325    ///
326    /// # Returns
327    /// A vector of (transport_type, address) pairs for all running servers.
328    pub async fn get_all_server_addresses(&self) -> Result<Vec<(String, String)>> {
329        let (tx, rx) = oneshot::channel();
330
331        self.background_tx
332            .get()
333            .ok_or(SyncError::NoTransportEnabled)?
334            .send(SyncCommand::GetAllServerAddresses { response: tx })
335            .await
336            .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
337
338        rx.await
339            .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
340    }
341
342    /// Generate a ticket for a database using all running transports' addresses.
343    ///
344    /// The ticket contains the database ID and address hints from all running
345    /// transport servers.
346    ///
347    /// For the common case of producing a ticket as part of starting to share
348    /// a database, prefer [`User::share`](crate::user::User::share), which
349    /// atomically enables sync and builds the ticket in one call.
350    ///
351    /// # Arguments
352    /// * `database_id` - The ID of the database to create a ticket for
353    ///
354    /// # Returns
355    /// A `DatabaseTicket` containing the database ID and transport address hints.
356    pub async fn create_ticket(&self, database_id: &ID) -> Result<DatabaseTicket> {
357        let mut ticket = DatabaseTicket::new(database_id.clone());
358        let addresses = self.get_all_server_addresses().await?;
359        for (transport_type, address) in addresses {
360            ticket.add_address(Address::new(transport_type, address));
361        }
362        Ok(ticket)
363    }
364}