This is becoming a pretty common question among xarray users as far as I can tell (myself included), and is closely related to this Github issue. Generally, a NumPy implementation of some function exists (in your case, np.polyfit()
), but it's not clear how best to apply this calculation to every grid cell, possibly over multiple dimensions.
In a geoscience context, there are two main use cases, one with a simple solution and the other is more complicated:
(1) Easy case:
You have an xr.DataArray of temp
, which is a function of (time, lat, lon)
and you want to find the trend in time, in each gridbox. The easiest way to do this is by grouping the (lat, lon)
coords into one new coord, grouping by that coord and then using the .apply()
method.
Inspired by this Gist from Ryan Abernathy: <3
# Example data
da = xr.DataArray(np.random.randn(20, 180, 360),
dims=('time', 'lat', 'lon'),
coords={'time': np.linspace(0,19, 20),
'lat': np.linspace(-90,90,180),
'lon': np.linspace(0,359, 360)})
# define a function to compute a linear trend of a timeseries
def linear_trend(x):
pf = np.polyfit(x.time, x, 1)
# need to return an xr.DataArray for groupby
return xr.DataArray(pf[0])
# stack lat and lon into a single dimension called allpoints
stacked = da.stack(allpoints=['lat','lon'])
# apply the function over allpoints to calculate the trend at each point
trend = stacked.groupby('allpoints').apply(linear_trend)
# unstack back to lat lon coordinates
trend_unstacked = trend.unstack('allpoints')
Downsides: This method becomes very slow for larger arrays and doesn't easily lend itself to other problems which may feel quite similar in their essence. This leads us to...
(2) Harder case (and the OP's question):
You have an xr.Dataset with variables temp
and height
, each of function of (plev, time, lat, lon)
and you want to find the regression of temp
against height
(the lapse rate) for each (time, lat, lon)
point.
The easiest way around this is to use xr.apply_ufunc(), which gives you some degree of vectorization and dask-compatibility. (Speed!)
# Example DataArrays
da1 = xr.DataArray(np.random.randn(20, 20, 180, 360),
dims=('plev', 'time', 'lat', 'lon'),
coords={'plev': np.linspace(0,19, 20),
'time': np.linspace(0,19, 20),
'lat': np.linspace(-90,90,180),
'lon': np.linspace(0,359, 360)})
# Create dataset
ds = xr.Dataset({'Temp': da1, 'Height': da1})
As before, we create a function to compute the linear trend we need:
def linear_trend(x, y):
pf = np.polyfit(x, y, 1)
return xr.DataArray(pf[0])
Now, we can use xr.apply_ufunc()
to regress the two DataArrays of temp
and height
against each other, along the plev
dimension !
%%time
slopes = xr.apply_ufunc(linear_trend,
ds.Height, ds.Temp,
vectorize=True,
input_core_dims=[['plev'], ['plev']],# reduce along 'plev'
)
However, this approach is also quite slow and, as before, won't scale up well for larger arrays.
CPU times: user 2min 44s, sys: 2.1 s, total: 2min 46s
Wall time: 2min 48s
Speed it up:
To speed up this calculation, we can convert our height
and temp
data to dask.arrays
using xr.DataArray.chunk()
. This splits up our data into small, manageable chunks which we can then use to parallelize our calculation with dask=parallelized
in our apply_ufunc()
.
N.B. You must be careful not to chunk along the dimension that you're applying the regression to!
dask_height = ds.Height.chunk({'lat':10, 'lon':10, 'time': 10})
dask_temp = ds.Temp.chunk({'lat':10, 'lon':10, 'time': 10})
dask_height
<xarray.DataArray 'Height' (plev: 20, time: 20, lat: 180, lon: 360)>
dask.array<xarray-<this-array>, shape=(20, 20, 180, 360), dtype=float64, chunksize=(20, 10, 10, 10), chunktype=numpy.ndarray>
Coordinates:
* plev (plev) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
* time (time) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
* lat (lat) float64 -90.0 -88.99 -87.99 -86.98 ... 86.98 87.99 88.99 90.0
* lon (lon) int64 0 1 2 3 4 5 6 7 8 ... 352 353 354 355 356 357 358 359
Now, do the calculation again!
%%time
slopes_dask = xr.apply_ufunc(linear_trend,
dask_height, dask_temp,
vectorize=True,
dask='parallelized',
input_core_dims=[['plev'], ['plev']], # reduce along 'plev'
output_dtypes=['d'],
)
CPU times: user 6.55 ms, sys: 2.39 ms, total: 8.94 ms
Wall time: 9.24 ms
SIGNIFICANT SPEEDUP !
Hope this helps! I learnt a lot trying to answer it :)
Best
EDIT: As pointed out in the comments, to really compare the processing time between the dask and non-dask methods, you should use:
%%time
slopes_dask.compute()
which gives you a comparable wall time to the non-dask method.
However it's important to point out that operating lazily on the data (i.e. not loading it in until you absolutely need it) is much preferred for working with the kind of large datasets that you encounter in climate analysis. So I'd still suggest using the dask method, because then you can operate many different processes on the input array and each will take only a few ms
, then only at the end do you have to wait for a few minutes to get your finished product out. :)