Hash Join and Sort merger exception in Apache Flink

686 Views Asked by At

Cluster Infra:

We have Flink standalone cluster with 4 nodes each with 16 cores of CPU and 32Gb of Physical memory out of which 16 GB is set to Flink Managed memory and rest all is set to UDFs and Java Heap. Hence Per slot, we have assigned 1 core and 1GB of ram.

Scenario Description:

We are trying to join two Datasets, A and B where Dataset A is a tuple of <String, ArrayList> and Dataset B has a custom POJO and joining key for both the data set is String.

For both the dataset size is not guaranteed, at one point of time A can be large whereas at another point of time Dataset B can be larger. Also it is highly possible that one dataset can have the multiple list of duplicate entries to it.

for example : Dataset A has information of <String, LocationClass> size = 51 mb
Dataset B may have information of size = 171 mb
joining key: Location example, Mumbai, NewYork etc.

Hence to join this we have chosen a joinHint Strategy as Repartition_Hash_First. this strategy works fine sometimes and sometimes it throws the below exception,

java.lang.RuntimeException: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident.
Probable cause Too many duplicate keys.

So we have tried to go with Repartition_Hash_Second but the results are same.

Hence as per my understanding Flink internally creates a hash table for the side which provide viz, First or Second and another side of data gets iterated over to hash table and vice versa and since one of the key has lots of data which couldn't accommodate into actual memory of flink while createing a hashtable it is throwing an exception of too many duplicate keys.

So at second step we have tried to implement this with Repartition_Sort_merge and we got below exception,

java.lang.Exception:caused an error obtaining the sort input. the record exceeds maximum size of sort buffer.

Can anyone please suggest me if we need to increase the flink managed memory to 2 gb or even more than that ? or shall we opt some different strategy to handle this problem ?

1

There are 1 best solutions below

2
On BEST ANSWER

Seems pretty clear to me that your problem is having a duplicate group that is too big.

Also, the duplicate group might be on both sides, yielding a O(n^2) size for that group, n being the max duplicate group size.

I'd advise that you deduplicate both sides beforehand if that's possible, using something like DeduplicateKeepLastRowFunction for example. Or build finer keys with additional data from your rows.