diff --git a/setup.py b/setup.py index 29a6299..435a68d 100644 --- a/setup.py +++ b/setup.py @@ -80,6 +80,8 @@ entry_points={ "console_scripts": [ "daops=daops.cli:main", + "data-pools-checks=tests.data_pools_checks.run_data_pools_checks:cli", + "merge-test-logs=tests.data_pools_checks.merge_csv:cli" ], }, long_description=_long_description, diff --git a/tests/data_pools_checks/.gitignore b/tests/data_pools_checks/.gitignore new file mode 100644 index 0000000..e6d2a88 --- /dev/null +++ b/tests/data_pools_checks/.gitignore @@ -0,0 +1,3 @@ +*~ +typescript* +*.nc diff --git a/tests/data_pools_checks/README.md b/tests/data_pools_checks/README.md new file mode 100644 index 0000000..7eb8c00 --- /dev/null +++ b/tests/data_pools_checks/README.md @@ -0,0 +1,26 @@ +This directory provides: + +- A command called `data-pools-checks` (implemented in + `run_data_pools_checks.py`) that runs the subsetter on a number of test + cases in order to try out a variety of different types of datasets + (including for example some on curvilinear grids). This will randomly select + the bounds for the subsets, although there is a command line option to set a + random seed (for example `--seed 0`) to give repeatable results, and + optionally this can be combined with a `--cache` option to cache the + subsetter output (under `/tmp`) rather than rerunning the subsetter every + time the script is run. Results from the checks (containing the name of the + collection, the subsetting ranges used, and the test success value) are + written initially into an sqlite database and then (periodically and also on + exit) these are moved into a compressed CSV file. + +- A command `merge-test-logs` (implemented in `merge_csv.py`) will merge the + output logs from the above tester (as obtained from different sites) into a + single compressed CSV file. The `data-pools-checks` command takes an + argument which is the site (e.g. `DKRZ`) and this is written both into the + contents of the output `.csv.gz` file (a column called "test location") and + also its filename, so the merge command will take a number of these files, + and merge them into the specified output file, removing any duplicates. + +- Also a file is included with some unit tests (`test_results_db.py`) to + accompany the `ResultsDB` class (in `results_db.py`) that is used to + implement how test results are stored. diff --git a/tests/data_pools_checks/__init__.py b/tests/data_pools_checks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/data_pools_checks/caching.py b/tests/data_pools_checks/caching.py new file mode 100644 index 0000000..a826a4c --- /dev/null +++ b/tests/data_pools_checks/caching.py @@ -0,0 +1,28 @@ +import os +import hashlib + + +def cached_output_fn(collection, params, odir=None): + if odir == None: + odir = '/tmp/roocs' ## FIXME + if not os.path.isdir(odir): + os.mkdir(odir) + vals = [collection] + if 'area' in params: + vals.append('area') + vals.extend(params['area']) + if 'time' in params: + vals.append('time') + vals.extend(list(params['time'].value)) + if 'level' in params: + vals.append('level') + vals.extend(list(float(lev) for lev in params['level'].value)) + vals = tuple(vals) + h = hashlib.sha1(str(vals).encode()).hexdigest() + #print(f'CACHE FOR: {vals} => {h}') + return f'{odir}/cache-{h}.nc' + + +class CachedResult: + def __init__(self, path): + self.file_uris = [path] diff --git a/tests/data_pools_checks/examples_data.py b/tests/data_pools_checks/examples_data.py new file mode 100644 index 0000000..ce0ac4b --- /dev/null +++ b/tests/data_pools_checks/examples_data.py @@ -0,0 +1,175 @@ +# an example with 3 files, including this for the first file: +# +# time = UNLIMITED ; // (588 currently) +# bnds = 2 ; +# plev = 19 ; +# lat = 144 ; +# lon = 192 ; +# +# float ta(time, plev, lat, lon) +# +example_4d = 'CMIP6.CDRMIP.MOHC.UKESM1-0-LL.esm-ssp585ext.r1i1p1f2.Amon.ta.gn.v20211018' + +# an example with 5 files, including this for the first file: +# +# time = UNLIMITED ; // (120 currently) +# bnds = 2 ; +# lat = 144 ; +# lon = 192 ; +# +# float ci(time, lat, lon) ; +# +example_3d = 'CMIP6.C4MIP.MOHC.UKESM1-0-LL.ssp534-over-bgc.r4i1p1f2.Amon.ci.gn.v20220708' + +examples_rot = ['CMIP6.CMIP.CMCC.CMCC-ESM2.historical.r1i1p1f1.Oyr.o2.gn.v20210114', + 'CMIP6.AerChemMIP.HAMMOZ-Consortium.MPI-ESM-1-2-HAM.hist-piAer.r1i1p1f1.Ofx.volcello.gn.v20190627'] + + +# One dataset from every model that exists at CEDA and has 2d lon / lat arrays +more_examples_rot = ''' +CMIP6.AerChemMIP.HAMMOZ-Consortium.MPI-ESM-1-2-HAM.hist-piAer.r1i1p1f1.Ofx.volcello.gn.v20190627 +CMIP6.AerChemMIP.NCC.NorESM2-LM.hist-piAer.r1i1p1f1.Omon.volcello.gn.v20191108 +CMIP6.CMIP.CCCma.CanESM5-CanOE.1pctCO2.r1i1p2f1.Ofx.sftof.gn.v20190429 +CMIP6.CMIP.EC-Earth-Consortium.EC-Earth3-LR.piControl.r1i1p1f1.3hr.tos.gn.v20200919 +CMIP6.DAMIP.CAS.FGOALS-g3.hist-aer.r1i1p1f1.Omon.tos.gn.v20200427 +CMIP6.HighResMIP.CAS.FGOALS-f3-H.control-1950.r1i1p1f1.Oday.tos.gn.v20210120 +CMIP6.OMIP.CSIRO-COSIMA.ACCESS-OM2-025.omip2.r1i1p1f1.Oday.tos.gn.v20210617 +CMIP6.OMIP.CSIRO-COSIMA.ACCESS-OM2.omip2-spunup.r1i1p1f1.Oday.tos.gn.v20210616 +CMIP6.OMIP.NTU.TaiESM1-TIMCOM.omip1.r1i1p1f1.Ofx.deptho.gn.v20201028 +CMIP6.CMIP.EC-Earth-Consortium.EC-Earth3P-VHR.historical.r1i1p2f1.Amon.clt.gr.v20201007 +CMIP6.FAFMIP.NOAA-GFDL.GFDL-ESM2M.faf-all.r1i1p1f1.Omon.so.gn.v20180701 +CMIP6.OMIP.NOAA-GFDL.GFDL-OM4p5B.omip1.r1i1p1f1.Omon.so.gn.v20180701 +'''.strip().split() + +# One dataset from every model that exists at CEDA and has 1d but non-coordinate lon / lat arrays +examples_unstructured = ''' +CMIP6.CMIP.MPI-M.ICON-ESM-LR.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20210215 +CMIP6.CMIP.UA.MCM-UA-1-0.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20190731 +CMIP6.HighResMIP.AWI.AWI-CM-1-1-HR.hist-1950.r1i1p1f2.3hr.tos.gn.v20170825 +CMIP6.HighResMIP.AWI.AWI-CM-1-1-LR.hist-1950.r1i1p1f2.3hr.tos.gn.v20170825 +CMIP6.HighResMIP.NCAR.CESM1-CAM5-SE-HR.control-1950.r1i1p1f1.Amon.ts.gn.v20200724 +CMIP6.HighResMIP.NCAR.CESM1-CAM5-SE-LR.control-1950.r1i1p1f1.Amon.ts.gn.v20200708 +'''.strip().split() + + +# One dataset from every model that exists at CEDA +all_examples = ''' +CMIP.CSIRO-ARCCSS.ACCESS-CM2.1pctCO2.r1i1p1f1.Amon.clt.gn.v20191109 +C4MIP.CSIRO.ACCESS-ESM1-5.esm-ssp585.r2i1p1f1.Amon.tas.gn.v20191203 +OMIP.CSIRO-COSIMA.ACCESS-OM2.omip2-spunup.r1i1p1f1.Oday.tos.gn.v20210616 +OMIP.CSIRO-COSIMA.ACCESS-OM2-025.omip2.r1i1p1f1.Oday.tos.gn.v20210617 +HighResMIP.AWI.AWI-CM-1-1-HR.hist-1950.r1i1p1f2.3hr.tos.gn.v20170825 +HighResMIP.AWI.AWI-CM-1-1-LR.hist-1950.r1i1p1f2.3hr.tos.gn.v20170825 +CMIP.AWI.AWI-CM-1-1-MR.1pctCO2.r1i1p1f1.3hr.tas.gn.v20181218 +CMIP.AWI.AWI-ESM-1-1-LR.1pctCO2.r1i1p1f1.3hr.tas.gn.v20200212 +HighResMIP.BCC.BCC-CSM2-HR.control-1950.r1i1p1f1.Amon.ts.gn.v20200922 +C4MIP.BCC.BCC-CSM2-MR.1pctCO2-bgc.r1i1p1f1.Amon.cli.gn.v20190321 +AerChemMIP.BCC.BCC-ESM1.hist-piNTCF.r1i1p1f1.Amon.ch4.gn.v20190621 +CMIP.CAMS.CAMS-CSM1-0.1pctCO2.r1i1p1f1.Amon.cli.gn.v20190708 +CMIP.CAS.CAS-ESM2-0.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20201228 +HighResMIP.NCAR.CESM1-CAM5-SE-HR.control-1950.r1i1p1f1.Amon.ts.gn.v20200724 +HighResMIP.NCAR.CESM1-CAM5-SE-LR.control-1950.r1i1p1f1.Amon.ts.gn.v20200708 +C4MIP.NCAR.CESM2.1pctCO2-bgc.r1i1p1f1.Amon.cli.gn.v20190724 +CMIP.NCAR.CESM2-FV2.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20200310 +AerChemMIP.NCAR.CESM2-WACCM.hist-1950HC.r1i1p1f1.Amon.ch4.gn.v20190606 +CMIP.NCAR.CESM2-WACCM-FV2.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20200226 +CMIP.THU.CIESM.1pctCO2.r1i1p1f1.Amon.rlut.gr.v20200417 +CMIP.CMCC.CMCC-CM2-HR4.1pctCO2.r1i1p1f1.6hrPlev.tas.gn.v20210304 +CMIP.CMCC.CMCC-CM2-SR5.1pctCO2.r1i1p1f1.3hr.tas.gn.v20200616 +HighResMIP.CMCC.CMCC-CM2-VHR4.control-1950.r1i1p1f1.6hrPlev.psl.gn.v20200917 +CMIP.CMCC.CMCC-ESM2.1pctCO2.r1i1p1f1.3hr.tas.gn.v20210114 +CFMIP.CNRM-CERFACS.CNRM-CM6-1.abrupt-0p5xCO2.r1i1p1f2.Amon.evspsbl.gr.v20190711 +CMIP.CNRM-CERFACS.CNRM-CM6-1-HR.1pctCO2.r1i1p1f2.Emon.orog.gr.v20191021 +AerChemMIP.CNRM-CERFACS.CNRM-ESM2-1.hist-1950HC.r1i1p1f2.Lmon.baresoilFrac.gr.v20190621 +C4MIP.CCCma.CanESM5.1pctCO2-bgc.r1i1p1f1.AERmon.ps.gn.v20190429 +CMIP.CCCma.CanESM5-CanOE.1pctCO2.r1i1p2f1.Ofx.sftof.gn.v20190429 +CFMIP.LLNL.E3SM-1-0.amip-p4K.r2i1p1f1.Amon.clivi.gr.v20210302 +CMIP.E3SM-Project.E3SM-1-1.historical.r1i1p1f1.AERmon.abs550aer.gr.v20191211 +CMIP.E3SM-Project.E3SM-1-1-ECA.historical.r1i1p1f1.AERmon.abs550aer.gr.v20200623 +CMIP.E3SM-Project.E3SM-2-0.abrupt-4xCO2.r1i1p1f1.Amon.hfls.gr.v20220826 +CMIP.EC-Earth-Consortium.EC-Earth3.1pctCO2.r3i1p1f1.3hr.tas.gr.v20210522 +CMIP.EC-Earth-Consortium.EC-Earth3-AerChem.1pctCO2.r1i1p1f1.3hr.tas.gr.v20200729 +CMIP.EC-Earth-Consortium.EC-Earth3-CC.1pctCO2.r1i1p1f1.Amon.rlut.gr.v20210525 +DCPP.EC-Earth-Consortium.EC-Earth3-HR.dcppA-hindcast.s1990-r10i2p1f1.Amon.rsds.gr.v20201205 +CMIP.EC-Earth-Consortium.EC-Earth3-LR.piControl.r1i1p1f1.3hr.tos.gn.v20200919 +CMIP.EC-Earth-Consortium.EC-Earth3-Veg.1pctCO2.r1i1p1f1.3hr.tas.gr.v20200325 +CMIP.EC-Earth-Consortium.EC-Earth3-Veg-LR.abrupt-4xCO2.r1i1p1f1.Amon.hfls.gr.v20220428 +HighResMIP.EC-Earth-Consortium.EC-Earth3P.control-1950.r1i1p2f1.3hr.clt.gr.v20190906 +HighResMIP.EC-Earth-Consortium.EC-Earth3P-HR.control-1950.r1i1p2f1.3hr.clt.gr.v20181119 +CMIP.EC-Earth-Consortium.EC-Earth3P-VHR.historical.r1i1p2f1.Amon.clt.gr.v20201007 +HighResMIP.ECMWF.ECMWF-IFS-HR.control-1950.r1i1p1f1.6hrPlevPt.psl.gr.v20170915 +HighResMIP.ECMWF.ECMWF-IFS-LR.control-1950.r1i1p1f1.6hrPlevPt.psl.gr.v20180221 +HighResMIP.ECMWF.ECMWF-IFS-MR.control-1950.r1i1p1f1.6hrPlevPt.psl.gr.v20181121 +HighResMIP.CAS.FGOALS-f3-H.control-1950.r1i1p1f1.Oday.tos.gn.v20210120 +CMIP.CAS.FGOALS-f3-L.1pctCO2.r1i1p1f1.Amon.rlut.gr.v20200620 +CMIP.CAS.FGOALS-g3.1pctCO2.r1i1p1f1.3hr.tas.gn.v20191219 +CMIP.FIO-QLNM.FIO-ESM-2-0.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20200302 +CMIP.NOAA-GFDL.GFDL-AM4.amip.r1i1p1f1.AERmon.lwp.gr1.v20180807 +CFMIP.NOAA-GFDL.GFDL-CM4.amip-4xCO2.r1i1p1f1.Amon.evspsbl.gr1.v20180701 +HighResMIP.NOAA-GFDL.GFDL-CM4C192.control-1950.r1i1p1f1.Amon.ts.gr3.v20180701 +FAFMIP.NOAA-GFDL.GFDL-ESM2M.faf-all.r1i1p1f1.Omon.so.gn.v20180701 +AerChemMIP.NOAA-GFDL.GFDL-ESM4.piClim-aer.r1i1p1f1.AERmon.cdnc.gr1.v20180701 +OMIP.NOAA-GFDL.GFDL-OM4p5B.omip1.r1i1p1f1.Omon.so.gn.v20180701 +CFMIP.NASA-GISS.GISS-E2-1-G.abrupt-0p5xCO2.r1i1p1f1.Amon.cli.gn.v20190524 +CMIP.NASA-GISS.GISS-E2-1-G-CC.esm-hist.r1i1p1f1.Amon.clt.gn.v20190815 +CFMIP.NASA-GISS.GISS-E2-1-H.abrupt-2xCO2.r1i1p1f1.Amon.cli.gn.v20190403 +CFMIP.NASA-GISS.GISS-E2-2-G.abrupt-2xCO2.r1i1p1f1.Amon.evspsbl.gn.v20191120 +CMIP.NASA-GISS.GISS-E2-2-H.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20191120 +HighResMIP.MOHC.HadGEM3-GC31-HH.control-1950.r1i1p1f1.3hr.clt.gn.v20180927 +HighResMIP.MOHC.HadGEM3-GC31-HM.control-1950.r1i1p1f1.3hr.clt.gn.v20180713 +CFMIP.MOHC.HadGEM3-GC31-LL.a4SST.r1i1p1f3.AERmon.abs550aer.gn.v20200403 +HighResMIP.MOHC.HadGEM3-GC31-LM.highresSST-future.r1i14p1f1.3hr.pr.gn.v20190710 +HighResMIP.MOHC.HadGEM3-GC31-MH.spinup-1950.r1i1p1f1.3hr.clt.gn.v20171227 +CMIP.MOHC.HadGEM3-GC31-MM.1pctCO2.r1i1p1f3.3hr.clt.gn.v20201113 +HighResMIP.AS-RCEC.HiRAM-SIT-HR.highres-future.r1i1p1f1.Amon.ts.gn.v20210707 +HighResMIP.AS-RCEC.HiRAM-SIT-LR.highres-future.r1i1p1f1.Amon.ts.gn.v20210707 +CMIP.MPI-M.ICON-ESM-LR.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20210215 +CMIP.CCCR-IITM.IITM-ESM.1pctCO2.r1i1p1f1.3hr.tas.gn.v20191204 +CMIP.INM.INM-CM4-8.1pctCO2.r1i1p1f1.Amon.rlut.gr1.v20190530 +CMIP.INM.INM-CM5-0.1pctCO2.r1i1p1f1.Amon.rlut.gr1.v20200226 +HighResMIP.INM.INM-CM5-H.control-1950.r1i1p1f1.Amon.ts.gr1.v20190812 +CMIP.IPSL.IPSL-CM5A2-INCA.1pctCO2.r1i1p1f1.Amon.rlut.gr.v20201218 +HighResMIP.IPSL.IPSL-CM6A-ATM-HR.highresSST-present.r1i1p1f1.3hr.mrsos.gr.v20190122 +C4MIP.IPSL.IPSL-CM6A-LR.1pctCO2-bgc.r1i1p1f1.Amon.huss.gr.v20180914 +CMIP.IPSL.IPSL-CM6A-LR-INCA.abrupt-4xCO2.r1i1p1f1.3hr.pr.gr.v20210113 +CMIP.NIMS-KMA.KACE-1-0-G.1pctCO2.r1i1p1f1.3hr.tas.gr.v20190918 +CMIP.KIOST.KIOST-ESM.1pctCO2.r1i1p1f1.3hr.tas.gr1.v20210601 +CMIP.UA.MCM-UA-1-0.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20190731 +CMIP.MIROC.MIROC-ES2H.historical.r1i1p1f2.Amon.clt.gn.v20210125 +CMIP.MIROC.MIROC-ES2L.1pctCO2.r1i1p1f2.Amon.rlut.gn.v20190823 +AerChemMIP.MIROC.MIROC6.hist-piAer.r1i1p1f1.Amon.cli.gn.v20190807 +AerChemMIP.HAMMOZ-Consortium.MPI-ESM-1-2-HAM.hist-piAer.r1i1p1f1.Ofx.volcello.gn.v20190627 +CMIP.MPI-M.MPI-ESM1-2-HR.1pctCO2.r1i1p1f1.3hr.tas.gn.v20190710 +C4MIP.MPI-M.MPI-ESM1-2-LR.1pctCO2-bgc.r2i1p1f1.Amon.tas.gn.v20190710 +HighResMIP.MPI-M.MPI-ESM1-2-XR.control-1950.r1i1p1f1.6hrPlev.wap.gn.v20180606 +HighResMIP.MRI.MRI-AGCM3-2-H.highresSST-future.r1i1p1f1.Amon.ts.gn.v20200619 +HighResMIP.MRI.MRI-AGCM3-2-S.highresSST-future.r1i1p1f1.Amon.ts.gn.v20200619 +CFMIP.MRI.MRI-ESM2-0.abrupt-0p5xCO2.r1i1p1f1.3hr.huss.gn.v20210308 +CMIP.NUIST.NESM3.1pctCO2.r1i1p1f1.3hr.tas.gn.v20190707 +HighResMIP.MIROC.NICAM16-7S.highresSST-present.r1i1p1f1.3hr.tos.gr.v20190325 +HighResMIP.MIROC.NICAM16-8S.highresSST-present.r1i1p1f1.3hr.tos.gr.v20190830 +HighResMIP.MIROC.NICAM16-9S.highresSST-present.r1i1p1f1.3hr.tos.gr.v20190830 +CMIP.NCC.NorCPM1.1pctCO2.r1i1p1f1.Amon.rlut.gn.v20190914 +CMIP.NCC.NorESM1-F.piControl.r1i1p1f1.AERmon.ua.gn.v20190920 +AerChemMIP.NCC.NorESM2-LM.hist-piAer.r1i1p1f1.Omon.volcello.gn.v20191108 +CMIP.NCC.NorESM2-MM.1pctCO2.r1i1p1f1.6hrPlev.tas.gn.v20210319 +CMIP.SNU.SAM0-UNICON.1pctCO2.r1i1p1f1.3hr.tas.gn.v20190323 +CFMIP.AS-RCEC.TaiESM1.abrupt-0p5xCO2.r1i1p1f1.AERmon.ps.gn.v20210913 +OMIP.NTU.TaiESM1-TIMCOM.omip1.r1i1p1f1.Ofx.deptho.gn.v20201028 +AerChemMIP.MOHC.UKESM1-0-LL.hist-piAer.r1i1p1f2.AERday.cod.gn.v20190813 +CMIP.MOHC.UKESM1-1-LL.1pctCO2.r1i1p1f2.AERmon.abs550aer.gn.v20220513 +ISMIP6.NERC.UKESM1-ice-LL.1pctCO2to4x-withism.r1i1p1f2.Amon.clivi.gn.v20220316 +'''.strip().split() + +#data_pool_tests_db = [example_3d, example_4d] + +#data_pool_tests_db = examples_rot + [example_3d, example_4d] + +data_pool_tests_db = [example_3d, example_4d] + examples_rot + + + + + + + diff --git a/tests/data_pools_checks/merge_csv.py b/tests/data_pools_checks/merge_csv.py new file mode 100644 index 0000000..057f8eb --- /dev/null +++ b/tests/data_pools_checks/merge_csv.py @@ -0,0 +1,75 @@ +#/usr/bin/env python + +import os +import argparse +from itertools import chain + +from .results_db import ResultsDB + + +def remove_dups_from_sorted(lst): + first = True + for item in lst: + if first or item != prev: + first = False + prev = item + yield item + + +def merge_csv(infiles, outfile): + + all_files = infiles + if os.path.exists(outfile): + all_files.append(outfile) + + with ResultsDB(None, csvgz_file=all_files[0], read_only=True) as dbin: + columns = dbin.columns + + rows = [] + for fn in all_files: + with ResultsDB(columns, csvgz_file=fn, sqlite_file=None, read_only=True) as dbin: + rows.extend(list(dbin.read_csvgz())) + + primary_index = columns.index('test_time') + + key_getter = lambda arr: \ + (arr[primary_index],) + tuple([arr[i] for i in range(len(arr)) if i != primary_index]) + + rows.sort(key=key_getter) + + with ResultsDB(columns, csvgz_file=outfile, sqlite_file=None) as dbout: + dbout.write_csvgz(remove_dups_from_sorted(rows)) + + +def _cli_arg_parser(): + parser = argparse.ArgumentParser() + parser.add_argument('--output', '-o', + help='output filename (may be pre-existing)', + required=True, + type=_csvgz_file, + nargs=1) + parser.add_argument('input_files', + type=_existing_csvgz_file, + help='input filename(s)', + nargs='+') + return parser.parse_args() + + +def _csvgz_file(path): + if not path.endswith('.csv.gz'): + raise argparse.ArgumentTypeError(f'filename {path} does not end .csv.gz') + return path + + +def _existing_csvgz_file(path): + if not os.path.exists(path): + raise argparse.ArgumentTypeError(f'file {path} does not exist') + return _csvgz_file(path) + + +def cli(): + args = _cli_arg_parser() + out_file, = args.output + print(f'Merging contents of files {args.input_files} into {out_file}') + merge_csv(args.input_files, out_file) + print('done') diff --git a/tests/data_pools_checks/results_db.py b/tests/data_pools_checks/results_db.py new file mode 100644 index 0000000..0d8633c --- /dev/null +++ b/tests/data_pools_checks/results_db.py @@ -0,0 +1,192 @@ +import os +from itertools import chain +import csv +import gzip +import sqlite3 + + +class ResultsDB: + + def __init__(self, + columns, + csvgz_file='tests.csv.gz', sqlite_file='tests_tmp.db', + sql_table_name = 'test_results', + sql_primary_key='id', + merge_every=100, + read_only=False): + self._csvgz_file = csvgz_file + if sqlite_file is None: + self._cursor = False + else: + self._cursor = None + self._sqlite_file = sqlite_file + if columns is None: + # special case: CSV file must already exist, columns are read in from it + self.columns = self._read_csvgz_columns() + else: + self.columns = columns[:] + self._sql_table_name = sql_table_name + self._sql_primary_key = sql_primary_key + self._merge_every = merge_every + self._num_tmp_rows = 0 + self._read_only = read_only + if self._sqlite_file is None: + self._merge_every = 0 + + + def _read_csvgz_columns(self): + with gzip.open(self._csvgz_file, 'rt') as f: + reader = csv.reader(f) + return next(reader) + + def read_csvgz(self, src=None): + "read .csv.gz, yield a sequence of rows (each is a list)" + if src is None: + src = self._csvgz_file + if not os.path.exists(src): + return [] + with gzip.open(src, 'rt') as f: + reader = csv.reader(f) + headers = next(reader) + if headers != self.columns: + raise Exception('CSV file does not have the expected columns') + for row in reader: + yield row + + def _read_as_dicts(self, reader, remove_blank=True, **kwargs): + if remove_blank: + cond = lambda t: t[1] + else: + cond = lambda t: True + + return (dict(t for t in zip(self.columns, row) if cond(t)) + for row in reader(**kwargs)) + + def read_csvgz_as_dicts(self, **kwargs): + return self._read_as_dicts(self.read_csvgz, **kwargs) + + def read_sqlite_as_dicts(self, **kwargs): + return self._read_as_dicts(self.get_sqlite_rows, **kwargs) + + def write_csvgz(self, rows, dest=None): + "write .csv.gz, input is a sequence of rows" + self._check_not_read_only() + if dest is None: + dest = self._csvgz_file + tmpname = dest.replace('.gz', '.tmp.gz') + try: + with gzip.open(tmpname, 'wt') as fout: + writer = csv.writer(fout) + writer.writerow(self.columns) + for row in rows: + writer.writerow(row) + os.rename(tmpname, dest) + finally: + if os.path.exists(tmpname): + os.remove(tmpname) + + def add_row(self, **row_dict): + "add a single row - goes into the sqlite file" + self._check_not_read_only() + cur = self._get_cursor() + if cur is False: + raise Exception('Cannot add row without an sqlite db') + sql_keys = ','.join(row_dict.keys()) + sql_vals = ','.join((f"'{str(val)}'" for val in row_dict.values())) + sql = f'insert into {self._sql_table_name} ({sql_keys}) values ({sql_vals})' + cur.execute(sql) + cur.connection.commit() + self._num_tmp_rows += 1 + if self._num_tmp_rows == self._merge_every: + self.merge_and_tidy() + + def _init_sqlite(self): + if self._cursor is False: + return + sql_columns = ','.join([f'{self._sql_primary_key} integer PRIMARY KEY'] + + [f'{col} text' for col in self.columns]) + sql = f'CREATE TABLE IF NOT EXISTS {self._sql_table_name}({sql_columns})' + path = os.path.realpath(self._sqlite_file) + try: + if self._read_only: + conn = sqlite3.connect(f'file:{path}?mode=ro', uri=True) + else: + conn = sqlite3.connect(path) + cursor = conn.cursor() + cursor.execute(sql) + self._cursor = cursor + except sqlite3.OperationalError: + self._cursor = False + + + def _get_cursor(self): + if self._cursor is None: + self._init_sqlite() + return self._cursor + + def get_sqlite_rows(self, include_primary_key=False): + cursor = self._get_cursor() + if cursor is False: + return + cursor.execute(f'SELECT * from {self._sql_table_name}') + for row in cursor: + if include_primary_key: + yield row + else: + yield row[1:] + + def _destroy_sqlite(self): + self._check_not_read_only() + if os.path.exists(self._sqlite_file): + os.remove(self._sqlite_file) + self._cursor = None + self._num_tmp_rows = 0 + + def merge_and_tidy(self): + self._check_not_read_only() + if self._cursor is False: + return + csv_rows = self.read_csvgz() + new_rows = self.get_sqlite_rows() + self.write_csvgz(chain(csv_rows, new_rows)) + self._destroy_sqlite() + + def _check_not_read_only(self): + if self._read_only: + raise Exception('write operation requested for read-only db') + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + if not self._read_only: + self.merge_and_tidy() + + +if __name__ == "__main__": + import random + + print(""" +Running simple tester for results DB. +For more extensive tests, use unit tester. From top of repo: + + pytest -c null -s tests/data_pools_checks/test_results_db.py + +""") + + def dump_db(): + print("=====================") + os.system('zcat tests.csv.gz') + print() + os.system('echo .dump | sqlite3 -header -readonly tests_tmp.db') + print("=====================") + + columns = ['test_location', 'test_time', 'collection', 'area', + 'level', 'time', 'success', 'message'] + + with ResultsDB(columns, merge_every=7) as rdb: + for _ in range(13): + rdb.add_row(collection='foo', success='False') + rdb.add_row(collection=str(random.random()), time='blah', success='True') + + dump_db() diff --git a/tests/data_pools_checks/run_data_pools_checks.py b/tests/data_pools_checks/run_data_pools_checks.py new file mode 100644 index 0000000..dbf2df8 --- /dev/null +++ b/tests/data_pools_checks/run_data_pools_checks.py @@ -0,0 +1,811 @@ +import os +os.environ["USE_PYGEOS"] = "0" +import random +import xarray as xr +import tempfile +import datetime +import numpy as np +import shutil +import time +import argparse + +from daops.ops.subset import subset +from roocs_utils.xarray_utils.xarray_utils import open_xr_dataset +from roocs_utils.parameter.param_utils import time_interval, level_interval + +from .results_db import ResultsDB +from .examples_data import data_pool_tests_db +from .caching import cached_output_fn, CachedResult + + +class SubsetTester: + + def __init__(self, + test_location='', + use_cache=True): + + self._test_location = test_location + self._use_cache = use_cache + self._tmpdir = None + + + def main(self): + + results_db_columns = ['test_location', 'test_time', 'collection', 'area', + 'level', 'time', 'success', 'message'] + + file_stem = f'tests_{self._test_location}' + with ResultsDB(results_db_columns, + csvgz_file=f'{file_stem}.csv.gz', + sqlite_file=f'{file_stem}_tmp.db') as rdb: + + row_writer = lambda **kwargs: \ + rdb.add_row(test_location=self._test_location, + test_time=time.strftime("%Y-%m-%d %H:%M:%S"), + **kwargs) + + for collection in data_pool_tests_db: + self.test_subset_in_data_pools(collection, row_writer) + + + def test_subset_in_data_pools(self, collection, row_writer=None): + """ + Do a range of tests on a given collection + """ + + ds = self._open_dataset(collection) + + for test_kwargs, description in self._get_tests(): + + print(description) + subset_params, success, message = self._do_test(collection, ds, **test_kwargs) + + if row_writer is not None: + self._store_result(row_writer, + collection, subset_params, success, message) + + + def _store_result(self, row_writer, collection, subset_params, success, message): + tm = subset_params.get('time') + if tm is not None: + tm = '/'.join(str(v) for v in tm.value) + level = subset_params.get('level') + if level is not None: + level = '/'.join(str(float(v)) for v in level.value) + area = subset_params.get('area') + if area is not None: + area = '/'.join(str(v) for v in area) + success = str(success) + + row_writer(collection=collection, + area=area, + level=level, + time=tm, + success=success, + message=message) + + + def _get_tests(self): + """ + Gets a sequence of tests to do. + """ + + # subset horizontally and in time + for _ in range(3): + yield {'do_area': True, + 'do_time': True}, 'subset by area and time' + + # subset in all dimensions (though levels will be ignored if there isn't + # a level dimension) + for _ in range(3): + yield {'do_area': True, + 'do_time': True, + 'do_levels': True}, 'subset by area, time and levels' + + # and a few special cases to try + special_cases = [{"force_lon_wrap": True}, + {"force_pole": "north"}, + {"force_pole": "south"}, + {"force_lon_wrap": True, "force_pole": "south"}] + + for area_args in special_cases: + yield {'do_area': True, + 'do_time': True, + 'do_levels': True, + 'area_args': area_args}, f"Doing special case: {area_args}" + + + def _open_dataset(self, collection): + + print(f"opening {collection}") + + if collection[0] == "/": + return xr.open_dataset(collection) + + #==================================== + ## THIS OUGHT TO WORK, but I need to work out how to configure the paths + ## for the tests. strace shows that it is using /tmp/roocs.ini but this + ## is overwritten every time (tests/conftest.py gets imported and + ## write_roocs_cfg() is run) + ## + ## strace also says that it is also looking in all of these: + ## /clisops/clisops/etc/roocs.ini + ## /daops/daops/etc/roocs.ini + ## /roocs-utils/roocs_utils/etc/roocs.ini + ## + ## For now, symlinking ~/.mini-esgf-data/master/test_data/badc + ## to point to the real /badc will do for a workaround -- now it finds + ## the dataset + ds = open_xr_dataset(collection) + + ## OR HERE IS ANOTHER POSSIBLE TEMPORARY WORKAROUND + # import glob # for open_xr_dataset workaround + # assert isinstance(collection, str) + #paths = glob.glob(f'/badc/cmip6/data/{collection.replace(".","/")}/*.nc') + #ds = open_xr_dataset(paths) + #==================================== + + return ds + + + def _dump_dims(self, ds, label=""): + ignore = ("bnds", "vertices") + print(f"{label} dataset dimensions: " + f'{",".join(f"{k}:{v}" for k, v in ds.dims.items() if k not in ignore)}') + + + def _get_data(self, val): + return val.data if isinstance(val, (xr.DataArray, xr.IndexVariable)) else val + + + def _do_test(self, collection, ds, **kwargs): + """ + Do an individual test on a collection + """ + + print(f"Doing test on {collection} using {kwargs}") + self._dump_dims(ds, label="input") + subset_params, expect = self._prepare_test(ds, **kwargs) + + temp_dir = tempfile.TemporaryDirectory() + self._tmpdir = temp_dir.name + + print("===========================================") + print("Doing test with:") + print(f"\n Collection: {collection}") + print("\n Subset params:") + for k, v in subset_params.items(): + if k == "time": + v = f"time_interval({v.value})" + elif k == "level": + v = f"level_interval({tuple([float(lev) for lev in v.value])})" + print(f" {k} : {v}") + print("\n Expect to get:") + for k, v in expect.items(): + if v is not None: + print(f" {k} : {self._get_data(v)}") + if all(k in expect and expect[k] is None + for k in ("lons_in_range", "lats_in_range")): + print(" (curvilinear grid; will test lon/lat ranges, not exact vals)") + + print("\n===========================================") + + try: + result = self._subset(collection, **subset_params) + + self._check_result(result, expect, collection, subset_params) + + except KeyboardInterrupt: + raise + except Exception as exc: + success = False + message = str(exc) + self._print_error(f"TEST FAILED: {message}") + raise # FIXME: comment out + else: + success = True + message = "" + print("Test succeeded\n\n\n") + finally: + temp_dir.cleanup() + + return subset_params, success, message + + + def _print_error(self, message): + stars = "*" * len(message) + print(f"****{stars}****\n" + f"*** {message} ***\n" + f"****{stars}****\n\n\n") + + + def _prepare_test(self, ds, do_area=False, do_time=False, do_levels=False, area_args=None): + """ + returns the params to the subset function that will be needed for the test + and a dictionary of things to expect to come back from the test + + The boolean inputs do_area, do_time, do_levels control whether to subset + in each of these ways. The input area_args can contain a dictionary of arguments + to get_rand_area (ignored if do_area==False) + """ + + if area_args == None: + area_args = {} + + params = {} + expect = {} + + if do_area: + area = self._get_rand_area(ds, **area_args) + req_area = [float(v) for v in area["req_area"]] + + params["area"] = req_area + + expect["lons_in_range"] = area["lons_in_range"] + expect["lats_in_range"] = area["lats_in_range"] + else: + expect["lons_in_range"] = self._get_lons(ds) + expect["lats_in_range"] = self._get_lats(ds) + + if do_levels: + lev_int = self._get_rand_lev_int(ds) + if lev_int is not None: + params["level"] = lev_int["req_interval"] + expect["levs_in_range"] = lev_int["levs_in_range"] + else: + print("WARNING: not requesting level range as no level dimension found") + expect["levs_in_range"] = None + else: + expect["levs_in_range"] = self._get_levs(ds) + + if do_time: + time_int = self._get_rand_time_int(ds) + if time_int is not None: + params["time"] = time_int["req_interval"] + expect["times_in_range"] = time_int["times_in_range"] + else: + print("WARNING: not requesting time range as no time dimension found") + expect["times_in_range"] = None + else: + expect["times_in_range"] = self._get_times(ds) + + return params, expect + + + def _get_rand_time_int(self, ds): + """ + Returns a dictionary containing a randomly chosen time interval + (in the form needed by the subset function) and the time values + that would be expected when subsetting using that interval + """ + times = self._get_times(ds) + if times is None: + return None + t_start, t_end, vals_in_range = self._get_rand_range(times) + ts_start = self._get_time_string(t_start) + ts_end = self._get_time_string(t_end) + return {"req_interval": time_interval(ts_start, ts_end), + "times_in_range": vals_in_range} + + + def _get_rand_lev_int(self, ds): + """ + As get_rand_time_int, but for levels + """ + levs = self._get_levs(ds) + if levs is None: + return None + z_start, z_end, vals_in_range = self._get_rand_range(levs) + return {"req_interval": level_interval(z_start, z_end), + "levs_in_range": vals_in_range} + + + def _get_time_string(self, when): + """ + Turns a datetime, or time value seen in xarray, + into a string understood by time_interval + """ + if isinstance(when, datetime.datetime): + t = when + else: + t = when.values.tolist() + + return (f"{t.year:04d}-{t.month:02d}-{t.day:02d}" + f"T{t.hour:02d}:{t.minute:02d}:{t.second:02d}") + + + def _get_rand_area(self, ds, force_lon_wrap=False, force_pole=None): + """Returns a dictionary containing a randomly chosen area + (tuple of lon_w, lat_s, lon_e, lat_n) and the lon and lat + values that would be expected when subsetting using that area. + + In the curvilinear case (lon and lat variables in the file are 2d), the + expected values will be None rather than arrays of expected values. The + reason for this is that it is not possible to validate a *specific* set of + lon, lat values in any case (as although the subsetting will set any + points that are out of range to NaN, but NaN values could also be because + of missing data points within the range). + """ + + ## (lon0, lon1, lons_in_range), wrap_lon = self._get_rand_lon_range(ds) + (lon0, lon1, lons_in_range) = self._get_rand_lon_range(ds, force_wrap=force_lon_wrap) + (lat0, lat1, lats_in_range) = self._get_rand_lat_range(ds, force_pole=force_pole) + + return {"req_area": (lon0, lat0, lon1, lat1), + "lons_in_range": lons_in_range, + ## "wrap_lon": wrap_lon, + "lats_in_range": lats_in_range} + + + def _get_wrap_lon(self, lons): + """ + Get the longitude at which the wrapping occurs. + Note - the exact value is not used in the current version of the calling code, + the main purpose here is to distinguish global from limited area, so that the + requested area for subsetting does not wrap for a limited area grid. + """ + minlon = lons.min() + maxlon = lons.max() + if maxlon - minlon < 270: + # assume this is a limited area + return None + elif maxlon - minlon >= 360: + raise Exception(f"too wide lon range {minlon} to {maxlon}") + elif 0 <= minlon and maxlon < 360: + return 360. + elif -180 <= minlon and maxlon < 180: + return 180. + else: + raise Exception(f"unsupported lon range {minlon} to {maxlon}") + + + def _get_rand_lon_range(self, ds, force_wrap=False): + """ + Get a randomly chosen longitude range. This might include wrapping around + (unless the longitudes don't seem to span global range), but if force_wrap is + set to True then it ensures that this occurs. + """ + lons = self._get_lons(ds) + wrap_lon = self._get_wrap_lon(lons) + can_wrap=(wrap_lon is not None) + + if force_wrap: + print("WARNING: forcing longitude wrapping for what appears to be limited area model") + params = {"force": "wrap"} + else: + params = {"can_wrap": can_wrap} + return self._get_rand_range(lons, **params) + + + def _get_rand_lat_range(self, ds, force_pole=None): + """ + Get a randomly chosen latitude range. If force_pole is set to "north" or "south", + then the range will extend to the relevant pole. + """ + + lats = self._get_lats(ds) + + # using "force" will ensure that the range returned + # by get_rand_range goes to the end of the latitude values, + # but for the test, actually use -90 or 90. Which value is + # to be overwritten will depend on the ordering. + # + if force_pole == "north": + ret = self._get_rand_range(lats, force="upper") + return (ret[0], 90., ret[2]) if ret[1] >= ret[0] else (90., ret[1], ret[2]) + params["force"] = "upper" + elif force_pole == "south": + ret = self._get_rand_range(lats, force="lower") + return (-90., ret[1], ret[2]) if ret[1] >= ret[0] else (ret[0], -90., ret[2]) + else: + return self._get_rand_range(lats) + + + def _get_rand_range(self, var, max_fraction=.1, can_wrap=False, force=None): + """ + Get a random range from specified variable (which can be any number + of dimensions). Returns tuple of (lower, upper, values in range) + + min and max are chosen based on histogram of values, defaulting to returning + a range that includes up to to about 10% of the points (though can be less) + + can_wrap=True means could be used with longitude - e.g. for a wrapping longitude of + 360 it could return lower=-10 upper=10, and the values in range are in the range -10 to 10 + where those values from -10 to 0 are based on the values from 350 to 360 in the input + + force can be used for special cases: "lower" forces the range to include + the lower end (e.g. south pole for latitude), "upper" forces it to include + the upper end (e.g. north pole), "wrap" forces it to wrap around (the meridian + for longitude) + """ + + length = random.uniform(0, max_fraction) + + while True: + + did_wrap = False + if force == "lower": + lower_q = 0. + upper_q = length + elif force == "upper": + lower_q = 1 - length + upper_q = 1. + elif force == "wrap": + lower_q = random.uniform(1 - length, 1) + upper_q = lower_q + length - 1 + did_wrap = True + elif force is not None: + raise ValueError(f"unrecognised force value {force}") + elif can_wrap: + lower_q = random.uniform(0, 1) + upper_q = lower_q + length + did_wrap = upper_q > 1 + if did_wrap: + upper_q -= 1 + else: + lower_q = random.uniform(0, 1 - length) + upper_q = lower_q + length + + lower = var.quantile(lower_q) + upper = var.quantile(upper_q) + + if did_wrap: + in_range = (lower <= var) | (var <= upper) + if var.ndim == 1: + modulus = 360 + lower_vals = self._get_data(var[lower <= var]) + upper_vals = self._get_data(var[var <= upper]) + + if var.min() >= 0: + # data uses 0-360 + # change e.g. 350..10 to -10..10 + # (as subset function doesn't seem to like 350..370) + lower -= modulus + lower_vals -= modulus + else: + # data uses -180 to 180 + # change e.g. 170..-170 to 170..190 + upper += modulus + upper_vals += modulus + + vals_in_range = np.concatenate((lower_vals, upper_vals)) + else: + vals_in_range = None + else: + in_range = (lower <= var) & (var <= upper) + vals_in_range = var[in_range] if var.ndim == 1 else None + + if in_range.sum() > 0: + break + length = min(length * 2, 1) + + if var.ndim == 1 and len(var) > 1 and var[1] < var[0]: + # if the variable is 1d and is decreasing, then swap the ordering + # of the bounds (which were chosen based on histogram so will be increasing at this point) + assert not did_wrap # longitude wrap not verified for decreasing lons + assert lower <= upper + lower, upper = upper, lower + + return (lower, upper, vals_in_range) + + + def _get_lons(self, ds): + "Get the longitude variable for a dataset. Not necessarily a coordinate variable." + return self._get_var_by_stdname(ds, "longitude") + def _get_lats(self, ds): + "Get the latitude variable for a dataset. Not necessarily a coordinate variable." + return self._get_var_by_stdname(ds, "latitude") + + def _get_times(self, ds): + "Get the time coordinate variable for a dataset" + return self._get_axis_by_direction(ds, "T") + def _get_levs(self, ds): + "Get the height coordinate variable for a dataset" + return self._get_axis_by_direction(ds, "Z") + + + def _get_var_by_stdname(self, ds, stdname): + """ + Get variable with a given standard name. + Will raise an exception if there is not exactly one. + """ + vars = [v for v in ds.variables.values() + if v.attrs.get("standard_name") == stdname] + var, = vars + return var + + + def _is_curvi(self, lons, lats): + """ + Test whether given lon and lat variables correspond to a curvilinear grid + (as opposed to 1d coordinate variables). + """ + if len(lons.dims) == 1 and len(lats.dims) == 1: + return False + elif len(lons.dims) == 2 and lons.dims == lats.dims: + return True + else: + raise Exception(f"unexpected dimensionality of lon, lat arrays: {lon.dims} and {lat.dims}") + + + def _get_gridded_vars(self, ds, lons=None, lats=None, curvi=None): + if lons is None: + lons = self._get_lons(ds) + if lats is None: + lats = self._get_lats(ds) + if curvi is None: + curvi = self._is_curvi(lons, lats) + + if curvi: + grid_dims = lons.dims + else: + latdim, = lats.dims + londim, = lons.dims + grid_dims = (latdim, londim) + return [v for v in ds.data_vars.values() + if v.dims[-2:] == grid_dims] + + + def _get_lonlat_ranges_for_curvi(self, ds): + """ + get ranges of lon, lat values where there is actual data (not masked) + for any variable on the lon, lat grid + """ + + lons = self._get_lons(ds) + lats = self._get_lats(ds) + + grid_shape = lons.shape + + # get a list of the variables that are on the lon-lat grid, + # other than lon, lat themselves, and then get an array of + # positions where there are non-NaN values in *any* of these + # variables, for any other level / height + # + # (start with 2d array of False, then use logical OR with + # each variable, although probably there is only one such var) + # + vars_on_grid = self._get_gridded_vars(ds, + lons=lons, lats=lats, + curvi=True) + if not vars_on_grid: + raise Exception("could not find any data variables") + has_data = np.zeros(grid_shape, dtype=bool) + for var in vars_on_grid: + var_has_data = np.logical_not(np.isnan(var.data)) + # reduce to 2d using "any" in loop - there might be a cleverer way + while var_has_data.ndim > 2: + var_has_data = np.any(var_has_data, axis=(var_has_data.ndim - 3)) + assert var_has_data.shape == grid_shape + has_data |= var_has_data + + if not has_data.any(): + print ("WARNING: data variable(s) contain only NaN values") + return (None, None) + + lons_where_data = lons.data[has_data] + lats_where_data = lats.data[has_data] + + lon_range_where_data = (lons_where_data.min(), lons_where_data.max()) + lat_range_where_data = (lats_where_data.min(), lats_where_data.max()) + + print("For this curvilinear dataset:") + print(f" Lon range where data {lon_range_where_data} " + f"(overall {lons.data.min(), lons.data.max()})") + print(f" Lat range where data {lat_range_where_data} " + f"(overall {lats.data.min(), lats.data.max()})") + + return (lon_range_where_data, lat_range_where_data) + + + def _check_in_range(self, actual_range, requested_range, label="", **kwargs): + """ + check whether the range of values lies WITHIN the requested range of values; + allow for the fact that the requested range might be passed in decreasing order + """ + if not self._is_in_range(actual_range, requested_range, **kwargs): + raise Exception(f"range check for {label} failed") + print(f"{label}: Verified range {actual_range} within {requested_range}") + + + def _is_in_range(self, actual_range, requested_range): + """ + helper for check_in_range. Returns boolean. + """ + req0, req1 = requested_range + if req0 > req1: + req0, req1 = req1, req0 + return all((req0 <= val <= req1 + for val in actual_range)) + + + def _check_equal(self, vals, exp_vals, label=""): + """ + Check whether the values match the expected values + """ + + vals = self._get_data(vals) + exp_vals = self._get_data(exp_vals) + + #print(f"\n\n============ {label} =========\n\n") + #print(f"Actual vals: {vals}") + #print(f"Expected vals: {exp_vals}") + + if not np.array_equal(vals, exp_vals): + raise Exception(f"equal values assertion failed for {label}:" + f"actual {vals}, expected: {exp_vals}") + + print(f"{label}: checked {len(vals)} values match expected values") + + + def _check_result(self, result, expect, collection, subset_params): + + """ + Do various checks on the result of subsetting. result should be an + object that has a file_uris property. expect should be dictionary + returned by prepare_test. + + subset_params is the dictionary of keyword args that was passed to + subset (needed for curvilinear - see below). + + Currently, the tests that are done are: + - check that the exact set of coordinate values matches what was expected, + - except that in the case of a curvilinear grid, check to see what lon and + lat ranges are spanned by the lon and lat arrays in the file but only + including those locations where there is data with non-NaN values; then + check that these are within the range that was requested + """ + + fn, = result.file_uris + dsout = xr.open_dataset(fn) + self._dump_dims(dsout, label="output") + + lons = self._get_lons(dsout) + lats = self._get_lats(dsout) + + curvi = self._is_curvi(lons, lats) + + if curvi and "area" in subset_params: + area = subset_params["area"] + req_lon_range = (area[0], area[2]) + req_lat_range = (area[1], area[3]) + + lon_range, lat_range = self._get_lonlat_ranges_for_curvi(dsout) + if lon_range is not None: + print("Checking that lon-lat values with (unmasked) data in requested range") + self._check_in_range(lon_range, req_lon_range, label="longitudes") + self._check_in_range(lat_range, req_lat_range, label="latitudes") + else: + print("Skipping lon/lat range check: did not find any data in requested range") + else: + expected_lons = expect["lons_in_range"] + expected_lats = expect["lats_in_range"] + + self._check_equal(lons, expected_lons, label="longitudes") + self._check_equal(lats, expected_lats, label="latitudes") + + expected_levs = expect["levs_in_range"] + if expected_levs is not None: + levs = self._get_levs(dsout) + self._check_equal(levs, expected_levs, label="levels") + + expected_times = expect["times_in_range"] + if expected_times is not None: + times = self._get_times(dsout) + self._check_equal(times, expected_times, label="times") + timeval = times[0] + else: + timeval = None + + vars_on_grid = self._get_gridded_vars(dsout, + lons=lons, lats=lats, + curvi=curvi) + + self._do_nans_check(vars_on_grid, collection, timeval) + + + def _get_axis_by_direction(self, ds, direction): + """ + Get the axis with the specified direction attribute ("X", "Y", "Z" or "T") + or if there is none, returns None. + (If more than one, raises an exception.) + """ + axes = [] + for name in ds.dims: + axis = ds[name] + if name == "bnds": + continue + if hasattr(axis, "axis") and axis.axis == direction: + axes.append(axis) + if len(axes) > 1: + raise Exception(f"more than one dimension with axis {direction}") + elif len(axes) == 1: + return axes[0] + else: + return None + + + def _do_nans_check(self, vars_on_grid, collection, timeval): + """ + Check whether any of the variables consist of all NaN values. + If so, then extract (for one timestep) the variable on all grid points. + If that variable consists of some but not all NaN values, then assume + that the subsetter is working and we are just requesting a region where + there are NaN values, so downgrade to a warning -- otherwise raise + an exception. + """ + + vars_with_nans = set() + for var in vars_on_grid: + if np.all(np.isnan(var.data)): + vars_with_nans.add(var.name) + + if vars_with_nans: + dsout2 = self._get_fullfield(collection, timeval=timeval) + + for varname in vars_with_nans: + isnan_ref = np.isnan(dsout2[varname].data) + if np.any(isnan_ref) and not np.all(isnan_ref): + print(f"Warning: {varname} contains all NaNs (but some non-NaNs exist in full field).") + else: + raise Exception(f"Variable {varname} contains all NaNs.") + + + def _subset(self, collection, **subset_params): + + if self._use_cache: + cached_fn = cached_output_fn(collection, subset_params) + if os.path.exists(cached_fn): + print(f"using cache: {cached_fn}") + result = CachedResult(cached_fn) + else: + result = subset(collection, + output_dir=self._tmpdir, + **subset_params) + fn, = result.file_uris + print(f"caching: {cached_fn}") + shutil.copy(fn, cached_fn) + else: + result = subset(collection, + output_dir=self._tmpdir, + **subset_params) + return result + + + def _get_fullfield(self, collection, timeval=None): + """ + Get the result of subsetting, optionally by time to a particular + time value, but not in any other dimensions. + """ + params = {} + if timeval is not None: + t_iso = timeval.data.tolist().isoformat() + params['time'] = time_interval(t_iso, t_iso) + + result = self._subset(collection, **params) + fn, = result.file_uris + return xr.open_dataset(fn) + + +def _cli_arg_parser(): + parser = argparse.ArgumentParser() + parser.add_argument('test_location', + help='test location, e.g. CEDA') + parser.add_argument('--seed', '-s', type=int, + help='value for random seed') + parser.add_argument('--cache', '-c', action='store_true', + help='use cache of subsetter output') + return parser.parse_args() + + +def cli(): + args = _cli_arg_parser() + if args.seed is not None: + random.seed(args.seed) + else: + if args.cache: + print('Warning: using --cache but not --seed; cache hits are unlikely.') + tester = SubsetTester(test_location=args.test_location, + use_cache=args.cache) + tester.main() + diff --git a/tests/data_pools_checks/test_results_db.py b/tests/data_pools_checks/test_results_db.py new file mode 100644 index 0000000..215fb16 --- /dev/null +++ b/tests/data_pools_checks/test_results_db.py @@ -0,0 +1,135 @@ +import os +import random +import subprocess as sp +import pytest + +from .results_db import ResultsDB + + +columns = ['test_location', 'test_time', 'collection', 'area', + 'level', 'time', 'success', 'message'] + +tst_data = [{'collection': str(random.random()), + 'time': 'blah', + 'success': 'True'} + for _ in range(30)] + + +tst_data2 = [{'collection': 'foo', + 'time': 'bar'}, + {'collection': 'baz', + 'message': 'qux'}, + {'collection': 'quux', + 'message': 'corge'}] + +tst_csv_data = (','.join(columns) + '\n' + + ',,foo,,,bar,,\n,,baz,,,,,qux') + +tst_sql_data =f'id|{"|".join(columns)}\n1|||quux|||||corge' + +tmp_csvgz = '/tmp/test.csv.gz' +tmp_sqlite = '/tmp/test_tmp.sql' + +def _purge(): + for path in (tmp_csvgz, tmp_sqlite): + if os.path.exists(path): + os.remove(path) + +def _open_test_db(**kwargs): + params = {'csvgz_file': tmp_csvgz, + 'sqlite_file': tmp_sqlite, + 'merge_every': 10} + params.update(kwargs) + return ResultsDB(columns, **params) + +def test_write_and_read(): + _purge() + with _open_test_db() as rdb: + for row in tst_data: + rdb.add_row(**row) + + with _open_test_db() as rdb: + data_read = list(rdb.read_csvgz_as_dicts()) + + assert tst_data == data_read + + +def test_write_and_read_while_open(): + + _purge() + merge_every = 7 + + with _open_test_db(merge_every=merge_every) as rdb: + for i, row in enumerate(tst_data, start=1): + rdb.add_row(**row) + + rows_in_csv = list(rdb.read_csvgz_as_dicts()) + rows_in_sqlite = list(rdb.read_sqlite_as_dicts()) + + print(f'CSV: {len(rows_in_csv)} rows, ' + f'sqlite: {len(rows_in_sqlite)} rows, ') + + assert rows_in_csv + rows_in_sqlite == tst_data[:i] + assert len(rows_in_csv) % merge_every == 0 + assert len(rows_in_sqlite) < merge_every + + +def test_manual_merge(): + + _purge() + merge_every = 7 + + with _open_test_db(merge_every=merge_every) as rdb: + for i, row in enumerate(tst_data, start=1): + rdb.add_row(**row) + rdb.merge_and_tidy() + + rows_in_csv = list(rdb.read_csvgz_as_dicts()) + rows_in_sqlite = list(rdb.read_sqlite_as_dicts()) + assert rows_in_csv == tst_data[:i] + assert rows_in_sqlite == [] + + +def test_ro_db(): + _purge() + test_write_and_read() + with _open_test_db(read_only=True) as rdb: + data_read = list(rdb.read_csvgz_as_dicts()) + assert tst_data == data_read + + +def test_write_to_rodb(): + _purge() + with _open_test_db(read_only=True) as rdb: + with pytest.raises(Exception): + rdb.add_row(**tst_data[0]) + + +def test_db_no_sqlite(): + _purge() + test_write_and_read() + with _open_test_db(sqlite_file=None) as rdb: + data_read = list(rdb.read_csvgz_as_dicts()) + assert tst_data == data_read + + +def test_db_write_no_sqlite(): + _purge() + test_write_and_read() + with _open_test_db(sqlite_file=None) as rdb: + with pytest.raises(Exception): + rdb.add_row(**tst_data[0]) + + +def test_dump_data(): + + _purge() + with _open_test_db(merge_every=2) as rdb: + for row in tst_data2: + rdb.add_row(**row) + csvdata = sp.getoutput(f'zcat {tmp_csvgz}') + sqldata = sp.getoutput('echo "select * from test_results" ' + f'| sqlite3 -header {tmp_sqlite}') + + assert csvdata == tst_csv_data + assert sqldata == tst_sql_data