I want to select subset area from ERA5 Zarr and write it to my own bucket. I want to do parallel processing.
I first read the data:
import xarray, fsspec, boto3, dask
import dask.array as da
from dask.distributed import Client, LocalCluster
from datetime import datetime
endpoint_url='xx'
target_bucket = 'era5.ecmwf-gcp.surface-areas.zarr'
s3_key = 'single-level-reanalysis-subset.zarr'
full_path = 's3://{}/{}'.format(target_bucket, s3_key)
access_key = 'xx'
access_secret = 'xx'
start = datetime.strptime('2015-01-01', '%Y-%m-%d')
end = datetime.strptime('2015-01-10', '%Y-%m-%d')
cluster = LocalCluster(processes=False, n_workers=16, threads_per_worker=2)
# open dataset
data_gs = xarray.open_zarr(
'gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr/',
chunks={'time': 48},
consolidated=True,
)
# select time slice
data = data_gs.sel(time=slice(start, end))
# select area
lat_chunk = [50, 55]
lon_chunk = [10, 15]
condw_lat = da.logical_and((data.latitude >= min(lat_chunk)), (data.latitude <= max(lat_chunk)))
condw_lon = da.logical_and((data.longitude >= min(lon_chunk)), (data.longitude <= max(lon_chunk)))
condw = da.logical_and(condw_lat, condw_lon).compute()
data = data.where(condw, drop=True)
print('Original data after selection:')
print(data)
Output:
<xarray.Dataset>
Dimensions: (time: 217, values: 212)
Coordinates:
depthBelowLandLayer float64 100.0
entireAtmosphere float64 0.0
latitude (values) float64 54.94 54.94 54.94 ... 50.16 50.16
longitude (values) float64 10.31 10.78 11.25 ... 14.17 14.58 15.0
number int64 0
step timedelta64[ns] 00:00:00
surface float64 0.0
* time (time) datetime64[ns] 2015-01-01 ... 2015-01-10
valid_time (time) datetime64[ns] dask.array<chunksize=(24,), meta=np.ndarray>
Dimensions without coordinates: values
Data variables: (12/38)
cape (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
d2m (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
hcc (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
istl1 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
istl2 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
istl3 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
... ...
tsn (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
u10 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
u100 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
v10 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
v100 (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
z (time, values) float32 dask.array<chunksize=(24, 212), meta=np.ndarray>
Attributes:
Conventions: CF-1.7
GRIB_centre: ecmf
GRIB_centreDescription: European Centre for Medium-Range Weather Forec...
GRIB_edition: 1
GRIB_subCentre: 0
history: 2022-09-23T18:56 GRIB to CDM+CF via cfgrib-0.9...
institution: European Centre for Medium-Range Weather Forec...
pangeo-forge:inputs_hash: 5f4378143e9f42402424280b63472752da3aa79179b53b...
pangeo-forge:recipe_hash: 0c3415923e347ce9dac9dc5c6d209525f4d45d799bd25b...
pangeo-forge:version: 0.9.1
Then I initialise empty Zarr:
# Initialise empty data to zarr
s3_target = fsspec.get_mapper("s3://{}/{}".format(target_bucket, s3_key),
key=access_key,
secret=access_secret,
client_kwargs={
'endpoint_url': endpoint_url
})
empty = xarray.zeros_like(data)
empty = empty.chunk({'time': 64, 'values': empty.sizes['values']})
store = empty.to_zarr(s3_target, mode='w', compute=False, consolidated=True)
with Client(cluster) as client:
client.compute(store)
# Open empty dataset from storage
data2 = xarray.open_dataset(full_path,
engine='zarr',
backend_kwargs={
'storage_options': {
'endpoint_url': endpoint_url,
'key': access_key,
'secret': access_secret
}})
print('Empty read from zarr:')
print(data2)
Output:
<xarray.Dataset>
Dimensions: (time: 217, values: 212)
Coordinates:
depthBelowLandLayer float64 ...
entireAtmosphere float64 ...
latitude (values) float64 ...
longitude (values) float64 ...
number int64 ...
step timedelta64[ns] ...
surface float64 ...
* time (time) datetime64[ns] 2015-01-01 ... 2015-01-10
valid_time (time) datetime64[ns] ...
Dimensions without coordinates: values
Data variables: (12/38)
cape (time, values) float32 ...
d2m (time, values) float32 ...
hcc (time, values) float32 ...
istl1 (time, values) float32 ...
istl2 (time, values) float32 ...
istl3 (time, values) float32 ...
... ...
tsn (time, values) float32 ...
u10 (time, values) float32 ...
u100 (time, values) float32 ...
v10 (time, values) float32 ...
v100 (time, values) float32 ...
z (time, values) float32 ...
Attributes:
Conventions: CF-1.7
GRIB_centre: ecmf
GRIB_centreDescription: European Centre for Medium-Range Weather Forec...
GRIB_edition: 1
GRIB_subCentre: 0
history: 2022-09-23T18:56 GRIB to CDM+CF via cfgrib-0.9...
institution: European Centre for Medium-Range Weather Forec...
pangeo-forge:inputs_hash: 5f4378143e9f42402424280b63472752da3aa79179b53b...
pangeo-forge:recipe_hash: 0c3415923e347ce9dac9dc5c6d209525f4d45d799bd25b...
pangeo-forge:version: 0.9.1
And then try to copy some data into the empty zarr (the part to be parallelised):
# Copy data, save metadata and create dask graphs
start = datetime.strptime('2015-01-01', '%Y-%m-%d')
end = datetime.strptime('2015-01-02', '%Y-%m-%d')
# select time slice
data = data_gs.sel(time=slice(start, end))
# select area
lat_chunk = [50, 55]
lon_chunk = [10, 15]
condw_lat = da.logical_and((data.latitude >= min(lat_chunk)), (data.latitude <= max(lat_chunk)))
condw_lon = da.logical_and((data.longitude >= min(lon_chunk)), (data.longitude <= max(lon_chunk)))
condw = da.logical_and(condw_lat, condw_lon).compute()
data = data.where(condw, drop=True)
copied = xarray.Dataset(coords=data.coords, attrs=data.attrs)
copied = copied.assign(data)
copied = copied.chunk({'time': 64, 'values': 'auto'})
region = {'time': slice(start.toordinal(), end.toordinal()),
'values': slice(None, None)}
d_dropped = data.drop_vars(['entireAtmosphere', 'number', 'step', 'surface', 'depthBelowLandLayer'])
store = d_dropped.to_zarr(s3_target, mode='a', compute=False, consolidated=True, region=region)
with Client(cluster) as client:
client.compute(store)
Put then I get ValueError:
Traceback (most recent call last):
File "/app/min-example.py", line 88, in <module>
store = d_dropped.to_zarr(s3_target, mode='a', compute=False, consolidated=True, region=region)
File "/usr/local/lib/python3.10/dist-packages/xarray/core/dataset.py", line 2105, in to_zarr
return to_zarr( # type: ignore[call-overload,misc]
File "/usr/local/lib/python3.10/dist-packages/xarray/backends/api.py", line 1654, in to_zarr
dump_to_store(dataset, zstore, writer, encoding=encoding)
File "/usr/local/lib/python3.10/dist-packages/xarray/backends/api.py", line 1263, in dump_to_store
store.store(variables, attrs, check_encoding, writer, unlimited_dims=unlimited_dims)
File "/usr/local/lib/python3.10/dist-packages/xarray/backends/zarr.py", line 604, in store
_validate_existing_dims(
File "/usr/local/lib/python3.10/dist-packages/xarray/backends/zarr.py", line 333, in _validate_existing_dims
raise ValueError(
ValueError: variable 'swvl3' already exists with different dimension sizes: {'time': 0, 'values': 212} != {'time': 25, 'values': 212}. to_zarr() only supports changing dimension sizes when explicitly appending, but append_dim=None.
What I am doing wrong?