https://aleics.me/posts/feed.xml

Storage for your search engine

2025-05-15

You can find the latest state of delta-search in the repository.

In the previous chapter, we've explored how to build our own database index using B-Trees and Roaring Bitmaps. With our implementation, we can apply custom filtering and sorting using only the indexed data for fast data retrieval.

In this chapter, we'll take a look at LMDB, an embedded transactional key-value store with full ACID semantics, and how to integrate it with our index implementation. Also, we'll implement the first steps towards our very own database, called delta-search.

LMDB

LMDB is a fast and lightweight transactional key-value store. Internally, it uses B+ tree, copy-on-write semantics, and multiversion concurrency control to achieve high performance and scalability. LMDB supports multi-threading and has great performance for read operations. It allows multiple readers at the time, and a single writer. However, the writer does not block any reader, making it an ideal candidate for storing data in a high-available database or search engine.

LMDB is overall a great candidate for the storage layer. Also, thanks to heed, a library built by meilisearch, we have great support for Rust 🦀 and it supports serde-compatible types.

Setup

The README of heed is a great place to get started. We can start building an EntityStorage instance, which contains all the logic related to the storage layer of our database. As we want to have separate entities stored in our database, each entity is stored in a separate entry.

First of all, we'll initialize the LMDB environment and the different internal databases:

struct EntityStorage {
    name: String,
    end: Env,
    indices: Database<Str, SerdeBincode<Index>>,
    data: Database<BEU64, SerdeBincode<DataItem>>
}

impl EntityStorage {
    fn init(name: &str) -> Result<Self, StorageError> {
        let file_name = format!("{}.mdb", name);
        let path = Path::new(DB_FOLDER).join(file_name);

        // Initialize the environment of our LMDB instance
        let env = unsafe {
            EnvOpenOptions::new()
                .map_size(100 * 1024 * 1024 * 1024) // 100 GB
                .max_dbs(4)
                .open(path)?
        };

        // Create databases for our data and indices, if none exists yet
        let data = env
            .create_database(&mut txn, Some(DATA_DB_NAME))
            .map_err(|_| StorageError::CreateDatabase(DATA_DB_NAME))?;

        let indices = env
            .create_database(&mut txn, Some(INDICES_DB_NAME))
            .map_err(|_| StorageError::CreateDatabase(INDICES_DB_NAME))?;

        Ok(EntityStorage {
            id: name.to_string(),
            env,
            indices,
            data,
        })
    }
}

As shown above, creating the first database in LMDB is very straight-forward. We only need to define the limits of our storage in disk, and the amount of databases that are needed.

Write

For identifying each data item, the DataItem structure includes an id property. Each entry in the database uses this property as a key.

type DataItemId = u64;

struct DataItem {
    id: DataItemId,
    fields: BTreeMap<String, FieldValue>
}

Then, adding an item is as easy as:

impl EntityStorage {
    fn add(&self, items: &[DataItem]) -> Result<(), StorageError> {
        // Start a transaction to store all the items at once
        let mut txn = self.env.write_txn()?;

        for item in items {
            self.data.put(&mut txn, &item.id, item)?;
        }

        // Commit transaction
        txn.commit()?;

        Ok(())
    }
}

However, when adding a new data entry, we also need to update our indices, so that they both reflect the same state:

impl EntityStorage {
    fn add(&self, items: &[DataItem]) -> Result<(), StorageError> {
        // Start a transaction to store all the items at once
        let mut txn = self.env.write_txn()?;

        let indices = self.indices.iter(&txn)?;

        // Iterate over each data item that needs to be added
        for item in items {
            // Read item ID and determine position
            let position = id_to_position(item.id);

            self.data.put(&mut txn, &item.id, item)?;

            // Iterate over field and update the index, if present
            for (field, value) in item.fields {
                let Some(index) = self.indices.get(txn, field)? else {
                    continue
                }

                index.put(value, position)?;
            }
        }

        // Commit transaction
        txn.commit()?;

        Ok(())
    }
}

This is a simplified version, as it does not cover certain edge cases and it's not well optimized. Check the final version in the repository.

Since we are using a single transaction, in case an error occurs during the multiple operations, we won't end up in a broken state.

In a similar way, deleting a data entry from our database needs to also refresh the index state:

