Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/service/
client.rs

1//! Remote connection client for the Eidetica service.
2//!
3//! `RemoteConnection` connects to an Eidetica service server and forwards
4//! storage operations as RPC calls. It backs the `RemoteBackend` implementation
5//! of the `Backend` seam and is not itself a `BackendImpl`.
6//!
7//! Authentication uses the client-side-signing flow described in the Service
8//! Architecture doc § Security Model: `RemoteConnection::trusted_login` drives
9//! the daemon's `TrustedLoginUser` / `TrustedLoginProve` challenge-response,
10//! decrypts the user's root signing key in-process, and signs the challenge
11//! locally. The daemon never sees the password or the plaintext signing key.
12//! After login, subsequent backend operations travel inside the `Authenticated`
13//! envelope and are dispatched against the user's identity; the daemon gates
14//! each one per-tree against the target database's auth settings.
15
16use std::collections::HashSet;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19
20use lru::LruCache;
21use tokio::io::{ReadHalf, WriteHalf};
22use tokio::net::UnixStream;
23use tokio::sync::Mutex;
24
25use crate::auth::crypto::PrivateKey;
26use crate::auth::crypto::{PublicKey, create_challenge_response};
27use crate::auth::types::SigKey;
28use crate::backend::InstanceMetadata;
29use crate::entry::{Entry, ID};
30use crate::service::error::service_error_to_eidetica_error;
31use crate::service::protocol::{
32    AuthenticatedDbRequest, DatabaseOp, Handshake, HandshakeAck, MergeState, PROTOCOL_VERSION,
33    ReadScope, ServiceRequest, ServiceResponse, TransactionContext, WireCrdtValue, read_frame,
34    write_frame,
35};
36use crate::user::UserError;
37use crate::user::crypto::{decrypt_private_key, derive_encryption_key};
38use crate::user::types::{KeyStorage, UserInfo};
39
40/// Default cap on the client-side CRDT-state LRU. Matches `MAX_FRAME_SIZE`
41/// (64 MiB) so a single oversized cached blob can still ride the wire.
42const CLIENT_CACHE_CAPACITY_BYTES: usize = 64 * 1024 * 1024;
43
44/// Process-lifetime LRU of materialized CRDT states for this connection.
45///
46/// Tier 1 of a two-level cache: local hits short-circuit any wire activity;
47/// misses fall through to `GetCachedCrdtState` against the daemon. Cleared
48/// on connection drop — durability across the daemon's lifetime is the
49/// unified [`crate::backend::CacheScope`]-keyed cache in the daemon's
50/// `BackendImpl`, not this one.
51///
52/// Keys are `(root_id, key, store_name)`; values are opaque bytes (cipher-
53/// or plaintext depending on the store, decided by the Transaction's
54/// `encryptors` map). The cache itself is byte-blind.
55struct ClientCrdtCache {
56    lru: LruCache<(ID, ID, String), Vec<u8>>,
57    current_bytes: usize,
58    capacity_bytes: usize,
59}
60
61impl ClientCrdtCache {
62    fn new(capacity_bytes: usize) -> Self {
63        Self {
64            lru: LruCache::unbounded(),
65            current_bytes: 0,
66            capacity_bytes,
67        }
68    }
69
70    fn get(&mut self, root_id: &ID, key: &ID, store: &str) -> Option<Vec<u8>> {
71        // `LruCache::get` promotes the entry to most-recently-used.
72        self.lru
73            .get(&(root_id.clone(), key.clone(), store.to_string()))
74            .cloned()
75    }
76
77    fn put(&mut self, root_id: ID, key: ID, store: String, blob: Vec<u8>) {
78        let blob_size = blob.len();
79        let cache_key = (root_id, key, store);
80        if let Some(prev) = self.lru.put(cache_key.clone(), blob) {
81            self.current_bytes = self.current_bytes.saturating_sub(prev.len());
82        }
83        self.current_bytes = self.current_bytes.saturating_add(blob_size);
84        // Evict LRU until under cap. Soft cap: a single oversized blob is
85        // allowed to exceed the limit alone rather than thrashing.
86        while self.current_bytes > self.capacity_bytes {
87            let Some((k, v)) = self.lru.pop_lru() else {
88                break;
89            };
90            if k == cache_key {
91                self.lru.put(k, v);
92                break;
93            }
94            self.current_bytes = self.current_bytes.saturating_sub(v.len());
95        }
96    }
97}
98
99/// Per-connection session state, populated by `trusted_login` on success.
100///
101/// Holds only the public key the daemon verified during challenge-response.
102/// The plaintext signing key is intentionally **not** stored here — it lives
103/// in the `User::key_manager` session that owns this connection. The
104/// daemon-side `ConnectionState::Authenticated` is what carries the
105/// `user_uuid` (for chunk 6's cache scoping); the client doesn't need it.
106#[derive(Clone, Debug)]
107struct SessionState {
108    session_pubkey: PublicKey,
109}
110
111/// Internal state for a remote connection, wrapped in Arc for Clone.
112struct RemoteConnectionInner {
113    stream: Mutex<(ReadHalf<UnixStream>, WriteHalf<UnixStream>)>,
114    /// Set on successful `trusted_login`; read by `backend_request` to populate
115    /// the `Authenticated` envelope's identity field. `RwLock` because reads
116    /// are far more frequent than the one-shot login write.
117    ///
118    /// Accessed poison-tolerantly via [`RemoteConnectionInner::session_read`]
119    /// and [`RemoteConnectionInner::session_write`]: a panic in one task
120    /// while holding the guard must not promote itself to a permanent connection
121    /// outage. The worst observable case is a half-written session field, which
122    /// the caller already treats as "unauthenticated" (`session_identity`
123    /// returns `None` and the per-tree gate rejects the op).
124    session: RwLock<Option<SessionState>>,
125    /// Pubkeys this client has already proven possession of on this
126    /// connection (via `SessionKeyChallenge`/`SessionKeyRegister`), plus the
127    /// login pubkey added in `trusted_login`. Lets `register_session_key`
128    /// short-circuit when the key has already been registered, avoiding
129    /// per-request wire chatter for the common case where a single per-DB
130    /// key is reused across many ops.
131    registered_keys: Mutex<HashSet<PublicKey>>,
132    /// Process-lifetime CRDT-state LRU shared across every `Database` handle
133    /// (every `RemoteBackend`) on this connection. Tier 1 of the
134    /// two-level cache; tier 2 is the daemon's unified scope-keyed cache
135    /// (lives in `BackendImpl`), reached via `GetCachedCrdtState` /
136    /// `CacheCrdtState` RPCs.
137    ///
138    /// Accessed poison-tolerantly via [`Self::crdt_cache_lock`]: same
139    /// rationale as `session` — a panic in one task must not strand the
140    /// rest of the connection, since cache state is rebuildable.
141    crdt_cache: std::sync::Mutex<ClientCrdtCache>,
142}
143
144impl RemoteConnectionInner {
145    /// Acquire a read guard on `session`, tolerating poisoning.
146    ///
147    /// See the field-level doc on [`Self::session`] for the recovery rationale.
148    fn session_read(&self) -> std::sync::RwLockReadGuard<'_, Option<SessionState>> {
149        self.session
150            .read()
151            .unwrap_or_else(|poisoned| poisoned.into_inner())
152    }
153
154    /// Acquire a write guard on `session`, tolerating poisoning.
155    fn session_write(&self) -> std::sync::RwLockWriteGuard<'_, Option<SessionState>> {
156        self.session
157            .write()
158            .unwrap_or_else(|poisoned| poisoned.into_inner())
159    }
160
161    /// Acquire the CRDT cache lock, tolerating poisoning. See the field-level
162    /// doc on [`Self::crdt_cache`] for the recovery rationale.
163    fn crdt_cache_lock(&self) -> std::sync::MutexGuard<'_, ClientCrdtCache> {
164        self.crdt_cache
165            .lock()
166            .unwrap_or_else(|poisoned| poisoned.into_inner())
167    }
168}
169
170/// A connection to a remote Eidetica service server over a Unix domain socket.
171///
172/// `RemoteConnection` backs the `RemoteBackend` implementation of the `Backend`
173/// seam. It provides the storage operations as inherent methods, plus additional
174/// coordination methods like `notify_entry_written`.
175///
176/// Cloning is cheap (Arc-backed).
177#[derive(Clone)]
178pub struct RemoteConnection {
179    inner: Arc<RemoteConnectionInner>,
180}
181
182impl std::fmt::Debug for RemoteConnection {
183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        f.debug_struct("RemoteConnection").finish_non_exhaustive()
185    }
186}
187
188impl RemoteConnection {
189    /// Connect to a service server at the given socket path.
190    ///
191    /// Performs the protocol handshake and returns a connection ready for use.
192    pub async fn connect(path: impl AsRef<Path>) -> crate::Result<Self> {
193        let stream = UnixStream::connect(path.as_ref()).await?;
194        let (mut reader, mut writer) = tokio::io::split(stream);
195
196        // Send handshake
197        let handshake = Handshake {
198            protocol_version: PROTOCOL_VERSION,
199        };
200        write_frame(&mut writer, &handshake).await?;
201
202        // Read ack
203        let ack: HandshakeAck = read_frame(&mut reader).await?.ok_or_else(|| {
204            crate::Error::Io(std::io::Error::new(
205                std::io::ErrorKind::ConnectionAborted,
206                "Server closed connection during handshake",
207            ))
208        })?;
209
210        if ack.protocol_version != PROTOCOL_VERSION {
211            return Err(crate::Error::Io(std::io::Error::new(
212                std::io::ErrorKind::InvalidData,
213                format!(
214                    "Protocol version mismatch: client={}, server={}",
215                    PROTOCOL_VERSION, ack.protocol_version
216                ),
217            )));
218        }
219
220        Ok(Self {
221            inner: Arc::new(RemoteConnectionInner {
222                stream: Mutex::new((reader, writer)),
223                session: RwLock::new(None),
224                registered_keys: Mutex::new(HashSet::new()),
225                crdt_cache: std::sync::Mutex::new(ClientCrdtCache::new(
226                    CLIENT_CACHE_CAPACITY_BYTES,
227                )),
228            }),
229        })
230    }
231
232    /// Look up a cached materialized CRDT state in the connection-shared
233    /// process-lifetime LRU. Promotes the entry to most-recently-used.
234    pub(crate) fn cache_get(&self, root_id: &ID, key: &ID, store: &str) -> Option<Vec<u8>> {
235        self.inner.crdt_cache_lock().get(root_id, key, store)
236    }
237
238    /// Insert a materialized CRDT state into the connection-shared LRU.
239    /// Triggers byte-bounded eviction if over capacity.
240    pub(crate) fn cache_put(&self, root_id: ID, key: ID, store: String, blob: Vec<u8>) {
241        self.inner.crdt_cache_lock().put(root_id, key, store, blob);
242    }
243
244    /// Send a request and read the response.
245    async fn request(&self, req: ServiceRequest) -> crate::Result<ServiceResponse> {
246        let mut stream = self.inner.stream.lock().await;
247        let (ref mut reader, ref mut writer) = *stream;
248        write_frame(writer, &req).await?;
249        let resp: ServiceResponse = read_frame(reader).await?.ok_or_else(|| {
250            crate::Error::Io(std::io::Error::new(
251                std::io::ErrorKind::ConnectionAborted,
252                "Server closed connection unexpectedly",
253            ))
254        })?;
255        Ok(resp)
256    }
257
258    /// Send a request and convert error responses to `crate::Error`.
259    pub(crate) async fn request_ok(&self, req: ServiceRequest) -> crate::Result<ServiceResponse> {
260        let resp = self.request(req).await?;
261        match resp {
262            ServiceResponse::Error(e) => Err(service_error_to_eidetica_error(e)),
263            other => Ok(other),
264        }
265    }
266
267    /// Wrap a `DatabaseOp` in the `AuthenticatedDb` envelope and send it.
268    ///
269    /// `(root_id, identity)` scope and `request_ok` error conversion, carrying
270    /// a `DatabaseOp` in an `AuthenticatedDbRequest`.
271    async fn db_request(
272        &self,
273        root_id: ID,
274        identity: SigKey,
275        op: DatabaseOp,
276    ) -> crate::Result<ServiceResponse> {
277        self.request_ok(ServiceRequest::AuthenticatedDb(Box::new(
278            AuthenticatedDbRequest {
279                root_id,
280                identity,
281                op,
282            },
283        )))
284        .await
285    }
286
287    /// Authenticate this connection as `username` by completing the
288    /// `TrustedLogin*` handshake against the daemon.
289    ///
290    /// Flow: send `TrustedLoginUser` → receive challenge + the user's full
291    /// `UserInfo` (encrypted credentials, user-database id, status) → derive
292    /// the password-encryption key locally (Argon2id) and decrypt the root
293    /// signing key in-process (or take it raw for passwordless users) → sign
294    /// the challenge → send `TrustedLoginProve` → expect `TrustedLoginOk`.
295    ///
296    /// The daemon never sees the password or the plaintext signing key; the
297    /// trust model for shipping the encrypted blob over the socket is captured
298    /// in the Service Architecture doc § Trusted login threat model.
299    ///
300    /// On success the connection's server-side state is `Authenticated`. The
301    /// caller receives the user's record and the decrypted root key so it can
302    /// build the `User` session without a second wire read of `_users` —
303    /// reads through the wire always travel as the authenticated user, which
304    /// with the per-tree gate means a fresh user without permissions on
305    /// `_users` would not be able to re-fetch it.
306    pub(crate) async fn trusted_login(
307        &self,
308        username: &str,
309        password: Option<&str>,
310    ) -> crate::Result<(String, UserInfo, PrivateKey)> {
311        // Step 1: name the user, receive challenge + user record.
312        let resp = self
313            .request_ok(ServiceRequest::TrustedLoginUser {
314                username: username.to_string(),
315            })
316            .await?;
317        let (challenge, user_uuid, user_info) = match resp {
318            ServiceResponse::TrustedLoginChallenge {
319                challenge,
320                user_uuid,
321                user_info,
322            } => (challenge, user_uuid, user_info),
323            other => return Err(unexpected_response("TrustedLoginChallenge", &other)),
324        };
325
326        // Step 2: decrypt the root signing key locally. Cross-check that the
327        // caller's password/no-password matches the credential's salt/no-salt;
328        // a mismatch is the same UX-level error as a wrong password.
329        let credentials = &user_info.credentials;
330        let is_passwordless = credentials.password_salt.is_none();
331        let signing_key = match (&credentials.root_key, password, is_passwordless) {
332            (KeyStorage::Unencrypted { key }, None, true) => key.clone(),
333            (
334                KeyStorage::Encrypted {
335                    ciphertext, nonce, ..
336                },
337                Some(pwd),
338                false,
339            ) => {
340                let salt = credentials.password_salt.as_deref().ok_or_else(|| {
341                    UserError::PasswordRequired {
342                        operation: "decrypt root key for remote login".to_string(),
343                    }
344                })?;
345                let kek = derive_encryption_key(pwd, salt)?;
346                decrypt_private_key(ciphertext, nonce, &kek)?
347            }
348            _ => return Err(UserError::InvalidPassword.into()),
349        };
350
351        // Step 3: sign the challenge and send the proof.
352        let signature = create_challenge_response(&challenge, &signing_key);
353        let resp = self
354            .request_ok(ServiceRequest::TrustedLoginProve { signature })
355            .await?;
356        match resp {
357            ServiceResponse::TrustedLoginOk => {
358                // Stash the verified session pubkey so subsequent
359                // `backend_request` calls can populate the `Authenticated`
360                // envelope's identity field.
361                *self.inner.session_write() = Some(SessionState {
362                    session_pubkey: credentials.root_key_id.clone(),
363                });
364                // The login pubkey is in the server-side session keyset by
365                // construction (the server seeds it there in
366                // `handle_trusted_login_prove`). Mirror that here so
367                // `register_session_key` short-circuits without a wire
368                // round-trip when called for the login key.
369                self.inner
370                    .registered_keys
371                    .lock()
372                    .await
373                    .insert(credentials.root_key_id.clone());
374                Ok((user_uuid, user_info, signing_key))
375            }
376            other => Err(unexpected_response("TrustedLoginOk", &other)),
377        }
378    }
379
380    /// Prove possession of `signing_key` and add its public key to the
381    /// connection's session keyset.
382    ///
383    /// Used by every `Database` handle whose `RemoteBackend` carries a
384    /// per-database identity (e.g. `Database::create` on a connected
385    /// instance, or `user.open_database_with_key` over the wire): the daemon
386    /// gates reads against the *acting* pubkey from the identity hint, and
387    /// the acting pubkey must be in the keyset, so we register the per-DB
388    /// key before the first read.
389    ///
390    /// Idempotent and cheap on repeated calls: a successful registration
391    /// caches the pubkey in `registered_keys`, and a follow-up call with the
392    /// same key returns `Ok(())` without touching the wire. The login pubkey
393    /// is seeded into the cache by `trusted_login`.
394    ///
395    /// Cryptographically a two-step proof of possession:
396    /// 1. `SessionKeyChallenge { pubkey }` → server returns a single-use,
397    ///    pubkey-bound random challenge.
398    /// 2. Client signs the challenge with `signing_key`; `SessionKeyRegister
399    ///    { pubkey, signature }` → server verifies and inserts the pubkey
400    ///    into its `session_keyset`.
401    pub(crate) async fn register_session_key(&self, signing_key: &PrivateKey) -> crate::Result<()> {
402        let pubkey = signing_key.public_key();
403        {
404            let cache = self.inner.registered_keys.lock().await;
405            if cache.contains(&pubkey) {
406                return Ok(());
407            }
408        }
409        // Step 1: ask for a challenge bound to this pubkey.
410        let resp = self
411            .request_ok(ServiceRequest::SessionKeyChallenge {
412                pubkey: pubkey.clone(),
413            })
414            .await?;
415        let challenge = match resp {
416            ServiceResponse::SessionKeyChallenge { challenge } => challenge,
417            other => return Err(unexpected_response("SessionKeyChallenge", &other)),
418        };
419        // Step 2: sign and submit. The daemon verifies and joins the pubkey
420        // into the connection's keyset on Ok.
421        let signature = create_challenge_response(&challenge, signing_key);
422        let resp = self
423            .request_ok(ServiceRequest::SessionKeyRegister {
424                pubkey: pubkey.clone(),
425                signature,
426            })
427            .await?;
428        Self::expect_ok(resp)?;
429        self.inner.registered_keys.lock().await.insert(pubkey);
430        Ok(())
431    }
432
433    // === Response extraction helpers ===
434
435    fn expect_ok(resp: ServiceResponse) -> crate::Result<()> {
436        match resp {
437            ServiceResponse::Ok => Ok(()),
438            other => Err(unexpected_response("Ok", &other)),
439        }
440    }
441
442    // === Instance-level operations ===
443
444    /// Build a `SigKey` from the session pubkey, when logged in.
445    pub fn session_identity(&self) -> Option<SigKey> {
446        self.inner
447            .session_read()
448            .as_ref()
449            .map(|s| SigKey::from_pubkey(&s.session_pubkey))
450    }
451
452    pub async fn get_instance_metadata(&self) -> crate::Result<Option<InstanceMetadata>> {
453        let resp = self.request_ok(ServiceRequest::GetInstanceMetadata).await?;
454        match resp {
455            ServiceResponse::InstanceMetadata(meta) => Ok(meta),
456            other => Err(unexpected_response("InstanceMetadata", &other)),
457        }
458    }
459
460    pub async fn set_instance_metadata(&self, metadata: &InstanceMetadata) -> crate::Result<()> {
461        // Gated server-side as Admin on `_databases`, not on `root_id`, so the
462        // scope's `root_id` is unused for this op (default is fine).
463        let identity = self.session_identity().unwrap_or_default();
464        let resp = self
465            .db_request(
466                ID::default(),
467                identity,
468                DatabaseOp::SetInstanceMetadata {
469                    metadata: Box::new(metadata.clone()),
470                },
471            )
472            .await?;
473        Self::expect_ok(resp)
474    }
475
476    // === Database operations (DatabaseOp via AuthenticatedDb envelope) ===
477
478    /// Acquire a [`TransactionContext`] for the given stores and scope.
479    ///
480    /// The returned context includes main-tree parents with heights,
481    /// per-store subtree parents, `_settings` tips, and the merged
482    /// `_settings` value — everything needed to build and sign an entry
483    /// locally without further round-trips.
484    pub async fn begin_transaction(
485        &self,
486        root_id: ID,
487        identity: SigKey,
488        stores: Vec<String>,
489        scope: ReadScope,
490    ) -> crate::Result<TransactionContext> {
491        let resp = self
492            .db_request(
493                root_id,
494                identity,
495                DatabaseOp::BeginTransaction { stores, scope },
496            )
497            .await?;
498        match resp {
499            ServiceResponse::TransactionContext(ctx) => Ok(ctx),
500            other => Err(unexpected_response("TransactionContext", &other)),
501        }
502    }
503
504    /// Fetch the server-materialized merged state of an unencrypted store.
505    pub async fn get_store_state(
506        &self,
507        root_id: ID,
508        identity: SigKey,
509        store: String,
510    ) -> crate::Result<WireCrdtValue> {
511        let resp = self
512            .db_request(root_id, identity, DatabaseOp::GetStoreState { store })
513            .await?;
514        match resp {
515            ServiceResponse::CrdtValue(v) => Ok(v),
516            other => Err(unexpected_response("CrdtValue", &other)),
517        }
518    }
519
520    /// Fetch ordered, verified, opaque store entries reachable from `tips`.
521    ///
522    /// Universal primitive — works for encrypted stores (client decrypts
523    /// locally) as well as unencrypted ones.
524    pub async fn get_store_entries(
525        &self,
526        root_id: ID,
527        identity: SigKey,
528        store: String,
529        tips: Vec<ID>,
530        scope: ReadScope,
531    ) -> crate::Result<Vec<Entry>> {
532        let resp = self
533            .db_request(
534                root_id,
535                identity,
536                DatabaseOp::GetStoreEntries { store, tips, scope },
537            )
538            .await?;
539        match resp {
540            ServiceResponse::Entries(entries) => Ok(entries),
541            other => Err(unexpected_response("Entries", &other)),
542        }
543    }
544
545    /// Fetch the database's Verified-frontier tips.
546    pub async fn get_verified_tips(&self, root_id: ID, identity: SigKey) -> crate::Result<Vec<ID>> {
547        let resp = self
548            .db_request(root_id, identity, DatabaseOp::GetVerifiedTips)
549            .await?;
550        match resp {
551            ServiceResponse::Ids(ids) => Ok(ids),
552            other => Err(unexpected_response("Ids", &other)),
553        }
554    }
555
556    /// Submit a client-signed entry to the server.
557    ///
558    /// The server stores the entry as `Unverified` and runs its own
559    /// verification pass — it never trusts a submitted entry's claimed
560    /// validity.
561    pub async fn submit_signed_entry(
562        &self,
563        root_id: ID,
564        identity: SigKey,
565        entry: Entry,
566    ) -> crate::Result<()> {
567        let resp = self
568            .db_request(
569                root_id,
570                identity,
571                DatabaseOp::SubmitSignedEntry {
572                    entry: Box::new(entry),
573                },
574            )
575            .await?;
576        match resp {
577            ServiceResponse::Ok => Ok(()),
578            other => Err(unexpected_response("Ok", &other)),
579        }
580    }
581
582    /// Fetch a single database entry by id.
583    ///
584    /// Gated post-fetch by the entry's owning tree, so the caller must hold
585    /// at least `Read` on the database the entry belongs to.
586    pub async fn db_get_entry(
587        &self,
588        root_id: ID,
589        identity: SigKey,
590        id: ID,
591    ) -> crate::Result<Entry> {
592        let resp = self
593            .db_request(root_id, identity, DatabaseOp::GetEntry { id })
594            .await?;
595        match resp {
596            ServiceResponse::Entry(entry) => Ok(entry),
597            other => Err(unexpected_response("Entry", &other)),
598        }
599    }
600
601    /// Subtree tips reachable from given main-tree entries.
602    pub async fn store_snapshot_at(
603        &self,
604        root_id: ID,
605        identity: SigKey,
606        store: String,
607        up_to: Vec<ID>,
608    ) -> crate::Result<Vec<ID>> {
609        let resp = self
610            .db_request(
611                root_id,
612                identity,
613                DatabaseOp::GetStoreTipsUpToEntries { store, up_to },
614            )
615            .await?;
616        match resp {
617            ServiceResponse::Ids(ids) => Ok(ids),
618            other => Err(unexpected_response("Ids", &other)),
619        }
620    }
621
622    /// Compute merge state: lowest common ancestor + path to tip entries.
623    pub async fn compute_merge_state(
624        &self,
625        root_id: ID,
626        identity: SigKey,
627        store: String,
628        entry_ids: Vec<ID>,
629    ) -> crate::Result<MergeState> {
630        let resp = self
631            .db_request(
632                root_id,
633                identity,
634                DatabaseOp::ComputeMergeState { store, entry_ids },
635            )
636            .await?;
637        match resp {
638            ServiceResponse::MergeState(state) => Ok(state),
639            other => Err(unexpected_response("MergeState", &other)),
640        }
641    }
642
643    /// Tier 2 cache read: ask the daemon for a previously-stashed CRDT
644    /// state blob. `None` on miss; the caller falls back to a full
645    /// recompute from store entries.
646    pub async fn get_cached_crdt_state_remote(
647        &self,
648        root_id: ID,
649        identity: SigKey,
650        store: String,
651        key: ID,
652    ) -> crate::Result<Option<Vec<u8>>> {
653        let resp = self
654            .db_request(
655                root_id,
656                identity,
657                DatabaseOp::GetCachedCrdtState { store, key },
658            )
659            .await?;
660        match resp {
661            ServiceResponse::CachedCrdtState(blob) => Ok(blob),
662            other => Err(unexpected_response("CachedCrdtState", &other)),
663        }
664    }
665
666    /// Tier 2 cache write: stash a client-computed CRDT state blob in the
667    /// daemon's unified cache, scoped to the session user
668    /// ([`crate::backend::CacheScope::User`]). Per-user trust; the daemon
669    /// stores opaque bytes verbatim.
670    pub async fn cache_crdt_state_remote(
671        &self,
672        root_id: ID,
673        identity: SigKey,
674        store: String,
675        key: ID,
676        blob: Vec<u8>,
677    ) -> crate::Result<()> {
678        let resp = self
679            .db_request(
680                root_id,
681                identity,
682                DatabaseOp::CacheCrdtState { store, key, blob },
683            )
684            .await?;
685        match resp {
686            ServiceResponse::Ok => Ok(()),
687            other => Err(unexpected_response("Ok", &other)),
688        }
689    }
690}
691
692fn unexpected_response(expected: &str, actual: &ServiceResponse) -> crate::Error {
693    crate::Error::Io(std::io::Error::new(
694        std::io::ErrorKind::InvalidData,
695        format!("Expected {expected} response, got {actual:?}"),
696    ))
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702
703    fn eid(s: &str) -> ID {
704        ID::from_bytes(s)
705    }
706
707    fn root() -> ID {
708        eid("root")
709    }
710
711    #[test]
712    fn client_cache_round_trip() {
713        let mut c = ClientCrdtCache::new(1024);
714        c.put(root(), eid("e1"), "store1".into(), b"hello".to_vec());
715        assert_eq!(
716            c.get(&root(), &eid("e1"), "store1"),
717            Some(b"hello".to_vec())
718        );
719    }
720
721    #[test]
722    fn client_cache_evicts_under_byte_pressure() {
723        let mut c = ClientCrdtCache::new(100);
724        c.put(root(), eid("e1"), "s".into(), vec![1u8; 50]);
725        c.put(root(), eid("e2"), "s".into(), vec![2u8; 50]);
726        assert_eq!(c.current_bytes, 100);
727        c.put(root(), eid("e3"), "s".into(), vec![3u8; 50]);
728        assert!(
729            c.get(&root(), &eid("e1"), "s").is_none(),
730            "least-recently-used entry must be evicted"
731        );
732        assert_eq!(c.get(&root(), &eid("e2"), "s"), Some(vec![2u8; 50]));
733        assert_eq!(c.get(&root(), &eid("e3"), "s"), Some(vec![3u8; 50]));
734    }
735
736    #[test]
737    fn client_cache_get_promotes_to_most_recent() {
738        let mut c = ClientCrdtCache::new(100);
739        c.put(root(), eid("e1"), "s".into(), vec![1u8; 50]);
740        c.put(root(), eid("e2"), "s".into(), vec![2u8; 50]);
741        let _ = c.get(&root(), &eid("e1"), "s"); // promote e1
742        c.put(root(), eid("e3"), "s".into(), vec![3u8; 50]);
743        assert!(
744            c.get(&root(), &eid("e1"), "s").is_some(),
745            "promoted entry must survive eviction"
746        );
747        assert!(
748            c.get(&root(), &eid("e2"), "s").is_none(),
749            "older un-touched entry must be evicted"
750        );
751    }
752
753    #[test]
754    fn client_cache_replaces_in_place() {
755        let mut c = ClientCrdtCache::new(1024);
756        c.put(root(), eid("e1"), "s".into(), b"v1".to_vec());
757        c.put(root(), eid("e1"), "s".into(), b"v2-different-len".to_vec());
758        assert_eq!(
759            c.get(&root(), &eid("e1"), "s"),
760            Some(b"v2-different-len".to_vec())
761        );
762        // current_bytes should reflect only the replacement, not the sum.
763        assert_eq!(c.current_bytes, b"v2-different-len".len());
764    }
765}