1use std::collections::HashSet;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20use lru::LruCache;
21use tokio::io::{ReadHalf, WriteHalf};
22use tokio::net::UnixStream;
23use tokio::sync::Mutex;
24
25use crate::auth::crypto::PrivateKey;
26use crate::auth::crypto::{PublicKey, create_challenge_response};
27use crate::auth::types::SigKey;
28use crate::backend::InstanceMetadata;
29use crate::entry::{Entry, ID};
30use crate::service::error::service_error_to_eidetica_error;
31use crate::service::protocol::{
32 AuthenticatedDbRequest, DatabaseOp, Handshake, HandshakeAck, MergeState, PROTOCOL_VERSION,
33 ReadScope, ServiceRequest, ServiceResponse, TransactionContext, WireCrdtValue, read_frame,
34 write_frame,
35};
36use crate::user::UserError;
37use crate::user::crypto::{decrypt_private_key, derive_encryption_key};
38use crate::user::types::{KeyStorage, UserInfo};
39
40const CLIENT_CACHE_CAPACITY_BYTES: usize = 64 * 1024 * 1024;
43
44struct ClientCrdtCache {
56 lru: LruCache<(ID, ID, String), Vec<u8>>,
57 current_bytes: usize,
58 capacity_bytes: usize,
59}
60
61impl ClientCrdtCache {
62 fn new(capacity_bytes: usize) -> Self {
63 Self {
64 lru: LruCache::unbounded(),
65 current_bytes: 0,
66 capacity_bytes,
67 }
68 }
69
70 fn get(&mut self, root_id: &ID, key: &ID, store: &str) -> Option<Vec<u8>> {
71 self.lru
73 .get(&(root_id.clone(), key.clone(), store.to_string()))
74 .cloned()
75 }
76
77 fn put(&mut self, root_id: ID, key: ID, store: String, blob: Vec<u8>) {
78 let blob_size = blob.len();
79 let cache_key = (root_id, key, store);
80 if let Some(prev) = self.lru.put(cache_key.clone(), blob) {
81 self.current_bytes = self.current_bytes.saturating_sub(prev.len());
82 }
83 self.current_bytes = self.current_bytes.saturating_add(blob_size);
84 while self.current_bytes > self.capacity_bytes {
87 let Some((k, v)) = self.lru.pop_lru() else {
88 break;
89 };
90 if k == cache_key {
91 self.lru.put(k, v);
92 break;
93 }
94 self.current_bytes = self.current_bytes.saturating_sub(v.len());
95 }
96 }
97}
98
99#[derive(Clone, Debug)]
107struct SessionState {
108 session_pubkey: PublicKey,
109}
110
111struct RemoteConnectionInner {
113 stream: Mutex<(ReadHalf<UnixStream>, WriteHalf<UnixStream>)>,
114 session: RwLock<Option<SessionState>>,
125 registered_keys: Mutex<HashSet<PublicKey>>,
132 crdt_cache: std::sync::Mutex<ClientCrdtCache>,
142}
143
144impl RemoteConnectionInner {
145 fn session_read(&self) -> std::sync::RwLockReadGuard<'_, Option<SessionState>> {
149 self.session
150 .read()
151 .unwrap_or_else(|poisoned| poisoned.into_inner())
152 }
153
154 fn session_write(&self) -> std::sync::RwLockWriteGuard<'_, Option<SessionState>> {
156 self.session
157 .write()
158 .unwrap_or_else(|poisoned| poisoned.into_inner())
159 }
160
161 fn crdt_cache_lock(&self) -> std::sync::MutexGuard<'_, ClientCrdtCache> {
164 self.crdt_cache
165 .lock()
166 .unwrap_or_else(|poisoned| poisoned.into_inner())
167 }
168}
169
170#[derive(Clone)]
178pub struct RemoteConnection {
179 inner: Arc<RemoteConnectionInner>,
180}
181
182impl std::fmt::Debug for RemoteConnection {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 f.debug_struct("RemoteConnection").finish_non_exhaustive()
185 }
186}
187
188impl RemoteConnection {
189 pub async fn connect(path: impl AsRef<Path>) -> crate::Result<Self> {
193 let stream = UnixStream::connect(path.as_ref()).await?;
194 let (mut reader, mut writer) = tokio::io::split(stream);
195
196 let handshake = Handshake {
198 protocol_version: PROTOCOL_VERSION,
199 };
200 write_frame(&mut writer, &handshake).await?;
201
202 let ack: HandshakeAck = read_frame(&mut reader).await?.ok_or_else(|| {
204 crate::Error::Io(std::io::Error::new(
205 std::io::ErrorKind::ConnectionAborted,
206 "Server closed connection during handshake",
207 ))
208 })?;
209
210 if ack.protocol_version != PROTOCOL_VERSION {
211 return Err(crate::Error::Io(std::io::Error::new(
212 std::io::ErrorKind::InvalidData,
213 format!(
214 "Protocol version mismatch: client={}, server={}",
215 PROTOCOL_VERSION, ack.protocol_version
216 ),
217 )));
218 }
219
220 Ok(Self {
221 inner: Arc::new(RemoteConnectionInner {
222 stream: Mutex::new((reader, writer)),
223 session: RwLock::new(None),
224 registered_keys: Mutex::new(HashSet::new()),
225 crdt_cache: std::sync::Mutex::new(ClientCrdtCache::new(
226 CLIENT_CACHE_CAPACITY_BYTES,
227 )),
228 }),
229 })
230 }
231
232 pub(crate) fn cache_get(&self, root_id: &ID, key: &ID, store: &str) -> Option<Vec<u8>> {
235 self.inner.crdt_cache_lock().get(root_id, key, store)
236 }
237
238 pub(crate) fn cache_put(&self, root_id: ID, key: ID, store: String, blob: Vec<u8>) {
241 self.inner.crdt_cache_lock().put(root_id, key, store, blob);
242 }
243
244 async fn request(&self, req: ServiceRequest) -> crate::Result<ServiceResponse> {
246 let mut stream = self.inner.stream.lock().await;
247 let (ref mut reader, ref mut writer) = *stream;
248 write_frame(writer, &req).await?;
249 let resp: ServiceResponse = read_frame(reader).await?.ok_or_else(|| {
250 crate::Error::Io(std::io::Error::new(
251 std::io::ErrorKind::ConnectionAborted,
252 "Server closed connection unexpectedly",
253 ))
254 })?;
255 Ok(resp)
256 }
257
258 pub(crate) async fn request_ok(&self, req: ServiceRequest) -> crate::Result<ServiceResponse> {
260 let resp = self.request(req).await?;
261 match resp {
262 ServiceResponse::Error(e) => Err(service_error_to_eidetica_error(e)),
263 other => Ok(other),
264 }
265 }
266
267 async fn db_request(
272 &self,
273 root_id: ID,
274 identity: SigKey,
275 op: DatabaseOp,
276 ) -> crate::Result<ServiceResponse> {
277 self.request_ok(ServiceRequest::AuthenticatedDb(Box::new(
278 AuthenticatedDbRequest {
279 root_id,
280 identity,
281 op,
282 },
283 )))
284 .await
285 }
286
287 pub(crate) async fn trusted_login(
307 &self,
308 username: &str,
309 password: Option<&str>,
310 ) -> crate::Result<(String, UserInfo, PrivateKey)> {
311 let resp = self
313 .request_ok(ServiceRequest::TrustedLoginUser {
314 username: username.to_string(),
315 })
316 .await?;
317 let (challenge, user_uuid, user_info) = match resp {
318 ServiceResponse::TrustedLoginChallenge {
319 challenge,
320 user_uuid,
321 user_info,
322 } => (challenge, user_uuid, user_info),
323 other => return Err(unexpected_response("TrustedLoginChallenge", &other)),
324 };
325
326 let credentials = &user_info.credentials;
330 let is_passwordless = credentials.password_salt.is_none();
331 let signing_key = match (&credentials.root_key, password, is_passwordless) {
332 (KeyStorage::Unencrypted { key }, None, true) => key.clone(),
333 (
334 KeyStorage::Encrypted {
335 ciphertext, nonce, ..
336 },
337 Some(pwd),
338 false,
339 ) => {
340 let salt = credentials.password_salt.as_deref().ok_or_else(|| {
341 UserError::PasswordRequired {
342 operation: "decrypt root key for remote login".to_string(),
343 }
344 })?;
345 let kek = derive_encryption_key(pwd, salt)?;
346 decrypt_private_key(ciphertext, nonce, &kek)?
347 }
348 _ => return Err(UserError::InvalidPassword.into()),
349 };
350
351 let signature = create_challenge_response(&challenge, &signing_key);
353 let resp = self
354 .request_ok(ServiceRequest::TrustedLoginProve { signature })
355 .await?;
356 match resp {
357 ServiceResponse::TrustedLoginOk => {
358 *self.inner.session_write() = Some(SessionState {
362 session_pubkey: credentials.root_key_id.clone(),
363 });
364 self.inner
370 .registered_keys
371 .lock()
372 .await
373 .insert(credentials.root_key_id.clone());
374 Ok((user_uuid, user_info, signing_key))
375 }
376 other => Err(unexpected_response("TrustedLoginOk", &other)),
377 }
378 }
379
380 pub(crate) async fn register_session_key(&self, signing_key: &PrivateKey) -> crate::Result<()> {
402 let pubkey = signing_key.public_key();
403 {
404 let cache = self.inner.registered_keys.lock().await;
405 if cache.contains(&pubkey) {
406 return Ok(());
407 }
408 }
409 let resp = self
411 .request_ok(ServiceRequest::SessionKeyChallenge {
412 pubkey: pubkey.clone(),
413 })
414 .await?;
415 let challenge = match resp {
416 ServiceResponse::SessionKeyChallenge { challenge } => challenge,
417 other => return Err(unexpected_response("SessionKeyChallenge", &other)),
418 };
419 let signature = create_challenge_response(&challenge, signing_key);
422 let resp = self
423 .request_ok(ServiceRequest::SessionKeyRegister {
424 pubkey: pubkey.clone(),
425 signature,
426 })
427 .await?;
428 Self::expect_ok(resp)?;
429 self.inner.registered_keys.lock().await.insert(pubkey);
430 Ok(())
431 }
432
433 fn expect_ok(resp: ServiceResponse) -> crate::Result<()> {
436 match resp {
437 ServiceResponse::Ok => Ok(()),
438 other => Err(unexpected_response("Ok", &other)),
439 }
440 }
441
442 pub fn session_identity(&self) -> Option<SigKey> {
446 self.inner
447 .session_read()
448 .as_ref()
449 .map(|s| SigKey::from_pubkey(&s.session_pubkey))
450 }
451
452 pub async fn get_instance_metadata(&self) -> crate::Result<Option<InstanceMetadata>> {
453 let resp = self.request_ok(ServiceRequest::GetInstanceMetadata).await?;
454 match resp {
455 ServiceResponse::InstanceMetadata(meta) => Ok(meta),
456 other => Err(unexpected_response("InstanceMetadata", &other)),
457 }
458 }
459
460 pub async fn set_instance_metadata(&self, metadata: &InstanceMetadata) -> crate::Result<()> {
461 let identity = self.session_identity().unwrap_or_default();
464 let resp = self
465 .db_request(
466 ID::default(),
467 identity,
468 DatabaseOp::SetInstanceMetadata {
469 metadata: Box::new(metadata.clone()),
470 },
471 )
472 .await?;
473 Self::expect_ok(resp)
474 }
475
476 pub async fn begin_transaction(
485 &self,
486 root_id: ID,
487 identity: SigKey,
488 stores: Vec<String>,
489 scope: ReadScope,
490 ) -> crate::Result<TransactionContext> {
491 let resp = self
492 .db_request(
493 root_id,
494 identity,
495 DatabaseOp::BeginTransaction { stores, scope },
496 )
497 .await?;
498 match resp {
499 ServiceResponse::TransactionContext(ctx) => Ok(ctx),
500 other => Err(unexpected_response("TransactionContext", &other)),
501 }
502 }
503
504 pub async fn get_store_state(
506 &self,
507 root_id: ID,
508 identity: SigKey,
509 store: String,
510 ) -> crate::Result<WireCrdtValue> {
511 let resp = self
512 .db_request(root_id, identity, DatabaseOp::GetStoreState { store })
513 .await?;
514 match resp {
515 ServiceResponse::CrdtValue(v) => Ok(v),
516 other => Err(unexpected_response("CrdtValue", &other)),
517 }
518 }
519
520 pub async fn get_store_entries(
525 &self,
526 root_id: ID,
527 identity: SigKey,
528 store: String,
529 tips: Vec<ID>,
530 scope: ReadScope,
531 ) -> crate::Result<Vec<Entry>> {
532 let resp = self
533 .db_request(
534 root_id,
535 identity,
536 DatabaseOp::GetStoreEntries { store, tips, scope },
537 )
538 .await?;
539 match resp {
540 ServiceResponse::Entries(entries) => Ok(entries),
541 other => Err(unexpected_response("Entries", &other)),
542 }
543 }
544
545 pub async fn get_verified_tips(&self, root_id: ID, identity: SigKey) -> crate::Result<Vec<ID>> {
547 let resp = self
548 .db_request(root_id, identity, DatabaseOp::GetVerifiedTips)
549 .await?;
550 match resp {
551 ServiceResponse::Ids(ids) => Ok(ids),
552 other => Err(unexpected_response("Ids", &other)),
553 }
554 }
555
556 pub async fn submit_signed_entry(
562 &self,
563 root_id: ID,
564 identity: SigKey,
565 entry: Entry,
566 ) -> crate::Result<()> {
567 let resp = self
568 .db_request(
569 root_id,
570 identity,
571 DatabaseOp::SubmitSignedEntry {
572 entry: Box::new(entry),
573 },
574 )
575 .await?;
576 match resp {
577 ServiceResponse::Ok => Ok(()),
578 other => Err(unexpected_response("Ok", &other)),
579 }
580 }
581
582 pub async fn db_get_entry(
587 &self,
588 root_id: ID,
589 identity: SigKey,
590 id: ID,
591 ) -> crate::Result<Entry> {
592 let resp = self
593 .db_request(root_id, identity, DatabaseOp::GetEntry { id })
594 .await?;
595 match resp {
596 ServiceResponse::Entry(entry) => Ok(entry),
597 other => Err(unexpected_response("Entry", &other)),
598 }
599 }
600
601 pub async fn store_snapshot_at(
603 &self,
604 root_id: ID,
605 identity: SigKey,
606 store: String,
607 up_to: Vec<ID>,
608 ) -> crate::Result<Vec<ID>> {
609 let resp = self
610 .db_request(
611 root_id,
612 identity,
613 DatabaseOp::GetStoreTipsUpToEntries { store, up_to },
614 )
615 .await?;
616 match resp {
617 ServiceResponse::Ids(ids) => Ok(ids),
618 other => Err(unexpected_response("Ids", &other)),
619 }
620 }
621
622 pub async fn compute_merge_state(
624 &self,
625 root_id: ID,
626 identity: SigKey,
627 store: String,
628 entry_ids: Vec<ID>,
629 ) -> crate::Result<MergeState> {
630 let resp = self
631 .db_request(
632 root_id,
633 identity,
634 DatabaseOp::ComputeMergeState { store, entry_ids },
635 )
636 .await?;
637 match resp {
638 ServiceResponse::MergeState(state) => Ok(state),
639 other => Err(unexpected_response("MergeState", &other)),
640 }
641 }
642
643 pub async fn get_cached_crdt_state_remote(
647 &self,
648 root_id: ID,
649 identity: SigKey,
650 store: String,
651 key: ID,
652 ) -> crate::Result<Option<Vec<u8>>> {
653 let resp = self
654 .db_request(
655 root_id,
656 identity,
657 DatabaseOp::GetCachedCrdtState { store, key },
658 )
659 .await?;
660 match resp {
661 ServiceResponse::CachedCrdtState(blob) => Ok(blob),
662 other => Err(unexpected_response("CachedCrdtState", &other)),
663 }
664 }
665
666 pub async fn cache_crdt_state_remote(
671 &self,
672 root_id: ID,
673 identity: SigKey,
674 store: String,
675 key: ID,
676 blob: Vec<u8>,
677 ) -> crate::Result<()> {
678 let resp = self
679 .db_request(
680 root_id,
681 identity,
682 DatabaseOp::CacheCrdtState { store, key, blob },
683 )
684 .await?;
685 match resp {
686 ServiceResponse::Ok => Ok(()),
687 other => Err(unexpected_response("Ok", &other)),
688 }
689 }
690}
691
692fn unexpected_response(expected: &str, actual: &ServiceResponse) -> crate::Error {
693 crate::Error::Io(std::io::Error::new(
694 std::io::ErrorKind::InvalidData,
695 format!("Expected {expected} response, got {actual:?}"),
696 ))
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702
703 fn eid(s: &str) -> ID {
704 ID::from_bytes(s)
705 }
706
707 fn root() -> ID {
708 eid("root")
709 }
710
711 #[test]
712 fn client_cache_round_trip() {
713 let mut c = ClientCrdtCache::new(1024);
714 c.put(root(), eid("e1"), "store1".into(), b"hello".to_vec());
715 assert_eq!(
716 c.get(&root(), &eid("e1"), "store1"),
717 Some(b"hello".to_vec())
718 );
719 }
720
721 #[test]
722 fn client_cache_evicts_under_byte_pressure() {
723 let mut c = ClientCrdtCache::new(100);
724 c.put(root(), eid("e1"), "s".into(), vec![1u8; 50]);
725 c.put(root(), eid("e2"), "s".into(), vec![2u8; 50]);
726 assert_eq!(c.current_bytes, 100);
727 c.put(root(), eid("e3"), "s".into(), vec![3u8; 50]);
728 assert!(
729 c.get(&root(), &eid("e1"), "s").is_none(),
730 "least-recently-used entry must be evicted"
731 );
732 assert_eq!(c.get(&root(), &eid("e2"), "s"), Some(vec![2u8; 50]));
733 assert_eq!(c.get(&root(), &eid("e3"), "s"), Some(vec![3u8; 50]));
734 }
735
736 #[test]
737 fn client_cache_get_promotes_to_most_recent() {
738 let mut c = ClientCrdtCache::new(100);
739 c.put(root(), eid("e1"), "s".into(), vec![1u8; 50]);
740 c.put(root(), eid("e2"), "s".into(), vec![2u8; 50]);
741 let _ = c.get(&root(), &eid("e1"), "s"); c.put(root(), eid("e3"), "s".into(), vec![3u8; 50]);
743 assert!(
744 c.get(&root(), &eid("e1"), "s").is_some(),
745 "promoted entry must survive eviction"
746 );
747 assert!(
748 c.get(&root(), &eid("e2"), "s").is_none(),
749 "older un-touched entry must be evicted"
750 );
751 }
752
753 #[test]
754 fn client_cache_replaces_in_place() {
755 let mut c = ClientCrdtCache::new(1024);
756 c.put(root(), eid("e1"), "s".into(), b"v1".to_vec());
757 c.put(root(), eid("e1"), "s".into(), b"v2-different-len".to_vec());
758 assert_eq!(
759 c.get(&root(), &eid("e1"), "s"),
760 Some(b"v2-different-len".to_vec())
761 );
762 assert_eq!(c.current_bytes, b"v2-different-len".len());
764 }
765}