I'm using dagster and azure to download files from a SFTP server to our blob storage. this is our Azure resource:

class AzureBlobStorageResource(ConfigurableResource):
    storageacc: str

    def get_blob_service_client(self):
        try:
            if is_bitbucket_docker_run_environment is False:
                # Create the BlobServiceClient object
                account_url = f"https://{self.storageacc}.blob.core.windows.net"
                credential = get_credential()
                blob_service_client = BlobServiceClient(
                    account_url, credential=credential
                )
                return blob_service_client
        except Exception:
            return None

    def download_file_obj(self, container_name: str, filename: str) -> str:
        blob_service_client = self.get_blob_service_client()
        return blob_service_client.get_blob_client(
            container_name, filename
        ).download_blob(encoding="utf8")

our dagster asset:

@multi_asset(
    outs={
        "data": AssetOut(
            key_prefix=["stfp"], io_manager_key="adls2_pickle_io_manager"
        ),
        "metadata": AssetOut(
            key_prefix=["stfp"], io_manager_key="adls2_pickle_io_manager"
        ),
    },
    partitions_def=partitions_def,
    compute_kind="pandas",
)
def source_files(
        context: OpExecutionContext, 
        stfp: SftpResource,
        azurebs: AzureBlobStorageResource
    ) -> Tuple[PandasDF, PandasDF]:

    start, end = context.partition_time_window
    context.log.info(f"Processing asset partition '{start}-{end}'")
    
    path = (start).strftime("%Y%m/%d")
    context.log.info(path)
    files = whiffle.sftp_list_files(f"{INCOMING_REMOTE_DIR}/{path}")
    if len(files) > 1:
        context.log.debug(f"More than 1 file in path {path}")
    elif len(files) == 0:
        context.log.debug(f"No file found in path {path}")
    else:
        remote_file = f"{INCOMING_REMOTE_DIR}/{path}/{files[0]}"
        context.log.info(f"Downloading file to Blob Storage: {remote_file}")
        blob_file_path = f"/{path}/{files[0]}"
        stfp.from_sftp_to_blob(remote_file, blob_file_path)

    context.log.info(f"Ingesting file to deltalake: {blob_file_path}")

    downloaded_blob = azurebs.download_file_obj(
        container_name=BLOB_CONTAINER, 
        filename=BLOB_DIRECTORY+blob_file_path
    )

    shortid = str(uuid.uuid4())[:7]
    data = pd.read_csv(io.StringIO(downloaded_blob.readall()))
    data["startdatetime_utc"] = pd.to_datetime(data["startdatetime_utc"], utc=True)
    data["id"] = shortid
    context.log.info(f"dataframe shape: {data.shape}")
    context.log.info(
        f"min-max datetime: {data.startdatetime_utc.min()}-{data.startdatetime_utc.max()}"
    )
    context.add_output_metadata(
        metadata={"no_records": len(data)}, output_name="data"
    )
    context.add_output_metadata(
        metadata={"sum_value": data.forecast_raw_kw.sum()}, output_name="forecasts"
    )

    filename = files[0]
    context.log.info(filename)
    metadata = pd.DataFrame({
        "id": [shortid], 
        "loaded_at": [datetime.now(timezone.utc)],
        "filename": [filename],
    })
        
    context.log.info(f"{len(forecasts)}")
    context.log.info(data.sample(1))
    
    context.log.info(metadata.sample(1))
    context.log.info(f"metadata dataframe shape: {metadata.shape}")
    context.add_output_metadata(
        metadata={"no_records": len(metadata)},
        output_name="metadata",
    )

    return data, metadata

The pipeline runs fine and normally, and then it ran into this issue:

dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "data" of step "source_files":

  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 482, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event):
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 533, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output):
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 724, in _store_output
    for elt in iterate_with_context(
  File "/usr/local/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 445, in iterate_with_context
    return
  File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
azure.core.exceptions.ResourceExistsError: The specified path, or an element of the path, exists and its resource type is invalid for this operation.
RequestId:6b583d4d-501f-0098-135f-15cacf000000
Time:2023-11-12T11:55:00.4663911Z
ErrorCode:PathConflict

  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 443, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 714, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/usr/local/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 453, in handle_output
    self.dump_to_path(context=context, obj=obj, path=path)
  File "/usr/local/lib/python3.9/site-packages/dagster_azure/adls2/io_manager.py", line 104, in dump_to_path
    file = self.file_system_client.create_file(str(path))
  File "/usr/local/lib/python3.9/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_file_system_client.py", line 835, in create_file
    file_client.create_file(**kwargs)
  File "/usr/local/lib/python3.9/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_data_lake_file_client.py", line 225, in create_file
    return self._create('file', content_settings=content_settings, metadata=metadata, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_path_client.py", line 301, in _create
    process_storage_error(error)
  File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_deserialize.py", line 221, in process_storage_error
    exec("raise error from None")   # pylint: disable=exec-used # nosec
  File "<string>", line 1, in <module>
  File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_path_client.py", line 299, in _create
    return self._client.path.create(**options)
  File "/usr/local/lib/python3.9/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_generated/operations/_path_operations.py", line 1170, in create
    map_error(status_code=response.status_code, response=response, error_map=error_map)
  File "/usr/local/lib/python3.9/site-packages/azure/core/exceptions.py", line 165, in map_error
    raise error

Note that I cannot reproduce this issue; when I reran the pipeline on command, the file on Azure Blob Storage got overwritten normally. The error was also documented widely, I can only find it here: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update?view=rest-storageservices-datalakestoragegen2-2019-12-12

My packages:

adlfs==2023.10.0
azure-storage-blob==12.18.3
dagster-azure==0.20.15
dagster==1.4.15
python==3.8
0

There are 0 best solutions below