Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/sync/transports/
iroh.rs

1//! Iroh transport implementation for sync communication.
2//!
3//! This module provides peer-to-peer sync communication using
4//! Iroh's QUIC-based networking with hole punching and relay servers.
5
6use std::sync::Arc;
7use tokio::sync::Mutex;
8
9use async_trait::async_trait;
10use iroh::{
11    Endpoint, RelayMode, SecretKey,
12    endpoint::{Connection, RecvStream, SendStream},
13};
14use iroh_tickets::{Ticket, endpoint::EndpointTicket};
15use serde::{Deserialize, Serialize};
16#[allow(unused_imports)] // Used by write_all method on streams
17use tokio::io::AsyncWriteExt;
18use tokio::sync::oneshot;
19
20use super::{SyncTransport, TransportBuilder, TransportConfig, shared::*};
21use crate::{
22    Result,
23    crdt::Doc,
24    store::Registered,
25    sync::{
26        error::SyncError,
27        handler::SyncHandler,
28        peer_types::Address,
29        protocol::{RequestContext, SyncRequest, SyncResponse},
30    },
31};
32
33const SYNC_ALPN: &[u8] = b"eidetica/v0";
34
35/// Serializable relay mode setting for transport configuration.
36///
37/// This is a simplified version of Iroh's `RelayMode` that can be
38/// persisted to storage. Custom relay configurations are not supported
39/// in the persisted config (use the builder API for custom relays).
40#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
41pub enum RelayModeSetting {
42    /// Use n0's production relay servers (recommended for most deployments)
43    #[default]
44    Default,
45    /// Use n0's staging relay infrastructure (for testing)
46    Staging,
47    /// Disable relay servers entirely (local/direct connections only)
48    Disabled,
49}
50
51impl From<RelayModeSetting> for RelayMode {
52    fn from(setting: RelayModeSetting) -> Self {
53        match setting {
54            RelayModeSetting::Default => RelayMode::Default,
55            RelayModeSetting::Staging => RelayMode::Staging,
56            RelayModeSetting::Disabled => RelayMode::Disabled,
57        }
58    }
59}
60
61/// Persistable configuration for the Iroh transport.
62///
63/// This configuration is stored in the `_sync` database's `transport_configs`
64/// subtree and is automatically loaded when `enable_iroh_transport()` is called.
65///
66/// The most important field is `secret_key_hex`, which stores the node's
67/// cryptographic identity. When this is persisted, the node will have the
68/// same address across restarts.
69///
70/// # Example
71///
72/// ```ignore
73/// use eidetica::sync::transports::iroh::IrohTransportConfig;
74///
75/// // Create a default config (secret key will be generated on first use)
76/// let config = IrohTransportConfig::default();
77///
78/// // Or create with specific settings
79/// let config = IrohTransportConfig {
80///     relay_mode: RelayModeSetting::Disabled,
81///     ..Default::default()
82/// };
83/// ```
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct IrohTransportConfig {
86    /// Secret key bytes (hex encoded for JSON storage).
87    ///
88    /// When `None`, a new secret key will be generated on first use
89    /// and stored back to the config. Once set, this ensures the node
90    /// maintains the same identity (and thus address) across restarts.
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub secret_key_hex: Option<String>,
93
94    /// Relay mode setting for NAT traversal.
95    #[serde(default)]
96    pub relay_mode: RelayModeSetting,
97}
98
99impl Default for IrohTransportConfig {
100    fn default() -> Self {
101        Self {
102            secret_key_hex: None,
103            relay_mode: RelayModeSetting::Default,
104        }
105    }
106}
107
108impl Registered for IrohTransportConfig {
109    fn type_id() -> &'static str {
110        "iroh:v0"
111    }
112}
113
114impl TransportConfig for IrohTransportConfig {}
115
116impl IrohTransportConfig {
117    /// Get the secret key from config, or generate a new one.
118    ///
119    /// If a secret key is already stored in the config, it will be decoded
120    /// and returned. Otherwise, a new random secret key is generated,
121    /// stored in the config (as hex), and returned.
122    ///
123    /// This method mutates the config to store the newly generated key,
124    /// so the caller should persist the config after calling this.
125    pub fn get_or_create_secret_key(&mut self) -> SecretKey {
126        if let Some(hex) = &self.secret_key_hex {
127            let bytes = hex::decode(hex).expect("valid hex in stored secret key");
128            let bytes: [u8; 32] = bytes.try_into().expect("secret key should be 32 bytes");
129            SecretKey::from_bytes(&bytes)
130        } else {
131            // Generate new secret key
132            use rand::RngCore;
133            let mut secret_bytes = [0u8; 32];
134            rand::rngs::OsRng.fill_bytes(&mut secret_bytes);
135            let key = SecretKey::from_bytes(&secret_bytes);
136            self.secret_key_hex = Some(hex::encode(key.to_bytes()));
137            key
138        }
139    }
140
141    /// Check if a secret key has been set in this config.
142    pub fn has_secret_key(&self) -> bool {
143        self.secret_key_hex.is_some()
144    }
145}
146
147/// Builder for configuring IrohTransport with different relay modes and options.
148///
149/// # Examples
150///
151/// ## Production deployment (default)
152/// ```no_run
153/// use eidetica::sync::transports::iroh::IrohTransport;
154///
155/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
156/// let transport = IrohTransport::builder()
157///     .build()?;
158/// // Uses n0's production relay servers by default
159/// # Ok(())
160/// # }
161/// ```
162///
163/// ## Local testing without internet
164/// ```no_run
165/// use eidetica::sync::transports::iroh::IrohTransport;
166/// use iroh::RelayMode;
167///
168/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
169/// let transport = IrohTransport::builder()
170///     .relay_mode(RelayMode::Disabled)
171///     .build()?;
172/// // Direct P2P only, no relay servers
173/// # Ok(())
174/// # }
175/// ```
176///
177/// ## Enterprise deployment with custom relay
178/// ```no_run
179/// use eidetica::sync::transports::iroh::IrohTransport;
180/// use iroh::{RelayConfig, RelayMode, RelayMap, RelayUrl};
181///
182/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
183/// let relay_url: RelayUrl = "https://relay.example.com".parse()?;
184/// let relay_config: RelayConfig = relay_url.into();
185/// let transport = IrohTransport::builder()
186///     .relay_mode(RelayMode::Custom(RelayMap::from_iter([relay_config])))
187///     .build()?;
188/// # Ok(())
189/// # }
190/// ```
191#[derive(Debug, Clone)]
192pub struct IrohTransportBuilder {
193    relay_mode: RelayMode,
194    secret_key: Option<SecretKey>,
195}
196
197impl IrohTransportBuilder {
198    /// Create a new builder with production defaults.
199    ///
200    /// By default, uses `RelayMode::Default` which connects to n0's
201    /// production relay infrastructure for NAT traversal.
202    pub fn new() -> Self {
203        Self {
204            relay_mode: RelayMode::Default, // Use n0's production relays by default
205            secret_key: None,
206        }
207    }
208
209    /// Set the relay mode for the transport.
210    ///
211    /// # Relay Modes
212    ///
213    /// - `RelayMode::Default` - Use n0's production relay servers (recommended)
214    /// - `RelayMode::Staging` - Use n0's staging infrastructure for testing
215    /// - `RelayMode::Disabled` - No relay servers, direct P2P only (local testing)
216    /// - `RelayMode::Custom(RelayMap)` - Use custom relay servers (enterprise deployments)
217    ///
218    /// # Example
219    ///
220    /// ```no_run
221    /// use eidetica::sync::transports::iroh::IrohTransport;
222    /// use iroh::RelayMode;
223    ///
224    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
225    /// let transport = IrohTransport::builder()
226    ///     .relay_mode(RelayMode::Disabled)
227    ///     .build()?;
228    /// # Ok(())
229    /// # }
230    /// ```
231    pub fn relay_mode(mut self, mode: RelayMode) -> Self {
232        self.relay_mode = mode;
233        self
234    }
235
236    /// Set the secret key for persistent node identity.
237    ///
238    /// When a secret key is provided, the node will have the same
239    /// cryptographic identity (and thus the same address) across restarts.
240    /// This is essential for maintaining stable peer connections.
241    ///
242    /// If not set, a random secret key will be generated on each startup,
243    /// resulting in a different node address each time.
244    ///
245    /// # Example
246    ///
247    /// ```no_run
248    /// use eidetica::sync::transports::iroh::IrohTransport;
249    /// use iroh::SecretKey;
250    /// use rand::RngCore;
251    ///
252    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
253    /// // Generate or load secret key from storage
254    /// let mut secret_bytes = [0u8; 32];
255    /// rand::rngs::OsRng.fill_bytes(&mut secret_bytes);
256    /// let secret_key = SecretKey::from_bytes(&secret_bytes);
257    ///
258    /// let transport = IrohTransport::builder()
259    ///     .secret_key(secret_key)
260    ///     .build()?;
261    /// # Ok(())
262    /// # }
263    /// ```
264    pub fn secret_key(mut self, key: SecretKey) -> Self {
265        self.secret_key = Some(key);
266        self
267    }
268
269    /// Build the IrohTransport with the configured options.
270    ///
271    /// Returns a configured `IrohTransport` ready to be used with
272    /// `SyncEngine::enable_iroh_transport_with_config()`.
273    pub fn build(self) -> Result<IrohTransport> {
274        Ok(IrohTransport {
275            endpoint: Arc::new(Mutex::new(None)),
276            server_state: ServerState::new(),
277            handler: None,
278            runtime_config: IrohRuntimeConfig {
279                relay_mode: self.relay_mode,
280                secret_key: self.secret_key,
281            },
282        })
283    }
284}
285
286impl Default for IrohTransportBuilder {
287    fn default() -> Self {
288        Self::new()
289    }
290}
291
292/// Key for storing secret key in persisted Doc
293const SECRET_KEY_FIELD: &str = "secret_key";
294
295#[async_trait]
296impl TransportBuilder for IrohTransportBuilder {
297    type Transport = IrohTransport;
298
299    /// Build the transport using persisted state for identity.
300    ///
301    /// The secret key (node identity) is loaded from the persisted Doc.
302    /// If no secret key exists, a new one is generated and the Doc is returned
303    /// for persistence. Any secret_key set via the builder is ignored -
304    /// persisted state takes precedence.
305    async fn build(self, mut persisted: Doc) -> Result<(Self::Transport, Option<Doc>)> {
306        use crate::crdt::doc::Value;
307
308        // Load or generate secret key from persisted state
309        let (secret_key, updated) = match persisted.get(SECRET_KEY_FIELD) {
310            Some(Value::Text(hex_str)) => {
311                // Decode existing secret key
312                let bytes = hex::decode(hex_str).map_err(|e| {
313                    SyncError::TransportInit(format!("Invalid secret key hex: {}", e))
314                })?;
315                let bytes: [u8; 32] = bytes.try_into().map_err(|_| {
316                    SyncError::TransportInit("Secret key must be 32 bytes".to_string())
317                })?;
318                (SecretKey::from_bytes(&bytes), None)
319            }
320            Some(_) => {
321                return Err(SyncError::TransportInit(
322                    "Secret key must be a text value".to_string(),
323                )
324                .into());
325            }
326            None => {
327                // Generate new secret key and store it
328                // FIXME: Use SecretKey::generate() here (and elsewhere)
329                // Waiting on a rand_core version mismatch to be resolved.
330                use rand::RngCore;
331                let mut secret_bytes = [0u8; 32];
332                rand::rngs::OsRng.fill_bytes(&mut secret_bytes);
333                let key = SecretKey::from_bytes(&secret_bytes);
334                persisted.set(SECRET_KEY_FIELD, hex::encode(key.to_bytes()));
335                (key, Some(persisted))
336            }
337        };
338
339        let transport = IrohTransport {
340            endpoint: Arc::new(Mutex::new(None)),
341            server_state: ServerState::new(),
342            handler: None,
343            runtime_config: IrohRuntimeConfig {
344                relay_mode: self.relay_mode,
345                secret_key: Some(secret_key),
346            },
347        };
348
349        Ok((transport, updated))
350    }
351}
352
353/// Runtime configuration for IrohTransport (internal, not persisted).
354///
355/// This holds the actual runtime values used by the transport,
356/// including the decoded secret key and relay mode.
357#[derive(Debug, Clone)]
358struct IrohRuntimeConfig {
359    relay_mode: RelayMode,
360    secret_key: Option<SecretKey>,
361}
362
363/// Iroh transport implementation using QUIC peer-to-peer networking.
364///
365/// Provides NAT traversal and direct peer-to-peer connectivity using the Iroh
366/// protocol. Supports both relay-assisted and direct connections.
367///
368/// # How It Works
369///
370/// 1. **Discovery**: Peers find each other via relay servers or direct addresses
371/// 2. **Connection**: Attempts direct connection through NAT hole-punching
372/// 3. **Fallback**: Uses relay servers if direct connection fails
373/// 4. **Upgrade**: Automatically upgrades to direct connection when possible
374///
375/// # Server Addresses
376///
377/// Addresses use iroh's standard `EndpointTicket` format (postcard + base32-lower
378/// with `endpoint` prefix) for both `get_server_address()` and `DatabaseTicket` URLs.
379///
380/// # Example
381///
382/// ```no_run
383/// use eidetica::sync::transports::iroh::IrohTransport;
384/// use iroh::RelayMode;
385///
386/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
387/// // Create with defaults (production relay servers)
388/// let transport = IrohTransport::new()?;
389///
390/// // Or use the builder for custom configuration
391/// let transport = IrohTransport::builder()
392///     .relay_mode(RelayMode::Staging)
393///     .build()?;
394/// # Ok(())
395/// # }
396/// ```
397pub struct IrohTransport {
398    /// The Iroh endpoint for P2P communication (lazily initialized).
399    endpoint: Arc<Mutex<Option<Endpoint>>>,
400    /// Shared server state management.
401    server_state: ServerState,
402    /// Handler for processing sync requests.
403    handler: Option<Arc<dyn SyncHandler>>,
404    /// Runtime configuration (relay mode, secret key, etc.)
405    runtime_config: IrohRuntimeConfig,
406}
407
408impl IrohTransport {
409    /// Transport type identifier for Iroh
410    pub const TRANSPORT_TYPE: &'static str = "iroh";
411
412    /// Create a new Iroh transport instance with production defaults.
413    ///
414    /// Uses `RelayMode::Default` which connects to n0's production relay
415    /// infrastructure. For custom configuration, use `IrohTransport::builder()`.
416    ///
417    /// The endpoint will be lazily initialized on first use.
418    ///
419    /// # Example
420    ///
421    /// ```no_run
422    /// use eidetica::sync::transports::iroh::IrohTransport;
423    ///
424    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
425    /// let transport = IrohTransport::new()?;
426    /// // Use with: sync.enable_iroh_transport_with_config(transport)?;
427    /// # Ok(())
428    /// # }
429    /// ```
430    pub fn new() -> Result<Self> {
431        IrohTransportBuilder::new().build()
432    }
433
434    /// Create a builder for configuring the transport.
435    ///
436    /// Allows customization of relay modes and other transport options.
437    ///
438    /// # Example
439    ///
440    /// ```no_run
441    /// use eidetica::sync::transports::iroh::IrohTransport;
442    /// use iroh::RelayMode;
443    ///
444    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
445    /// let transport = IrohTransport::builder()
446    ///     .relay_mode(RelayMode::Disabled)
447    ///     .build()?;
448    /// # Ok(())
449    /// # }
450    /// ```
451    pub fn builder() -> IrohTransportBuilder {
452        IrohTransportBuilder::new()
453    }
454
455    /// Initialize the Iroh endpoint if not already done.
456    async fn ensure_endpoint(&self) -> Result<Endpoint> {
457        let mut endpoint_lock = self.endpoint.lock().await;
458
459        if endpoint_lock.is_none() {
460            // Create a new Iroh endpoint with configured relay mode
461            let mut builder = Endpoint::builder()
462                .alpns(vec![SYNC_ALPN.to_vec()])
463                .relay_mode(self.runtime_config.relay_mode.clone());
464
465            // Use the provided secret key for persistent identity
466            if let Some(secret_key) = &self.runtime_config.secret_key {
467                builder = builder.secret_key(secret_key.clone());
468            }
469
470            let endpoint = builder.bind().await.map_err(|e| {
471                SyncError::TransportInit(format!("Failed to create Iroh endpoint: {e}"))
472            })?;
473
474            *endpoint_lock = Some(endpoint);
475        }
476
477        Ok(endpoint_lock.as_ref().unwrap().clone())
478    }
479
480    /// Start the server request handling loop.
481    async fn start_server_loop(
482        &self,
483        endpoint: Endpoint,
484        ready_tx: oneshot::Sender<()>,
485        shutdown_rx: oneshot::Receiver<()>,
486        handler: Arc<dyn SyncHandler>,
487    ) -> Result<()> {
488        let mut shutdown_rx = shutdown_rx;
489
490        // Signal that we're ready
491        let _ = ready_tx.send(());
492
493        // Accept incoming connections
494        tokio::spawn(async move {
495            loop {
496                tokio::select! {
497                    // Check for shutdown signal
498                    _ = &mut shutdown_rx => {
499                        break;
500                    }
501                    // Accept incoming connections
502                    connection_result = endpoint.accept() => {
503                        match connection_result {
504                            Some(connecting) => {
505                                let handler_clone = handler.clone();
506                                tokio::spawn(async move {
507                                    if let Ok(conn) = connecting.await {
508                                        Self::handle_connection(conn, handler_clone).await;
509                                    }
510                                });
511                            }
512                            None => break, // Endpoint closed
513                        }
514                    }
515                }
516            }
517            // Server loop has exited - the shutdown was triggered by stop_server()
518            // which already marked the server as stopped, so no additional cleanup needed here
519        });
520
521        Ok(())
522    }
523
524    /// Handle an incoming connection.
525    async fn handle_connection(conn: Connection, handler: Arc<dyn SyncHandler>) {
526        // Get the remote peer node ID for context
527        let remote_endpoint_id = conn.remote_id();
528        let remote_address = Address {
529            transport_type: Self::TRANSPORT_TYPE.to_string(),
530            address: remote_endpoint_id.to_string(),
531        };
532
533        // Accept incoming streams and process sequentially
534        // Note: We process streams sequentially because SyncHandler::handle_request
535        // returns non-Send futures (internal types use Rc/RefCell).
536        while let Ok((send_stream, recv_stream)) = conn.accept_bi().await {
537            Self::handle_stream(
538                send_stream,
539                recv_stream,
540                handler.clone(),
541                remote_address.clone(),
542            )
543            .await;
544        }
545    }
546
547    /// Handle an incoming bidirectional stream.
548    async fn handle_stream(
549        mut send_stream: SendStream,
550        mut recv_stream: RecvStream,
551        handler: Arc<dyn SyncHandler>,
552        remote_address: Address,
553    ) {
554        // Read the request with size limit (1MB)
555        let buffer: Vec<u8> = match recv_stream.read_to_end(1024 * 1024).await {
556            Ok(buffer) => buffer,
557            Err(e) => {
558                tracing::error!("Failed to read stream: {e}");
559                return;
560            }
561        };
562
563        // Deserialize the request using JsonHandler
564        let request: SyncRequest = match JsonHandler::deserialize_request(&buffer) {
565            Ok(req) => req,
566            Err(e) => {
567                tracing::error!("Failed to deserialize request: {e}");
568                return;
569            }
570        };
571
572        // Extract peer_pubkey from SyncTreeRequest if present
573        let peer_pubkey = match &request {
574            SyncRequest::SyncTree(sync_tree_request) => sync_tree_request.peer_pubkey.clone(),
575            _ => None,
576        };
577
578        // Create request context with remote address and peer pubkey
579        let context = RequestContext {
580            remote_address: Some(remote_address),
581            peer_pubkey,
582        };
583
584        // Handle the request using the SyncHandler
585        let response = handler.handle_request(&request, &context).await;
586
587        // Serialize and send response using JsonHandler
588        match JsonHandler::serialize_response(&response) {
589            Ok(response_bytes) => {
590                if let Err(e) = send_stream.write_all(&response_bytes).await {
591                    tracing::error!("Failed to write response: {e}");
592                    return;
593                }
594                if let Err(e) = send_stream.finish() {
595                    tracing::error!("Failed to finish stream: {e}");
596                }
597            }
598            Err(e) => {
599                tracing::error!("Failed to serialize response: {e}");
600            }
601        }
602    }
603}
604
605#[async_trait]
606impl SyncTransport for IrohTransport {
607    fn transport_type(&self) -> &'static str {
608        Self::TRANSPORT_TYPE
609    }
610
611    fn can_handle_address(&self, address: &Address) -> bool {
612        address.transport_type == Self::TRANSPORT_TYPE
613    }
614
615    async fn start_server(&mut self, handler: Arc<dyn SyncHandler>) -> Result<()> {
616        // Check if server is already running
617        if self.server_state.is_running() {
618            return Err(SyncError::ServerAlreadyRunning {
619                address: "iroh-endpoint".to_string(),
620            }
621            .into());
622        }
623
624        // Store the handler
625        self.handler = Some(handler);
626
627        // Ensure we have an endpoint and get EndpointAddr with direct addresses
628        let endpoint = self.ensure_endpoint().await?;
629        let endpoint_clone = endpoint.clone();
630
631        // Get the EndpointAddr with direct addresses
632        // Note: We don't wait for online() - direct addresses are available immediately
633        // after bind(), and relay connections happen asynchronously in the background.
634        let endpoint_addr = endpoint.addr();
635        let endpoint_addr_str = Ticket::serialize(&EndpointTicket::new(endpoint_addr));
636
637        // Create server coordination channels
638        let (ready_tx, ready_rx) = oneshot::channel();
639        let (shutdown_tx, shutdown_rx) = oneshot::channel();
640
641        // Start server loop
642        self.start_server_loop(
643            endpoint_clone,
644            ready_tx,
645            shutdown_rx,
646            self.handler.clone().unwrap(),
647        )
648        .await?;
649
650        // Wait for server to be ready using shared utility
651        wait_for_ready(ready_rx, "iroh-endpoint").await?;
652
653        // Start server state with EndpointAddr string and shutdown sender
654        self.server_state
655            .server_started(endpoint_addr_str, shutdown_tx);
656
657        Ok(())
658    }
659
660    async fn stop_server(&mut self) -> Result<()> {
661        if !self.server_state.is_running() {
662            return Err(SyncError::ServerNotRunning.into());
663        }
664
665        // Stop server using combined method
666        self.server_state.stop_server();
667
668        Ok(())
669    }
670
671    async fn send_request(&self, address: &Address, request: &SyncRequest) -> Result<SyncResponse> {
672        if !self.can_handle_address(address) {
673            return Err(SyncError::UnsupportedTransport {
674                transport_type: address.transport_type.clone(),
675            }
676            .into());
677        }
678
679        // Ensure we have an endpoint (lazy initialization)
680        let endpoint = self.ensure_endpoint().await?;
681
682        // Deserialize the EndpointTicket address
683        let endpoint_ticket =
684            <EndpointTicket as Ticket>::deserialize(&address.address).map_err(|e| {
685                SyncError::SerializationError(format!(
686                    "Failed to parse EndpointTicket '{}': {e}",
687                    address.address
688                ))
689            })?;
690        let endpoint_addr = endpoint_ticket.endpoint_addr().clone();
691
692        // Connect to the peer
693        let conn = endpoint
694            .connect(endpoint_addr, SYNC_ALPN)
695            .await
696            .map_err(|e| SyncError::ConnectionFailed {
697                address: address.address.clone(),
698                reason: e.to_string(),
699            })?;
700
701        // Open a bidirectional stream
702        let (mut send_stream, mut recv_stream) = conn
703            .open_bi()
704            .await
705            .map_err(|e| SyncError::Network(format!("Failed to open stream: {e}")))?;
706
707        // Serialize and send the request using JsonHandler
708        let request_bytes = JsonHandler::serialize_request(request)?;
709
710        send_stream
711            .write_all(&request_bytes)
712            .await
713            .map_err(|e| SyncError::Network(format!("Failed to write request: {e}")))?;
714
715        send_stream
716            .finish()
717            .map_err(|e| SyncError::Network(format!("Failed to finish send stream: {e}")))?;
718
719        // Read the response with size limit (1MB)
720        let response_bytes: Vec<u8> = recv_stream
721            .read_to_end(1024 * 1024)
722            .await
723            .map_err(|e| SyncError::Network(format!("Failed to read response: {e}")))?;
724
725        // Deserialize the response using JsonHandler
726        let response: SyncResponse = JsonHandler::deserialize_response(&response_bytes)?;
727
728        Ok(response)
729    }
730
731    fn is_server_running(&self) -> bool {
732        self.server_state.is_running()
733    }
734
735    fn get_server_address(&self) -> Result<String> {
736        self.server_state.get_address().map_err(|e| e.into())
737    }
738}
739
740#[cfg(test)]
741mod tests {
742    use super::*;
743    use iroh::{EndpointAddr, TransportAddr};
744
745    /// Round-trip: EndpointAddr → EndpointTicket string → EndpointAddr
746    #[test]
747    fn endpoint_ticket_round_trip() {
748        let secret_key = SecretKey::from_bytes(&[1u8; 32]);
749        let endpoint_addr = EndpointAddr::from_parts(
750            secret_key.public(),
751            vec![
752                TransportAddr::Ip("127.0.0.1:1234".parse().unwrap()),
753                TransportAddr::Ip("192.168.1.1:5678".parse().unwrap()),
754            ],
755        );
756
757        // Serialize to EndpointTicket string
758        let ticket_str = Ticket::serialize(&EndpointTicket::new(endpoint_addr.clone()));
759        assert!(
760            ticket_str.starts_with("endpoint"),
761            "EndpointTicket should start with 'endpoint' prefix: {ticket_str}"
762        );
763
764        // Deserialize back
765        let ticket = <EndpointTicket as Ticket>::deserialize(&ticket_str).unwrap();
766        let round_tripped = ticket.endpoint_addr();
767
768        assert_eq!(endpoint_addr.id, round_tripped.id);
769        assert_eq!(
770            endpoint_addr.ip_addrs().collect::<Vec<_>>(),
771            round_tripped.ip_addrs().collect::<Vec<_>>()
772        );
773    }
774}