Apache Nifi Flowfile to Elasticsearch bulk api

402 Views Asked by At

I'm trying to prepare flowfile to make bulk call with versioning to elasticsearch using standard InvokeHttp processor, since I failed to make it work with PutElasticsearch* processors

Elasticsearch bulk endpoint accept ndjson data format that looks like follow:

{"index": {"_index":"foo","id":"bar", "version":1, "version_type":"external}}\n
{"id":"bar", "version":1, "field1":"field1_value", "field2":"field2_value"}\n

{"index": {"_index":"foo","id":"bizz", "version":2, "version_type":"external}}\n
{"id":"bizz", "version":2, "field1":"field3_value", "field2":"field4_value"}\n

My current setup of processors looks like follow: Change to elasticsearch bulk format flow

I'm extracting id to attribute, version to attribute and flowfile content to attribute message.body using ExtractText processor with simple regex .*,

With content in attribute I can then use ConvertRecord processor with RecordReader: JsonTreeReader and RecordWriter: FreeFormTextRecordSetWriter with Text property set to:

{"index": {"_index":"foo","_id":"${id}", "version":"${version", "version_type":"external"}}}
${message.body}

Having flowfiles converted now I merge them using MergeContent processor, and sending them to elasticsearch by InvokeHttp processor with url set to ${elastic.url}/foo/_bulk and Content-Type set to x-ndjson type.

Is it possible to make it better? I have this feeling that my solution is not best one, but I'm Newby to Nifi and I couldn't solve it in other way.

0

There are 0 best solutions below