Understanding siddhi snapshot concept

351 Views Asked by At

I have query and execution plan, I want to take snapshot of that so that I could restore it on receiver side and start executing it again.

  1. What format should be sent to the receiver?
  2. How to restore on receiver side?

Following is some code which I have taken from Siddhi repository.

 SiddhiManager siddhiManager = new SiddhiManager();
    String query =
            "define stream inStream(meta_roomNumber int,meta_temperature double);" +
                    "from inStream#window(10)[meta_temperature > 50]\n" +
                    "select *" +
                    "insert into outStream;";

    ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(query);

    executionPlanRuntime.start();
    SiddhiContext siddhicontext = new SiddhiContext();

    context.setSiddhiContext(siddhicontext);
    context.setSnapshotService(new SnapshotService(context));
    executionPlanRuntime.snapshot();
1

There are 1 best solutions below

0
On

You can use PersistenceStore to persist the state (snapshot) of the execution plan and restore it later. Please refer to the following PersistenceTestCase to get an idea on its usage. i.e.;

    // Create executionPlanRuntime
    ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

    // Register Callbacks, InputHandlers
    executionPlanRuntime.addCallback("query1", queryCallback);
    stream1 = executionPlanRuntime.getInputHandler("Stream1");

    // Start executionPlanRuntime
    executionPlanRuntime.start();

    // Send events
    stream1.send(new Object[]{"WSO2", 25.6f, 100});
    Thread.sleep(100);
    stream1.send(new Object[]{"GOOG", 47.6f, 100});
    Thread.sleep(100);

    // Persist the state
    executionPlanRuntime.persist();

    // Shutdown the running execution plan
    executionPlanRuntime.shutdown();

    // Create new executionPlanRuntime
    executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

    // Register Callbacks, InputHandlers
    executionPlanRuntime.addCallback("query1", queryCallback);
    stream1 = executionPlanRuntime.getInputHandler("Stream1");

    // Start executionPlanRuntime
    executionPlanRuntime.start();

    // Restore to previously persisted state
    executionPlanRuntime.restoreLastRevision();