I'm working with PyFlink
and trying to perform delete operations on my Elasticsearch index using the built-in ElasticsearchSink
class. I've successfully used this sink for adding new documents and updating existing ones in Elasticsearch, but I'm struggling to understand how to structure my input data to execute a delete request.
Below is the relevant section of my Flink job:
def main(config: ElasticsearchExampleConfig) -> None:
"""Sink an example message into Elasticsearch and delete it."""
env = StreamExecutionEnvironment.get_execution_environment()
# Add JARs to the classpath
jars = [path.absolute().as_uri() for path in Path("./jars").glob("*.jar")]
env.add_jars(*jars)
# Set up the Elasticsearch sink
elasticsearch_sink = (
Elasticsearch7SinkBuilder()
.set_bulk_flush_max_actions(1)
.set_emitter(ElasticsearchEmitter.static_index("test-index", "id"))
.set_hosts([config["elasticsearch_endpoint"]])
.set_connection_username(config["elasticsearch_username"])
.set_connection_password(config["elasticsearch_password"])
.build()
)
# Set up a sequence messages to trigger the delete action.
stream = [
# This message creates the document
{
"id": "example-message",
"hello": "world"
},
# This message should delete the document
{
"id": "example-message",
# What should the message body look like?
}
]
env.from_collection(stream, Types.MAP(Types.STRING(), Types.STRING())).sink_to(
elasticsearch_sink,
).name(
"Elasticsearch Sink",
)
env.execute("Elasticsearch Example")
I understand that the ElasticsearchSink
can handle different types of requests, but I am not sure how to format the second message in the stream to trigger a delete operation for the document with the ID example-message
.
Is there a specific structure or flag that needs to be set in the message to indicate a delete operation? Any insights or examples on how to achieve this would be greatly appreciated.