AWS OpenSearch Ingestion Pipeline - Index List of Strings into separate documents

72 Views Asked by At

I'm building an AWS OpenSearch domain and Ingestion Pipeline and ingesting from source documents from a parquet files with the following schema:

query

keyword1 keyword2
keyword3 keyword4 keyword5

I'm trying to split it by space separator and ingest each keyword in a separate document into the OpenSearch index

This is my current pipeline configuration:

version: "2"
my-pipeline:
  source:
---- removed -----
  processor:
    - split_string:
        entries:
          - source: "query"
            delimiter_regex: "\\s+"
  sink:
    - opensearch:
        hosts: <my host>
        aws: <aws details>
        index: index_1
        document_id_field: query

I'm having the following exception while ingesting:

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)
 at [Source: UNKNOWN; byte offset: #UNKNOWN]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[jackson-databind-2.15.3.jar:2.15.3]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1752) ~[jackson-databind-2.15.3.jar:2.15.3]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1526) ~[jackson-databind-2.15.3.jar:2.15.3]
    at com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromArray(StdDeserializer.java:222) ~[jackson-databind-2.15.3.jar:2.15.3]
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:46) ~[jackson-databind-2.15.3.jar:2.15.3]
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11) ~[jackson-databind-2.15.3.jar:2.15.3]
    at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) ~[jackson-databind-2.15.3.jar:2.15.3]
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4801) ~[jackson-databind-2.15.3.jar:2.15.3]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2974) ~[jackson-databind-2.15.3.jar:2.15.3]
    at com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3438) ~[jackson-databind-2.15.3.jar:2.15.3]
    at org.opensearch.dataprepper.model.event.JacksonEvent.mapNodeToObject(JacksonEvent.java:209) ~[data-prepper-api-2.6.1.jar:?]
    at org.opensearch.dataprepper.model.event.JacksonEvent.get(JacksonEvent.java:199) ~[data-prepper-api-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.getDocument(OpenSearchSink.java:468) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doOutput(OpenSearchSink.java:374) ~[opensearch-2.6.1.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:67) ~[data-prepper-api-2.6.1.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.11.5.jar:1.11.5]
    at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:67) ~[data-prepper-api-2.6.1.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$5(Pipeline.java:349) ~[data-prepper-core-2.6.1.jar:?]
    ... 5 more

It looks like the split_string processor is creating a list that cannot be used as the document_id_field.

Question: How can I split the list of strings returned by the split_string processor into separate events to ingest each of them separately into my OpenSearch sink?

0

There are 0 best solutions below