Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/instance/backend/
remote.rs

1//! [`RemoteBackend`]: the seam backed by a service connection.
2
3use async_trait::async_trait;
4
5use super::Backend;
6use crate::{
7    Result,
8    auth::SigKey,
9    backend::{InstanceMetadata, VerificationStatus},
10    entry::{Entry, ID},
11    instance::WriteSource,
12    service::{client::RemoteConnection, protocol::ReadScope},
13    snapshot::Snapshot,
14};
15
16/// A [`Backend`] that translates every storage operation to a wire RPC over a
17/// shared [`RemoteConnection`].
18///
19/// The only per-handle state is the acting identity: `None` means "use the
20/// connection's current session identity" (the instance-level backend), and
21/// `Some(k)` means "act as `k`" (a `Database` handle opened with key `k`).
22/// Every clone shares the same socket and session — additional keys are
23/// proof-of-possession registered into the connection's keyset by the handle
24/// constructors, not by holding a separate connection.
25///
26/// Tree-scoped methods use the `tree` argument the caller already supplies
27/// (`Transaction` passes the owning database's root), so no root is bound here.
28/// `get` derives its gating tree server-side from the fetched entry, so it
29/// passes `ID::default()` as the (waved-through) request root.
30///
31/// CRDT-state caching is two-tiered: a connection-scoped process-lifetime LRU
32/// (tier 1) backed by the daemon's unified scope-keyed cache (tier 2) reached
33/// via `GetCachedCrdtState` / `CacheCrdtState` RPCs.
34#[derive(Debug, Clone)]
35pub struct RemoteBackend {
36    conn: RemoteConnection,
37    identity: Option<SigKey>,
38}
39
40impl RemoteBackend {
41    pub fn new(conn: RemoteConnection, identity: Option<SigKey>) -> Self {
42        Self { conn, identity }
43    }
44
45    /// The acting identity for authenticated RPCs: the bound per-handle
46    /// identity, else the connection's current session identity.
47    fn identity(&self) -> SigKey {
48        self.identity
49            .clone()
50            .or_else(|| self.conn.session_identity())
51            .unwrap_or_default()
52    }
53}
54
55#[async_trait]
56impl Backend for RemoteBackend {
57    async fn get(&self, id: &ID) -> Result<Entry> {
58        // `ID::default()` is never a real database, so the pre-dispatch gate
59        // waves it through; the server then gates post-fetch against the
60        // fetched entry's owning tree using our identity.
61        self.conn
62            .db_get_entry(ID::default(), self.identity(), id.clone())
63            .await
64    }
65
66    async fn snapshot(&self, tree: &ID) -> Result<Snapshot> {
67        match self
68            .conn
69            .get_verified_tips(tree.clone(), self.identity())
70            .await
71        {
72            Ok(tips) => Ok(Snapshot::new(tips)),
73            Err(e) if e.is_not_found() => Ok(Snapshot::EMPTY),
74            Err(e) => Err(e),
75        }
76    }
77
78    async fn store_snapshot(&self, tree: &ID, store: &str) -> Result<Snapshot> {
79        let tree_tips = match self
80            .conn
81            .get_verified_tips(tree.clone(), self.identity())
82            .await
83        {
84            Ok(tips) => tips,
85            Err(e) if e.is_not_found() => return Ok(Snapshot::EMPTY),
86            Err(e) => return Err(e),
87        };
88        if tree_tips.is_empty() {
89            return Ok(Snapshot::EMPTY);
90        }
91        match self
92            .conn
93            .store_snapshot_at(tree.clone(), self.identity(), store.to_string(), tree_tips)
94            .await
95        {
96            Ok(tips) => Ok(Snapshot::new(tips)),
97            Err(e) if e.is_not_found() => Ok(Snapshot::EMPTY),
98            Err(e) => Err(e),
99        }
100    }
101
102    async fn store_snapshot_at(
103        &self,
104        tree: &ID,
105        store: &str,
106        main_snapshot: &Snapshot,
107    ) -> Result<Snapshot> {
108        match self
109            .conn
110            .store_snapshot_at(
111                tree.clone(),
112                self.identity(),
113                store.to_string(),
114                main_snapshot.tips().to_vec(),
115            )
116            .await
117        {
118            Ok(tips) => Ok(Snapshot::new(tips)),
119            Err(e) if e.is_not_found() => Ok(Snapshot::EMPTY),
120            Err(e) => Err(e),
121        }
122    }
123
124    async fn store_at(&self, tree: &ID, store: &str, snapshot: &Snapshot) -> Result<Vec<Entry>> {
125        self.conn
126            .get_store_entries(
127                tree.clone(),
128                self.identity(),
129                store.to_string(),
130                snapshot.tips().to_vec(),
131                ReadScope::Verified,
132            )
133            .await
134    }
135
136    async fn find_merge_base(&self, tree: &ID, store: &str, entry_ids: &[ID]) -> Result<ID> {
137        let state = self
138            .conn
139            .compute_merge_state(
140                tree.clone(),
141                self.identity(),
142                store.to_string(),
143                entry_ids.to_vec(),
144            )
145            .await?;
146        Ok(state.merge_base)
147    }
148
149    async fn get_path_from_to(
150        &self,
151        tree: &ID,
152        store: &str,
153        _from_id: &ID,
154        to_ids: &[ID],
155    ) -> Result<Vec<ID>> {
156        // The server fuses LCA + path against `to_ids` in one round-trip, so a
157        // separately-supplied `from_id` LCA isn't replayed.
158        let state = self
159            .conn
160            .compute_merge_state(
161                tree.clone(),
162                self.identity(),
163                store.to_string(),
164                to_ids.to_vec(),
165            )
166            .await?;
167        Ok(state.path)
168    }
169
170    async fn get_cached_crdt_state(
171        &self,
172        tree: &ID,
173        entry_id: &ID,
174        store: &str,
175    ) -> Result<Option<Vec<u8>>> {
176        // Tier 1: connection-shared process-lifetime LRU.
177        if let Some(blob) = self.conn.cache_get(tree, entry_id, store) {
178            return Ok(Some(blob));
179        }
180        // Tier 2: daemon-side unified cache, durable across sessions.
181        let blob = self
182            .conn
183            .get_cached_crdt_state_remote(
184                tree.clone(),
185                self.identity(),
186                store.to_string(),
187                entry_id.clone(),
188            )
189            .await?;
190        if let Some(b) = &blob {
191            self.conn
192                .cache_put(tree.clone(), entry_id.clone(), store.to_string(), b.clone());
193        }
194        Ok(blob)
195    }
196
197    async fn cache_crdt_state(
198        &self,
199        tree: &ID,
200        entry_id: &ID,
201        store: &str,
202        state: Vec<u8>,
203    ) -> Result<()> {
204        // Tier 1: stash locally first so a same-session re-read hits even if
205        // the tier-2 write later fails.
206        self.conn.cache_put(
207            tree.clone(),
208            entry_id.clone(),
209            store.to_string(),
210            state.clone(),
211        );
212        // Tier 2: propagate to the daemon. Awaited so wire errors surface.
213        self.conn
214            .cache_crdt_state_remote(
215                tree.clone(),
216                self.identity(),
217                store.to_string(),
218                entry_id.clone(),
219                state,
220            )
221            .await
222    }
223
224    async fn put(&self, entry: Entry) -> Result<()> {
225        let tree_root = entry.root().unwrap_or_else(|| entry.id());
226        self.conn
227            .submit_signed_entry(tree_root, self.identity(), entry)
228            .await
229    }
230
231    async fn write_entry(
232        &self,
233        _verification: VerificationStatus,
234        entry: Entry,
235        _source: WriteSource,
236    ) -> Result<()> {
237        // The server stores the submitted entry `Unverified` and runs its own
238        // verification pass; a client-asserted status is never trusted.
239        let tree_root = entry.root().unwrap_or_else(|| entry.id());
240        self.conn
241            .submit_signed_entry(tree_root, self.identity(), entry)
242            .await
243    }
244
245    async fn get_instance_metadata(&self) -> Result<Option<InstanceMetadata>> {
246        self.conn.get_instance_metadata().await
247    }
248
249    async fn set_instance_metadata(&self, metadata: &InstanceMetadata) -> Result<()> {
250        self.conn.set_instance_metadata(metadata).await
251    }
252
253    fn remote_connection(&self) -> Option<RemoteConnection> {
254        Some(self.conn.clone())
255    }
256}