eidetica/sync/transports/
http.rs1use std::{net::SocketAddr, sync::Arc};
7
8use async_trait::async_trait;
9use axum::{
10 Router,
11 extract::{ConnectInfo, Json as ExtractJson, State},
12 response::Json,
13 routing::post,
14};
15use serde::{Deserialize, Serialize};
16use tokio::sync::oneshot;
17
18use super::{SyncTransport, TransportBuilder, TransportConfig, shared::*};
19use crate::{
20 Result,
21 crdt::Doc,
22 store::Registered,
23 sync::{
24 error::SyncError,
25 handler::SyncHandler,
26 peer_types::Address,
27 protocol::{RequestContext, SyncRequest, SyncResponse},
28 },
29};
30
31#[derive(Debug, Clone, Serialize, Deserialize, Default)]
35pub struct HttpTransportConfig {
36 #[serde(default, skip_serializing_if = "Option::is_none")]
39 pub bind_address: Option<String>,
40}
41
42impl Registered for HttpTransportConfig {
43 fn type_id() -> &'static str {
44 "http:v0"
45 }
46}
47
48impl TransportConfig for HttpTransportConfig {}
49
50#[derive(Debug, Clone, Default)]
65pub struct HttpTransportBuilder {
66 bind_address: Option<String>,
67}
68
69impl HttpTransportBuilder {
70 pub fn new() -> Self {
72 Self::default()
73 }
74
75 pub fn bind(mut self, addr: impl Into<String>) -> Self {
87 self.bind_address = Some(addr.into());
88 self
89 }
90
91 pub fn build_sync(self) -> Result<HttpTransport> {
96 Ok(HttpTransport {
97 server_state: ServerState::new(),
98 bind_address: self.bind_address,
99 })
100 }
101}
102
103#[async_trait]
104impl TransportBuilder for HttpTransportBuilder {
105 type Transport = HttpTransport;
106
107 async fn build(self, _persisted: Doc) -> Result<(Self::Transport, Option<Doc>)> {
111 let transport = HttpTransport {
112 server_state: ServerState::new(),
113 bind_address: self.bind_address,
114 };
115 Ok((transport, None))
116 }
117}
118
119pub struct HttpTransport {
121 server_state: ServerState,
123 bind_address: Option<String>,
125}
126
127impl HttpTransport {
128 pub const TRANSPORT_TYPE: &'static str = "http";
130
131 pub fn new() -> Result<Self> {
133 Ok(Self {
134 server_state: ServerState::new(),
135 bind_address: None,
136 })
137 }
138
139 pub fn builder() -> HttpTransportBuilder {
141 HttpTransportBuilder::new()
142 }
143
144 fn create_router(handler: Arc<dyn SyncHandler>) -> Router {
146 Router::new()
147 .route("/api/v0", post(handle_sync_request))
148 .with_state(handler)
149 }
150}
151
152#[async_trait]
153impl SyncTransport for HttpTransport {
154 fn transport_type(&self) -> &'static str {
155 Self::TRANSPORT_TYPE
156 }
157
158 fn can_handle_address(&self, address: &Address) -> bool {
159 address.transport_type == Self::TRANSPORT_TYPE
160 }
161
162 async fn start_server(&mut self, handler: Arc<dyn SyncHandler>) -> Result<()> {
163 let Some(effective_addr) = self.bind_address.as_deref() else {
165 return Ok(());
166 };
167
168 if self.server_state.is_running() {
170 return Err(SyncError::ServerAlreadyRunning {
171 address: effective_addr.to_string(),
172 }
173 .into());
174 }
175
176 let socket_addr: SocketAddr =
177 effective_addr.parse().map_err(|e| SyncError::ServerBind {
178 address: effective_addr.to_string(),
179 reason: format!("Invalid address: {e}"),
180 })?;
181
182 let router = Self::create_router(handler);
183
184 let (ready_tx, ready_rx) = oneshot::channel();
186 let (shutdown_tx, shutdown_rx) = oneshot::channel();
187
188 let (addr_tx, addr_rx) = oneshot::channel::<SocketAddr>();
190
191 tokio::spawn(async move {
193 let listener = tokio::net::TcpListener::bind(socket_addr)
194 .await
195 .expect("Failed to bind address");
196
197 let actual_addr = listener.local_addr().expect("Failed to get local address");
199
200 let _ = addr_tx.send(actual_addr);
202
203 let _ = ready_tx.send(());
205
206 axum::serve(
209 listener,
210 router.into_make_service_with_connect_info::<SocketAddr>(),
211 )
212 .with_graceful_shutdown(async move {
213 let _ = shutdown_rx.await;
214 })
215 .await
216 .expect("Server failed");
217 });
218
219 let actual_addr = addr_rx.await.map_err(|_| SyncError::ServerBind {
221 address: effective_addr.to_string(),
222 reason: "Failed to get actual server address".to_string(),
223 })?;
224
225 wait_for_ready(ready_rx, effective_addr).await?;
227
228 self.server_state
230 .server_started(actual_addr.to_string(), shutdown_tx);
231
232 Ok(())
233 }
234
235 async fn stop_server(&mut self) -> Result<()> {
236 if !self.server_state.is_running() {
237 return Err(SyncError::ServerNotRunning.into());
238 }
239
240 self.server_state.stop_server();
242
243 Ok(())
244 }
245
246 async fn send_request(&self, address: &Address, request: &SyncRequest) -> Result<SyncResponse> {
247 if !self.can_handle_address(address) {
248 return Err(SyncError::UnsupportedTransport {
249 transport_type: address.transport_type.clone(),
250 }
251 .into());
252 }
253
254 let client = reqwest::Client::new();
255 let url = format!("http://{}/api/v0", address.address);
256
257 let response = client
258 .post(&url)
259 .json(&request) .send()
261 .await
262 .map_err(|e| SyncError::ConnectionFailed {
263 address: address.address.clone(),
264 reason: e.to_string(),
265 })?;
266
267 if !response.status().is_success() {
268 return Err(SyncError::Network(format!(
269 "Server returned error: {}",
270 response.status()
271 ))
272 .into());
273 }
274
275 let sync_response: SyncResponse = response
276 .json()
277 .await
278 .map_err(|e| SyncError::Network(format!("Failed to parse response: {e}")))?;
279
280 Ok(sync_response)
281 }
282
283 fn is_server_running(&self) -> bool {
284 self.server_state.is_running()
285 }
286
287 fn get_server_address(&self) -> Result<String> {
288 self.server_state.get_address().map_err(|e| e.into())
289 }
290}
291
292async fn handle_sync_request(
294 State(handler): State<Arc<dyn SyncHandler>>,
295 ConnectInfo(addr): ConnectInfo<SocketAddr>,
296 ExtractJson(request): ExtractJson<SyncRequest>,
297) -> Json<SyncResponse> {
298 let peer_pubkey = match &request {
300 SyncRequest::SyncTree(sync_tree_request) => sync_tree_request.peer_pubkey.clone(),
301 _ => None,
302 };
303
304 let context = RequestContext {
306 remote_address: Some(Address {
307 transport_type: HttpTransport::TRANSPORT_TYPE.to_string(),
308 address: addr.to_string(),
309 }),
310 peer_pubkey,
311 };
312
313 let response = handler.handle_request(&request, &context).await;
315
316 Json(response)
317}