I have been trying to understand how to batch things in benthos but am a bit confused on how to do it..
Take this example:
input:
generate:
interval: ""
count: 40
mapping: |
root = count("test")
pipeline:
processors:
- log:
level: INFO
message: 'Test! ${! (this) } ${! (this % 2 == 0) } ${! batch_size() }'
- group_by_value:
value: ${! (this % 2 == 0) }
- archive:
format: tar
- compress:
algorithm: gzip
output:
file:
path: test/${! (this % 2 == 0) }.tar.gz
codec: all-bytes
My expectation with this would be 2 files in test/.. one called "true.tar" and another called "false.tar", with 20 elements each, (odd and even numbers). What I get instead is a single file with the last message. I understand from the logs that it is not actually batching these based on that condition
I thought group_by_value would kind of create "two streams/batches" of messages that would get separately handled in the output/archive, but it looks like it doesn't behave like that
Could you please help me understand how it works?
additionally, I was also going to limit the size of each of these streams to a certain number, so each would get their number of entries in the TAR limited
Thanks!!
EDIT 1
This is something which works more like expected, but this was I have to "know" how many items I want to batch before actually being able to filter them.. I wonder if I can't just "accumulate" things based on this group_by_value condition and batch them based on a count later?
input:
broker:
inputs:
- generate:
interval: ""
count: 40
mapping: |
root = count("test")
batching:
count: 40
pipeline:
processors:
- group_by_value:
value: ${! (this % 2 == 0) }
- log:
level: INFO
message: 'Test! ${! (this) } ${! (this % 2 == 0) } ${! batch_size() }'
- bloblang: |
meta name = (this) % 2 == 0
- archive:
format: tar
path: ${! (this) }
output:
file:
path: test/${! (meta("name")) }.tar
codec: all-bytes
As you already noticed,
group_by_valueoperates on message batches, which is why your first example produces a single file as output. In fact, it produces a file for each message, but since the file name is identical, each new file ends up overwriting the previous one.From your edit, I'm not sure I get what you're trying to achieve. The batch policy documentation explains that
byte_size,countandperiodare the available conditions for composing batches. When either of those is met, a batch is flushed, so you don't necessarily have to rely on a specific count. For convenience, thebatchingpolicy also has aprocessorsfield, which allows you to define an optional list of processors to apply to each batch before it is flushed.The windowed processing documentation might also be of interest, since it explains how the
system_windowbuffer can be used to chop a stream of messages into tumbling or sliding windows of fixed temporal size. It has a section on grouping here.Update 22.02.2022: Here's an example of how to perform output batching based on some key, as requested in the comments:
Sample output: