NIFI - upload binary.zip to SQL Server as varbinary

240 Views Asked by At

I am trying to upload a binary.zip to SQL Server as varbinary type column content.

Target Table:

CREATE TABLE myTable ( zipFile varbinary(MAX) );

My NIFI Flow is very simple:

  -> GetFile: 
         filter:binary.zip

  -> UpdateAttribute:<br>
         sql.args.1.type  = -3    # as varbinary  according to JDBC types enumeration
         sql.args.1.value =  ???  # I don't know what to put here ! (I've triying everything!)
         sql.args.1.format=  ???  # Is It required? I triyed 'hex'

   -> PutSQL:<br>
         SQLstatement= INSERT INTO myTable (zip_file) VALUES (?);

What should I put in sql.args.1.value?

I think it should be the flowfile payload, but it would work as part of the INSERT in the PutSQL? Not by the moment!

Thanks!

SOLUTION UPDATE:

Based on https://issues.apache.org/jira/browse/NIFI-8052 (Consider I'm sending some data as attribute parameter)

import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

def flowFile = session.get()
def lookup = context.controllerServiceLookup
def dbServiceName = flowFile.getAttribute('DatabaseConnectionPoolName')  
def tableName = flowFile.getAttribute('table_name')
def fieldName = flowFile.getAttribute('field_name')

def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs -> lookup.getControllerServiceName(cs) == dbServiceName }

def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def sql = new Sql(conn)


flowFile.read{ rawIn->
    def parms = [rawIn ]
    sql.executeInsert "INSERT INTO " + tableName + " (date, "+ fieldName + ")  VALUES (CAST( GETDATE() AS Date ) , ?) ", parms
}
conn?.close()

if(!flowFile) return
session.transfer(flowFile, REL_SUCCESS)
session.commit()
1

There are 1 best solutions below

1
On BEST ANSWER

maybe there is a nifi native way to insert blob however you could use ExecuteGroovyScript instead of UpdateAttribute and PutSQL

add SQL.mydb parameter on the level of processor and link it to required DBCP pool.

use following script body:

def ff=session.get()
if(!ff)return

def statement = "INSERT INTO myTable (zip_file) VALUES (:p_zip_file)"
def params = [
  p_zip_file: SQL.mydb.BLOB(ff.read())    //cast flow file content as BLOB sql type
]
SQL.mydb.executeInsert(params, statement) //committed automatically on flow file success

//transfer to success without changes
REL_SUCCESS << ff

inside the script SQL.mydb is a reference to groovy.sql.Sql oblject