How to Issue Delete Requests in Elasticsearch Using PyFlink's ElasticsearchSink

35 Views Asked by At

I'm working with PyFlink and trying to perform delete operations on my Elasticsearch index using the built-in ElasticsearchSink class. I've successfully used this sink for adding new documents and updating existing ones in Elasticsearch, but I'm struggling to understand how to structure my input data to execute a delete request.

Below is the relevant section of my Flink job:

def main(config: ElasticsearchExampleConfig) -> None:
    """Sink an example message into Elasticsearch and delete it."""
    env = StreamExecutionEnvironment.get_execution_environment()

    # Add JARs to the classpath
    jars = [path.absolute().as_uri() for path in Path("./jars").glob("*.jar")]
    env.add_jars(*jars)

    # Set up the Elasticsearch sink
    elasticsearch_sink = (
        Elasticsearch7SinkBuilder()
        .set_bulk_flush_max_actions(1)
        .set_emitter(ElasticsearchEmitter.static_index("test-index", "id"))
        .set_hosts([config["elasticsearch_endpoint"]])
        .set_connection_username(config["elasticsearch_username"])
        .set_connection_password(config["elasticsearch_password"])
        .build()
    )
    
    # Set up a sequence messages to trigger the delete action.
    stream = [
        # This message creates the document
        {
            "id": "example-message",
            "hello": "world"
        },
        # This message should delete the document
        {
            "id": "example-message",
            # What should the message body look like?
        }
    ]

    env.from_collection(stream, Types.MAP(Types.STRING(), Types.STRING())).sink_to(
        elasticsearch_sink,
    ).name(
        "Elasticsearch Sink",
    )

    env.execute("Elasticsearch Example")

I understand that the ElasticsearchSink can handle different types of requests, but I am not sure how to format the second message in the stream to trigger a delete operation for the document with the ID example-message.

Is there a specific structure or flag that needs to be set in the message to indicate a delete operation? Any insights or examples on how to achieve this would be greatly appreciated.

0

There are 0 best solutions below