How to assign windows dynamically after process() operator

728 Views Asked by At

I m trying to do a streaming process like below,

  1. collect sensor events from a source
  2. collect rule events defined for a device (which streams sensor events)
  3. rules may have been defined with window information for aggregation processes differently for any device
  4. when a rule for a device with a window info seen in stream then create a window (tumbling)
  5. if a new rule comes without window info, remove window and process without window function.

I took this as a reference : https://techblog.king.com/rbea-scalable-real-time-analytics-king/

my streaming code as below:

mappedDataSource 
    .connect(mappedRuleStream) 
    .keyBy(..deviceId..) 
    .process(new RuleProcessorFunction()) 
    .windowAll(new CustomTimeWindowing()) 
    .apply(new AllWindowFunction<ProcessedEvent, Object, TimeWindow>() { 

      @Override 
      public void apply(TimeWindow window, Iterable<ProcessedEvent> values, Collector out) throws Exception { 
        System.out.println("hello"); 
      } 
    });

RuleProcessorFunction is:

public class RuleProcessorFunction extends CoProcessFunction<SensorEvent, RuleEvent, ProcessedEvent> { 

  private transient ValueState<Tuple2<SensorEvent, RuleEvent>> state; 

  @Override 
  public void processElement1(SensorEvent value, Context ctx, Collector<ProcessedEvent> out) throws Exception { 
    System.out.println("process element device id : " + value.deviceId); 
    System.out.println("process element solution id : " + value.solutionId); 
    state.update(Tuple2.of(value, null)); 

    RuleEvent rule = state.value().f1; 

    // execute if there is a defined rule on incoming event 
  } 

  @Override 
  public void processElement2(RuleEvent value, Context ctx, Collector<ProcessedEvent> out) throws Exception { 
    System.out.println("rule stream element solId :" + value.solutionId + " devId : " + value.deviceId); 
    state.value().f1 = value; 
    // store rule in memory 
    // processed event is gonna be stored window information and downstream is window assignment 
    ProcessedEvent processedEvent = new ProcessedEvent(); 
    processedEvent.deviceId = value.deviceId; 
    processedEvent.solutionId = value.solutionId; 
    processedEvent.windowInfo = value.window; 
    processedEvent.ruleId = value.ruleId; 

    out.collect(processedEvent); 

  } 

  @Override 
  public void open(Configuration parameters) throws Exception { 
    ValueStateDescriptor<Tuple2<SensorEvent, RuleEvent>> stateDescriptor = 
            new ValueStateDescriptor<>("processor", TypeInformation.of(new TypeHint<Tuple2<SensorEvent, RuleEvent>>() { 
            })); 
    state = getRuntimeContext().getState(stateDescriptor); 

  } 

  @Override 
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<ProcessedEvent> out) throws Exception { 
    // rule triggers 
  } 
} 

CustomWindowAssigner is:

public class CustomTimeWindowing extends TumblingEventTimeWindows { 
  public CustomTimeWindowing() { 
    super(1, 0); 
  } 

  @Override 
  public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { 
    System.out.println("creating window : "); 
    ProcessedEvent processedEvent = (ProcessedEvent) element; 
    int windowInfo = processedEvent.windowInfo; 
    System.out.println("creating window  rule : " + processedEvent.ruleId); 
    long size = windowInfo * 1000; 
    System.out.println("window info in milisecond :" + size); 
    long start = timestamp - (timestamp % size); 
    long end = start + size; 
    return Collections.singletonList(new TimeWindow(start, end)); 
  } 
} 

When a ruleEvent comes i'm adding metadata about window info and add into the collector to keep streaming. but if i do this in processElement1 for SensorEvent, then windowAssigner is gonna be called again and window will be changed. I want it to enter when a new/changed window info comes.

What is the correct way to create this kind of structure ? Managing windows manually or using this kind of custom window assigners ?

another ref : Differences between working with states and windows(time) in Flink streaming

0

There are 0 best solutions below