Storing ranged timeseries data in Postgres

544 Views Asked by At

I need to store netflow data in Postgresql. This is data about network traffic. Each record contains the following:

  • Connection start time
  • Connection end time
  • Total data transferred
  • Source/destination IPs/ASNs
  • (There is a bunch more, but that is enough for the purpose of this question).

My question is this: How can I store this data so I can efficiently calculate data transfer rates for the past X days/hours? For example, I may want to draw a chart of all traffic to Netflix's ASN over the last 7 days, with hourly resolution.

The difference between the connection start & end times could be milliseconds, or could be over an hour.


My first-pass at this would be to store the connection in a TSTZRANGE field with a GiST index. Then, to query the data for hourly traffic over the last 7 days:

  1. Use a CTE to generate a sequence of hourly time buckets
  2. Look for any TSTZRANGEs which overlap each bucket
  3. Calculate the duration of the overlap
  4. Calculate the data rate for the record in bytes per second
  5. Do duration * bytes per second to get total data
  6. Group it all on the bucket, summing the total data values

However, that sounds like a lot of heavy lifting. Can anyone think of a better option?

2

There are 2 best solutions below

0
On BEST ANSWER

After looking into this some more, I think the real answer is that there isn't an out-of-the-box way to achieve this in a performant manner. Especially as the data volume scales up. Ultimately it is just going to be slow to aggregate many thousands of rows, because that is simply a lot of data access.

Instead I have gone a different route. I am using a Postgresql trigger on the table which stores the raw flows (traffic_flow). Every time a record is inserted into traffic_flow, the trigger will then upsert the new data into separate aggregation tables for daily, hourly, and minutely data.


Here is my experimental implementation in case it is useful to someone. This could be improved to also handle updates and deletes.

create or replace function update_aggregated_traffic(NEW RECORD, table_name TEXT, interval_name text, store_customer BOOLEAN)
    returns void
    language plpgsql
as
$body$
declare
    aggregate_interval interval;
    customer_ip_ inet;
begin
    -- Update the data aggregated traffic data given the insertion of a new flow.
    -- A flow is the data about a single connection (start time, stop time, total
    -- bytes/packets). This function essentially rasterises that data into a
    -- series of aggregation buckets.

    -- interval_name should be second, hour, or minute
    -- turn the interval_name into an actual INTERVAL
    aggregate_interval = ('1 ' || interval_name)::INTERVAL;
    if store_customer then
        customer_ip_ = NEW.source_address;
    else
        customer_ip_ = '100.64.0.0'::INET;
    end if;

    -- We need to insert into a dynamically generated table name. There is
    -- no way to do this without writing the whole SQL statement as a string.
    -- Instead, let's use a trick. Create a temporary view, then insert into that.
    -- Postgres will proxy this insert into the desired table
    drop view if exists table_pointer;
    execute format('create temporary view table_pointer as select * from %s', table_name);

    -- We use a CTE to keep things readable, even though it is pretty long
    with aggregate_range AS (
        -- Create all the aggregate buckets spanned by the inserted flow
        SELECT generate_series(
            date_trunc(interval_name, lower(NEW.range)),
            date_trunc(interval_name, upper(NEW.range)),
            aggregate_interval
        ) as range_lower
    ),
    -- For each bucket, figure out its overlap with the provided flow data.
    -- Only the first and last buckets will have less than than complete overlap,
    -- but we do the calculation for all buckets anyway
    with_overlaps AS (
        SELECT
            NEW.range * tstzrange(range_lower, range_lower + aggregate_interval) AS overlap,
            range_lower
        FROM
        aggregate_range
    ),
    -- Convert the overlap intervals into seconds (FLOAT)
    with_overlap_seconds AS (
        SELECT
            extract(epoch from (upper(overlap) - lower(overlap))) as overlap_seconds,
            range_lower
        FROM
            with_overlaps
    )
    -- Now we have enough information to do the inserts
    insert into table_pointer as traffic
        (timestamp, customer_ip, as_number, bytes, packets)
        select
            range_lower,
            customer_ip_,
            NEW.as_number,
            -- Scale the packets/bytes per second to be a total number of
            -- of packets/bytes
            round(NEW.bytes_per_second * overlap_seconds)::INT,
            round(NEW.packets_per_second * overlap_seconds)::INT
        from with_overlap_seconds
        -- We shouldn't have any 0-second overlaps, but let's just be sure
        where overlap_seconds > 0
        -- If there is already existing data, then increment the bytes/packets values
        on conflict (customer_ip, timestamp, as_number) DO UPDATE SET
            bytes = EXCLUDED.bytes + traffic.bytes,
            packets = EXCLUDED.packets + traffic.packets
    ;
end;
$body$;


create or replace function update_aggregated_traffic_hourly() returns trigger
    language plpgsql
as
$body$
begin
    -- Store aggregated data for different resolutions. For each we also store data
    -- without the customer information. This way we can efficiently see traffic data
    -- for the whole network
    PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', True);
    PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', False);

    PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', True);
    PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', False);

    PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', True);
    PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', False);

    PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', True);
    PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', False);

    return NEW;
end;
$body$;

create trigger update_aggregated_traffic_hourly_trigger AFTER INSERT ON traffic_flow
    FOR EACH ROW EXECUTE PROCEDURE update_aggregated_traffic_hourly();
1
On

A first draft:

WITH ts_bucket AS (
    SELECT
        LAG(gs, 1) OVER () AS begin_period,
        gs AS end_period
    FROM
        generate_series('1/25/2021 0:00-8'::timestamptz, '1/26/2021 0:00-8'::timestamptz, '1 hour') AS gs
),
se AS (
    SELECT
        1000000 AS bytes,
        '01/25/2021 11:35-8'::timestamptz AS start_ts,
        '01/25/2021 12:45-08'::timestamptz AS end_ts
)
SELECT
    *,
    extract('epoch' FROM (upper(tstzrange(begin_period, end_period, '[]') * tstzrange(start_ts, end_ts, '[]'))) - (lower(tstzrange(begin_period, end_period, '[]') * tstzrange(start_ts, end_ts, '[]')))) * bytes / extract('epoch' FROM end_ts - start_ts) AS data_transferred
FROM
    ts_bucket,
    se
WHERE
    begin_period IS NOT NULL
    AND tstzrange(se.start_ts, se.end_ts, '[]') && tstzrange(ts_bucket.begin_period, ts_bucket.end_period, '[]');

 begin_period      |       end_period       |  bytes  |        start_ts        |         end_ts         |  data_transferred  
------------------------+------------------------+---------+------------------------+------------------------+--------------------
 2021-01-25 11:00:00-08 | 2021-01-25 12:00:00-08 | 1000000 | 2021-01-25 11:35:00-08 | 2021-01-25 12:45:00-08 | 357142.85714285716
 2021-01-25 12:00:00-08 | 2021-01-25 13:00:00-08 | 1000000 | 2021-01-25 11:35:00-08 | 2021-01-25 12:45:00-08 |  642857.1428571428

This is based on the connection start and end times being stored in separate fields and then turning them into ranges as necessary.