This lesson has passed peer-review! See the publication in JOSE.

Large data

Overview

Teaching: 30 min
Exercises: 30 min
Questions
  • How do I work with multiple CMIP files that won’t fit in memory?

Objectives
  • Inspect netCDF chunking.

  • Import the dask library and start a client with parallel workers.

  • Calculate and plot the maximum daily precipitation for a high resolution model.

So far we’ve been working with small, individual data files that can be comfortably read into memory on a modern laptop. What if we wanted to process a larger dataset that consists of many files and/or much larger file sizes? For instance, let’s say the next step in our global precipitation analysis is to plot the daily maximum precipitation over the 1850-2014 period for the high resolution CNRM-CM6-1-HR model.

Data download

We’ll look at CMIP6 data. Since the files are large (48GB), it is optional for learners to download the dataset. However, should the learners wish to proceed and download the data, this can be done in different ways.

On NeSI, we’ve downloaded the data for you and stored them under /nesi/nobackup/icshmo_python_aos/data.

If you wish to download the data yourself, you can use the Synda software. For NIWA users, the following commands will search the Earth System Grid Federation (ESGF) catalogue, find out the size of the dataset and asynchronously download the data on NeSI’s platforms:

$ source /nesi/project/niwa02916/synda_env.sh
$ synda search source_id=CNRM-CM6-1-HR variable=pr table_id=day experiment_id=historical
$ synda stat CMIP6.CMIP.CNRM-CERFACS.CNRM-CM6-1-HR.historical.r1i1p1f2.day.pr.gr.v20191021
$ synda install CMIP6.CMIP.CNRM-CERFACS.CNRM-CM6-1-HR.historical.r1i1p1f2.day.pr.gr.v20191021

