I try to make simple Apache Flink MongoDB connector codes to read and write json data in MongoDB. First, Below codes are the MongoDB Sink codes.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("Hello", 1));
data.add(new Tuple2<>("Hi", 2));
data.add(new Tuple2<>("Hey", 3));
DataStream<Tuple2<String, Integer>> stream = env.fromCollection(data);
MongoSink<Tuple2<String, Integer>> sink = MongoSink.<Tuple2<String, Integer>>builder()
.setUri("mongodb://127.0.0.1:27017")
.setDatabase("test_db")
.setCollection("test_coll")
.setBatchSize(1000)
.setBatchIntervalMs(1000)
.setMaxRetries(3)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setSerializationSchema(
(input, context)
-> {
Document doc = new Document(input.f0, input.f1);
return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
})
.build();
stream.sinkTo(sink);
These sink codes insert json type documents into MongoDB successfully. The generated documents are
{
"_id": {
"$oid": "65f67f3b9779060fd2390d0e"
},
"Hello": 1
}
But the MongoDB source codes bring some error message.
MongoSource<Tuple2<String,Integer>> source = MongoSource.<Tuple2<String,Integer>>builder()
.setUri("mongodb://127.0.0.1:27017")
.setDatabase("test_db")
.setCollection("test_coll")
.setDeserializationSchema(new MongoDeserializationSchema<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> deserialize(BsonDocument document) {
String key = document.getFirstKey();
Integer value = document.getInt64(key).intValue(); // this line throws the error message
return new Tuple2<String, Integer>(key, value);
}
@Override
public TypeInformation<Tuple2<String, Integer>> getProducedType() {
return Types.TUPLE(Types.STRING, Types.INT);
}
})
.build();
DataStream<Tuple2<String, Integer>> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB-Source");
ds.print();
The error messages are
Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type INT64 is of unexpected type OBJECT_ID
at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
at org.bson.BsonValue.asInt64(BsonValue.java:105)
at org.bson.BsonDocument.getInt64(BsonDocument.java:203)
at com.aaa.test.FlinkMongoTest$1.deserialize(FlinkMongoTest.java:63)
at com.aaa.test.FlinkMongoTest$1.deserialize(FlinkMongoTest.java:1)
at org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:58)
at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54)
at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)
I think the value type of the request json data should be INT64 but the returned type is OBJECT_ID so these codes bring the errors. Kindly inform me how to call the integer value of mongodb document, not the OBJECT_ID. Any reply will be thanksful.