eidetica/store/table.rs
1use std::marker::PhantomData;
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7use crate::{
8 Result, Store, Transaction,
9 crdt::{CRDT, Doc},
10 store::{Registered, errors::StoreError},
11};
12
13/// A Row-based Store
14///
15/// `Table` provides a record-oriented storage abstraction for entries in a subtree,
16/// similar to a database table with automatic primary key generation.
17///
18/// # Features
19/// - Automatically generates UUIDv4 primary keys for new records
20/// - Provides CRUD operations (Create, Read, Update, Delete) for record-based data
21/// - Supports searching across all records with a predicate function
22///
23/// # Type Parameters
24/// - `T`: The record type to be stored, which must be serializable, deserializable, and cloneable
25///
26/// This abstraction simplifies working with collections of similarly structured data
27/// by handling the details of:
28/// - Primary key generation and management
29/// - Serialization/deserialization of records
30/// - Storage within the underlying CRDT (Doc)
31pub struct Table<T>
32where
33 T: Serialize + for<'de> Deserialize<'de> + Clone,
34{
35 name: String,
36 txn: Transaction,
37 phantom: PhantomData<T>,
38}
39
40impl<T> Registered for Table<T>
41where
42 T: Serialize + for<'de> Deserialize<'de> + Clone,
43{
44 fn type_id() -> &'static str {
45 "table:v0"
46 }
47}
48
49#[async_trait]
50impl<T> Store for Table<T>
51where
52 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
53{
54 type Data = Doc;
55
56 async fn new(txn: &Transaction, subtree_name: String) -> Result<Self> {
57 Ok(Self {
58 name: subtree_name,
59 txn: txn.clone(),
60 phantom: PhantomData,
61 })
62 }
63
64 fn name(&self) -> &str {
65 &self.name
66 }
67
68 fn transaction(&self) -> &Transaction {
69 &self.txn
70 }
71}
72
73impl<T> Table<T>
74where
75 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
76{
77 /// Retrieves a row from the Table by its primary key.
78 ///
79 /// This method first checks for the record in the current transaction's
80 /// local changes, and if not found, retrieves it from the persistent state.
81 ///
82 /// # Arguments
83 /// * `key` - The primary key (UUID string) of the record to retrieve
84 ///
85 /// # Returns
86 /// * `Ok(T)` - The retrieved record if found
87 /// * `Err(Error::NotFound)` - If no record exists with the given key
88 ///
89 /// # Errors
90 /// Returns an error if:
91 /// * The record doesn't exist (`Error::NotFound`)
92 /// * There's a serialization/deserialization error
93 pub async fn get(&self, key: impl AsRef<str>) -> Result<T> {
94 let key = key.as_ref();
95
96 // Check local staged data from the transaction
97 if let Some(data) = self.local_data()? {
98 // If there's a tombstone in local data, the record is deleted
99 if data.is_tombstone(key) {
100 return Err(StoreError::KeyNotFound {
101 store: self.name.clone(),
102 key: key.to_string(),
103 }
104 .into());
105 }
106
107 // If there's a value in local data, return that
108 if let Some(map_value) = data.get(key)
109 && let Some(value) = map_value.as_text()
110 {
111 return serde_json::from_str(value).map_err(|e| {
112 StoreError::DeserializationFailed {
113 store: self.name.clone(),
114 reason: format!("Failed to deserialize record for key '{key}': {e}"),
115 }
116 .into()
117 });
118 }
119 }
120
121 // Otherwise, get the full state from the backend
122 let data: Doc = self.txn.get_full_state(&self.name).await?;
123
124 // Get the value
125 match data.get(key).and_then(|v| v.as_text()) {
126 Some(value) => serde_json::from_str(value).map_err(|e| {
127 StoreError::DeserializationFailed {
128 store: self.name.clone(),
129 reason: format!("Failed to deserialize record for key '{key}': {e}"),
130 }
131 .into()
132 }),
133 None => Err(StoreError::KeyNotFound {
134 store: self.name.clone(),
135 key: key.to_string(),
136 }
137 .into()),
138 }
139 }
140
141 /// Inserts a new row into the Table and returns its generated primary key.
142 ///
143 /// This method:
144 /// 1. Generates a new UUIDv4 as the primary key
145 /// 2. Serializes the record
146 /// 3. Stores it in the local transaction
147 ///
148 /// # Arguments
149 /// * `row` - The record to insert
150 ///
151 /// # Returns
152 /// * `Ok(String)` - The generated UUID primary key as a string
153 ///
154 /// # Errors
155 /// Returns an error if there's a serialization error or the operation fails
156 pub async fn insert(&self, row: T) -> Result<String> {
157 // Generate a UUIDv4 for the primary key
158 let primary_key = Uuid::new_v4().to_string();
159
160 // Get current data from the transaction, or create new if not existing
161 let mut data = self.local_data()?.unwrap_or_default();
162
163 // Serialize the row
164 let serialized_row =
165 serde_json::to_string(&row).map_err(|e| StoreError::SerializationFailed {
166 store: self.name.clone(),
167 reason: format!("Failed to serialize record: {e}"),
168 })?;
169
170 // Update the data with the new row
171 data.set(primary_key.clone(), serialized_row);
172
173 // Serialize and update the transaction
174 let serialized_data =
175 serde_json::to_string(&data).map_err(|e| StoreError::SerializationFailed {
176 store: self.name.clone(),
177 reason: format!("Failed to serialize subtree data: {e}"),
178 })?;
179 self.txn
180 .update_subtree(&self.name, &serialized_data)
181 .await?;
182
183 // Return the primary key
184 Ok(primary_key)
185 }
186
187 /// Updates an existing row in the Table with a new value.
188 ///
189 /// This method completely replaces the existing record with the provided one.
190 /// If the record doesn't exist yet, it will be created with the given key.
191 ///
192 /// # Arguments
193 /// * `key` - The primary key of the record to update
194 /// * `row` - The new record value
195 ///
196 /// # Returns
197 /// * `Ok(())` - If the update was successful
198 ///
199 /// # Errors
200 /// Returns an error if there's a serialization error or the operation fails
201 pub async fn set(&self, key: impl AsRef<str>, row: T) -> Result<()> {
202 let key_str = key.as_ref();
203 // Get current data from the transaction, or create new if not existing
204 let mut data = self.local_data()?.unwrap_or_default();
205
206 // Serialize the row
207 let serialized_row =
208 serde_json::to_string(&row).map_err(|e| StoreError::SerializationFailed {
209 store: self.name.clone(),
210 reason: format!("Failed to serialize record for key '{key_str}': {e}"),
211 })?;
212
213 // Update the data
214 data.set(key_str, serialized_row);
215
216 // Serialize and update the transaction
217 let serialized_data =
218 serde_json::to_string(&data).map_err(|e| StoreError::SerializationFailed {
219 store: self.name.clone(),
220 reason: format!("Failed to serialize subtree data: {e}"),
221 })?;
222 self.txn.update_subtree(&self.name, &serialized_data).await
223 }
224
225 /// Deletes a row from the Table by its primary key.
226 ///
227 /// This method marks the record as deleted using CRDT tombstone semantics,
228 /// ensuring the deletion is properly synchronized across distributed nodes.
229 ///
230 /// # Arguments
231 /// * `key` - The primary key of the record to delete
232 ///
233 /// # Returns
234 /// * `Ok(true)` - If a record existed and was deleted
235 /// * `Ok(false)` - If no record existed with the given key
236 ///
237 /// # Errors
238 /// Returns an error if there's a serialization error or the operation fails
239 pub async fn delete(&self, key: impl AsRef<str>) -> Result<bool> {
240 let key_str = key.as_ref();
241
242 // Check if the record exists (checks both local and full state)
243 let exists = self.get(key_str).await.is_ok();
244
245 // If the record doesn't exist, return false early
246 if !exists {
247 return Ok(false);
248 }
249
250 // Get current data from the transaction, or create new if not existing
251 let mut data = self.local_data()?.unwrap_or_default();
252
253 // Remove the key (creates tombstone for CRDT semantics)
254 data.remove(key_str);
255
256 // Serialize and update the transaction
257 let serialized_data =
258 serde_json::to_string(&data).map_err(|e| StoreError::SerializationFailed {
259 store: self.name.clone(),
260 reason: format!("Failed to serialize subtree data: {e}"),
261 })?;
262 self.txn
263 .update_subtree(&self.name, &serialized_data)
264 .await?;
265
266 // Return true since we confirmed the record existed
267 Ok(true)
268 }
269
270 /// Searches for rows matching a predicate function.
271 ///
272 /// # Arguments
273 /// * `query` - A function that takes a reference to a record and returns a boolean
274 ///
275 /// # Returns
276 /// * `Ok(Vec<(String, T)>)` - A vector of (primary_key, record) pairs that match the predicate
277 ///
278 /// # Errors
279 /// Returns an error if there's a serialization error or the operation fails
280 pub async fn search(&self, query: impl Fn(&T) -> bool) -> Result<Vec<(String, T)>> {
281 // Get the full state combining local and backend data
282 let mut result = Vec::new();
283
284 // Get the full state from the backend
285 let mut data = self.txn.get_full_state::<Doc>(&self.name).await?;
286
287 // Merge with local staged data if any
288 if let Some(local) = self.local_data()? {
289 data = data.merge(&local)?;
290 }
291
292 // Iterate through all key-value pairs
293 for (key, map_value) in data.iter() {
294 // Skip non-text values
295 if let Some(value) = map_value.as_text() {
296 // Deserialize the row
297 let row: T =
298 serde_json::from_str(value).map_err(|e| StoreError::DeserializationFailed {
299 store: self.name.clone(),
300 reason: format!(
301 "Failed to deserialize record for key '{key}' during search: {e}"
302 ),
303 })?;
304
305 // Check if the row matches the query
306 if query(&row) {
307 result.push((key.clone(), row));
308 }
309 }
310 }
311
312 Ok(result)
313 }
314}