I have been assigned a requirement in my project i.e. is to implement a Quartz scheduler with in a custom NiFi processor.
At the initial stage of work, the functionality I need to develop is that to schedule the transfer of the flowfile from its preceding processor to its defined processor at a scheduled time.
I will the receive the scheduling instructions as a dynamic parameter and need to schedule it accordingly using quartz library.
So at the base level of development I am hardcoding the scheduling expressions (CRON expression).
-> What I have tried
I have created a custom processor and have added the quartz library.
Then created a class (ScheduleFlowFile) to implement the org.quartz.Job and have implemented the abstract method execute which is used to execute the logic at the scheduled time. Within this class I am getting the reference to the ProcessSession, ProcessContext variables of the custom NiFi processor that I have created using JobDataMap while defining a JobDetail which is to be scheduled.
package com.chellyvishal.scheduleQuartz;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class ScheduleFlowFile implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
ProcessSession session = (ProcessSession) dataMap.get("processSession");
ProcessContext context = (ProcessContext) dataMap.get("processContext");
Relationship success = (Relationship) dataMap.get("successRelationship");
FlowFile getFlowFile = session.get();
session.transfer(getFlowFile, success);
}
}
- Inside MyProcessors.class with in the onTrigger method I have created the JobDetail and the trigger to run them at the instructed schedule
public class MyProcessor extends AbstractProcessor {
public static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that have been successfully processed")
.build();
public static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that have failed processing")
.build();
.
.
.
.
.
.
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
// TODO implement
try {
scheduler = schedulerFactory.getScheduler();
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
long currentTime = System.currentTimeMillis();
String uniqueJobIdentifier = "sampleJob_" + currentTime;
String cronExpression = "0/10 0/2 * ? * * *";
JobDetail jobDetail = newJob(ScheduleFlowFile.class)
.withIdentity(uniqueJobIdentifier, "newGroupFirst")
.build();
jobDetail.getJobDataMap().put("processSession", session);
jobDetail.getJobDataMap().put("processContext", context);
jobDetail.getJobDataMap().put("successRelationship", SUCCESS);
//A simple trigger which runs once after 5 seconds from the moment of getting initiated
Trigger trigger = newTrigger()
.withIdentity("newTrigger"+currentTime, "newGroupFirst")
.startNow()
.withSchedule(simpleSchedule()
.withIntervalInSeconds(5)
.withRepeatCount(0))
.build();
//A CRON trigger which schedules the job according the CRON expression
Trigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity("newCRONTrigger"+currentTime", "newGroupFirst")
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
And I am stuck here and not getting an idea how to deal with it
