I'm experimenting with Apache Beam, and struggling with correclty handling dataframe to pcollection conversion. I've tested implementation of left anti-join with following logics:
def left_anti_join(
left: DeferredDataFrame,
right: DeferredDataFrame,
left_index: str,
right_index: str, ) -> DeferredDataFrame:
inner_joined_df = left.merge(
right.set_index(right_index),
right_index=True,
left_on=left_index,
how="left",
indicator=True,
)
return inner_joined_df[inner_joined_df._merge == "left_only"].drop("_merge", axis=1)
After executing this function i'm turning its output into p_collection with:
data = left_anti_join(df1,df2,"id", "id")
to_pcollection(data)
Everything works fine, until Deffered DataFrame returned by my "left_anti_join" function is empty.
If that happens i keep getting following error:
/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/opcounters.py:226: RuntimeWarning: invalid value encountered in long_scalars
mean_element_size = self.producer_batch_converter.estimate_byte_size(
Traceback (most recent call last):
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1417, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 623, in invoke_process
self.output_handler.handle_process_outputs(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1600, in handle_process_outputs
self._write_batch_to_tag(tag, windowed_batch, watermark_estimator)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1704, in _write_batch_to_tag
self.main_receivers.receive_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 365, in receive_batch
self.update_counters_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 206, in update_counters_batch
self.opcounter.update_from_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/opcounters.py", line 228, in update_from_batch
self.mean_byte_counter.update_n(mean_element_size, batch_length)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/utils/counters.py", line 222, in update_n
self._fast_add_input_n(value, n)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/transforms/cy_combiners.py", line 213, in add_input_n
element = int(element)
ValueError: cannot convert float NaN to integer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mydir/beam_jobs/test_job.py", line 163, in <module>
with beam.Pipeline(options=beam_options) as p:
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/pipeline.py", line 597, in __exit__
self.result = self.run()
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/pipeline.py", line 574, in run
return self.runner.run_pipeline(self, self._options)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 199, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 442, in run_stages
bundle_results = self._execute_bundle(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _execute_bundle
self._run_bundle(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 999, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1309, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
response = self.worker.do_instruction(request)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
return getattr(self, request_type)(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1009, in process_bundle
op.finish()
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 943, in finish
self.dofn_runner.finish()
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1479, in finish
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1460, in _invoke_bundle_method
self._reraise_augmented(exn)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1491, in _reraise_augmented
raise exn
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1458, in _invoke_bundle_method
bundle_method()
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 566, in invoke_finish_bundle
self.output_handler.finish_bundle_outputs(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1745, in finish_bundle_outputs
self.main_receivers.receive(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 240, in receive
self.consumer.process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 908, in process
delayed_applications = self.dofn_runner.process(o)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1419, in process
self._reraise_augmented(exn)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1491, in _reraise_augmented
raise exn
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1417, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 837, in invoke_process
self._invoke_process_per_window(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 981, in _invoke_process_per_window
self.output_handler.handle_process_outputs(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1581, in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1696, in _write_value_to_tag
self.tagged_receivers[tag].receive(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 240, in receive
self.consumer.process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 908, in process
delayed_applications = self.dofn_runner.process(o)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1419, in process
self._reraise_augmented(exn)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1507, in _reraise_augmented
raise new_exn.with_traceback(tb)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1417, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 623, in invoke_process
self.output_handler.handle_process_outputs(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1600, in handle_process_outputs
self._write_batch_to_tag(tag, windowed_batch, watermark_estimator)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1704, in _write_batch_to_tag
self.main_receivers.receive_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 365, in receive_batch
self.update_counters_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 206, in update_counters_batch
self.opcounter.update_from_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/opcounters.py", line 228, in update_from_batch
self.mean_byte_counter.update_n(mean_element_size, batch_length)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/utils/counters.py", line 222, in update_n
self._fast_add_input_n(value, n)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/transforms/cy_combiners.py", line 213, in add_input_n
element = int(element)
ValueError: cannot convert float NaN to integer [while running 'Unbatch 'drop_DataFrame_6039969152'']
What's the correct way of handling such a cases of empty dataframes? I've been trying to count number of elements before transforming it to PCollection but it feels inefficient