Apache Beam - Convert empty Dataframe back to PCollection

198 Views Asked by At

I'm experimenting with Apache Beam, and struggling with correclty handling dataframe to pcollection conversion. I've tested implementation of left anti-join with following logics:

def left_anti_join(
    left: DeferredDataFrame,
    right: DeferredDataFrame,
    left_index: str,
    right_index: str, ) -> DeferredDataFrame:

    inner_joined_df = left.merge(
        right.set_index(right_index),
        right_index=True,
        left_on=left_index,
        how="left",
        indicator=True,
    )
    return inner_joined_df[inner_joined_df._merge == "left_only"].drop("_merge", axis=1)

After executing this function i'm turning its output into p_collection with:

data = left_anti_join(df1,df2,"id", "id")
to_pcollection(data)

Everything works fine, until Deffered DataFrame returned by my "left_anti_join" function is empty.

If that happens i keep getting following error:

/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/opcounters.py:226: RuntimeWarning: invalid value encountered in long_scalars
  mean_element_size = self.producer_batch_converter.estimate_byte_size(
Traceback (most recent call last):
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1417, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 623, in invoke_process
    self.output_handler.handle_process_outputs(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1600, in handle_process_outputs
    self._write_batch_to_tag(tag, windowed_batch, watermark_estimator)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1704, in _write_batch_to_tag
    self.main_receivers.receive_batch(windowed_batch)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 365, in receive_batch
    self.update_counters_batch(windowed_batch)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 206, in update_counters_batch
    self.opcounter.update_from_batch(windowed_batch)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/opcounters.py", line 228, in update_from_batch
    self.mean_byte_counter.update_n(mean_element_size, batch_length)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/utils/counters.py", line 222, in update_n
    self._fast_add_input_n(value, n)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/transforms/cy_combiners.py", line 213, in add_input_n
    element = int(element)
ValueError: cannot convert float NaN to integer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/mydir/beam_jobs/test_job.py", line 163, in <module>
    with beam.Pipeline(options=beam_options) as p:
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result = self.run()
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/pipeline.py", line 574, in run
    return self.runner.run_pipeline(self, self._options)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 199, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 442, in run_stages
    bundle_results = self._execute_bundle(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _execute_bundle
    self._run_bundle(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 999, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1309, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
    response = self.worker.do_instruction(request)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
    return getattr(self, request_type)(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1009, in process_bundle
    op.finish()
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 943, in finish
    self.dofn_runner.finish()
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1479, in finish
    self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1460, in _invoke_bundle_method
    self._reraise_augmented(exn)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1491, in _reraise_augmented
    raise exn
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1458, in _invoke_bundle_method
    bundle_method()
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 566, in invoke_finish_bundle
    self.output_handler.finish_bundle_outputs(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1745, in finish_bundle_outputs
    self.main_receivers.receive(windowed_value)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 240, in receive
    self.consumer.process(windowed_value)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 908, in process
    delayed_applications = self.dofn_runner.process(o)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1419, in process
    self._reraise_augmented(exn)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1491, in _reraise_augmented
    raise exn
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1417, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 837, in invoke_process
    self._invoke_process_per_window(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 981, in _invoke_process_per_window
    self.output_handler.handle_process_outputs(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1581, in handle_process_outputs
    self._write_value_to_tag(tag, windowed_value, watermark_estimator)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1696, in _write_value_to_tag
    self.tagged_receivers[tag].receive(windowed_value)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 240, in receive
    self.consumer.process(windowed_value)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 908, in process
    delayed_applications = self.dofn_runner.process(o)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1419, in process
    self._reraise_augmented(exn)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1507, in _reraise_augmented
    raise new_exn.with_traceback(tb)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1417, in process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 623, in invoke_process
    self.output_handler.handle_process_outputs(
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1600, in handle_process_outputs
    self._write_batch_to_tag(tag, windowed_batch, watermark_estimator)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1704, in _write_batch_to_tag
    self.main_receivers.receive_batch(windowed_batch)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 365, in receive_batch
    self.update_counters_batch(windowed_batch)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 206, in update_counters_batch
    self.opcounter.update_from_batch(windowed_batch)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/opcounters.py", line 228, in update_from_batch
    self.mean_byte_counter.update_n(mean_element_size, batch_length)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/utils/counters.py", line 222, in update_n
    self._fast_add_input_n(value, n)
  File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/transforms/cy_combiners.py", line 213, in add_input_n
    element = int(element)
ValueError: cannot convert float NaN to integer [while running 'Unbatch 'drop_DataFrame_6039969152'']

What's the correct way of handling such a cases of empty dataframes? I've been trying to count number of elements before transforming it to PCollection but it feels inefficient

0

There are 0 best solutions below