Steaming Large Dataset back to ExecuteStreamCommand Processor

38 Views Asked by At

I am currently parsing a large COBOL data file using JRecord library in Java and converting to JSON. I have a function written in java that is executed by ExecuteCommandProcessor in NIFI to return the JSON object. As the COBOL data file consists of 1M+ records, I would like to process in batches and stream the data in chunks back to ExecuteStreamCommand flow file. I have tried OutPutStream.flush method to return the partial resultset after iterating through 10,000 records but results are not being returned to the NIFI flow file. Below is the sample code. What changes do I need to make to stream data back to ExecuteStreamCommand flow file in chucks of 10,000 records at a time.

  
        public static void main(String[] args) 
        
        {
             AbstractLine line;
            int lineNum = 0;
          String currentStatNum = null;
          String currentSeqNum = null;
            JSONObject result = new JSONObject();
            JSONArray dafieldArray = new JSONArray();
            JSONArray dafldspcArray = new JSONArray();
         
            try {
                ICobolIOBuilder iob = JRecordInterface1.COBOL
                                    .newIOBuilder(copyBookFile)
                                       //.setFont("") // Think about specifying an encoding !!!
                                       .setFileOrganization(Constants.IO_BIN_TEXT)
                                       .setSplitCopybook(CopybookLoader.SPLIT_01_LEVEL)
                                     
            
               .setRecordSelection(
                          "DAFIELD", 
                           net.sf.JRecord.ExternalRecordSelection.ExternalFieldSelection.newFieldSelection(
             false, "RRC-TAPE-RECORD-ID", "=", "03")

                      )
                      .setRecordSelection(
                          "DAFLDSPC", 
                           net.sf.JRecord.ExternalRecordSelection.ExternalFieldSelection.newFieldSelection(
             false, "RRC-TAPE-RECORD-ID", "=", "04") );
             
             
                FieldNames.RecordDafield rDafield = FieldNames.RECORD_DAFIELD;
                FieldNames.RecordDafldspc rDafldspc = FieldNames.RECORD_DAFLDSPC;
                
                 OutputStream out = new BufferedOutputStream(System.out);
                
                 AbstractLineReader reader = iob.newReader(dataFile);
                while ((line = reader.read()) != null) {
                    lineNum += 1;
                    
                   
                    
                    
                    if ("03".equals(line.getFieldValue(rDafield.rrcTapeRecordId).asString().trim())) {
                         JSONObject dafield = new JSONObject();
        
                         dafield.put("rrcTapeRecordId",line.getFieldValue(rDafield.rrcTapeRecordId).asString().trim());
                         dafield.put("daFieldNumber",line.getFieldValue(rDafield.daFieldNumber).asString().trim());
                         dafield.put("daFieldApplicationWellCode",line.getFieldValue(rDafield.daFieldApplicationWellCode).asString().trim());
                         dafield.put("da1995FieldApplWellCode",line.getFieldValue(rDafield.da1995FieldApplWellCode).asString().trim());
                         dafield.put("daFieldRule38Flag",line.getFieldValue(rDafield.daFieldRule38Flag).asString().trim());
                         dafield.put("rrcTapeFiller",line.getFieldValue(rDafield.rrcTapeFiller).asString().trim());
                        dafieldArray.put(dafield);
                        }
                    
                    
                    if ("04".equals(line.getFieldValue(rDafldspc.rrcTapeRecordId).asString().trim())) {
                        
                        JSONObject dafldspc = new JSONObject();
                        dafldspc.put("rrcTapeRecordId",line.getFieldValue(rDafldspc.rrcTapeRecordId).asString().trim());
                      dafldspc.put("daFieldDistrict",line.getFieldValue(rDafldspc.daFieldDistrict).asString().trim());
                      dafldspc.put("daFieldLeaseName",line.getFieldValue(rDafldspc.daFieldLeaseName).asString().trim());
                      dafldspc.put("daFieldTotalDepth",line.getFieldValue(rDafldspc.daFieldTotalDepth).asString().trim());
                      dafldspc.put("daFieldWellNumber",line.getFieldValue(rDafldspc.daFieldWellNumber).asString().trim());
                      dafldspc.put("daFieldAcres",line.getFieldValue(rDafldspc.daFieldAcres).asString().trim());
                      dafldspc.put("rrcTapeFiller",line.getFieldValue(rDafldspc.rrcTapeFiller).asString().trim());
                        
                    
                      result.put("DAFIELD", dafieldArray);
                      result.put("DAFLDSPC",dafldspcArray);
                      output.write(result.toString().getBytes());
                      out.flush();
                      
                     
                      dafieldArray.clear();
                      dafldspcArray.clear();
                      
                        } catch (Exception e) 
            {
        e.printStackTrace();
        }

        }
0

There are 0 best solutions below