Nifi Custom processor showing error "the local variable flowfile cannot be assigned"

265 Views Asked by At

I am trying to send the original flowfile as input to next processor, but ending up getting the error, I am very new to Nifi and also some experience in Java.

public void onTrigger(ProessorContext context, ProcessSessionsession) throws ProceException{
   FlowFile flowfile = session.get();
   if(flowfile == null){
        return;
    }

    ArrayList<String> headData = new ArrayList<String>();
    try{
        session.read(flowfile, new InputStreamCallback(){

        final DBCPService= context.getProperty(CONNECTION_POOL).asContollerService(DBCPService.class);
        String query = " CREATE TABLE MODEL (";
        @SuppressWarnings("deprecation")
        public void process(InputStream inputStream) throws IOException {
        try{
            OPCPackage pkg = OPCPackage.open(inputStream);
            XSSFWorkbook workbook = new XSSFWorkbook(pkg);
            workbook.getAllNames();
            String dateheader = "date"
            XSSFSheet sheetName = workbook.getSheet(0);
            Row row = sheetName.getRow(0);
            for(Cell cell: row) {
                switch (cell.getCellType()){
                case NUMERIC:
                    if(HSSDataUtil.isCellDateFromated(cell)){
                        DataFormatter dataFromatter = new DataFormatter();
                        headData.add(dataFromatter.formatCellValue(cell);
                        query +=dataFromatter.formatCellValue(cell)+" " + "INT" ;
                    }else{
                        headData.add(String.valueOf(cell.getNumericCellValue()));
                    }
                    break;
                    case STRING:
                        headData.add(cell.getStringCellValue());
                        if(cell.getStringCellValue().toLowerCase().contains(dateheader))
                            query += cell.getStringCellValue() + " " + "TIMESTAMP,";
                        else
                            query +=cell.getStringCellValue() + " + "VARCHAR(50),";
                            break;
                    case BOOLEAN:
                        headData.add(String.valueOf(cell.getBooleanCellValue());
                        break;
                    default:
                        headData.add("");
                        break;
                        }
                    }
                    query = query.substring(0, query.length() -1);
                    query += ")";
                    workbook.close();
                    final Connection con = DBCPService.getConnection();
                    try{
                        java.sql.PreparedStatement = con.prepareStatement(query);
                        PreparedStatement.execute();
                        con.commit();
                        session.transfer(flowfile, REL_SUCCESS);
                    }catch (SQL Exception e){
                        e.printStackTrace();
                        session.transfer(flowfile, REL_FAILURE);
                    }
                }catch(InvalidFromatException ife){
                    getLogger().error(" only .xlsx excel files are supprted", ife);
                    thrownew UnsupportedOperationException("Only .xlsx OOXML files are substring", ife);
                }
            }

        });
        {catch (RuntimeException ex) {
            getLogger().error("Failed to process incoming Excel document. " + ex.getMessage(), ex);
            FlowFile failedFlowFile = session.putAttribute(flowfile, testxlsqlProcessor.class.getMessage());
        }
        final StringBuilder stringBuilder = new StringBuilder();
        flowfile = session.write(flowfile, new StreamCallback(){
        public void process(InputStream in, OutputStream out) throws IOException{
            stringBuilder.append(IOUtils.copy(in,out));
        }
    });

    }
}

If I don't add the outputstream, I am getting the exception transfer relationship not specified.

1

There are 1 best solutions below

1
On BEST ANSWER

You won't want to transfer the flow file from inside the InputStreamCallback, that should happen after you are finished reading from the flow file. If you aren't changing the content of the outgoing flow file then you don't need the StreamCallback and IOUtils.copy() stuff at the end either, you can just transfer the original flowfile. For the failure cases, you can throw an IOException wrapping the real exception in the InputStreamCallback, catch it outside, then transfer the original flow file to failure. If no exception occurs, you can transfer the original flow file to success.