impl EntityStorage {
    fn remove(&self, ids: &[DataItemId]) -> Result<(), StorageError> {
        let mut txn = self.env.write_txn()?;
        let mut positions_to_delete = Vec::with_capacity(ids.len());

        for id in ids {
            // Remove item from data and ID to position mapping
            let present = self.data.delete(&mut txn, id)?;
            if !present {
                continue;
            }

            // Categorize the item's position to be removed
            positions_to_delete.push(id_to_position(*id));
        }

        // Iterate over the indices and modify their value so that
        // the item's position is removed from the index.
        let mut entries = self.indices.iter_mut(&mut txn)?;

        while let Some(entry) = entries.next() {
            let (key, mut index) = entry
                .map(|(key, value)| (key.to_string(), value))?;

            for position in &positions_to_delete {
                index.remove_item(*position);
            }

            // It's unsafe to keep a reference of a value from this database
            // while modifying it
            unsafe { entries.put_current(&key, &index)? };
        }

        // Drop the mutable iterator so the mutable reference to the database
        // is dropped from here on.
        drop(entries);

        txn.commit()?;

        Ok(())
    }
}

It turned out to be a bit long, but at the end is the same principle: adapt the data and the associated index.

Read

Reading data is quite simple. Given a data ID, read that entry from the data database. To reduce the amount of read transactions needed, we'd implement our read operation for a list of data IDs. Thus, when querying our data results we'd need a single transaction.

fn read_multiple<'a, T>(
    &self,
    ids: T,
) -> Result<Vec<DataItem>, StorageError>
where
    T: Iterator<Item = &'a DataItemId>,
{
    let txn = self.env.read_txn().unwrap();

    let mut data = Vec::new();

    for id in ids {
        let Some(mut item) = self.data.get(&txn, id)? else {
            continue;
        };

        data.push(item);
    }

    Ok(data)
}

Querying

In the previous chapter, we've explored how to implement different inequality operators for our Index. We can then implement our own filter and sorting operators.

For the first iteration, we won't use any SQL query for our query expression. Instead, we'll use actual types.

Filter

A query execution is composed by a filter expression, sorting, pagination, among others. The most important feature of a database is to filter for the data the user is interested in. Thus, it needs to run fast and reliable. A filter could be expressed using types as follows:

/// A composite filter allows to combine multiple filter expressions using
/// logical conjunction.
enum CompositeFilter {
    And(Vec<CompositeFilter>),
    Or(Vec<CompositeFilter>),
    Not(Box<CompositeFilter>),
    Single(Filter),
}

/// A single filter expression with a `name` identifying the field to match
/// the filter against, and the filter operation.
struct Filter {
    name: String,
    operation: FilterOperation,
}

/// A filter operation collects all the available filter operations.
enum FilterOperation {
    Eq(FieldValue),
    Between(FieldValue, FieldValue),
    GreaterThan(FieldValue),
    GreaterOrEqual(FieldValue),
    LessThan(FieldValue),
    LessThanOrEqual(FieldValue),
}

Then, given a CompositeFilter and a set of indices, we could match the filter expression against the index. The result is a bitmap defining which elements in our data set are a positive hit, and which aren't.

struct EntityIndices {
    /// Indices available associated by data's field name
    field_indices: HashMap<String, Index>,

    /// Bitmap including all items' positions
    all: RoaringBitmap,
}

fn execute_filter(
    indices: EntityIndices,
    filter: &CompositeFilter
) -> RoaringBitmap {
    match filter {
        CompositeFilter::And(filters) => {
            let mut result: Option<RoaringBitmap> = None;

            // Iterate over all the filters and aggregate the result using AND
            for filter in filters {
                let inner = execute_filter(filter);
                let next = if let Some(current) = result {
                    current.and(inner)
                } else {
                    inner
                };

                result = Some(next);
            }

            result.unwrap_or_else(RoaringBitmap::new)
        }
        CompositeFilter::Or(filters) => {
            // Same as with AND above, but using OR
        }
        CompositeFilter::Not(filter) => {
            let result = execute_filter(filter);

            // Negating the filter result against all the elements
            &indices.all - result.hits
        }
        CompositeFilter::Single(filter) => {
            let Some(index) = indices.field_indices.get(&filter.name) else {
                return RoaringBitmap::new();
            };

            // Apply the filter against the index
            index.filter(&filter.operation)
        }
    }
}

Matching the filter operation against the index is implemented similar to what was discussed in the previous chapter.

Sort

Our Index implementation is built on top of a B-Tree, meaning it can be used to sort the filter hits, so we get as a result a collection of sorted IDs identifying each data item.

fn execute_sort(
    indices: EntityIndices,
    filter_result: &RoaringBitmap,
    sort: &Option<Sort>
) -> Vec<u32> {
    // If no sort is available or the sort criteria does not have any index,
    // the filter hits won't be sorted
    let Some(index) = sort.map(|sort| indices.field_indices.get(sort.by)) else {
        return filter_result.hits.iter().map(position_to_id).collect()
    };

    index.sort(items, &sort.direction)
}

The index sort implementation would be similar to the one presented in the previous chapter.

Execution

