eidetica/sync/transport.rs
1//! Network transport management for the sync system.
2
3use std::sync::Arc;
4use tokio::sync::oneshot;
5use tracing::info;
6
7use super::{
8 DatabaseTicket, Sync, SyncError, TRANSPORT_STATE_STORE,
9 background::BackgroundSync,
10 background::SyncCommand,
11 peer_types::Address,
12 transports::{SyncTransport, TransportBuilder},
13};
14use crate::{
15 Result,
16 crdt::{Doc, doc::Value},
17 entry::ID,
18 store::DocStore,
19};
20
21impl Sync {
22 // === Network Transport Methods ===
23
24 /// Stop the running sync server (async version).
25 ///
26 /// Stops servers on all transports.
27 ///
28 /// # Returns
29 /// A Result indicating success or failure of server shutdown.
30 pub async fn stop_server(&self) -> Result<()> {
31 let (tx, rx) = oneshot::channel();
32
33 self.background_tx
34 .get()
35 .ok_or(SyncError::NoTransportEnabled)?
36 .send(SyncCommand::StopServer {
37 name: None, // Stop all transports
38 response: tx,
39 })
40 .await
41 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
42
43 rx.await
44 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
45 }
46
47 /// Start accepting incoming connections on all registered transports.
48 ///
49 /// Must be called each time the instance is created to accept inbound sync requests.
50 /// This starts servers on all transports that have been registered via `register_transport()`.
51 /// Each transport uses its pre-configured bind address.
52 ///
53 /// # Example
54 ///
55 /// ```rust,ignore
56 /// instance.enable_sync().await?;
57 /// let sync = instance.sync().unwrap();
58 ///
59 /// // Register transports with their configurations
60 /// sync.register_transport("http-local", HttpTransport::builder()
61 /// .bind("127.0.0.1:8080")
62 /// ).await?;
63 /// sync.register_transport("p2p", IrohTransport::builder()).await?;
64 ///
65 /// // Start all servers
66 /// sync.accept_connections().await?;
67 /// ```
68 pub async fn accept_connections(&self) -> Result<()> {
69 // Ensure transports are enabled
70 if self.background_tx.get().is_none() {
71 return Err(SyncError::NoTransportEnabled.into());
72 }
73
74 // Start servers on all registered transports
75 // Each transport uses its pre-configured bind address
76 let (tx, rx) = oneshot::channel();
77
78 self.background_tx
79 .get()
80 .ok_or(SyncError::NoTransportEnabled)?
81 .send(SyncCommand::StartServer {
82 name: None, // Start all
83 response: tx,
84 })
85 .await
86 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
87
88 rx.await
89 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))??;
90
91 info!("Started servers on all registered transports");
92 Ok(())
93 }
94
95 /// Start accepting incoming connections on a specific named transport.
96 ///
97 /// Use this when you want fine-grained control over which transports
98 /// accept connections and when.
99 ///
100 /// # Arguments
101 /// * `name` - The name of the transport to start (as used in `register_transport`)
102 ///
103 /// # Example
104 ///
105 /// ```rust,ignore
106 /// // Register transports
107 /// sync.register_transport("http-local", HttpTransport::builder()
108 /// .bind("127.0.0.1:8080")
109 /// ).await?;
110 /// sync.register_transport("p2p", IrohTransport::builder()).await?;
111 ///
112 /// // Only start HTTP server (P2P stays inactive)
113 /// sync.accept_connections_on("http-local").await?;
114 /// ```
115 pub async fn accept_connections_on(&self, name: impl Into<String>) -> Result<()> {
116 let name = name.into();
117
118 if self.background_tx.get().is_none() {
119 return Err(SyncError::NoTransportEnabled.into());
120 }
121
122 let (tx, rx) = oneshot::channel();
123
124 self.background_tx
125 .get()
126 .ok_or(SyncError::NoTransportEnabled)?
127 .send(SyncCommand::StartServer {
128 name: Some(name.clone()),
129 response: tx,
130 })
131 .await
132 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
133
134 rx.await
135 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))??;
136
137 info!(name = %name, "Started server on transport");
138 Ok(())
139 }
140
141 /// Add a named transport to the sync system.
142 ///
143 /// If a transport with the same name already exists, it will be replaced
144 /// (the old transport's server will be stopped if running).
145 ///
146 /// # Arguments
147 /// * `name` - Unique name for this transport instance (e.g., "http-local", "p2p")
148 /// * `transport` - The transport to add
149 pub async fn add_transport(
150 &self,
151 name: impl Into<String>,
152 transport: Box<dyn SyncTransport>,
153 ) -> Result<()> {
154 // Ensure background sync is running
155 self.start_background_sync()?;
156
157 let name = name.into();
158 let (tx, rx) = oneshot::channel();
159
160 self.background_tx
161 .get()
162 .ok_or(SyncError::NoTransportEnabled)?
163 .send(SyncCommand::AddTransport {
164 name,
165 transport,
166 response: tx,
167 })
168 .await
169 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
170
171 rx.await
172 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
173 }
174
175 /// Register a named transport instance with persisted state.
176 ///
177 /// This is the recommended way to add transports. The builder's `build()` method
178 /// receives persisted state for this named instance (may be empty on first run)
179 /// and can update it. The updated state is automatically saved.
180 ///
181 /// # Arguments
182 /// * `name` - Unique name for this transport instance (e.g., "http-local", "p2p")
183 /// * `builder` - The transport builder that creates the transport
184 ///
185 /// # Example
186 ///
187 /// ```ignore
188 /// use eidetica::sync::transports::http::HttpTransport;
189 /// use eidetica::sync::transports::iroh::IrohTransport;
190 ///
191 /// // Register an HTTP transport with bind address
192 /// sync.register_transport("http-local", HttpTransport::builder()
193 /// .bind("127.0.0.1:8080")
194 /// ).await?;
195 ///
196 /// // Register an Iroh transport (generates and persists node ID on first run)
197 /// sync.register_transport("p2p", IrohTransport::builder()).await?;
198 ///
199 /// // Register multiple Iroh transports with different identities
200 /// sync.register_transport("p2p-work", IrohTransport::builder()).await?;
201 /// sync.register_transport("p2p-personal", IrohTransport::builder()).await?;
202 /// ```
203 pub async fn register_transport<B: TransportBuilder>(
204 &self,
205 name: impl Into<String>,
206 builder: B,
207 ) -> Result<()>
208 where
209 B::Transport: 'static,
210 {
211 let name = name.into();
212
213 // Load persisted state for this named instance
214 let persisted = self.load_transport_state(&name).await?;
215
216 // Build transport - may generate/update persisted state
217 let (transport, updated) = builder.build(persisted).await?;
218
219 // Save updated persisted state if changed
220 if let Some(state) = updated {
221 self.save_transport_state(&name, &state).await?;
222 }
223
224 // Add transport by name
225 self.add_transport(name, Box::new(transport)).await
226 }
227
228 /// Load persisted state for a named transport instance.
229 ///
230 /// Returns an empty Doc if no state exists for this transport.
231 async fn load_transport_state(&self, name: &str) -> Result<Doc> {
232 let tx = self.sync_tree.new_transaction().await?;
233 let store = tx.get_store::<DocStore>(TRANSPORT_STATE_STORE).await?;
234
235 match store.get(name).await {
236 Ok(Value::Doc(doc)) => Ok(doc),
237 Ok(_) => Ok(Doc::new()), // Unexpected value type
238 Err(e) if e.is_not_found() => Ok(Doc::new()),
239 Err(e) => Err(e),
240 }
241 }
242
243 /// Save persisted state for a named transport instance.
244 async fn save_transport_state(&self, name: &str, state: &Doc) -> Result<()> {
245 let tx = self.sync_tree.new_transaction().await?;
246 let store = tx.get_store::<DocStore>(TRANSPORT_STATE_STORE).await?;
247 store.set(name, Value::Doc(state.clone())).await?;
248 tx.commit().await?;
249 Ok(())
250 }
251
252 /// Start the background sync engine.
253 ///
254 /// The engine starts with no transports registered. Use `add_transport()`
255 /// or `register_transport()` to add transports.
256 pub(crate) fn start_background_sync(&self) -> Result<()> {
257 if self.background_tx.get().is_some() {
258 return Ok(()); // Already enabled
259 }
260
261 let sync_tree_id = self.sync_tree.root_id().clone();
262 let instance = self.instance()?;
263
264 // Create the background sync and get command sender
265 let background_tx = BackgroundSync::start(instance, sync_tree_id, Arc::clone(&self.queue));
266
267 // Initialize the command channel (can only be done once)
268 self.background_tx
269 .set(background_tx)
270 .map_err(|_| SyncError::ServerAlreadyRunning {
271 address: "command channel already initialized".to_string(),
272 })?;
273
274 Ok(())
275 }
276
277 /// Get a server address if any transport is running a server.
278 ///
279 /// Returns the first available raw server address string (e.g.,
280 /// `127.0.0.1:8080`). To produce a shareable link, use
281 /// [`create_ticket`](Self::create_ticket) to generate a full ticket URL,
282 /// or construct an [`Address`] with [`Address::new`] for programmatic use.
283 ///
284 /// Use `get_server_address_for` for a specific transport.
285 ///
286 /// # Returns
287 /// The address of the first running server, or an error if no server is running.
288 pub async fn get_server_address(&self) -> Result<String> {
289 let addresses = self.get_all_server_addresses().await?;
290 addresses
291 .into_iter()
292 .next()
293 .map(|(_, addr)| addr)
294 .ok_or_else(|| SyncError::ServerNotRunning.into())
295 }
296
297 /// Get the server address for a specific transport.
298 ///
299 /// # Arguments
300 /// * `name` - The name of the transport
301 ///
302 /// # Returns
303 /// The address the server is bound to for that transport.
304 pub async fn get_server_address_for(&self, name: &str) -> Result<String> {
305 let (tx, rx) = oneshot::channel();
306
307 self.background_tx
308 .get()
309 .ok_or(SyncError::NoTransportEnabled)?
310 .send(SyncCommand::GetServerAddress {
311 name: name.to_string(),
312 response: tx,
313 })
314 .await
315 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
316
317 rx.await
318 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
319 }
320
321 /// Get all server addresses for running transports.
322 ///
323 /// # Returns
324 /// A vector of (transport_type, address) pairs for all running servers.
325 pub async fn get_all_server_addresses(&self) -> Result<Vec<(String, String)>> {
326 let (tx, rx) = oneshot::channel();
327
328 self.background_tx
329 .get()
330 .ok_or(SyncError::NoTransportEnabled)?
331 .send(SyncCommand::GetAllServerAddresses { response: tx })
332 .await
333 .map_err(|e| SyncError::CommandSendError(e.to_string()))?;
334
335 rx.await
336 .map_err(|e| SyncError::Network(format!("Response channel error: {e}")))?
337 }
338
339 /// Generate a ticket for a database using all running transports' addresses.
340 ///
341 /// The ticket contains the database ID and address hints from all running
342 /// transport servers.
343 ///
344 /// # Arguments
345 /// * `database_id` - The ID of the database to create a ticket for
346 ///
347 /// # Returns
348 /// A `DatabaseTicket` containing the database ID and transport address hints.
349 pub async fn create_ticket(&self, database_id: &ID) -> Result<DatabaseTicket> {
350 let mut ticket = DatabaseTicket::new(database_id.clone());
351 let addresses = self.get_all_server_addresses().await?;
352 for (transport_type, address) in addresses {
353 ticket.add_address(Address::new(transport_type, address));
354 }
355 Ok(ticket)
356 }
357}