I have setup a kinesis data stream to capture RDS changes and adding data to this stream. Data is feeding to this stream as i can see from stream dashboard. But I'm unable to create consumer, I tried KCL sample code for node but it just keep running not printing anything..
import boto3
import json
import random
import calendar
import time
from datetime import datetime
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
my_stream_name = 'Foo'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
my_connection = psycopg2.connect(
"dbname='postgres' host='mypgdb.abcdefghijk.us-east-1.rds.amazonaws.com' user='repluser' password='replpass'" ,
connection_factory = LogicalReplicationConnection)
cur = my_connection.cursor()
cur.drop_replication_slot('wal2json_test_slot')
cur.create_replication_slot('wal2json_test_slot', output_plugin = 'wal2json')
cur.start_replication(slot_name = 'wal2json_test_slot', options = {'pretty-print' : 1}, decode= True)
def consume(msg):
kinesis_client.put_record(StreamName=my_stream_name, Data=json.dumps(msg.payload), PartitionKey="default")
print (msg.payload)
cur.consume_stream(consume)
https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample/consumer
Below is my kcl implementation as provided above
'use strict';
var util = require('util');
var kcl = require('aws-kcl');
function recordProcessor() {
var shardId;
return {
initialize: function(props, completeCallback) {
shardId = initializeInput.shardId;
console.log('recordProcessor');
completeCallback();
},
processRecords: function(processRecordsInput, completeCallback) {
if (!processRecordsInput || !processRecordsInput.records) {
completeCallback();
return;
}
var records = processRecordsInput.records;
var record, data, sequenceNumber, partitionKey;
for (var i = 0 ; i < records.length ; ++i) {
record = records[i];
data = new Buffer(record.data, 'base64').toString();
sequenceNumber = record.sequenceNumber;
partitionKey = record.partitionKey;
console.log(util.format('ShardID: %s, Record: %s, SeqenceNumber: %s, PartitionKey:%s', shardId, data, sequenceNumber, partitionKey));
}
if (!sequenceNumber) {
completeCallback();
return;
}
// If checkpointing, completeCallback should only be called once checkpoint is complete.
processRecordsInput.checkpointer.checkpoint(sequenceNumber, function(err, sequenceNumber) {
console.log(util.format('Checkpoint successful. ShardID: %s, SeqenceNumber: %s', shardId, sequenceNumber));
completeCallback();
});
},
leaseLost: function(leaseLostInput, completeCallback) {
console.log(util.format('Lease was lost for ShardId: %s', shardId));
completeCallback();
},
shardEnded: function(shardEndedInput, completeCallback) {
console.log(util.format('ShardId: %s has ended. Will checkpoint now.', shardId));
shardEndedInput.checkpointer.checkpoint(function(err) {
completeCallback();
});
},
shutdownRequested: function(shutdownRequestedInput, completeCallback) {
shutdownRequestedInput.checkpointer.checkpoint(function (err) {
completeCallback();
});
}
};
}
kcl(recordProcessor()).run();
Run app with java bootstraper ../../../bin/kcl-bootstrap --java /usr/bin/java -e -p ./sample.properties