Lazy evaluation on Dask arrays

If you are unfamiliar with Dask, read Parallel computing with Dask in Xarray documentation first.

Recall that the regridding process is divided in two steps : computing the weights and applying the weights. Dask support is much more advanced for the latter, and this what the first part of this notebook is about.

Dask allows lazy evaluation and out-of-core computing, to allow processing large volumes of data with limited memory. You may also get a speed-up by parallelizing the process in some cases, but a general rule of thumb is that if the data fits in memory, regridding will be faster without dask.

[1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import dask.array as da  # need to have dask.array installed, although not directly using it here.
import xarray as xr
import xesmf as xe

A simple example

Prepare input data

[2]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 500})
ds
[2]:
<xarray.Dataset>
Dimensions:  (lat: 25, time: 2920, lon: 53)
Coordinates:
  * lat      (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0
  * lon      (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air      (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray>
Attributes:
    Conventions:  COARDS
    title:        4x daily NMC reanalysis (1948)
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
[3]:
ds.chunks
[3]:
Frozen({'time': (500, 500, 500, 500, 500, 420), 'lat': (25,), 'lon': (53,)})
[4]:
ds["air"].data
[4]:
Array Chunk
Bytes 14.76 MiB 2.53 MiB
Shape (2920, 25, 53) (500, 25, 53)
Dask graph 6 chunks in 2 graph layers
Data type float32 numpy.ndarray
53 25 2920

Build regridder

[5]:
ds_out = xr.Dataset(
    {
        "lat": (["lat"], np.arange(16, 75, 1.0)),
        "lon": (["lon"], np.arange(200, 330, 1.5)),
    }
)

regridder = xe.Regridder(ds, ds_out, "bilinear")
regridder
[5]:
xESMF Regridder
Regridding algorithm:       bilinear
Weight filename:            bilinear_25x53_59x87.nc
Reuse pre-computed weights? False
Input grid shape:           (25, 53)
Output grid shape:          (59, 87)
Periodic in longitude?      False

Apply to xarray Dataset/DataArray

[7]:
# only build the dask graph; actual computation happens later when calling compute()
%time ds_out = regridder(ds)
ds_out
CPU times: user 8.17 ms, sys: 2.32 ms, total: 10.5 ms
Wall time: 10.2 ms
[7]:
<xarray.Dataset>
Dimensions:  (time: 2920, lat: 59, lon: 87)
Coordinates:
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
  * lat      (lat) float64 16.0 17.0 18.0 19.0 20.0 ... 70.0 71.0 72.0 73.0 74.0
  * lon      (lon) float64 200.0 201.5 203.0 204.5 ... 324.5 326.0 327.5 329.0
Data variables:
    air      (time, lat, lon) float32 dask.array<chunksize=(500, 59, 87), meta=np.ndarray>
Attributes:
    regrid_method:  bilinear
[8]:
ds_out["air"].data
[8]:
Array Chunk
Bytes 57.18 MiB 9.79 MiB
Shape (2920, 59, 87) (500, 59, 87)
Dask graph 6 chunks in 8 graph layers
Data type float32 numpy.ndarray
87 59 2920
[9]:
%time result = ds_out['air'].compute()  # actually applies regridding
CPU times: user 756 ms, sys: 155 ms, total: 911 ms
Wall time: 600 ms
[10]:
type(result.data), result.data.shape
[10]:
(numpy.ndarray, (2920, 59, 87))

Chunking behaviour

xESMF will adjust its default behaviour according to the input data. On spatial dimensions where the data has only one chunk, the output of a Regridder call will also have only one chunk, no matter the new dimension size. This like the previous example.

However, if the input has more than one chunk along a spatial dimension, then the regridder will try to preserve the chunk size. When upscaling data, this means the number of chunks increases and with it the number of dask tasks added to the graph. This can actually decrease performance if the graph becomes too large, filled up with many small tasks.

One can always override xESMF’s default behaviour by passing output_chunks to the Regridder call.

In the example below, the input has three chunks along lon:

[13]:
ds_3lon = ds.chunk({"lat": 25, "lon": 25, "time": -1})
ds_3lon.air.data
[13]:
Array Chunk
Bytes 14.76 MiB 6.96 MiB
Shape (2920, 25, 53) (2920, 25, 25)
Dask graph 3 chunks in 3 graph layers
Data type float32 numpy.ndarray
53 25 2920

In this case, the output DataArray will have the same chunk size on longitude, but still only one chunk along latitude.

[14]:
ds_spatial_out = regridder(ds_spatial)  # Regridding ds_spatial
ds_spatial_out["air"].data
[14]:
Array Chunk
Bytes 57.18 MiB 16.43 MiB
Shape (2920, 59, 87) (2920, 59, 25)
Dask graph 4 chunks in 10 graph layers
Data type float32 numpy.ndarray
87 59 2920

Unless the output_chunks argument is passed to the regridder

[15]:
ds_spatial_out = regridder(ds_spatial, output_chunks={"lat": 10, "lon": 10})
ds_spatial_out["air"].data
[15]:
Array Chunk
Bytes 57.18 MiB 1.11 MiB
Shape (2920, 59, 87) (2920, 10, 10)
Dask graph 54 chunks in 10 graph layers
Data type float32 numpy.ndarray
87 59 2920

Parallel weight generation with Dask

Dask can also be used to build the regridder and compute its weights in parallel. To do so, xESMF uses the chunks on the destination grid and computes subsets of weights on each chunk in parallel.

This feature is currently in an experimental state and it will force dask to use processes to parallelize the computation. Moreover, it is slower than then normal method abd thus is it only useful if the destination grid does not fit in memory. Recall that the parallization is done over chunks of the destination grid and each iteration will need to load the source grid in memory.

For a more performant way to generate weights in parallel, it might be better to use ESMF directly instead, assuming you have an MPI-enabled version. See the “Solving large problems using HPC” notebook.

Parallel weight generation example

Prepare input data

[16]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 500})
ds
[16]:
<xarray.Dataset>
Dimensions:  (lat: 25, time: 2920, lon: 53)
Coordinates:
  * lat      (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0
  * lon      (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air      (time, lat, lon) float32 dask.array<chunksize=(500, 25, 53), meta=np.ndarray>
Attributes:
    Conventions:  COARDS
    title:        4x daily NMC reanalysis (1948)
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...

Prepare output dataset and chunk it

[17]:
ds_out = xr.tutorial.open_dataset("rasm")
ds_out = ds_out.chunk({"y": 50, "x": 50})
ds_out.chunks
[17]:
Frozen({'time': (36,), 'y': (50, 50, 50, 50, 5), 'x': (50, 50, 50, 50, 50, 25)})

Create regridder, generating the weights in parallel

[18]:
para_regridder = xe.Regridder(ds, ds_out, "bilinear", parallel=True)
para_regridder
[WARNING] yaksa: 10 leaked handle pool objects
[WARNING] yaksa: 10 leaked handle pool objects
[WARNING] yaksa: 10 leaked handle pool objects
[WARNING] yaksa: 10 leaked handle pool objects
[18]:
xESMF Regridder
Regridding algorithm:       bilinear
Weight filename:            bilinear_25x53_205x275.nc
Reuse pre-computed weights? False
Input grid shape:           (25, 53)
Output grid shape:          (205, 275)
Periodic in longitude?      False

Attempting to build the Regridder using the option parallel=True with either reuse_weights=True or with weights != None will produce a warning. In both cases, since the weights are already generated, the regridder will be built skipping the parallel part.

Using a mask to chunk an empty Dataset

If the destination grid has no variables and contains 1D lat/lon coordinates, using xarray’s .chunk() method will not work

[19]:
ds_out = xr.Dataset(
    {
        "lat": (["lat"], np.arange(16, 75, 1.0), {"units": "degrees_north"}),
        "lon": (["lon"], np.arange(200, 330, 1.5), {"units": "degrees_east"}),
    }
)
ds_out
[19]:
<xarray.Dataset>
Dimensions:  (lat: 59, lon: 87)
Coordinates:
  * lat      (lat) float64 16.0 17.0 18.0 19.0 20.0 ... 70.0 71.0 72.0 73.0 74.0
  * lon      (lon) float64 200.0 201.5 203.0 204.5 ... 324.5 326.0 327.5 329.0
Data variables:
    *empty*
[20]:
ds_out.chunk({"lat": 25, "lon": 25})
ds_out.chunks
[20]:
Frozen({})

To deal with this issue, we can create a mask and add it to ds_out. Using a boolean mask ensures ds_out is not bloated by data and setting the mask to be True everywhere will not affect regridding.

[21]:
mask = da.ones((ds_out.lat.size, ds_out.lon.size), dtype=bool, chunks=(25, 25))
ds_out["mask"] = (ds_out.dims, mask)

# Now we check the chunks of ds_out
ds_out.chunks
[21]:
Frozen({'lat': (25, 25, 9), 'lon': (25, 25, 25, 12)})