And so, the complete query execution based on a filter expression, a sort criteria and a pagination limit can be implemented as follows:

struct QueryExecution {
    filter: Option<CompositeFilter>,
    sort: Option<Sort>,
    pagination: Option<Pagination>,
}

impl QueryExecution {
    fn run(self, storage: &EntityStorage) -> Vec<DataItem> {
        // Read indices from the LMDB storage
        let indices = storage.read_current_indices();

        // Apply filter given the indices
        let filter_result = if let Some(filter) = self.filter.as_ref() {
            execute_filter(indices, filter)
        } else {
            // If no filter is defined, all the data items are hits
            indices.all.clone()
        };

        // Sort filter results into a vector of data IDs
        let sorted_ids = sort(indices, filter_result, &self.sort);

        // Apply pagination using an offset approach
        let pagination = self
            .pagination
            .unwrap_or(Pagination::new(0, sorted_ids.len()));

        let paginated_ids = sorted_ids
            .iter()
            .skip(pagination.start)
            .take(pagination.size);

        // Read from the database the data of the paginated result
        storage.read_multiple(paginated_ids, &indices.indices)
    }
}

Benchmarks

After our implementation is fully implemented, we've made sure it works as expected by writing unit and integration tests, we can measure how fast running queries against our own database engine is. For that, we'll generate dumb data and store it in LMDB before running the benchmark. Afterwards, we'll define different query scenarios and so we can analyze which steps are more expensive.

const COUNT: usize = 100000;
const PAGE_SIZE: usize = 500;

lazy_static! {
    static ref PAGINATION: Pagination = Pagination::new(0, PAGE_SIZE);
    static ref NAME: String = "players_bench".to_string();
    static ref PLAYERS: Vec<DataItem> = create_random_players(COUNT as u64);
    static ref ENGINE: Engine = Engine::with_entities(
        vec![create_players_storage(&NAME, PLAYERS.to_vec())]
    );
}

#[bench]
fn bench_filter_numeric_eq(b: &mut Bencher) {
    b.iter(move || {
        // Filter all players with score = 10.0
        let query = QueryExecution::new()
            .with_filter(
                CompositeFilter::eq("score", FieldValue::dec(10.0))
            )
            .with_pagination(*PAGINATION);

        ENGINE.query(&NAME, query).unwrap();
    });
}

#[bench]
fn bench_filter_numeric_between(b: &mut Bencher) {
    b.iter(move || {
        // Filter all players with score >= 0.0 AND score <= 100.0
        let query = QueryExecution::new()
            .with_filter(
                CompositeFilter::between(
                    "score",
                    FieldValue::dec(0.0),
                    FieldValue::dec(100.0)
                )
            )
            .with_pagination(*PAGINATION);

        ENGINE.query(&NAME, query).unwrap();
    });
}

#[bench]
fn bench_filter_or(b: &mut Bencher) {
    b.iter(move || {
        // Filter all players with
        // sport = "Basketball" OR (score >= 0.0 AND score <= 100.0)
        let query = QueryExecution::new()
            .with_filter(
                CompositeFilter::or(vec![
                    CompositeFilter::eq(
                        "sport",
                        FieldValue::String(Sport::Basketball.as_string())
                    ),
                    CompositeFilter::between(
                        "score",
                        FieldValue::dec(0.0),
                        FieldValue::dec(100.0)
                    ),
                ])
            )
            .with_pagination(*PAGINATION);

        ENGINE.query(&NAME, query).unwrap();
    });
}

#[bench]
fn bench_sort(b: &mut Bencher) {
    b.iter(move || {
        // Sort all players by score
        let sort = Sort::new("score").with_direction(SortDirection::DESC);
        let query = QueryExecution::new()
            .with_sort(sort)
            .with_pagination(*PAGINATION);

        ENGINE.query(&NAME, query).unwrap();
    });
}

The results in my computer are as follows:

running 7 tests
test bench_filter_numeric_between  ... bench:   6,308,875.00 ns/iter (+/- 162,387.71)
test bench_filter_numeric_eq       ... bench:   6,297,158.40 ns/iter (+/- 497,664.54)
test bench_filter_or               ... bench:   6,344,937.50 ns/iter (+/- 355,226.66)
test bench_sort                    ... bench:  10,367,891.70 ns/iter (+/- 615,670.41)

So, it takes 5-10ms to fetch the first page of 500 elements from a total of 100k data items. For starters, it's quite a good result!

You can use the wonderful flamegraph to analyze the performance and see for yourself which parts of the implementation contribute to the overall performance.

Conclusions

In this chapter, we've taken a look at how to implement a simple but efficient storage solution using LMDB and heed. We've seen how to store read and store data, as well as, querying results by defining custom filters and sorting criteria.