Triggered FetchFolder in NiFi?

872 Views Asked by At

I'm using NiFi to orchestrate the processing of large binary files using a proprietary processing tool (which runs external to NiFi).

NiFi drops the source files on disk, I call the external tool (using an ExecuteScript processor), the tool loads the binary file and proceeds to generate a lot of smaller files.

When the external tool is completely finished, I need to "pick up" the directory of smaller (generated) files and continue to process via NiFi. I need to wait because the [output directory], [number of files], and [time required to process] are dynamic.

The problem:

  1. GetFile (to grab a directory) doesn't have an upstream connection, so I can't trigger it upon completion of processing.
  2. A ListFile + FetchFile combo doesn't work b/c ListFile doesn't have an upstream connection, so -- again -- I can't trigger it upon completion of processing.

... so what processor(s) can I use to, upon completion of the binary processing, grab the directory of new files and bring them into NiFi land?

2

There are 2 best solutions below

0
On BEST ANSWER

Somewhat in-line with @Bryan Bende's answer, I ended up using an ExecuteScript processor to create a "ListFile" processor that offers an upstream connection:

import java.nio.charset.StandardCharsets
import groovy.io.FileType
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def fetchDirectory = flowFile.getAttribute('fetchDirectory')
def listOfFiles = []
def dir = new File(fetchDirectory)
if(dir.exists()) {
   dir.eachFileRecurse (FileType.FILES) { file ->
      listOfFiles << file
   }
}
listOfFiles.each { i ->
   def newFlowFile = session.create()
   session.putAttribute(newFlowFile, 'path', i.path)
   session.putAttribute(newFlowFile, 'filename', i.getName())
   flowFiles << newFlowFile
}
session.remove(flowFile)
session.transfer(flowFiles, REL_SUCCESS)

So, when the external tool completes, I route the block's FlowFile to the above processor, which I then pipe to a FetchFile processor.

0
On

I'm going to assume your external tool has a way to notify NiFi when it is done, since you would need that even if GetFile or ListFile supported incoming flow files..

So how about a two step process...

External tool writes to directory-1, and when done makes a call to a REST API provided by a HandleHttpRequest processor, which then goes to an ExecuteScript processor that calls a "mv directory-1 directory-2".

The ListFile processor is always watching directory-2, but never sees anything til the move command above executes.