Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/sync/transports/
http.rs

1//! HTTP transport implementation for sync communication.
2//!
3//! This module provides HTTP-based sync communication using a single
4//! JSON endpoint (/api/v0) with axum for the server and reqwest for the client.
5
6use std::{net::SocketAddr, sync::Arc};
7
8use async_trait::async_trait;
9use axum::{
10    Router,
11    extract::{ConnectInfo, Json as ExtractJson, State},
12    response::Json,
13    routing::post,
14};
15use serde::{Deserialize, Serialize};
16use tokio::sync::oneshot;
17
18use super::{SyncTransport, TransportBuilder, TransportConfig, shared::*};
19use crate::{
20    Result,
21    crdt::Doc,
22    store::Registered,
23    sync::{
24        error::SyncError,
25        handler::SyncHandler,
26        peer_types::Address,
27        protocol::{RequestContext, SyncRequest, SyncResponse},
28    },
29};
30
31/// Persistable configuration for the HTTP transport.
32///
33/// Stores HTTP-specific configuration such as the bind address for listening.
34#[derive(Debug, Clone, Serialize, Deserialize, Default)]
35pub struct HttpTransportConfig {
36    /// Bind address for HTTP server (e.g., "127.0.0.1:8080").
37    /// If None, HTTP server won't be started by accept_connections().
38    #[serde(default, skip_serializing_if = "Option::is_none")]
39    pub bind_address: Option<String>,
40}
41
42impl Registered for HttpTransportConfig {
43    fn type_id() -> &'static str {
44        "http:v0"
45    }
46}
47
48impl TransportConfig for HttpTransportConfig {}
49
50/// Builder for configuring HTTP transport.
51///
52/// # Example
53///
54/// ```ignore
55/// use eidetica::sync::transports::http::HttpTransport;
56///
57/// // Create transport with specific bind address
58/// let builder = HttpTransport::builder()
59///     .bind("127.0.0.1:8080");
60///
61/// // Register with sync system
62/// sync.register_transport("http-local", builder).await?;
63/// ```
64#[derive(Debug, Clone, Default)]
65pub struct HttpTransportBuilder {
66    bind_address: Option<String>,
67}
68
69impl HttpTransportBuilder {
70    /// Create a new builder with default settings.
71    pub fn new() -> Self {
72        Self::default()
73    }
74
75    /// Set the bind address for the HTTP server.
76    ///
77    /// # Arguments
78    /// * `addr` - The address to bind to (e.g., "127.0.0.1:8080" or "0.0.0.0:80")
79    ///
80    /// # Example
81    ///
82    /// ```ignore
83    /// let builder = HttpTransport::builder()
84    ///     .bind("0.0.0.0:8080");
85    /// ```
86    pub fn bind(mut self, addr: impl Into<String>) -> Self {
87        self.bind_address = Some(addr.into());
88        self
89    }
90
91    /// Build the transport synchronously (for backwards compatibility).
92    ///
93    /// Note: The bind address is stored but the server is not started.
94    /// Call `start_server()` on the transport to actually bind.
95    pub fn build_sync(self) -> Result<HttpTransport> {
96        Ok(HttpTransport {
97            server_state: ServerState::new(),
98            bind_address: self.bind_address,
99        })
100    }
101}
102
103#[async_trait]
104impl TransportBuilder for HttpTransportBuilder {
105    type Transport = HttpTransport;
106
107    /// Build the HTTP transport.
108    ///
109    /// HTTP transport doesn't require persisted state for identity.
110    async fn build(self, _persisted: Doc) -> Result<(Self::Transport, Option<Doc>)> {
111        let transport = HttpTransport {
112            server_state: ServerState::new(),
113            bind_address: self.bind_address,
114        };
115        Ok((transport, None))
116    }
117}
118
119/// HTTP transport implementation using axum and reqwest.
120pub struct HttpTransport {
121    /// Shared server state management.
122    server_state: ServerState,
123    /// Configured bind address (used when start_server is called with empty addr)
124    bind_address: Option<String>,
125}
126
127impl HttpTransport {
128    /// Transport type identifier for HTTP
129    pub const TRANSPORT_TYPE: &'static str = "http";
130
131    /// Create a new HTTP transport instance.
132    pub fn new() -> Result<Self> {
133        Ok(Self {
134            server_state: ServerState::new(),
135            bind_address: None,
136        })
137    }
138
139    /// Create a builder for configuring the transport.
140    pub fn builder() -> HttpTransportBuilder {
141        HttpTransportBuilder::new()
142    }
143
144    /// Create the axum router with single JSON endpoint and handler state.
145    fn create_router(handler: Arc<dyn SyncHandler>) -> Router {
146        Router::new()
147            .route("/api/v0", post(handle_sync_request))
148            .with_state(handler)
149    }
150}
151
152#[async_trait]
153impl SyncTransport for HttpTransport {
154    fn transport_type(&self) -> &'static str {
155        Self::TRANSPORT_TYPE
156    }
157
158    fn can_handle_address(&self, address: &Address) -> bool {
159        address.transport_type == Self::TRANSPORT_TYPE
160    }
161
162    async fn start_server(&mut self, handler: Arc<dyn SyncHandler>) -> Result<()> {
163        // No bind address configured = client-only transport, nothing to start
164        let Some(effective_addr) = self.bind_address.as_deref() else {
165            return Ok(());
166        };
167
168        // Check if server is already running
169        if self.server_state.is_running() {
170            return Err(SyncError::ServerAlreadyRunning {
171                address: effective_addr.to_string(),
172            }
173            .into());
174        }
175
176        let socket_addr: SocketAddr =
177            effective_addr.parse().map_err(|e| SyncError::ServerBind {
178                address: effective_addr.to_string(),
179                reason: format!("Invalid address: {e}"),
180            })?;
181
182        let router = Self::create_router(handler);
183
184        // Create server coordination channels
185        let (ready_tx, ready_rx) = oneshot::channel();
186        let (shutdown_tx, shutdown_rx) = oneshot::channel();
187
188        // Create a channel to get the actual bound address back
189        let (addr_tx, addr_rx) = oneshot::channel::<SocketAddr>();
190
191        // Spawn server task
192        tokio::spawn(async move {
193            let listener = tokio::net::TcpListener::bind(socket_addr)
194                .await
195                .expect("Failed to bind address");
196
197            // Get the actual bound address (important for port 0)
198            let actual_addr = listener.local_addr().expect("Failed to get local address");
199
200            // Send the actual address back
201            let _ = addr_tx.send(actual_addr);
202
203            // Signal that server is ready
204            let _ = ready_tx.send(());
205
206            // Run server with graceful shutdown
207            // Convert router to service with ConnectInfo support
208            axum::serve(
209                listener,
210                router.into_make_service_with_connect_info::<SocketAddr>(),
211            )
212            .with_graceful_shutdown(async move {
213                let _ = shutdown_rx.await;
214            })
215            .await
216            .expect("Server failed");
217        });
218
219        // Get the actual bound address
220        let actual_addr = addr_rx.await.map_err(|_| SyncError::ServerBind {
221            address: effective_addr.to_string(),
222            reason: "Failed to get actual server address".to_string(),
223        })?;
224
225        // Wait for server to be ready
226        wait_for_ready(ready_rx, effective_addr).await?;
227
228        // Start server state with address and shutdown sender
229        self.server_state
230            .server_started(actual_addr.to_string(), shutdown_tx);
231
232        Ok(())
233    }
234
235    async fn stop_server(&mut self) -> Result<()> {
236        if !self.server_state.is_running() {
237            return Err(SyncError::ServerNotRunning.into());
238        }
239
240        // Stop server using combined method
241        self.server_state.stop_server();
242
243        Ok(())
244    }
245
246    async fn send_request(&self, address: &Address, request: &SyncRequest) -> Result<SyncResponse> {
247        if !self.can_handle_address(address) {
248            return Err(SyncError::UnsupportedTransport {
249                transport_type: address.transport_type.clone(),
250            }
251            .into());
252        }
253
254        let client = reqwest::Client::new();
255        let url = format!("http://{}/api/v0", address.address);
256
257        let response = client
258            .post(&url)
259            .json(&request) // Send SyncRequest as JSON body
260            .send()
261            .await
262            .map_err(|e| SyncError::ConnectionFailed {
263                address: address.address.clone(),
264                reason: e.to_string(),
265            })?;
266
267        if !response.status().is_success() {
268            return Err(SyncError::Network(format!(
269                "Server returned error: {}",
270                response.status()
271            ))
272            .into());
273        }
274
275        let sync_response: SyncResponse = response
276            .json()
277            .await
278            .map_err(|e| SyncError::Network(format!("Failed to parse response: {e}")))?;
279
280        Ok(sync_response)
281    }
282
283    fn is_server_running(&self) -> bool {
284        self.server_state.is_running()
285    }
286
287    fn get_server_address(&self) -> Result<String> {
288        self.server_state.get_address().map_err(|e| e.into())
289    }
290}
291
292/// Handler for the /api/v0 endpoint - accepts JSON SyncRequest and returns JSON SyncResponse.
293async fn handle_sync_request(
294    State(handler): State<Arc<dyn SyncHandler>>,
295    ConnectInfo(addr): ConnectInfo<SocketAddr>,
296    ExtractJson(request): ExtractJson<SyncRequest>,
297) -> Json<SyncResponse> {
298    // Extract peer_pubkey from SyncTreeRequest if present
299    let peer_pubkey = match &request {
300        SyncRequest::SyncTree(sync_tree_request) => sync_tree_request.peer_pubkey.clone(),
301        _ => None,
302    };
303
304    // Create request context with remote address and peer pubkey
305    let context = RequestContext {
306        remote_address: Some(Address {
307            transport_type: HttpTransport::TRANSPORT_TYPE.to_string(),
308            address: addr.to_string(),
309        }),
310        peer_pubkey,
311    };
312
313    // Call handler directly (Transaction is now Send since it uses Arc<Mutex>)
314    let response = handler.handle_request(&request, &context).await;
315
316    Json(response)
317}