How to create multiple flow files from one incoming flow files in nifi using ExecuteScript with python

1.5k Views Asked by At

Running locally, this works exactly how I want it to (has one incoming flow file with many different codes in position 7-10 and outputs 1 file per unique code) For example if record 1-5 has 1234 in positions 7-10, and record 6 has 2345 in position 7-10, and record 7 has 1234 in positions 7-10, then there would be one file called 1234_file.txt with rows 1-5 and 7 and a second file 2345_file.txt would have row 6 from the input file:

f = open("test_comp.txt", "r")
for x in f:
    comp = x[6:10]
    print(comp)
    n = open(comp+"_file.txt","a")
    n.write(x)
    n.close()
f.close()

In nifi, I am trying this:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    f = open(inputStream, 'r')
    for x in f:
        comp = x[6:10]
        print("comp: ",comp)
        newFile = open(comp+"_file.txt","a")
        newFile.write(x)


flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, REL_SUCCESS)
session.commit()

It seems to be getting the input and correctly storing comp as position 7-10 as expected, but I'm not getting multiple flow files out (for each unique string in x[6:10]. And the flow file coming out is 1 zero byte file.

Any thoughts on what I'm missing??

1

There are 1 best solutions below

5
On

You are writing directly to files on your file system rather than flowfiles which are objects within the NiFi ecosystem. I'd suggest reading the Apache NiFi Developer's Guide for context around those patterns and looking at some Python ExecuteScript examples to see the relevant Python code.

Rather than write a single flowfile out, you'll need to create multiple flowfile objects, map the data to them, and then transfer all of them to the respective relationship.

Is there a reason you need to use custom Python code to do this rather than the SplitRecord and/or PartitionRecord processors? I think PartitionRecord would solve the problem you've described very easily.