Use Kinesis Analytics for analyzing events and related missing events, separated in time?

248 Views Asked by At

I have a stream of events for various devices that can either be "connected" or "disconnected".

I.e. an event has the following structure:

  • timestamp
  • device_id
  • event ("connected" or "disconnected")

I want to trigger an action instantly when a device has been disconnected and not connected within (a device specific configurable) time period, e.g. 1 hour. I only want to trigger once per "disconnected" event.

Is this something that can be done using AWS Kinesis Analytics and if so what would the query look like? If not, can it be done using some other tool or do I have to custom build it?

1

There are 1 best solutions below

0
On BEST ANSWER

This is possible with Drools Kinesis Analytics (managed service on Amazon):

Types:

package com.test;

import java.util.Set;

import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;

declare DeviceConfig
    @DynamoDBTable(tableName="DeviceConfig")

    deviceId: int @DynamoDBHashKey(attributeName="device_id");
    timeoutMillis: int @DynamoDBAttribute(attributeName="timeout_millis");
end

declare DeviceEvent
@role( event )
    // attributes 
    deviceId: int;
    timestamp: java.util.Date;
    event: String;
end

declare DisconnectAlert
    deviceId: int;
end

Rules:

package com.test;

// setup dynamic timer
rule "disconnect timer"
    timer( expr: $timeout )
when
    $event : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
    DeviceConfig(deviceId == $event.deviceId, $timeout : timeoutMillis) from entry-point configs
then
    insertLogical(new DisconnectAlert($event.getDeviceId()));
end

rule "remove dups"
when
    $event : DeviceEvent( $id : deviceId, $state : event ) from entry-point events
    $dup : DeviceEvent(this != $event, deviceId == $event.deviceId, event == $state, this after $event) from entry-point events
then
    delete($dup);
end

// on connect event remove "disconnected" state
rule "connect device"
when
    $disconnected : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
    DeviceEvent(deviceId == $disconnected.deviceId, event == "connected", this after $disconnected) from entry-point events
then
    delete($disconnected);
end

// cleanup "connected" state to free up memory (not needed any more)
rule "delete connected state"
when
    $connected : DeviceEvent(event == "connected") from entry-point events
then
    delete($connected);
end

Note, that there are 2 types of inputs:

  • DeviceConfig, which is mostly static device configuration, located in DynamoDB.
  • DeviceEvent, which is a Kinesis Stream of device events.