I'm using dagster and azure to download files from a SFTP server to our blob storage. this is our Azure resource:
class AzureBlobStorageResource(ConfigurableResource):
storageacc: str
def get_blob_service_client(self):
try:
if is_bitbucket_docker_run_environment is False:
# Create the BlobServiceClient object
account_url = f"https://{self.storageacc}.blob.core.windows.net"
credential = get_credential()
blob_service_client = BlobServiceClient(
account_url, credential=credential
)
return blob_service_client
except Exception:
return None
def download_file_obj(self, container_name: str, filename: str) -> str:
blob_service_client = self.get_blob_service_client()
return blob_service_client.get_blob_client(
container_name, filename
).download_blob(encoding="utf8")
our dagster asset:
@multi_asset(
outs={
"data": AssetOut(
key_prefix=["stfp"], io_manager_key="adls2_pickle_io_manager"
),
"metadata": AssetOut(
key_prefix=["stfp"], io_manager_key="adls2_pickle_io_manager"
),
},
partitions_def=partitions_def,
compute_kind="pandas",
)
def source_files(
context: OpExecutionContext,
stfp: SftpResource,
azurebs: AzureBlobStorageResource
) -> Tuple[PandasDF, PandasDF]:
start, end = context.partition_time_window
context.log.info(f"Processing asset partition '{start}-{end}'")
path = (start).strftime("%Y%m/%d")
context.log.info(path)
files = whiffle.sftp_list_files(f"{INCOMING_REMOTE_DIR}/{path}")
if len(files) > 1:
context.log.debug(f"More than 1 file in path {path}")
elif len(files) == 0:
context.log.debug(f"No file found in path {path}")
else:
remote_file = f"{INCOMING_REMOTE_DIR}/{path}/{files[0]}"
context.log.info(f"Downloading file to Blob Storage: {remote_file}")
blob_file_path = f"/{path}/{files[0]}"
stfp.from_sftp_to_blob(remote_file, blob_file_path)
context.log.info(f"Ingesting file to deltalake: {blob_file_path}")
downloaded_blob = azurebs.download_file_obj(
container_name=BLOB_CONTAINER,
filename=BLOB_DIRECTORY+blob_file_path
)
shortid = str(uuid.uuid4())[:7]
data = pd.read_csv(io.StringIO(downloaded_blob.readall()))
data["startdatetime_utc"] = pd.to_datetime(data["startdatetime_utc"], utc=True)
data["id"] = shortid
context.log.info(f"dataframe shape: {data.shape}")
context.log.info(
f"min-max datetime: {data.startdatetime_utc.min()}-{data.startdatetime_utc.max()}"
)
context.add_output_metadata(
metadata={"no_records": len(data)}, output_name="data"
)
context.add_output_metadata(
metadata={"sum_value": data.forecast_raw_kw.sum()}, output_name="forecasts"
)
filename = files[0]
context.log.info(filename)
metadata = pd.DataFrame({
"id": [shortid],
"loaded_at": [datetime.now(timezone.utc)],
"filename": [filename],
})
context.log.info(f"{len(forecasts)}")
context.log.info(data.sample(1))
context.log.info(metadata.sample(1))
context.log.info(f"metadata dataframe shape: {metadata.shape}")
context.add_output_metadata(
metadata={"no_records": len(metadata)},
output_name="metadata",
)
return data, metadata
The pipeline runs fine and normally, and then it ran into this issue:
dagster._core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "data" of step "source_files":
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 482, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event):
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 533, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output):
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 724, in _store_output
for elt in iterate_with_context(
File "/usr/local/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 445, in iterate_with_context
return
File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
azure.core.exceptions.ResourceExistsError: The specified path, or an element of the path, exists and its resource type is invalid for this operation.
RequestId:6b583d4d-501f-0098-135f-15cacf000000
Time:2023-11-12T11:55:00.4663911Z
ErrorCode:PathConflict
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/usr/local/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 443, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/execute_step.py", line 714, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/usr/local/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 453, in handle_output
self.dump_to_path(context=context, obj=obj, path=path)
File "/usr/local/lib/python3.9/site-packages/dagster_azure/adls2/io_manager.py", line 104, in dump_to_path
file = self.file_system_client.create_file(str(path))
File "/usr/local/lib/python3.9/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_file_system_client.py", line 835, in create_file
file_client.create_file(**kwargs)
File "/usr/local/lib/python3.9/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_data_lake_file_client.py", line 225, in create_file
return self._create('file', content_settings=content_settings, metadata=metadata, **kwargs)
File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_path_client.py", line 301, in _create
process_storage_error(error)
File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_deserialize.py", line 221, in process_storage_error
exec("raise error from None") # pylint: disable=exec-used # nosec
File "<string>", line 1, in <module>
File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_path_client.py", line 299, in _create
return self._client.path.create(**options)
File "/usr/local/lib/python3.9/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/azure/storage/filedatalake/_generated/operations/_path_operations.py", line 1170, in create
map_error(status_code=response.status_code, response=response, error_map=error_map)
File "/usr/local/lib/python3.9/site-packages/azure/core/exceptions.py", line 165, in map_error
raise error
Note that I cannot reproduce this issue; when I reran the pipeline on command, the file on Azure Blob Storage got overwritten normally. The error was also documented widely, I can only find it here: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update?view=rest-storageservices-datalakestoragegen2-2019-12-12
My packages:
adlfs==2023.10.0
azure-storage-blob==12.18.3
dagster-azure==0.20.15
dagster==1.4.15
python==3.8