Development Documentation (main branch) - For stable release docs, see docs.rs/eidetica
Skip to main content

eidetica/store/
table.rs

1use std::marker::PhantomData;
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7use crate::{
8    Result, Store, Transaction,
9    crdt::{CRDT, Doc},
10    store::{Registered, errors::StoreError},
11};
12
13/// A Row-based Store
14///
15/// `Table` provides a record-oriented storage abstraction for entries in a subtree,
16/// similar to a database table with automatic primary key generation.
17///
18/// # Features
19/// - Automatically generates UUIDv4 primary keys for new records
20/// - Provides CRUD operations (Create, Read, Update, Delete) for record-based data
21/// - Supports searching across all records with a predicate function
22///
23/// # Type Parameters
24/// - `T`: The record type to be stored, which must be serializable, deserializable, and cloneable
25///
26/// This abstraction simplifies working with collections of similarly structured data
27/// by handling the details of:
28/// - Primary key generation and management
29/// - Serialization/deserialization of records
30/// - Storage within the underlying CRDT (Doc)
31pub struct Table<T>
32where
33    T: Serialize + for<'de> Deserialize<'de> + Clone,
34{
35    name: String,
36    txn: Transaction,
37    phantom: PhantomData<T>,
38}
39
40impl<T> Registered for Table<T>
41where
42    T: Serialize + for<'de> Deserialize<'de> + Clone,
43{
44    fn type_id() -> &'static str {
45        "table:v0"
46    }
47}
48
49#[async_trait]
50impl<T> Store for Table<T>
51where
52    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
53{
54    type Data = Doc;
55
56    async fn load(txn: &Transaction, subtree_name: String) -> Result<Self> {
57        Ok(Self {
58            name: subtree_name,
59            txn: txn.clone(),
60            phantom: PhantomData,
61        })
62    }
63
64    fn name(&self) -> &str {
65        &self.name
66    }
67
68    fn transaction(&self) -> &Transaction {
69        &self.txn
70    }
71}
72
73impl<T> Table<T>
74where
75    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync,
76{
77    /// Retrieves a row from the Table by its primary key.
78    ///
79    /// This method first checks for the record in the current transaction's
80    /// local changes, and if not found, retrieves it from the persistent state.
81    ///
82    /// # Arguments
83    /// * `key` - The primary key (UUID string) of the record to retrieve
84    ///
85    /// # Returns
86    /// * `Ok(T)` - The retrieved record if found
87    /// * `Err(Error::NotFound)` - If no record exists with the given key
88    ///
89    /// # Errors
90    /// Returns an error if:
91    /// * The record doesn't exist (`Error::NotFound`)
92    /// * There's a serialization/deserialization error
93    pub async fn get(&self, key: impl AsRef<str>) -> Result<T> {
94        let key = key.as_ref();
95
96        // Check local staged data from the transaction
97        if let Some(data) = self.local_data()? {
98            // If there's a tombstone in local data, the record is deleted
99            if data.is_tombstone(key) {
100                return Err(StoreError::KeyNotFound {
101                    store: self.name.clone(),
102                    key: key.to_string(),
103                }
104                .into());
105            }
106
107            // If there's a value in local data, return that
108            if let Some(map_value) = data.get(key)
109                && let Some(value) = map_value.as_text()
110            {
111                return serde_json::from_str(value).map_err(|e| {
112                    StoreError::DeserializationFailed {
113                        store: self.name.clone(),
114                        reason: format!("Failed to deserialize record for key '{key}': {e}"),
115                    }
116                    .into()
117                });
118            }
119        }
120
121        // Otherwise, get the full state from the backend
122        let data: Doc = self.txn.get_full_state(&self.name).await?;
123
124        // Get the value
125        match data.get(key).and_then(|v| v.as_text()) {
126            Some(value) => serde_json::from_str(value).map_err(|e| {
127                StoreError::DeserializationFailed {
128                    store: self.name.clone(),
129                    reason: format!("Failed to deserialize record for key '{key}': {e}"),
130                }
131                .into()
132            }),
133            None => Err(StoreError::KeyNotFound {
134                store: self.name.clone(),
135                key: key.to_string(),
136            }
137            .into()),
138        }
139    }
140
141    /// Inserts a new row into the Table and returns its generated primary key.
142    ///
143    /// This method:
144    /// 1. Generates a new UUIDv4 as the primary key
145    /// 2. Serializes the record
146    /// 3. Stores it in the local transaction
147    ///
148    /// # Arguments
149    /// * `row` - The record to insert
150    ///
151    /// # Returns
152    /// * `Ok(String)` - The generated UUID primary key as a string
153    ///
154    /// # Errors
155    /// Returns an error if there's a serialization error or the operation fails
156    pub async fn insert(&self, row: T) -> Result<String> {
157        // Generate a UUIDv4 for the primary key
158        let primary_key = Uuid::new_v4().to_string();
159
160        // Get current data from the transaction, or create new if not existing
161        let mut data = self.local_data()?.unwrap_or_default();
162
163        // Serialize the row
164        let serialized_row =
165            serde_json::to_string(&row).map_err(|e| StoreError::SerializationFailed {
166                store: self.name.clone(),
167                reason: format!("Failed to serialize record: {e}"),
168            })?;
169
170        // Update the data with the new row
171        data.set(primary_key.clone(), serialized_row);
172
173        // Serialize and update the transaction
174        let serialized_data =
175            serde_json::to_vec(&data).map_err(|e| StoreError::SerializationFailed {
176                store: self.name.clone(),
177                reason: format!("Failed to serialize subtree data: {e}"),
178            })?;
179        self.txn.update_subtree(&self.name, serialized_data).await?;
180
181        // Return the primary key
182        Ok(primary_key)
183    }
184
185    /// Updates an existing row in the Table with a new value.
186    ///
187    /// This method completely replaces the existing record with the provided one.
188    /// If the record doesn't exist yet, it will be created with the given key.
189    ///
190    /// # Arguments
191    /// * `key` - The primary key of the record to update
192    /// * `row` - The new record value
193    ///
194    /// # Returns
195    /// * `Ok(())` - If the update was successful
196    ///
197    /// # Errors
198    /// Returns an error if there's a serialization error or the operation fails
199    pub async fn set(&self, key: impl AsRef<str>, row: T) -> Result<()> {
200        let key_str = key.as_ref();
201        // Get current data from the transaction, or create new if not existing
202        let mut data = self.local_data()?.unwrap_or_default();
203
204        // Serialize the row
205        let serialized_row =
206            serde_json::to_string(&row).map_err(|e| StoreError::SerializationFailed {
207                store: self.name.clone(),
208                reason: format!("Failed to serialize record for key '{key_str}': {e}"),
209            })?;
210
211        // Update the data
212        data.set(key_str, serialized_row);
213
214        // Serialize and update the transaction
215        let serialized_data =
216            serde_json::to_vec(&data).map_err(|e| StoreError::SerializationFailed {
217                store: self.name.clone(),
218                reason: format!("Failed to serialize subtree data: {e}"),
219            })?;
220        self.txn.update_subtree(&self.name, serialized_data).await
221    }
222
223    /// Deletes a row from the Table by its primary key.
224    ///
225    /// This method marks the record as deleted using CRDT tombstone semantics,
226    /// ensuring the deletion is properly synchronized across distributed nodes.
227    ///
228    /// # Arguments
229    /// * `key` - The primary key of the record to delete
230    ///
231    /// # Returns
232    /// * `Ok(true)` - If a record existed and was deleted
233    /// * `Ok(false)` - If no record existed with the given key
234    ///
235    /// # Errors
236    /// Returns an error if there's a serialization error or the operation fails
237    pub async fn delete(&self, key: impl AsRef<str>) -> Result<bool> {
238        let key_str = key.as_ref();
239
240        // Check if the record exists (checks both local and full state)
241        let exists = self.get(key_str).await.is_ok();
242
243        // If the record doesn't exist, return false early
244        if !exists {
245            return Ok(false);
246        }
247
248        // Get current data from the transaction, or create new if not existing
249        let mut data = self.local_data()?.unwrap_or_default();
250
251        // Remove the key (creates tombstone for CRDT semantics)
252        data.remove(key_str);
253
254        // Serialize and update the transaction
255        let serialized_data =
256            serde_json::to_vec(&data).map_err(|e| StoreError::SerializationFailed {
257                store: self.name.clone(),
258                reason: format!("Failed to serialize subtree data: {e}"),
259            })?;
260        self.txn.update_subtree(&self.name, serialized_data).await?;
261
262        // Return true since we confirmed the record existed
263        Ok(true)
264    }
265
266    /// Searches for rows matching a predicate function.
267    ///
268    /// # Arguments
269    /// * `query` - A function that takes a reference to a record and returns a boolean
270    ///
271    /// # Returns
272    /// * `Ok(Vec<(String, T)>)` - A vector of (primary_key, record) pairs that match the predicate
273    ///
274    /// # Errors
275    /// Returns an error if there's a serialization error or the operation fails
276    pub async fn search(&self, query: impl Fn(&T) -> bool) -> Result<Vec<(String, T)>> {
277        // Get the full state combining local and backend data
278        let mut result = Vec::new();
279
280        // Get the full state from the backend
281        let mut data = self.txn.get_full_state::<Doc>(&self.name).await?;
282
283        // Merge with local staged data if any
284        if let Some(local) = self.local_data()? {
285            data = data.merge(&local)?;
286        }
287
288        // Iterate through all key-value pairs
289        for (key, map_value) in data.iter() {
290            // Skip non-text values
291            if let Some(value) = map_value.as_text() {
292                // Deserialize the row
293                let row: T =
294                    serde_json::from_str(value).map_err(|e| StoreError::DeserializationFailed {
295                        store: self.name.clone(),
296                        reason: format!(
297                            "Failed to deserialize record for key '{key}' during search: {e}"
298                        ),
299                    })?;
300
301                // Check if the row matches the query
302                if query(&row) {
303                    result.push((key.clone(), row));
304                }
305            }
306        }
307
308        Ok(result)
309    }
310}