We have an Apache Flink POC application which works fine locally but after we deploy into Kinesis Data Analytics (KDA) it does not emit records into the sink.
Used technologies
Local
- Source: Kafka 2.7
- 1 broker
- 1 topic with partition of 1 and replication factor 1
- Processing: Flink 1.12.1
- Sink: Managed ElasticSearch Service 7.9.1 (the same instance as in case of AWS)
AWS
- Source: Amazon MSK Kafka 2.8
- 3 brokers (but we are connecting to one)
- 1 topic with partition of 1, replication factor 3
- Processing: Amazon KDA Flink 1.11.1
- Parallelism: 2
- Parallelism per KPU: 2
- Sink: Managed ElasticSearch Service 7.9.1
Application logic
- The
FlinkKafkaConsumer
reads messages in json format from the topic - The jsons are mapped to domain objects, called
Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "BROKER1_ADDRESS.amazonaws.com:9092");
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC", new SimpleStringSchema(), kafkaProperties);
consumer.setStartFromEarliest(); //Just for repeatable testing
return environment
.addSource(consumer)
.map(new MapJsonToTelemetry());
}
- The Telemetry’s timestamp is chosen for EventTimeStamp.
3.1. WithforMonotonousTimeStamps
- Telemetry’s
StateIso
is used forkeyBy
.
4.1. The two letter iso code of the state of USA - 5 seconds tumbling window strategy is applied
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
WatermarkStrategy<Telemetry> wmStrategy =
WatermarkStrategy
.<Telemetry>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.TimeStamp);
return telemetries
.assignTimestampsAndWatermarks(wmStrategy)
.keyBy(t -> t.StateIso)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new WindowCountFunction());
}
- A custom
ProcessWindowFunction
is called to perform some basic aggregation.
6.1. We calculate a singleStateAggregatedTelemetry
- ElasticSearch is configured as sink.
7.1.StateAggregatedTelemetry
data are mapped into aHashMap
and pushed intosource
.
7.2. AllsetBulkFlushXYZ
methods are set to low values
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));
ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
(ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element, ctx, indexer) -> {
Map<String, Object> record = new HashMap<>();
record.put("stateIso", element.StateIso);
record.put("healthy", element.Flawless);
record.put("unhealthy", element.Faulty);
...
LOG.info("Telemetry has been added to the buffer");
indexer.add(Requests.indexRequest()
.index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
.source(record, XContentType.JSON));
}
);
//Using low values to make sure that the Flush will happen
esSinkBuilder.setBulkFlushMaxActions(25);
esSinkBuilder.setBulkFlushInterval(1000);
esSinkBuilder.setBulkFlushMaxSizeMb(1);
esSinkBuilder.setBulkFlushBackoff(true);
esSinkBuilder.setRestClientFactory(restClientBuilder -> {});
LOG.info("Sink has been attached to the DataStream");
telemetries.addSink(esSinkBuilder.build());
}
Excluded things
- We managed to put Kafka, KDA and ElasticSearch under the same VPC and same subnets to avoid the need to sign each request
- From the logs we could see that the Flink can reach the ES cluster.
Request
{
"locationInformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)",
"logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge",
"message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
Response
{
"locationInformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)",
"logger": "org.elasticsearch.client.RestClient",
"message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]",
"threadName": "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, WindowCountFunction) -> (Sink: Print to Std. Out, Sink: Unnamed, Sink: Print to Std. Out) (2/2)",
"applicationARN": "arn:aws:kinesisanalytics:...",
"applicationVersionId": "39",
"messageSchemaVersion": "1",
"messageType": "DEBUG"
}
- We could also verify that the messages had been read from the Kafka topic and sent for processing by looking at the Flink Dashboard
What we have tried without luck
- We had implemented a
RichParallelSourceFunction
which emits 1_000_000 messages and then exits- This worked well in the Local environment
- The job finished in the AWS environment, but there was no data on the sink side
- We had implemented an other
RichParallelSourceFunction
which emits 100 messages at each second- Basically we had two loops a
while(true)
outer andfor
inner - After the inner loop we called the
Thread.sleep(1000)
- This worked perfectly fine on the local environment
- But in AWS we could see that checkpoints' size grow continuously and no message appeared in ELK
- Basically we had two loops a
- We have tried to run the KDA application with different parallelism settings
- But there was no difference
- We also tried to use different watermarking strategies (
forBoundedOutOfOrderness
,withIdle
,noWatermarks
)- But there was no difference
- We have added logs for the
ProcessWindowFunction
and for theElasticsearchSinkFunction
- Whenever we run the application from IDEA then these logs were on the console
- Whenever we run the application with KDA then there was no such logs in CloudWatch
- Those logs that were added to the
main
they do appear in the CloudWatch logs
- Those logs that were added to the
We suppose that we don't see data on the sink side because the window processing logic is not triggered. That's why don't see processing logs in the CloudWatch.
Any help would be more than welcome!
Update #1
- We have tried to downgrade the Flink version from 1.12.1 to 1.11.1
- There is no change
- We have tried processing time window instead of event time
- It did not even work on the local environment
Update #2
The average message size is around 4kb. Here is an excerpt of a sample message:
{
"affiliateCode": "...",
"appVersion": "1.1.14229",
"clientId": "guid",
"clientIpAddr": "...",
"clientOriginated": true,
"connectionType": "Cable/DSL",
"countryCode": "US",
"design": "...",
"device": "...",
...
"deviceSerialNumber": "...",
"dma": "UNKNOWN",
"eventSource": "...",
"firstRunTimestamp": 1609091112818,
"friendlyDeviceName": "Comcast",
"fullDevice": "Comcast ...",
"geoInfo": {
"continent": {
"code": "NA",
"geoname_id": 120
},
"country": {
"geoname_id": 123,
"iso_code": "US"
},
"location": {
"accuracy_radius": 100,
"latitude": 37.751,
"longitude": -97.822,
"time_zone": "America/Chicago"
},
"registered_country": {
"geoname_id": 123,
"iso_code": "US"
}
},
"height": 720,
"httpUserAgent": "Mozilla/...",
"isLoggedIn": true,
"launchCount": 19,
"model": "...",
"os": "Comcast...",
"osVersion": "...",
...
"platformTenantCode": "...",
"productCode": "...",
"requestOrigin": "https://....com",
"serverTimeUtc": 1617809474787,
"serviceCode": "...",
"serviceOriginated": false,
"sessionId": "guid",
"sessionSequence": 2,
"subtype": "...",
"tEventId": "...",
...
"tRegion": "us-east-1",
"timeZoneOffset": 5,
"timestamp": 1617809473305,
"traits": {
"isp": "Comcast Cable",
"organization": "..."
},
"type": "...",
"userId": "guid",
"version": "v1",
"width": 1280,
"xb3traceId": "guid"
}
We are using ObjectMapper
to parse only just some of the fields of the json. Here is how the Telemetry
class looks like:
public class Telemetry {
public String AppVersion;
public String CountryCode;
public String ClientId;
public String DeviceSerialNumber;
public String EventSource;
public String SessionId;
public TelemetrySubTypes SubType; //enum
public String TRegion;
public Long TimeStamp;
public TelemetryTypes Type; //enum
public String StateIso;
...
}
Update #3
Source
Subtasks tab
ID | Bytes received | Records received | Bytes sent | Records sent | Status |
---|---|---|---|---|---|
0 | 0 B | 0 | 0 B | 0 | RUNNING |
1 | 0 B | 0 | 2.83 MB | 15,000 | RUNNING |
Watermarks tab
No Data
Window
Subtasks tab
ID | Bytes received | Records received | Bytes sent | Records sent | Status |
---|---|---|---|---|---|
0 | 1.80 MB | 9,501 | 0 B | 0 | RUNNING |
1 | 1.04 MB | 5,499 | 0 B | 0 | RUNNING |
Watermarks
SubTask | Watermark |
---|---|
1 | No Watermark |
2 | No Watermark |
After having a support session with the AWS folks it turned out that we have missed to set the time characteristic on the streaming environment.
TimeCharacteristic
wasIngestionTime
.EventTime
:So, after we have set that
EventTime
explicitly then it started to generates watermarks like a charm: