eidetica/store/table.rs
1use std::marker::PhantomData;
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7use crate::{
8 Result, Store, Transaction,
9 crdt::{CRDT, Doc},
10 store::{Registered, errors::StoreError},
11};
12
13/// A Row-based Store
14///
15/// `Table` provides a record-oriented storage abstraction for entries in a subtree,
16/// similar to a database table with automatic primary key generation.
17///
18/// # Features
19/// - Automatically generates UUIDv4 primary keys for new records
20/// - Provides CRUD operations (Create, Read, Update, Delete) for record-based data
21/// - Supports searching across all records with a predicate function
22///
23/// # Type Parameters
24/// - `T`: The record type to be stored, which must be serializable, deserializable, and cloneable
25///
26/// This abstraction simplifies working with collections of similarly structured data
27/// by handling the details of:
28/// - Primary key generation and management
29/// - Serialization/deserialization of records
30/// - Storage within the underlying CRDT (Doc)
31pub struct Table<T>
32where
33 T: Serialize + for<'de> Deserialize<'de> + Clone,
34{
35 name: String,
36 txn: Transaction,
37 phantom: PhantomData<T>,
38}
39
40impl<T> Registered for Table<T>
41where
42 T: Serialize + for<'de> Deserialize<'de> + Clone,
43{
44 fn type_id() -> &'static str {
45 "table:v0"
46 }
47}
48
49#[async_trait]
50impl<T> Store for Table<T>
51where
52 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
53{
54 type Data = Doc;
55
56 async fn load(txn: &Transaction, subtree_name: String) -> Result<Self> {
57 Ok(Self {
58 name: subtree_name,
59 txn: txn.clone(),
60 phantom: PhantomData,
61 })
62 }
63
64 fn name(&self) -> &str {
65 &self.name
66 }
67
68 fn transaction(&self) -> &Transaction {
69 &self.txn
70 }
71}
72
73impl<T> Table<T>
74where
75 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
76{
77 /// Retrieves a row from the Table by its primary key.
78 ///
79 /// This method first checks for the record in the current transaction's
80 /// local changes, and if not found, retrieves it from the persistent state.
81 ///
82 /// # Arguments
83 /// * `key` - The primary key (UUID string) of the record to retrieve
84 ///
85 /// # Returns
86 /// * `Ok(T)` - The retrieved record if found
87 /// * `Err(Error::NotFound)` - If no record exists with the given key
88 ///
89 /// # Errors
90 /// Returns an error if:
91 /// * The record doesn't exist (`Error::NotFound`)
92 /// * There's a serialization/deserialization error
93 pub async fn get(&self, key: impl AsRef<str>) -> Result<T> {
94 let key = key.as_ref();
95
96 // Check local staged data from the transaction
97 if let Some(data) = self.local_data()? {
98 // If there's a tombstone in local data, the record is deleted
99 if data.is_tombstone(key) {
100 return Err(StoreError::KeyNotFound {
101 store: self.name.clone(),
102 key: key.to_string(),
103 }
104 .into());
105 }
106
107 // If there's a value in local data, return that
108 if let Some(map_value) = data.get(key)
109 && let Some(value) = map_value.as_text()
110 {
111 return serde_json::from_str(value).map_err(|e| {
112 StoreError::DeserializationFailed {
113 store: self.name.clone(),
114 reason: format!("Failed to deserialize record for key '{key}': {e}"),
115 }
116 .into()
117 });
118 }
119 }
120
121 // Otherwise, get the full state from the backend
122 let data: Doc = self.txn.get_full_state(&self.name).await?;
123
124 // Get the value
125 match data.get(key).and_then(|v| v.as_text()) {
126 Some(value) => serde_json::from_str(value).map_err(|e| {
127 StoreError::DeserializationFailed {
128 store: self.name.clone(),
129 reason: format!("Failed to deserialize record for key '{key}': {e}"),
130 }
131 .into()
132 }),
133 None => Err(StoreError::KeyNotFound {
134 store: self.name.clone(),
135 key: key.to_string(),
136 }
137 .into()),
138 }
139 }
140
141 /// Inserts a new row into the Table and returns its generated primary key.
142 ///
143 /// This method:
144 /// 1. Generates a new UUIDv4 as the primary key
145 /// 2. Serializes the record
146 /// 3. Stores it in the local transaction
147 ///
148 /// # Arguments
149 /// * `row` - The record to insert
150 ///
151 /// # Returns
152 /// * `Ok(String)` - The generated UUID primary key as a string
153 ///
154 /// # Errors
155 /// Returns an error if there's a serialization error or the operation fails
156 pub async fn insert(&self, row: T) -> Result<String> {
157 // Generate a UUIDv4 for the primary key
158 let primary_key = Uuid::new_v4().to_string();
159
160 // Get current data from the transaction, or create new if not existing
161 let mut data = self.local_data()?.unwrap_or_default();
162
163 // Serialize the row
164 let serialized_row =
165 serde_json::to_string(&row).map_err(|e| StoreError::SerializationFailed {
166 store: self.name.clone(),
167 reason: format!("Failed to serialize record: {e}"),
168 })?;
169
170 // Update the data with the new row
171 data.set(primary_key.clone(), serialized_row);
172
173 // Serialize and update the transaction
174 let serialized_data =
175 serde_json::to_vec(&data).map_err(|e| StoreError::SerializationFailed {
176 store: self.name.clone(),
177 reason: format!("Failed to serialize subtree data: {e}"),
178 })?;
179 self.txn.update_subtree(&self.name, serialized_data).await?;
180
181 // Return the primary key
182 Ok(primary_key)
183 }
184
185 /// Updates an existing row in the Table with a new value.
186 ///
187 /// This method completely replaces the existing record with the provided one.
188 /// If the record doesn't exist yet, it will be created with the given key.
189 ///
190 /// # Arguments
191 /// * `key` - The primary key of the record to update
192 /// * `row` - The new record value
193 ///
194 /// # Returns
195 /// * `Ok(())` - If the update was successful
196 ///
197 /// # Errors
198 /// Returns an error if there's a serialization error or the operation fails
199 pub async fn set(&self, key: impl AsRef<str>, row: T) -> Result<()> {
200 let key_str = key.as_ref();
201 // Get current data from the transaction, or create new if not existing
202 let mut data = self.local_data()?.unwrap_or_default();
203
204 // Serialize the row
205 let serialized_row =
206 serde_json::to_string(&row).map_err(|e| StoreError::SerializationFailed {
207 store: self.name.clone(),
208 reason: format!("Failed to serialize record for key '{key_str}': {e}"),
209 })?;
210
211 // Update the data
212 data.set(key_str, serialized_row);
213
214 // Serialize and update the transaction
215 let serialized_data =
216 serde_json::to_vec(&data).map_err(|e| StoreError::SerializationFailed {
217 store: self.name.clone(),
218 reason: format!("Failed to serialize subtree data: {e}"),
219 })?;
220 self.txn.update_subtree(&self.name, serialized_data).await
221 }
222
223 /// Deletes a row from the Table by its primary key.
224 ///
225 /// This method marks the record as deleted using CRDT tombstone semantics,
226 /// ensuring the deletion is properly synchronized across distributed nodes.
227 ///
228 /// # Arguments
229 /// * `key` - The primary key of the record to delete
230 ///
231 /// # Returns
232 /// * `Ok(true)` - If a record existed and was deleted
233 /// * `Ok(false)` - If no record existed with the given key
234 ///
235 /// # Errors
236 /// Returns an error if there's a serialization error or the operation fails
237 pub async fn delete(&self, key: impl AsRef<str>) -> Result<bool> {
238 let key_str = key.as_ref();
239
240 // Check if the record exists (checks both local and full state)
241 let exists = self.get(key_str).await.is_ok();
242
243 // If the record doesn't exist, return false early
244 if !exists {
245 return Ok(false);
246 }
247
248 // Get current data from the transaction, or create new if not existing
249 let mut data = self.local_data()?.unwrap_or_default();
250
251 // Remove the key (creates tombstone for CRDT semantics)
252 data.remove(key_str);
253
254 // Serialize and update the transaction
255 let serialized_data =
256 serde_json::to_vec(&data).map_err(|e| StoreError::SerializationFailed {
257 store: self.name.clone(),
258 reason: format!("Failed to serialize subtree data: {e}"),
259 })?;
260 self.txn.update_subtree(&self.name, serialized_data).await?;
261
262 // Return true since we confirmed the record existed
263 Ok(true)
264 }
265
266 /// Searches for rows matching a predicate function.
267 ///
268 /// # Arguments
269 /// * `query` - A function that takes a reference to a record and returns a boolean
270 ///
271 /// # Returns
272 /// * `Ok(Vec<(String, T)>)` - A vector of (primary_key, record) pairs that match the predicate
273 ///
274 /// # Errors
275 /// Returns an error if there's a serialization error or the operation fails
276 pub async fn search(&self, query: impl Fn(&T) -> bool) -> Result<Vec<(String, T)>> {
277 // Get the full state combining local and backend data
278 let mut result = Vec::new();
279
280 // Get the full state from the backend
281 let mut data = self.txn.get_full_state::<Doc>(&self.name).await?;
282
283 // Merge with local staged data if any
284 if let Some(local) = self.local_data()? {
285 data = data.merge(&local)?;
286 }
287
288 // Iterate through all key-value pairs
289 for (key, map_value) in data.iter() {
290 // Skip non-text values
291 if let Some(value) = map_value.as_text() {
292 // Deserialize the row
293 let row: T =
294 serde_json::from_str(value).map_err(|e| StoreError::DeserializationFailed {
295 store: self.name.clone(),
296 reason: format!(
297 "Failed to deserialize record for key '{key}' during search: {e}"
298 ),
299 })?;
300
301 // Check if the row matches the query
302 if query(&row) {
303 result.push((key.clone(), row));
304 }
305 }
306 }
307
308 Ok(result)
309 }
310}