How to continuously save locally big data from tick-by-tick streaming without overloading the RAM?

70 Views Asked by At

I want to save locally all the tick-by-tick data from a broker without overloading the RAM. The streaming starts on Sunday and ends the first hours of Saturday. Anytime during the week I want to be able to load the file from the hard disk and analyze the "so far" updates.

Sometimes it can have more than 1.000 or even 10.000 ticks during a second. I want to subscribe to 40 instruments (for example EURUSD is one).

So what is the best way in python to do so?

This is what I have tried:

Either pandas dataframe:

df = pd.DataFrame(columns=['Time_sent', 'Time_received', 'Instrument', 'Bid', 'Ofr'])

And then in any update, I concatenate the new row:

df = pd.concat([df, pd.DataFrame([[datetime.fromtimestamp(int(update.get_time()) /1000),
                                                 datetime.now(),
                                                 update.get_instrument(),
                                                 update.get_bid_value(),
                                                 update.get_ofr_value()]], columns=df.columns)], ignore_index=True)

And finally, I update the ticks:

df.to_pickle('ticks.pkl')

But it has two disadvantages: It overloads the RAM unnecessarily and in any update it overwrites the ticks.pkl file, so it needs much time if it is large and I cannot so easily access it any time I desire.

Either json:

I define a function:

def append_record(record):
    with open('ticks.json', 'a') as f:
        json.dump(record, f)
        f.write(os.linesep)

And then, any time I put an update locally:

append_record({'Time_sent': update.get_time(),
                       'Time_received': datetime.now().timestamp() * 1000,
                       'Instrument': update.get_instrument(),
                       'Bid': get_bid_value(),
                       'Ofr': get_ofr_value()})

But it has two disadvantages: The file (ticks.json) is much larger than the ticks.pkl and I don't think it is the best and most up-to-date python method.

Any advice? Maybe I should use the SQL database (SQLite) or h5? What is the best way to do what I want? In any case I should not reload/reread the saved file, append it and then again save it. It should be something smarter.

2

There are 2 best solutions below

0
Techwizard On

A database setup would be ideal for this scenario: SQL, SQLite, etc. However, if database storage is not viable then a different file format like CSV may be better, this can be opened in an append only mode, saving the entire file from being rewritten every tick. CSV has options for appending data efficiently, and can be imported to popular spread sheeting software for viewing.

For simplicity, CSV seems to be the easiest to implement and use. However, for scalability SQLite is a far better choice, allowing more complex querying and optimised data storage and retrieval for large data sets.

0
BitsAreNumbersToo On

As mentioned in comments by @AKX, this is a good problem for sqlite, which is luckily built-in to Python.

Your concerns you mention is memory and performance. During testing, this solution I propose here used 22.5 MB of memory, and had a max throughput in the neighborhood of 150K records / second which should meet your needs. If it does not, you will likely have difficulty solving this problem in Python at all.

To demonstrate the solution, I mocked up a class that matched the signature I could find in your example code, and created a function that could return those tick updates. It should be a drop-in replacement for your actual function call, or will require minimal adjustment.

import numpy as np
import time
import datetime
import sqlite3


class InstrumentUpdate:
    # This is a placeholder class that has a similar signature
    #   to the one from the question and should be a drop-in
    #   replacement
    def __init__(self, instrument):
        self.instrument: str = instrument
        self.bid_value: float = np.random.random()
        self.ofr_value: float = np.random.random()
        self.time: datetime.datetime = datetime.datetime.now()
    def get_instrument(self) -> str:
        return self.instrument
    def get_bid_value(self) -> float:
        return self.bid_value
    def get_ofr_value(self) -> float:
        return self.ofr_value
    def get_time(self) -> datetime.datetime:
        return self.time
    def __repr__(self):
        outstr = ''
        outstr += (
            f'<InstrumentUpdate '
            f'instrument="{self.instrument}" '
            f'bid={self.bid_value:4.3f} '
            f'ofr={self.ofr_value:4.3f} '
            f'time={self.time}>'
        )
        return outstr


class SEnergeiakosRecorder:
    # The custom data recording class

    # A name for the table in the linked database
    table_name = 'UpdateTable'

    def __init__(self, outpath: str):
        # Create a new sqlite database connection
        #   this also creates a new one if necessary
        self.conn = sqlite3.connect(outpath)
        # Create a cursor to that database
        self.cur = self.conn.cursor()

        # If the table is not already in the database,
        #   add it with the correct fields
        tables = self.cur.execute('SELECT name FROM sqlite_master')
        if not (SEnergeiakosRecorder.table_name,) in tables.fetchall():
            self.cur.execute(
                f'CREATE TABLE '
                f'{SEnergeiakosRecorder.table_name}'
                f'(instrument, bid, ofr, time_req, time_rec)'
            )
    def append_record(self, record: InstrumentUpdate):
        # This function allows you to append one record at a time
        #   This is much slower than appending hundreds or thousands at a time
        self.cur.execute(
            f'INSERT INTO {SEnergeiakosRecorder.table_name} '
            f'VALUES(?, ?, ?, ?, ?)',
            (
                record.get_instrument(),
                record.get_bid_value(),
                record.get_ofr_value(),
                record.get_time(),
                datetime.datetime.now()
            )
        )
        # Commit the change to update the database
        self.conn.commit()
    def append_records(self, records: list[InstrumentUpdate]):
        # This function allows you to append many records at a time
        #   This can be much faster than iterating over each one
        self.cur.executemany(
            f'INSERT INTO {SEnergeiakosRecorder.table_name} '
            f'VALUES(?, ?, ?, ?, ?)',
            [
                (
                    record.get_instrument(),
                    record.get_bid_value(),
                    record.get_ofr_value(),
                    record.get_time(),
                    datetime.datetime.now()
                ) for record in records
            ]
        )
        # Commit the change to update the database
        self.conn.commit()
    def get_records(self):
        return self.cur.execute(
            f'SELECT * FROM {SEnergeiakosRecorder.table_name}'
        ).fetchall()


