eidetica/sync/transport.rs
1//! Network transport management for the sync system.
2
3use std::sync::Arc;
4use tokio::sync::oneshot;
5use tracing::info;
6
7use super::{
8 DatabaseTicket, Sync, SyncError, TRANSPORT_STATE_STORE,
9 background::BackgroundSync,
10 background::SyncCommand,
11 peer_types::Address,
12 transports::{SyncTransport, TransportBuilder},
13};
14use crate::{
15 Result,
16 crdt::{Doc, doc::Value},
17 entry::ID,
18 store::DocStore,
19};
20
21impl Sync {
22 // === Network Transport Methods ===
23
24 /// Stop the running sync server (async version).
25 ///
26 /// Stops servers on all transports.
27 ///
28 /// # Returns
29 /// A Result indicating success or failure of server shutdown.
30 pub async fn stop_server(&self) -> Result<()> {
31 let (tx, rx) = oneshot::channel();
32
33 self.background_tx
34 .get()
35 .ok_or(SyncError::NoTransportEnabled)?
36 .send(SyncCommand::StopServer {
37 name: None, // Stop all transports
38 response: tx,
39 })
40 .await
41 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
42
43 rx.await
44 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
45 }
46
47 /// Start accepting incoming connections on all registered transports.
48 ///
49 /// Must be called each time the instance is created to accept inbound sync requests.
50 /// This starts servers on all transports that have been registered via `register_transport()`.
51 /// Each transport uses its pre-configured bind address.
52 ///
53 /// # Example
54 ///
55 /// ```rust,ignore
56 /// instance.enable_sync().await?;
57 /// let sync = instance.sync().unwrap();
58 ///
59 /// // Register transports with their configurations
60 /// sync.register_transport("http-local", HttpTransport::builder()
61 /// .bind("127.0.0.1:8080")
62 /// ).await?;
63 /// sync.register_transport("p2p", IrohTransport::builder()).await?;
64 ///
65 /// // Start all servers
66 /// sync.accept_connections().await?;
67 /// ```
68 pub async fn accept_connections(&self) -> Result<()> {
69 // Ensure transports are enabled
70 if self.background_tx.get().is_none() {
71 return Err(SyncError::NoTransportEnabled.into());
72 }
73
74 // Start servers on all registered transports
75 // Each transport uses its pre-configured bind address
76 let (tx, rx) = oneshot::channel();
77
78 self.background_tx
79 .get()
80 .ok_or(SyncError::NoTransportEnabled)?
81 .send(SyncCommand::StartServer {
82 name: None, // Start all
83 response: tx,
84 })
85 .await
86 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
87
88 rx.await
89 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))??;
90
91 info!("Started servers on all registered transports");
92 Ok(())
93 }
94
95 /// Start accepting incoming connections on a specific named transport.
96 ///
97 /// Use this when you want fine-grained control over which transports
98 /// accept connections and when.
99 ///
100 /// # Arguments
101 /// * `name` - The name of the transport to start (as used in `register_transport`)
102 ///
103 /// # Example
104 ///
105 /// ```rust,ignore
106 /// // Register transports
107 /// sync.register_transport("http-local", HttpTransport::builder()
108 /// .bind("127.0.0.1:8080")
109 /// ).await?;
110 /// sync.register_transport("p2p", IrohTransport::builder()).await?;
111 ///
112 /// // Only start HTTP server (P2P stays inactive)
113 /// sync.accept_connections_on("http-local").await?;
114 /// ```
115 pub async fn accept_connections_on(&self, name: impl Into<String>) -> Result<()> {
116 let name = name.into();
117
118 if self.background_tx.get().is_none() {
119 return Err(SyncError::NoTransportEnabled.into());
120 }
121
122 let (tx, rx) = oneshot::channel();
123
124 self.background_tx
125 .get()
126 .ok_or(SyncError::NoTransportEnabled)?
127 .send(SyncCommand::StartServer {
128 name: Some(name.clone()),
129 response: tx,
130 })
131 .await
132 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
133
134 rx.await
135 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))??;
136
137 info!(name = %name, "Started server on transport");
138 Ok(())
139 }
140
141 /// Add a named transport to the sync system.
142 ///
143 /// If a transport with the same name already exists, it will be replaced
144 /// (the old transport's server will be stopped if running).
145 ///
146 /// # Arguments
147 /// * `name` - Unique name for this transport instance (e.g., "http-local", "p2p")
148 /// * `transport` - The transport to add
149 pub async fn add_transport(
150 &self,
151 name: impl Into<String>,
152 transport: Box<dyn SyncTransport>,
153 ) -> Result<()> {
154 // Ensure background sync is running
155 self.start_background_sync()?;
156
157 let name = name.into();
158 let (tx, rx) = oneshot::channel();
159
160 self.background_tx
161 .get()
162 .ok_or(SyncError::NoTransportEnabled)?
163 .send(SyncCommand::AddTransport {
164 name,
165 transport,
166 response: tx,
167 })
168 .await
169 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
170
171 rx.await
172 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
173 }
174
175 /// Register a named transport instance with persisted state.
176 ///
177 /// This is the recommended way to add transports. The builder's `build()` method
178 /// receives persisted state for this named instance (may be empty on first run)
179 /// and can update it. The updated state is automatically saved.
180 ///
181 /// # Arguments
182 /// * `name` - Unique name for this transport instance (e.g., "http-local", "p2p")
183 /// * `builder` - The transport builder that creates the transport
184 ///
185 /// # Example
186 ///
187 /// ```ignore
188 /// use eidetica::sync::transports::http::HttpTransport;
189 /// use eidetica::sync::transports::iroh::IrohTransport;
190 ///
191 /// // Register an HTTP transport with bind address
192 /// sync.register_transport("http-local", HttpTransport::builder()
193 /// .bind("127.0.0.1:8080")
194 /// ).await?;
195 ///
196 /// // Register an Iroh transport (generates and persists node ID on first run)
197 /// sync.register_transport("p2p", IrohTransport::builder()).await?;
198 ///
199 /// // Register multiple Iroh transports with different identities
200 /// sync.register_transport("p2p-work", IrohTransport::builder()).await?;
201 /// sync.register_transport("p2p-personal", IrohTransport::builder()).await?;
202 /// ```
203 pub async fn register_transport<B: TransportBuilder>(
204 &self,
205 name: impl Into<String>,
206 builder: B,
207 ) -> Result<()>
208 where
209 B::Transport: 'static,
210 {
211 let name = name.into();
212
213 // Load persisted state for this named instance
214 let persisted = self.load_transport_state(&name).await?;
215
216 // Build transport - may generate/update persisted state
217 let (transport, updated) = builder.build(persisted).await?;
218
219 // Save updated persisted state if changed
220 if let Some(state) = updated {
221 self.save_transport_state(&name, &state).await?;
222 }
223
224 // Add transport by name
225 self.add_transport(name, Box::new(transport)).await
226 }
227
228 /// Load persisted state for a named transport instance.
229 ///
230 /// Returns an empty Doc if no state exists for this transport.
231 async fn load_transport_state(&self, name: &str) -> Result<Doc> {
232 let tx = self.sync_tree.new_transaction().await?;
233 let store = tx.get_store::<DocStore>(TRANSPORT_STATE_STORE).await?;
234
235 match store.get(name).await {
236 Ok(Value::Doc(doc)) => Ok(doc),
237 Ok(_) => Ok(Doc::new()), // Unexpected value type
238 Err(e) if e.is_not_found() => Ok(Doc::new()),
239 Err(e) => Err(e),
240 }
241 }
242
243 /// Save persisted state for a named transport instance.
244 async fn save_transport_state(&self, name: &str, state: &Doc) -> Result<()> {
245 let name = name.to_string();
246 let state = state.clone();
247 self.sync_tree
248 .with_transaction(|tx| async move {
249 let store = tx.get_store::<DocStore>(TRANSPORT_STATE_STORE).await?;
250 store.set(&name, Value::Doc(state)).await
251 })
252 .await
253 }
254
255 /// Start the background sync engine.
256 ///
257 /// The engine starts with no transports registered. Use `add_transport()`
258 /// or `register_transport()` to add transports.
259 pub(crate) fn start_background_sync(&self) -> Result<()> {
260 if self.background_tx.get().is_some() {
261 return Ok(()); // Already enabled
262 }
263
264 let sync_tree_id = self.sync_tree.root_id().clone();
265 let instance = self.instance()?;
266
267 // Create the background sync and get command sender
268 let background_tx = BackgroundSync::start(instance, sync_tree_id, Arc::clone(&self.queue));
269
270 // Initialize the command channel (can only be done once)
271 self.background_tx
272 .set(background_tx)
273 .map_err(|_| SyncError::ServerAlreadyRunning {
274 address: "command channel already initialized".to_string(),
275 })?;
276
277 Ok(())
278 }
279
280 /// Get a server address if any transport is running a server.
281 ///
282 /// Returns the first available raw server address string (e.g.,
283 /// `127.0.0.1:8080`). To produce a shareable link, use
284 /// [`create_ticket`](Self::create_ticket) to generate a full ticket URL,
285 /// or construct an [`Address`] with [`Address::new`] for programmatic use.
286 ///
287 /// Use `get_server_address_for` for a specific transport.
288 ///
289 /// # Returns
290 /// The address of the first running server, or an error if no server is running.
291 pub async fn get_server_address(&self) -> Result<String> {
292 let addresses = self.get_all_server_addresses().await?;
293 addresses
294 .into_iter()
295 .next()
296 .map(|(_, addr)| addr)
297 .ok_or_else(|| SyncError::ServerNotRunning.into())
298 }
299
300 /// Get the server address for a specific transport.
301 ///
302 /// # Arguments
303 /// * `name` - The name of the transport
304 ///
305 /// # Returns
306 /// The address the server is bound to for that transport.
307 pub async fn get_server_address_for(&self, name: &str) -> Result<String> {
308 let (tx, rx) = oneshot::channel();
309
310 self.background_tx
311 .get()
312 .ok_or(SyncError::NoTransportEnabled)?
313 .send(SyncCommand::GetServerAddress {
314 name: name.to_string(),
315 response: tx,
316 })
317 .await
318 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
319
320 rx.await
321 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
322 }
323
324 /// Get all server addresses for running transports.
325 ///
326 /// # Returns
327 /// A vector of (transport_type, address) pairs for all running servers.
328 pub async fn get_all_server_addresses(&self) -> Result<Vec<(String, String)>> {
329 let (tx, rx) = oneshot::channel();
330
331 self.background_tx
332 .get()
333 .ok_or(SyncError::NoTransportEnabled)?
334 .send(SyncCommand::GetAllServerAddresses { response: tx })
335 .await
336 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
337
338 rx.await
339 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
340 }
341
342 /// Generate a ticket for a database using all running transports' addresses.
343 ///
344 /// The ticket contains the database ID and address hints from all running
345 /// transport servers.
346 ///
347 /// For the common case of producing a ticket as part of starting to share
348 /// a database, prefer [`User::share`](crate::user::User::share), which
349 /// atomically enables sync and builds the ticket in one call.
350 ///
351 /// # Arguments
352 /// * `database_id` - The ID of the database to create a ticket for
353 ///
354 /// # Returns
355 /// A `DatabaseTicket` containing the database ID and transport address hints.
356 pub async fn create_ticket(&self, database_id: &ID) -> Result<DatabaseTicket> {
357 let mut ticket = DatabaseTicket::new(database_id.clone());
358 let addresses = self.get_all_server_addresses().await?;
359 for (transport_type, address) in addresses {
360 ticket.add_address(Address::new(transport_type, address));
361 }
362 Ok(ticket)
363 }
364}