cosmos Change feed listener in java

26 Views Asked by At

I am implementing change feed listener in java. I don't want to use function App for it. Implementation is almost completed but it is not running and throwing error:

leaseClient: content response on write setting must be enabled

I created lease container manually and it has all the access.

@Component
public class ChangeFeedListener {

    private CosmosAsyncContainer sourceContainer;
    private CosmosAsyncContainer targetContainer;
    private CosmosAsyncContainer leaseContainer;

    @Autowired
    public ChangeFeedListener(CosmosAsyncDatabase cosmosAsyncDatabase) {
        this.sourceContainer = cosmosAsyncDatabase.getContainer("sourceContainer");
        this.targetContainer = cosmosAsyncDatabase.getContainer("targetContainer");
        this.leaseContainer = cosmosAsyncDatabase.getContainer("leaseContainer");
    }

    @EventListener({ApplicationStartedEvent.class})
    public void initialize() {

        // Create change feed processor options
        ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
        // Create change feed processor
        ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
                .hostName(UUID.randomUUID().toString())
                .feedContainer(sourceContainer)
                .handleChanges((List<JsonNode> docs) -> {
                    // Process each changed document
                    for (JsonNode document : docs) {
                        System.out.println("Processing document: " + document);
                        // Write the changed document to the target container
                        targetContainer.createItem(document).block();
                        System.out.println("Document written to target container.");
                    }
                })
                .leaseContainer(leaseContainer)
                .options(changeFeedProcessorOptions)
                .buildChangeFeedProcessor();

        // Start change feed processor
        changeFeedProcessor.start();
    }
}

Can someone please help me get this code up and running. Thanks

1

There are 1 best solutions below

0
Matias Quaranta On

Reference: https://github.com/Azure/azure-sdk-for-java/blob/0ad1e12ce687547139be4a7da5b06fbc232aa5a4/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java#L149

This means that your client needs to enable content content on write operations, this is on the CosmosClientBuilder in Java:

 CosmosAsyncClient client = new CosmosClientBuilder()
         .endpoint(serviceEndpoint)
         .key(key)
         // other settings...
         // This one below
         .contentResponseOnWriteEnabled(true)
         .buildAsyncClient();

Spring

Seems you are using Spring (decorators), might want to look at this full example too: https://github.com/Azure/cloud-scale-data-for-devs-guide/blob/main/demos/cosmos-db/demo/src/main/java/com/example/demo/service/ChangeFeedService.java

EDIT: Adding Spring reference