How to use RocksDb to make many concurrent writes and reads without database loosing data between hosting desktop app launches?

769 Views Asked by At

I'm building a product, Zet Universe, that enables users to work with data coming from different sources in one place. It's a desktop app for Windows, it has multiple plugins that obtain data from external data sources, and it has a built-in extensible pipeline that runs multiple readers and writers for processing and enhancing incoming data. Data is JSON structures that represent entities at different times. Essentially system records all of the changes including user activities and activities from external sources as separate copies of these entities. So data is never deleted. In case of deletion it's simply marked as deleted. The app on load obtains the entire list of keys forming an index and then builds a spatio-temporal index out of them. Then the app runs a spatio-temporal query over the index to then obtain only those items that belong to the current moment in time.

The problem is that it's hard to find a way to safely store and retrieve such data.

Context: Over the last almost decade I've went through multiple iterations of approaching this problem. I've started with SQL Compact (too limited storage capacity), then used SQL Server (too slow, too heavy), then used a file system (anti-virus hate this pattern of multiple writers and reads), then different key-value databases (and even Compound Files which internally are like FAT). In 2016 I've moved to Redis and used it since but it has a problem where once the app is closed reds-server.exe takes a long time to shutdown.

Now I'm trying to move back to embedded database and use RocksDb. However it looses some of the data between app launches. I use Wal, use concurrent writes etc. but the app looses data between launches.

I do realize that maybe RocksDb isn't the right embedded tool but I'm at loss as of what to use otherwise.

I've used DbOptions to use Wal, concurrent writes, increase parallelism.

Update:

There's a StorageContext class instance that gets an instance of IStorageProvider that encapsulates work with RocksDb in this case.

When RocksDb's IStorageProvider implementation is instantiated, it creates a RW instance of RocksDb DB:

// basic options
var options = new DbOptions()
.SetCreateIfMissing(true)
.SetWalDir(this.StoreName) // using WAL
.SetWalRecoveryMode(Recovery.AbsoluteConsistency) // setting recovery mode to Absolute Consistency
.SetAllowConcurrentMemtableWrite(true) // concurrent writers
.SetEnableWriteThreadAdaptiveYield(true) // required for concurrent writers, see http://smalldatum.blogspot.com/2016/02/concurrent-inserts-and-rocksdb-memtable.html
.IncreaseParallelism(cpus / 2) // only half of all available threads
.OptimizeLevelStyleCompaction(0);

// opening db
_readWriteRocksDbInstance = RocksDb.Open(options, this.StoreName);

Then RocksDb's IStorageProvider implementation tries to read all keys:

                    using (var it = _readWriteRocksDbInstance.NewIterator())
                    {
                        it.SeekToFirst();

                        Log.Info("Iterating all keys..");
                        while (it.Valid())
                        {
                            String key = it.StringKey();

                            ProcessKey(key);

                            it.Next();
                            // done
                        }
                    }

After that, a spatio-temporal index (Octree) is created outside of RocksDb IStorageProvider implementation, the system makes a query into it, and then passes the keys to RocksDb's IStorageProvider implementation.

Originally, I've ran a check for each key in StorageRecordExists(string storageKey) <-- but this was failing every single time, so in the end I've stopped using this check:

            var result = false;

            var charSpan = storageKey.AsSpan();
            var byteSpan = MemoryMarshal.Cast<char, byte>(charSpan);

            try
            {
                result =_readWriteRocksDbInstance.HasKey(byteSpan);
            }
            catch (Exception ex)
            {
                Log.Exception(ex);
            }

            return result;

Then it tries to retrieve it:

            //if (StorageRecordExists(storageKey))
            //{
                try
                {
                    sJson = _readWriteRocksDbInstance.Get(storageKey);
                    result = true;
                }
                catch (Exception ex)
                {
                    Log.Exception(ex);
                }
            //}
            return result;

Data is put back this way:

        private bool InternalSaveStorageRecord(string recordName, string sJson)
        {
            bool result = false;

            try
            {
                _readWriteRocksDbInstance.Put(recordName, sJson);
            }
            catch (Exception ex)
            {
                Log.Exception(ex);
            }

            return result;
        }

Finally, when the app is being closed by the user, the system is doing this:

It saves data:

        private void InternalSave()
        {
            var options = new FlushOptions();
            
            _readWriteRocksDbInstance.Flush(options);
        }

And then it disposes the RocksDb ```db``:

                        // disposing
                        this._readWriteRocksDbInstance.Dispose();

Oh, and I checked, there are no writes done to the DB when the system is being shut down. I correctly stop the pipeline, stop UX interaction with the Model, and there are no calls to the Storage during the final saving and disposing phase.

0

There are 0 best solutions below