14
14
from decipher .processing .pipeline import get_base_pipeline , read_raw_df
15
15
from decipher .processing .transformers import (
16
16
AgeAdder ,
17
+ HPVResults ,
17
18
ObservationMatrix ,
18
19
PersonStats ,
19
20
RiskAdder ,
@@ -213,20 +214,77 @@ def metadata(self) -> dict:
213
214
214
215
215
216
class DataManager :
217
+ """DataManager is a class for managing and organizing the datasets.
218
+
219
+ DataManager provides methods to read data from CSV files, save and load DataFrames
220
+ as parquet files for improved performance, filter data, and get feature data.
221
+
222
+ Attributes:
223
+ person_df: DataFrame containing personal data.
224
+ exams_df: DataFrame containing data about exams.
225
+ hpv_df: DataFrame containing HPV results data. See `decipher.processing.transformers.HPVResults` for details. Default is None.
226
+ screening_data: DataFrame containing screening data. Default is None.
227
+ metadata: Dictionary containing metadata.
228
+
229
+ Examples:
230
+ *** Reading data from CSV files ***
231
+ ```python
232
+ from pathlib import Path
233
+ from decipher.data import DataManager
234
+
235
+ screening_data = Path(<screening data>)
236
+ dob_data = Path(<dob data>)
237
+
238
+ # Read in from CSV
239
+ data_manager = DataManager.read_from_csv(screening_data, dob_data)
240
+ ```
241
+
242
+ **Read and Write with Parquet**
243
+ ```
244
+ from pathlib import Path
245
+ from decipher.data import DataManager
246
+
247
+ screening_data = Path(<screening data>)
248
+ dob_data = Path(<dob data>)
249
+ parquet_dir = Path(<parquet dir>)
250
+
251
+ # Read in from CSV
252
+ data_manager = DataManager.read_from_csv(screening_data, dob_data)
253
+
254
+ # Store to Parquet
255
+ data_manager.save_to_parquet(parquet_dir, engine="pyarrow")
256
+
257
+ # Read from Parquet
258
+ # Will fail if `decipher` version does not match that of stored data
259
+ data_manager = DataManager.from_parquet(parquet_dir, engine="pyarrow")
260
+
261
+ # See metadata
262
+ data_manager.metadata
263
+ ```
264
+
265
+ Note:
266
+ It is strongly advised to read the CSV files once, and then store the DataManager
267
+ to parquet. This gives much faster read times.
268
+ """
269
+
216
270
def __init__ (
217
271
self ,
218
272
person_df : pd .DataFrame ,
219
273
exams_df : pd .DataFrame ,
274
+ hpv_df : pd .DataFrame | None = None ,
220
275
screening_data : pd .DataFrame | None = None ,
221
276
metadata : dict | None = None ,
222
277
):
223
278
self .screening_data = screening_data
224
279
self .person_df = person_df
280
+ self .hpv_df = hpv_df
225
281
self .exams_df = exams_df
226
282
self .metadata = metadata or {"decipher_version" : version ("decipher" )}
227
283
228
284
@classmethod
229
- def read_from_csv (cls , screening_path : Path , dob_path : Path ):
285
+ def read_from_csv (
286
+ cls , screening_path : Path , dob_path : Path , read_hpv : bool = False
287
+ ):
230
288
base_df = _get_base_df (screening_path , dob_path )
231
289
logger .debug ("Got base DF" )
232
290
exams = Pipeline (
@@ -238,11 +296,17 @@ def read_from_csv(cls, screening_path: Path, dob_path: Path):
238
296
verbose = True ,
239
297
).fit_transform (base_df )
240
298
logger .debug ("Got exams DF" )
299
+
241
300
person_df : pd .DataFrame = PersonStats (base_df = base_df ).fit_transform (exams )
242
301
logger .debug ("Got person DF" )
302
+
303
+ hpv_df = HPVResults ().fit_transform (base_df ) if read_hpv else None
304
+ logger .debug ("Got HPV DF" )
305
+
243
306
return DataManager (
244
307
person_df = person_df ,
245
308
exams_df = exams ,
309
+ hpv_df = hpv_df ,
246
310
)
247
311
248
312
def save_to_parquet (
@@ -254,13 +318,22 @@ def save_to_parquet(
254
318
self .screening_data .to_parquet (
255
319
directory / "screening_data.parquet" , engine = engine
256
320
)
321
+ if self .hpv_df is not None :
322
+ self .hpv_df .to_parquet (directory / "hpv_df.parquet" , engine = engine )
257
323
self .person_df .to_parquet (directory / "person_df.parquet" , engine = engine )
258
324
self .exams_df .to_parquet (directory / "exams_df.parquet" , engine = engine )
259
325
with open (directory / "metadata.json" , "w" ) as file :
260
326
# We always want to store the decipher version, so if it is not
261
327
# in the metadata, add it.
262
328
json .dump ({"decipher_version" : version ("decipher" )} | self .metadata , file )
263
329
330
+ @staticmethod
331
+ def _read_if_exists (
332
+ path : Path , engine : _parquet_engine_types
333
+ ) -> pd .DataFrame | None :
334
+ """Read the parquet file at path if it exists, otherwise return None"""
335
+ return pd .read_parquet (path , engine = engine ) if path .exists () else None
336
+
264
337
@classmethod
265
338
def from_parquet (
266
339
cls ,
@@ -282,15 +355,16 @@ def from_parquet(
282
355
)
283
356
else :
284
357
logger .warning (message )
285
- if ( screening_file := directory / "screening_data.parquet" ). exists ():
286
- screening_data = pd . read_parquet ( screening_file , engine = engine )
287
- else :
288
- screening_data = None
358
+ screening_data = cls . _read_if_exists (
359
+ directory / "screening_data.parquet" , engine
360
+ )
361
+ hpv_df = cls . _read_if_exists ( directory / "hpv_df.parquet" , engine )
289
362
person_df = pd .read_parquet (directory / "person_df.parquet" , engine = engine )
290
363
exams_df = pd .read_parquet (directory / "exams_df.parquet" , engine = engine )
291
364
return DataManager (
292
365
person_df = person_df ,
293
366
exams_df = exams_df ,
367
+ hpv_df = hpv_df ,
294
368
screening_data = screening_data ,
295
369
metadata = metadata ,
296
370
)
0 commit comments