Initializing empty zarr for parallel writing

179 Views Asked by At

I want to select subset area from ERA5 Zarr and write it to my own bucket. I want to do parallel processing.

I first read the data:

import xarray, fsspec, boto3, dask
import dask.array as da
from dask.distributed import Client, LocalCluster
from datetime import datetime

endpoint_url='xx'
target_bucket = 'era5.ecmwf-gcp.surface-areas.zarr'
s3_key = 'single-level-reanalysis-subset.zarr'
full_path = 's3://{}/{}'.format(target_bucket, s3_key)

access_key = 'xx'
access_secret = 'xx'

start = datetime.strptime('2015-01-01', '%Y-%m-%d')
end = datetime.strptime('2015-01-10', '%Y-%m-%d')

cluster = LocalCluster(processes=False, n_workers=16, threads_per_worker=2)

# open dataset
data_gs = xarray.open_zarr(
    'gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr/',
    chunks={'time': 48},
    consolidated=True,
)
# select time slice
data = data_gs.sel(time=slice(start, end))

# select area
lat_chunk = [50, 55]
lon_chunk = [10, 15]
condw_lat = da.logical_and((data.latitude >= min(lat_chunk)), (data.latitude <= max(lat_chunk)))
condw_lon = da.logical_and((data.longitude >= min(lon_chunk)), (data.longitude <= max(lon_chunk)))    
condw = da.logical_and(condw_lat, condw_lon).compute()
data = data.where(condw, drop=True)

print('Original data after selection:')
print(data)

Output:

<xarray.Dataset>
Dimensions:              (time: 217, values: 212)
Coordinates:
    depthBelowLandLayer  float64 100.0
    entireAtmosphere     float64 0.0
    latitude             (values) float64 54.94 54.94 54.94 ... 50.16 50.16
    longitude            (values) float64 10.31 10.78 11.25 ... 14.17 14.58 15.0
    number               int64 0
    step                 timedelta64[ns] 00:00:00
    surface              float64 0.0
  * time                 (time) datetime64[ns] 2015-01-01 ... 2015-01-10
    valid_time           (time) datetime64[ns] dask.array<chunksize=(24,), meta=np.ndarray>
