I am using hazelcast jet to capture database changes. I have a pipline which catch database changes and update IMap:
Pipeline pipeline(StreamSource<ChangeRecord> theDataBase, IMap<Long, String> myMap) {
var pipeline = Pipeline.create();
pipeline.readFrom(theDataBase)
.withoutTimestamps()
.writeTo(CdcSinks.map(myMap,
record -> Long.parseLong(Objects.requireNonNull(record.key()).toMap().get("id").toString()),
record -> record.value().toJson()));
return pipeline;
}
And I would like to do reverse synchronization, i.e. make a request to update a record in the database when the map changes.
I tried this by adding a second pipeline:
Pipeline pipelineReversed(IMap<Long, String> myMap) {
Pipeline p = Pipeline.create();
p.readFrom(Sources.mapJournal(myMap, START_FROM_CURRENT))
.withoutTimestamps()
.writeTo(Sinks.jdbc("%some update query%", () -> {
BaseDataSource dataSource = new PGXADataSource();
dataSource.setUrl("jdbc:postgresql://localhost:5432/my_db");
dataSource.setUser("postgres");
dataSource.setPassword("postgres");
dataSource.setDatabaseName("my_db");
return dataSource;
}, (stmt, record) -> {
// fill query params and execute
}));
return p;
}
However I get recursion, the map and database endlessly update each other. Can I avoid recursion in this approach? Or maybe there are other tools and best practices?
You should be using a
MapStoreAdapter
and I question whether you need to be using Jet at all to fetch database changes, but then again I don't know the architecture of your system. I'd read more here:https://docs.hazelcast.com/hazelcast/5.3/mapstore/working-with-external-data