You can then find the files under $ST_HOME/data/CMIP6/CMIP/CNRM-CERFACS/CNRM-CM6-1-HR/historical/r1i1p1f2/day/pr/gr/v20191021. Alternatively, you can copy the data from /nesi/nobackup/icshmo_python_aos/data/*.nc on NeSI’s HPC:

Once the data are copied into the data/ directory, we can inspect the dataset and see that the daily maximum precipitation data for the 1850-2014 period has been broken up into 25-year chunks and spread across seven netCDF files, most of which are 6.7 GB in size.

$ ls -lh data/pr*.nc
-rw-r-----  1 irving  staff   6.7G 26 Jan 12:09 pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_18500101-18741231.nc
-rw-r-----  1 irving  staff   6.7G 26 Jan 13:14 pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_18750101-18991231.nc
-rw-r-----  1 irving  staff   6.7G 26 Jan 14:17 pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19000101-19241231.nc
-rw-r-----  1 irving  staff   6.7G 26 Jan 15:20 pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19250101-19491231.nc
-rw-r-----  1 irving  staff   6.7G 26 Jan 16:24 pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19500101-19741231.nc
-rw-r-----  1 irving  staff   6.7G 26 Jan 17:27 pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19750101-19991231.nc
-rw-r-----  1 irving  staff   4.0G 26 Jan 18:05 pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_20000101-20141231.nc

In order to work with these files, we can use xarray to open a “multifile” dataset as though it were a single file. Our first step is to open a new Jupyter notebook and import a library with a rather unfortunate name.

import glob

The glob library contains a single function, also called glob, that finds files whose names match a pattern. We provide those patterns as strings: the character * matches zero or more characters, while ? matches any one character, just like at the Unix shell.

pr_files = glob.glob('data/pr_day_CNRM-CM6-1-HR_*.nc')
pr_files.sort()
print(pr_files)
['/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_18500101-18741231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_18750101-18991231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19000101-19241231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19250101-19491231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19500101-19741231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_19750101-19991231.nc',
 '/Users/irving/Desktop/data-carpentry/data/pr_day_CNRM-CM6-1-HR_historical_r1i1p1f2_gr_20000101-20141231.nc']

Recall that when we first open data in xarray it simply (“lazily”) loads the metadata associated with the data and shows summary information about the contents of the dataset (i.e. it doesn’t read the actual data into memory). This may take a little time for a large multifile dataset.

import xarray as xr

dset = xr.open_mfdataset(pr_files, chunks={'time': '500MB'})

print(dset)
<xarray.Dataset>
Dimensions:      (axis_nbounds: 2, lat: 360, lon: 720, time: 60265)
Coordinates:
  * lat          (lat) float64 -89.62 -89.12 -88.62 -88.13 ... 88.62 89.12 89.62
  * lon          (lon) float64 0.0 0.5 1.0 1.5 2.0 ... 358.0 358.5 359.0 359.5
  * time         (time) datetime64[ns] 1850-01-01T12:00:00 ... 2014-12-31T12:...
Dimensions without coordinates: axis_nbounds
Data variables:
    time_bounds  (time, axis_nbounds) datetime64[ns] dask.array<chunksize=(9131, 2), meta=np.ndarray>
    pr           (time, lat, lon) float32 dask.array<chunksize=(397, 360, 720), meta=np.ndarray>
Attributes:
    Conventions:            CF-1.7 CMIP-6.2
    creation_date:          2019-05-23T12:33:53Z
    description:            CMIP6 historical
    title:                  CNRM-CM6-1-HR model output prepared for CMIP6 and...
    activity_id:            CMIP
    contact:                contact.cmip@meteo.fr
    data_specs_version:     01.00.21
    dr2xml_version:         1.16
    experiment_id:          historical
    experiment:             all-forcing simulation of the recent past
    external_variables:     areacella
    forcing_index:          2
    frequency:              day
    further_info_url:       https://furtherinfo.es-doc.org/CMIP6.CNRM-CERFACS...
    grid:                   data regridded to a 359 gaussian grid (360x720 la...
    grid_label:             gr
    nominal_resolution:     50 km
    initialization_index:   1
    institution_id:         CNRM-CERFACS
    institution:            CNRM (Centre National de Recherches Meteorologiqu...
    license:                CMIP6 model data produced by CNRM-CERFACS is lice...
    mip_era:                CMIP6
    parent_experiment_id:   p i C o n t r o l
    parent_mip_era:         CMIP6
    parent_activity_id:     C M I P
    parent_source_id:       CNRM-CM6-1-HR
    parent_time_units:      days since 1850-01-01 00:00:00
    parent_variant_label:   r1i1p1f2
    branch_method:          standard
    branch_time_in_parent:  0.0
    branch_time_in_child:   0.0
    physics_index:          1
    product:                model-output
    realization_index:      1
    realm:                  atmos
    references:             http://www.umr-cnrm.fr/cmip6/references
    source:                 CNRM-CM6-1-HR (2017):  aerosol: prescribed monthl...
    source_id:              CNRM-CM6-1-HR
    source_type:            AOGCM
    sub_experiment_id:      none
    sub_experiment:         none
    table_id:               day
    variable_id:            pr
    variant_label:          r1i1p1f2
    EXPID:                  CNRM-CM6-1-HR_historical_r1i1p1f2
    CMIP6_CV_version:       cv=6.2.3.0-7-g2019642
    dr2xml_md5sum:          45d4369d889ddfb8149d771d8625e9ec
    xios_commit:            1442-shuffle
    nemo_gelato_commit:     84a9e3f161dade7_8250e198106a168
    arpege_minor_version:   6.3.3
    history:                none
    tracking_id:            hdl:21.14100/223fa794-73fe-4bb5-9209-8ff910f7dc40

We can see that our dset object is an xarray.Dataset, but notice now that each variable has type dask.array with a chunksize attribute. Dask will access the data chunk-by-chunk (rather than all at once), which is fortunate because at 48GB the full size of our dataset is much larger than the available RAM on most laptops. Dask can also distribute chunks across multiple cores if we ask it to (i.e. parallel processing).

So how big should our chunks be? As a general rule they need to be small enough to fit comfortably in memory (because multiple chunks can be in memory at once), but large enough to avoid the time cost associated with asking Dask to manage/schedule lots of little chunks. The Dask documentation suggests that chunk sizes between 10MB-1GB are common, so we’ve set the chunk size to 500MB in this example. Since our netCDF files are chunked by time, we’ve specified that the 500MB Dask chunks should also be along that axis. Performance would suffer dramatically if our Dask chunks weren’t aligned with our netCDF chunks.

We can have the Jupyter notebook display the size and shape of our chunks, just to make sure they are indeed 500MB.

dset['pr'].data
          Array                 Chunk
Bytes     62.48 GB              499.74 MB
Shape     (60265, 360, 720)     (482, 360, 720)
Count     307 Tasks             150 Chunks
Type      float32               numpy.ndarray

Now that we understand the chunking information contained in the metadata, let’s go ahead and calculate the daily maximum precipitation.

pr_max = dset['pr'].max('time', keep_attrs=True)
print(pr_max)
<xarray.DataArray 'pr' (lat: 360, lon: 720)>
dask.array<nanmax-aggregate, shape=(360, 720), dtype=float32, chunksize=(360, 720), chunktype=numpy.ndarray>
Coordinates:
  * lat      (lat) float64 -89.62 -89.12 -88.62 -88.13 ... 88.62 89.12 89.62
  * lon      (lon) float64 0.0 0.5 1.0 1.5 2.0 ... 357.5 358.0 358.5 359.0 359.5
Attributes:
    long_name:           Precipitation
    units:               kg m-2 s-1
    online_operation:    average
    cell_methods:        area: time: mean
    interval_operation:  900 s
    interval_write:      1 d
    standard_name:       precipitation_flux
    description:         at surface; includes both liquid and solid phases fr...
    history:             none
    cell_measures:       area: areacella

It seems like the calculation happened instataneously, but it’s actually just another “lazy” feature of xarray. It’s showing us what the output of the calculation would look like (i.e. a 360 by 720 array), but xarray won’t actually do the computation until the data is needed (e.g. to create a plot or write to a netCDF file).

To force xarray to do the computation we can use .load() with the %%time Jupyter notebook command to record how long it takes:

%%time
pr_max.load()
CPU times: user 4min 1s, sys: 54.2 s, total: 4min 55s
Wall time: 3min 44s

By processing the data chunk-by-chunk, we’ve avoided the memory error we would have generated had we tried to handle the whole dataset at once. A completion time of 3 minutes and 44 seconds isn’t too bad, but that was only using one core. We can try and speed things up by using a dask “client” to run the calculation in parallel across multiple cores:

from dask.distributed import Client

client = Client()

client
Client                                    Cluster
Scheduler: tcp://127.0.0.1:59152          Workers: 4
Dashboard: http://127.0.0.1:8787/status   Cores: 4
                                          Memory: 17.18 GB

(Click on the dashboard link to watch what’s happening on each core.)

Now rerun your computation with the distributed clients running:

%%time
pr_max = dset['pr'].max('time', keep_attrs=True)
pr_max.load()
CPU times: user 10.2 s, sys: 1.12 s, total: 11.3 s
Wall time: 2min 33s

By distributing the calculation across all four cores the processing time has dropped to 2 minutes and 33 seconds. It’s faster than, but not a quarter of, the original 3 minutes and 44 seconds because there’s a time cost associated with setting up and coordinating jobs across all the cores.

Now that we’ve computed the daily maximum precipitation, we can go ahead and plot it:

import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import numpy as np
import cmocean

pr_max.data = pr_max.data * 86400
pr_max.attrs['units'] = 'mm/day'

fig = plt.figure(figsize=[12,5])
ax = fig.add_subplot(111, projection=ccrs.PlateCarree(central_longitude=180))
pr_max.plot.contourf(ax=ax,
                     levels=np.arange(0, 450, 50),
                     extend='max',
                     transform=ccrs.PlateCarree(),
                     cbar_kwargs={'label': pr_max.units},
                     cmap=cmocean.cm.haline_r)
ax.coastlines()

model = dset.attrs['source_id']
title = f'Daily maximum precipitation, 1850-2014 ({model})'
plt.title(title)

plt.show()

Daily maximum precipitation

Writing your own Dask-aware functions

So far leveraging Dask for our large data computations has been relatively simple, because almost all of xarray’s built-in operations (like max) work on Dask arrays.

If we wanted to chunk and parallelise code involving operations that aren’t built into xarray (e.g. an interpolation routine from the SciPy library) we’d first need to use the apply_ufunc or map_blocks function to make those operations “Dask-aware”. The xarray tutorial from SciPy 2020 explains how to do this.

Parallel computing on NeSI

Sometimes, the computing power on your laptop or desktop is not enough. At this point you may want to run your analysis on a supercomputer. The following script will compute the global maximum of precipitation across all times, latitudes and longitudes.

import glob
import xarray as xr
import dask
import time

if __name__ == '__main__':
    tic = time.time()

    with dask.config.set(scheduler='processes'):
        pr_files = glob.glob('/nesi/nobackup/icshmo_python_aos/data/*.nc')
        ds = xr.open_mfdataset(pr_files, chunks={'time': 100})
        prmax_all = ds['pr'].max(('time', 'lat', 'lon'), keep_attrs=True)

        prmax_all.load()

    print(prmax_all.data)
    toc = time.time()
    print(f'max: {prmax_all.values} took {toc - tic:.2f} sec')

You can submit the script with

$ srun --cpus-per-task=10 --mem=3gb --ntasks=1 --hint=nomultithread python historical_max_precip.py

Here, we use 10 processes to share the workload. if you vary the number of processes you will likely observe that the improvement in execution time plateaus, and often even decreases beyond a number of processes. This is shown in the plot below where we can see that the maximum performance is achieved using 8 processes.

Also note that the parallel efficiency, the ratio between the observed (blue solid line) and the ideal speedup (green dashed line) is often less than perfect (less than 100 percent). This can be due to some tasks taking less time than others, which causes some processes to wait for others to finish. Another reason for imperfect parallel scalability is that more processes demand more data to be instantiated and copied between processes, which can also slow down execution.

Ultimately, the speedup will also depend on the size of your data, more scalability can be expected with larger datasets, and on chunking as well as other factors. Try to change the chunking and rerun the problem to see how the parallel scalability changes.

Parallel scalability

Alternatives to Dask

While dask can be a good option for working with large data, it’s not always the best choice. For instance, in this lesson we’ve talked about the time cost associated with setting up and coordinating jobs across multiple cores, and the fact that it can take a bit of time and effort to make a function Dask-aware.

What other options/tactics do you have when working with a large, multi-file dataset? What are the pros and cons of these options?

Solution

Other options include writing a loop to process each netCDF file one at a time or a series of sub-regions one at a time.

By breaking the data processing down like this you might avoid the need to make your function/s Dask-aware. If the loop only involves tens (as opposed to hundreds) files/regions, the loop might also not be too slow. However, using Dask is neater and more scalable, because the code you write looks essentially the same regardless of whether you’re running the code on a powerful supercomputer or a personal laptop.

Find out what you’re working with

Setup a Dask client in your Jupyter notebook. How many cores do you have available and how much memory?

Solution

Run the following two commands in your notebook to setup the client and then type client to view the display of information:

from dask.distributed import Client
client = Client()
client

Applying this to your own work

In your own research, are there any data processing tasks that could benefit from chunking and/or parallelisation? If so, how would you implement it? What size would your chunks be and along what axis? Are all the operations involved already Dask-aware?

Key Points

  • Libraries such as dask and xarray can make loading, processing and visualising netCDF data much easier.

  • Dask can speed up processing through parallelism but care may be needed particularly with data chunking.