Dimensions without coordinates: values
Data variables: (12/38)
    cape                 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    d2m                  (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    hcc                  (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    istl1                (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    istl2                (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    istl3                (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    ...                   ...
    tsn                  (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    u10                  (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    u100                 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    v10                  (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    v100                 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
    z                    (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
Attributes:
    Conventions:               CF-1.7
    GRIB_centre:               ecmf
    GRIB_centreDescription:    European Centre for Medium-Range Weather Forec...
    GRIB_edition:              1
    GRIB_subCentre:            0
    history:                   2022-09-23T18:56 GRIB to CDM+CF via cfgrib-0.9...
    institution:               European Centre for Medium-Range Weather Forec...
    pangeo-forge:inputs_hash:  5f4378143e9f42402424280b63472752da3aa79179b53b...
    pangeo-forge:recipe_hash:  0c3415923e347ce9dac9dc5c6d209525f4d45d799bd25b...
    pangeo-forge:version:      0.9.1

Then I initialise empty Zarr:

# Initialise empty data to zarr
s3_target = fsspec.get_mapper("s3://{}/{}".format(target_bucket, s3_key),
                            key=access_key,
                            secret=access_secret,
                            client_kwargs={
                            'endpoint_url': endpoint_url
                            })

empty = xarray.zeros_like(data)
empty = empty.chunk({'time': 64, 'values': empty.sizes['values']})
store = empty.to_zarr(s3_target, mode='w', compute=False, consolidated=True)

with Client(cluster) as client:
    client.compute(store)

# Open empty dataset from storage
data2 = xarray.open_dataset(full_path, 
                           engine='zarr', 
                           backend_kwargs={
                               'storage_options': {
                                   'endpoint_url': endpoint_url, 
                                   'key': access_key, 
                                   'secret': access_secret
                               }})

print('Empty read from zarr:')
print(data2)

Output:

<xarray.Dataset>
Dimensions:              (time: 217, values: 212)
Coordinates:
    depthBelowLandLayer  float64 ...
    entireAtmosphere     float64 ...
    latitude             (values) float64 ...
    longitude            (values) float64 ...
    number               int64 ...
    step                 timedelta64[ns] ...
    surface              float64 ...
  * time                 (time) datetime64[ns] 2015-01-01 ... 2015-01-10
    valid_time           (time) datetime64[ns] ...
Dimensions without coordinates: values
Data variables: (12/38)
    cape                 (time, values) float32 ...
    d2m                  (time, values) float32 ...
    hcc                  (time, values) float32 ...
    istl1                (time, values) float32 ...
    istl2                (time, values) float32 ...
    istl3                (time, values) float32 ...
    ...                   ...
    tsn                  (time, values) float32 ...
    u10                  (time, values) float32 ...
    u100                 (time, values) float32 ...
    v10                  (time, values) float32 ...
    v100                 (time, values) float32 ...
    z                    (time, values) float32 ...
Attributes:
    Conventions:               CF-1.7
    GRIB_centre:               ecmf
    GRIB_centreDescription:    European Centre for Medium-Range Weather Forec...
    GRIB_edition:              1
    GRIB_subCentre:            0
    history:                   2022-09-23T18:56 GRIB to CDM+CF via cfgrib-0.9...
    institution:               European Centre for Medium-Range Weather Forec...
    pangeo-forge:inputs_hash:  5f4378143e9f42402424280b63472752da3aa79179b53b...
    pangeo-forge:recipe_hash:  0c3415923e347ce9dac9dc5c6d209525f4d45d799bd25b...
    pangeo-forge:version:      0.9.1

And then try to copy some data into the empty zarr (the part to be parallelised):

# Copy data, save metadata and create dask graphs
start = datetime.strptime('2015-01-01', '%Y-%m-%d')
end = datetime.strptime('2015-01-02', '%Y-%m-%d')

# select time slice
data = data_gs.sel(time=slice(start, end))

# select area
lat_chunk = [50, 55]
lon_chunk = [10, 15]
condw_lat = da.logical_and((data.latitude >= min(lat_chunk)), (data.latitude <= max(lat_chunk)))
condw_lon = da.logical_and((data.longitude >= min(lon_chunk)), (data.longitude <= max(lon_chunk)))    
condw = da.logical_and(condw_lat, condw_lon).compute()
data = data.where(condw, drop=True)

copied = xarray.Dataset(coords=data.coords, attrs=data.attrs)
copied = copied.assign(data)
copied = copied.chunk({'time': 64, 'values': 'auto'})

region = {'time': slice(start.toordinal(), end.toordinal()),
          'values': slice(None, None)}
d_dropped = data.drop_vars(['entireAtmosphere', 'number', 'step', 'surface', 'depthBelowLandLayer'])
store = d_dropped.to_zarr(s3_target, mode='a', compute=False, consolidated=True, region=region)

with Client(cluster) as client:
    client.compute(store)

Put then I get ValueError:

Traceback (most recent call last):
  File "/app/min-example.py", line 88, in <module>
    store = d_dropped.to_zarr(s3_target, mode='a', compute=False, consolidated=True, region=region)
  File "/usr/local/lib/python3.10/dist-packages/xarray/core/dataset.py", line 2105, in to_zarr
    return to_zarr(  # type: ignore[call-overload,misc]
  File "/usr/local/lib/python3.10/dist-packages/xarray/backends/api.py", line 1654, in to_zarr
    dump_to_store(dataset, zstore, writer, encoding=encoding)
  File "/usr/local/lib/python3.10/dist-packages/xarray/backends/api.py", line 1263, in dump_to_store
    store.store(variables, attrs, check_encoding, writer, unlimited_dims=unlimited_dims)
  File "/usr/local/lib/python3.10/dist-packages/xarray/backends/zarr.py", line 604, in store
    _validate_existing_dims(
  File "/usr/local/lib/python3.10/dist-packages/xarray/backends/zarr.py", line 333, in _validate_existing_dims
    raise ValueError(
ValueError: variable 'swvl3' already exists with different dimension sizes: {'time': 0, 'values': 212} != {'time': 25, 'values': 212}. to_zarr() only supports changing dimension sizes when explicitly appending, but append_dim=None.

What I am doing wrong?

0

There are 0 best solutions below