Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica

eidetica/store/
table.rs

1use std::marker::PhantomData;
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7use crate::{
8    Result, Store, Transaction,
9    crdt::{CRDT, Doc},
10    store::{Registered, errors::StoreError},
11};
12
13/// A Row-based Store
14///
15/// `Table` provides a record-oriented storage abstraction for entries in a subtree,
16/// similar to a database table with automatic primary key generation.
17///
18/// # Features
19/// - Automatically generates UUIDv4 primary keys for new records
20/// - Provides CRUD operations (Create, Read, Update, Delete) for record-based data
21/// - Supports searching across all records with a predicate function
22///
23/// # Type Parameters
24/// - `T`: The record type to be stored, which must be serializable, deserializable, and cloneable
25///
26/// This abstraction simplifies working with collections of similarly structured data
27/// by handling the details of:
28/// - Primary key generation and management
29/// - Serialization/deserialization of records
30/// - Storage within the underlying CRDT (Doc)
31pub struct Table<T>
32where
33    T: Serialize + for<'de> Deserialize<'de> + Clone,
34{
35    name: String,
36    txn: Transaction,
37    phantom: PhantomData<T>,
38}
39
40impl<T> Registered for Table<T>
41where
42    T: Serialize + for<'de> Deserialize<'de> + Clone,
43{
44    fn type_id() -> &'static str {
45        "table:v0"
46    }
47}
48
49#[async_trait]
50impl<T> Store for Table<T>
51where
52    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
53{
54    type Data = Doc;
55
56    async fn new(txn: &Transaction, subtree_name: String) -> Result<Self> {
57        Ok(Self {
58            name: subtree_name,
59            txn: txn.clone(),
60            phantom: PhantomData,
61        })
62    }
63
64    fn name(&self) -> &str {
65        &self.name
66    }
67
68    fn transaction(&self) -> &Transaction {
69        &self.txn
70    }
71}
72
73impl<T> Table<T>
74where
75    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
76{
77    /// Retrieves a row from the Table by its primary key.
78    ///
79    /// This method first checks for the record in the current transaction's
80    /// local changes, and if not found, retrieves it from the persistent state.
81    ///
82    /// # Arguments
83    /// * `key` - The primary key (UUID string) of the record to retrieve
84    ///
85    /// # Returns
86    /// * `Ok(T)` - The retrieved record if found
87    /// * `Err(Error::NotFound)` - If no record exists with the given key
88    ///
89    /// # Errors
90    /// Returns an error if:
91    /// * The record doesn't exist (`Error::NotFound`)
92    /// * There's a serialization/deserialization error
93    pub async fn get(&self, key: impl AsRef<str>) -> Result<T> {
94        let key = key.as_ref();
95
96        // Check local staged data from the transaction
97        if let Some(data) = self.local_data()? {
98            // If there's a tombstone in local data, the record is deleted
99            if data.is_tombstone(key) {
100                return Err(StoreError::KeyNotFound {
101                    store: self.name.clone(),
102                    key: key.to_string(),
103                }
104                .into());
105            }
106
107            // If there's a value in local data, return that
108            if let Some(map_value) = data.get(key)
109                && let Some(value) = map_value.as_text()
110            {
111                return serde_json::from_str(value).map_err(|e| {
112                    StoreError::DeserializationFailed {
113                        store: self.name.clone(),
114                        reason: format!("Failed to deserialize record for key '{key}': {e}"),
115                    }
116                    .into()
117                });
118            }
119        }
120
121        // Otherwise, get the full state from the backend
122        let data: Doc = self.txn.get_full_state(&self.name).await?;
123
124        // Get the value
125        match data.get(key).and_then(|v| v.as_text()) {
126            Some(value) => serde_json::from_str(value).map_err(|e| {
127                StoreError::DeserializationFailed {
128                    store: self.name.clone(),
129                    reason: format!("Failed to deserialize record for key '{key}': {e}"),
130                }
131                .into()
132            }),
133            None => Err(StoreError::KeyNotFound {
134                store: self.name.clone(),
135                key: key.to_string(),
136            }
137            .into()),
138        }
139    }
140
141    /// Inserts a new row into the Table and returns its generated primary key.
142    ///
143    /// This method:
144    /// 1. Generates a new UUIDv4 as the primary key
145    /// 2. Serializes the record
146    /// 3. Stores it in the local transaction
147    ///
148    /// # Arguments
149    /// * `row` - The record to insert
150    ///
151    /// # Returns
152    /// * `Ok(String)` - The generated UUID primary key as a string
153    ///
154    /// # Errors
155    /// Returns an error if there's a serialization error or the operation fails
156    pub async fn insert(&self, row: T) -> Result<String> {
157        // Generate a UUIDv4 for the primary key
158        let primary_key = Uuid::new_v4().to_string();
159
160        // Get current data from the transaction, or create new if not existing
161        let mut data = self.local_data()?.unwrap_or_default();
162
163        // Serialize the row
164        let serialized_row =
165            serde_json::to_string(&row).map_err(|e| StoreError::SerializationFailed {
166                store: self.name.clone(),
167                reason: format!("Failed to serialize record: {e}"),
168            })?;
169
170        // Update the data with the new row
171        data.set(primary_key.clone(), serialized_row);
172
173        // Serialize and update the transaction
174        let serialized_data =
175            serde_json::to_string(&data).map_err(|e| StoreError::SerializationFailed {
176                store: self.name.clone(),
177                reason: format!("Failed to serialize subtree data: {e}"),
178            })?;
179        self.txn
180            .update_subtree(&self.name, &serialized_data)
181            .await?;
182
183        // Return the primary key
184        Ok(primary_key)
185    }
186
187    /// Updates an existing row in the Table with a new value.
188    ///
189    /// This method completely replaces the existing record with the provided one.
190    /// If the record doesn't exist yet, it will be created with the given key.
191    ///
192    /// # Arguments
193    /// * `key` - The primary key of the record to update
194    /// * `row` - The new record value
195    ///
196    /// # Returns
197    /// * `Ok(())` - If the update was successful
198    ///
199    /// # Errors
200    /// Returns an error if there's a serialization error or the operation fails
201    pub async fn set(&self, key: impl AsRef<str>, row: T) -> Result<()> {
202        let key_str = key.as_ref();
203        // Get current data from the transaction, or create new if not existing
204        let mut data = self.local_data()?.unwrap_or_default();
205
206        // Serialize the row
207        let serialized_row =
208            serde_json::to_string(&row).map_err(|e| StoreError::SerializationFailed {
209                store: self.name.clone(),
210                reason: format!("Failed to serialize record for key '{key_str}': {e}"),
211            })?;
212
213        // Update the data
214        data.set(key_str, serialized_row);
215
216        // Serialize and update the transaction
217        let serialized_data =
218            serde_json::to_string(&data).map_err(|e| StoreError::SerializationFailed {
219                store: self.name.clone(),
220                reason: format!("Failed to serialize subtree data: {e}"),
221            })?;
222        self.txn.update_subtree(&self.name, &serialized_data).await
223    }
224
225    /// Deletes a row from the Table by its primary key.
226    ///
227    /// This method marks the record as deleted using CRDT tombstone semantics,
228    /// ensuring the deletion is properly synchronized across distributed nodes.
229    ///
230    /// # Arguments
231    /// * `key` - The primary key of the record to delete
232    ///
233    /// # Returns
234    /// * `Ok(true)` - If a record existed and was deleted
235    /// * `Ok(false)` - If no record existed with the given key
236    ///
237    /// # Errors
238    /// Returns an error if there's a serialization error or the operation fails
239    pub async fn delete(&self, key: impl AsRef<str>) -> Result<bool> {
240        let key_str = key.as_ref();
241
242        // Check if the record exists (checks both local and full state)
243        let exists = self.get(key_str).await.is_ok();
244
245        // If the record doesn't exist, return false early
246        if !exists {
247            return Ok(false);
248        }
249
250        // Get current data from the transaction, or create new if not existing
251        let mut data = self.local_data()?.unwrap_or_default();
252
253        // Remove the key (creates tombstone for CRDT semantics)
254        data.remove(key_str);
255
256        // Serialize and update the transaction
257        let serialized_data =
258            serde_json::to_string(&data).map_err(|e| StoreError::SerializationFailed {
259                store: self.name.clone(),
260                reason: format!("Failed to serialize subtree data: {e}"),
261            })?;
262        self.txn
263            .update_subtree(&self.name, &serialized_data)
264            .await?;
265
266        // Return true since we confirmed the record existed
267        Ok(true)
268    }
269
270    /// Searches for rows matching a predicate function.
271    ///
272    /// # Arguments
273    /// * `query` - A function that takes a reference to a record and returns a boolean
274    ///
275    /// # Returns
276    /// * `Ok(Vec<(String, T)>)` - A vector of (primary_key, record) pairs that match the predicate
277    ///
278    /// # Errors
279    /// Returns an error if there's a serialization error or the operation fails
280    pub async fn search(&self, query: impl Fn(&T) -> bool) -> Result<Vec<(String, T)>> {
281        // Get the full state combining local and backend data
282        let mut result = Vec::new();
283
284        // Get the full state from the backend
285        let mut data = self.txn.get_full_state::<Doc>(&self.name).await?;
286
287        // Merge with local staged data if any
288        if let Some(local) = self.local_data()? {
289            data = data.merge(&local)?;
290        }
291
292        // Iterate through all key-value pairs
293        for (key, map_value) in data.iter() {
294            // Skip non-text values
295            if let Some(value) = map_value.as_text() {
296                // Deserialize the row
297                let row: T =
298                    serde_json::from_str(value).map_err(|e| StoreError::DeserializationFailed {
299                        store: self.name.clone(),
300                        reason: format!(
301                            "Failed to deserialize record for key '{key}' during search: {e}"
302                        ),
303                    })?;
304
305                // Check if the row matches the query
306                if query(&row) {
307                    result.push((key.clone(), row));
308                }
309            }
310        }
311
312        Ok(result)
313    }
314}