def get_instrument_update(instrument):
    # A dummy function that returns an object matching the one
    #   shown in the example
    # This function would be replaced with whatever you real data
    #   source is
    update = InstrumentUpdate(instrument)
    return update

def _gt(s: float = 0.0) -> float:
    # Convenience function for getting time elapsed
    return time.perf_counter() - s

def main_test(per_loop_sleep: float = None, total_upates: int = int(1e4)):
    # The fname of the sqlite database
    dbname = 'test.sqlite3'
    # Some various names to iterate over to simulate having a variety of data
    instruments = ['abc', 'cde', 'efg', 'ghi', 'ijk']
    # Link into the database (or create it if it doesn't exist)
    ser = SEnergeiakosRecorder(dbname)
    # Record how many ticks we get
    ticks = 0
    # Record the start time for profiling speed
    start_time = _gt()
    # Create a list that updates to the database will be temporarily stored in
    updates = []
    # Wrap in a try-except block so that we can
    #   ctrl-c out if necessary without breaking anything
    try:
        while True:
            # Sleep to similar lower update rates
            if per_loop_sleep:
                time.sleep(per_loop_sleep)
            # Get an update for each instrument using the dummy function
            for instrument in instruments:
                update = get_instrument_update(instrument)
                # Add the collected updates to the database buffer
                updates.append(update)
            # By only appending every so often, we greatly decrease
            #   the database IO overhead
            if len(updates) >= 5000:
                ser.append_records(updates)
                updates = []
            ticks += len(instruments)
            # If we got enough, stop
            if ticks >= total_upates:
                break

    except KeyboardInterrupt:
        print('exited prematurely')
        pass
    # If there is anything left in the buffer, add it
    if updates:
        ser.append_records(updates)
    # Print run stats
    print(f'Total updates: {total_upates: 7} - Delay: {per_loop_sleep or 0.0:6.4f}')
    print(f'Run time: {_gt(start_time):.1f} sec')
    print(f'Run rate: {ticks / _gt(start_time):.0f} records / sec')
    print()

def get_data_test():
    # Link into the same database
    ser = SEnergeiakosRecorder('test.sqlite3')
    # print it somewhat formatted to see what's in the table
    print('  ' + '\n  '.join([str(x) for x in ser.get_records()]))
    print()

if __name__ == '__main__':
    # Verify we can add data
    main_test(total_upates=1)
    # Verify we can get the data back
    get_data_test()

    # Test near 10K updates / sec (this is about the minimum sleep anyways)
    main_test(0.0001, int(1e4))
    # Test max update rate
    main_test(None, int(1e6))

When I run that on my computer I get the following output (the datetime stamps are all the same because the library doesn't update as fast as the loop runs, but they do change over time if it runs long enough):

Total updates:       1 - Delay: 0.0000
Run time: 0.0 sec
Run rate: 1037 records / sec

  ('abc', 0.3580169831137039, 0.1706733746466279, '2024-03-17 11:06:17.783886', '2024-03-17 11:06:17.783886')
  ('cde', 0.4308290581033847, 0.5299851442933575, '2024-03-17 11:06:17.783886', '2024-03-17 11:06:17.783886')
  ('efg', 0.5717282162497984, 0.3937017895708135, '2024-03-17 11:06:17.783886', '2024-03-17 11:06:17.783886')
  ('ghi', 0.10451352945842662, 0.8847603628697307, '2024-03-17 11:06:17.783886', '2024-03-17 11:06:17.783886')
  ('ijk', 0.5424993028653913, 0.8936337862898509, '2024-03-17 11:06:17.783886', '2024-03-17 11:06:17.783886')

Total updates:   10000 - Delay: 0.0001
Run time: 1.1 sec
Run rate: 9513 records / sec

Total updates:  1000000 - Delay: 0.0000
Run time: 6.2 sec
Run rate: 162452 records / sec

Tested with Python 3.11.4 on Windows 10 with i9-10900K and SSD. Lower Python, processor or disk will likely lower the performance some.

You can also adjust this with relatively little effort to start a new sqlite database for each day, so that you aren't just making huge databases, and make it a little easier to work with later, or any of a number of other small changes.

Let me know if you have any questions.