From edf29850b24c4c3bdd9cb0daacdd7cb325ca36a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eneko=20Uru=C3=B1uela?= Date: Fri, 30 Sep 2022 10:40:59 +0200 Subject: [PATCH 1/9] Use client.compute instead of dask.compute --- pySPFM/workflows/pySPFM.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pySPFM/workflows/pySPFM.py b/pySPFM/workflows/pySPFM.py index 617a947..5d27cd3 100644 --- a/pySPFM/workflows/pySPFM.py +++ b/pySPFM/workflows/pySPFM.py @@ -481,7 +481,7 @@ def pySPFM( final_estimates = np.empty((n_scans, n_voxels)) # Iterate between temporal and spatial regularizations - _, cluster = dask_scheduler(n_jobs) + client, _ = dask_scheduler(n_jobs) for iter_idx in range(max_iter): if spatial_weight > 0: data_temp_reg = final_estimates - estimates_temporal + data_masked @@ -502,7 +502,7 @@ def pySPFM( ) futures.append(fut) - lars_estimates = compute(futures)[0] + lars_estimates = client.compute(futures)[0] for vox_idx in range(n_voxels): estimates[:, vox_idx] = np.squeeze(lars_estimates[vox_idx][0]) @@ -527,7 +527,7 @@ def pySPFM( ) futures.append(fut) - fista_estimates = compute(futures)[0] + fista_estimates = client.compute(futures)[0] for vox_idx in range(n_voxels): estimates[:, vox_idx] = np.squeeze(fista_estimates[vox_idx][0]) @@ -549,7 +549,7 @@ def pySPFM( ) futures.append(fut) - stability_estimates = compute(futures)[0] + stability_estimates = client.compute(futures)[0] for vox_idx in range(n_voxels): auc[:, vox_idx] = np.squeeze(stability_estimates[vox_idx]) From c38a8ea6946031b873bea445510f983e2f3c71de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eneko=20Uru=C3=B1uela?= Date: Fri, 30 Sep 2022 10:55:44 +0200 Subject: [PATCH 2/9] Use scatter on hrf_norm --- pySPFM/workflows/pySPFM.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pySPFM/workflows/pySPFM.py b/pySPFM/workflows/pySPFM.py index 5d27cd3..81fc85b 100644 --- a/pySPFM/workflows/pySPFM.py +++ b/pySPFM/workflows/pySPFM.py @@ -539,15 +539,16 @@ def pySPFM( auc = np.zeros((n_scans, n_voxels)) # Solve stability regularization - futures = [] - for vox_idx in range(n_voxels): - fut = delayed_dask(stability_selection)( - hrf_norm, + hrf_norm_fut = client.scatter(hrf_norm) + futures = [ + delayed_dask(stability_selection)( + hrf_norm_fut, data_temp_reg[:, vox_idx], n_lambdas, n_surrogates, ) - futures.append(fut) + for vox_idx in range(n_voxels) + ] stability_estimates = client.compute(futures)[0] for vox_idx in range(n_voxels): From 3fec58b8168ddc79921e2e859102c7887e601d75 Mon Sep 17 00:00:00 2001 From: eurunuela Date: Fri, 30 Sep 2022 11:40:37 +0200 Subject: [PATCH 3/9] Scatter seems to be working on big data now --- pySPFM/workflows/pySPFM.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pySPFM/workflows/pySPFM.py b/pySPFM/workflows/pySPFM.py index 81fc85b..d14c884 100644 --- a/pySPFM/workflows/pySPFM.py +++ b/pySPFM/workflows/pySPFM.py @@ -550,7 +550,8 @@ def pySPFM( for vox_idx in range(n_voxels) ] - stability_estimates = client.compute(futures)[0] + stability_estimates = compute(futures)[0] + print(stability_estimates) for vox_idx in range(n_voxels): auc[:, vox_idx] = np.squeeze(stability_estimates[vox_idx]) From aa5954fe9360c7923f04d127ffc2d4312670afe6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eneko=20Uru=C3=B1uela?= Date: Fri, 30 Sep 2022 11:43:25 +0200 Subject: [PATCH 4/9] Added case for non-cluster jobs --- pySPFM/workflows/pySPFM.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pySPFM/workflows/pySPFM.py b/pySPFM/workflows/pySPFM.py index d14c884..cfa0aad 100644 --- a/pySPFM/workflows/pySPFM.py +++ b/pySPFM/workflows/pySPFM.py @@ -538,8 +538,13 @@ def pySPFM( n_lambdas = int(np.ceil(max_iter_factor * n_scans)) auc = np.zeros((n_scans, n_voxels)) + # Scatter data to workers if client is not None + if client is not None: + hrf_norm_fut = client.scatter(hrf_norm) + else: + hrf_norm_fut = hrf_norm + # Solve stability regularization - hrf_norm_fut = client.scatter(hrf_norm) futures = [ delayed_dask(stability_selection)( hrf_norm_fut, @@ -550,8 +555,11 @@ def pySPFM( for vox_idx in range(n_voxels) ] - stability_estimates = compute(futures)[0] - print(stability_estimates) + if client is not None: + stability_estimates = compute(futures)[0] + else: + stability_estimates = compute(futures, scheduler="single-threaded")[0] + for vox_idx in range(n_voxels): auc[:, vox_idx] = np.squeeze(stability_estimates[vox_idx]) From a73abfc89b5b6fa3c946385930277c2b6d052c3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eneko=20Uru=C3=B1uela?= Date: Fri, 30 Sep 2022 11:46:08 +0200 Subject: [PATCH 5/9] Add comment --- pySPFM/workflows/pySPFM.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pySPFM/workflows/pySPFM.py b/pySPFM/workflows/pySPFM.py index cfa0aad..3305ee3 100644 --- a/pySPFM/workflows/pySPFM.py +++ b/pySPFM/workflows/pySPFM.py @@ -555,6 +555,7 @@ def pySPFM( for vox_idx in range(n_voxels) ] + # Gather results if client is not None: stability_estimates = compute(futures)[0] else: From 1e69eaddf00f1bc69dd47336e783de1af76aad14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eneko=20Uru=C3=B1uela?= Date: Fri, 30 Sep 2022 11:54:26 +0200 Subject: [PATCH 6/9] Apply changes to FISTA and LARS --- pySPFM/workflows/pySPFM.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/pySPFM/workflows/pySPFM.py b/pySPFM/workflows/pySPFM.py index 3305ee3..846b836 100644 --- a/pySPFM/workflows/pySPFM.py +++ b/pySPFM/workflows/pySPFM.py @@ -491,6 +491,12 @@ def pySPFM( estimates = np.zeros((n_scans, n_voxels)) lambda_map = np.zeros(n_voxels) + # Scatter data to workers if client is not None + if client is not None: + hrf_norm_fut = client.scatter(hrf_norm) + else: + hrf_norm_fut = hrf_norm + if criterion in lars_criteria: LGR.info("Solving inverse problem with LARS...") n_lambdas = int(np.ceil(max_iter_factor * n_scans)) @@ -498,11 +504,15 @@ def pySPFM( futures = [] for vox_idx in range(n_voxels): fut = delayed_dask(solve_regularization_path, pure=False)( - hrf_norm, data_temp_reg[:, vox_idx], n_lambdas, criterion + hrf_norm_fut, data_temp_reg[:, vox_idx], n_lambdas, criterion ) futures.append(fut) - lars_estimates = client.compute(futures)[0] + # Gather results + if client is not None: + lars_estimates = compute(futures)[0] + else: + lars_estimates = compute(futures, scheduler="single-threaded")[0] for vox_idx in range(n_voxels): estimates[:, vox_idx] = np.squeeze(lars_estimates[vox_idx][0]) @@ -514,7 +524,7 @@ def pySPFM( futures = [] for vox_idx in range(n_voxels): fut = delayed_dask(fista, pure=False)( - hrf_norm, + hrf_norm_fut, data_temp_reg[:, vox_idx], criterion, max_iter_fista, @@ -527,7 +537,11 @@ def pySPFM( ) futures.append(fut) - fista_estimates = client.compute(futures)[0] + # Gather results + if client is not None: + fista_estimates = compute(futures)[0] + else: + fista_estimates = compute(futures, scheduler="single-threaded")[0] for vox_idx in range(n_voxels): estimates[:, vox_idx] = np.squeeze(fista_estimates[vox_idx][0]) @@ -538,12 +552,6 @@ def pySPFM( n_lambdas = int(np.ceil(max_iter_factor * n_scans)) auc = np.zeros((n_scans, n_voxels)) - # Scatter data to workers if client is not None - if client is not None: - hrf_norm_fut = client.scatter(hrf_norm) - else: - hrf_norm_fut = hrf_norm - # Solve stability regularization futures = [ delayed_dask(stability_selection)( From 105b26ce377b74f54b8ed27632c76e99dfb49eea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eneko=20Uru=C3=B1uela?= Date: Fri, 30 Sep 2022 11:57:45 +0200 Subject: [PATCH 7/9] Added number of surrogates to AUC calculation --- pySPFM/deconvolution/stability_selection.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pySPFM/deconvolution/stability_selection.py b/pySPFM/deconvolution/stability_selection.py index 66d891f..261655e 100644 --- a/pySPFM/deconvolution/stability_selection.py +++ b/pySPFM/deconvolution/stability_selection.py @@ -37,7 +37,7 @@ def get_subsampling_indices(n_scans, n_echos, mode="same"): return subsample_idx -def calculate_auc(coefs, lambdas): +def calculate_auc(coefs, lambdas, n_surrogates): # Create shared space of lambdas and coefficients lambdas_shared = np.zeros((lambdas.shape[0] * lambdas.shape[1])) @@ -61,7 +61,7 @@ def calculate_auc(coefs, lambdas): coefs_sorted[coefs_sorted != 0] = 1 # Calculate the AUC as the normalized area under the curve - auc = np.trapz(coefs_sorted, lambdas_sorted) / np.sum(lambdas_sorted) / coefs.shape[-1] + auc = np.trapz(coefs_sorted, lambdas_sorted) / np.sum(lambdas_sorted) / n_surrogates return auc @@ -94,6 +94,6 @@ def stability_selection(hrf_norm, data, n_lambdas, n_surrogates): # Calculate the AUC for each TR auc = np.zeros((n_scans)) for tr_idx in range(n_scans): - auc[tr_idx] = calculate_auc(estimates[tr_idx, :, :], lambdas) + auc[tr_idx] = calculate_auc(estimates[tr_idx, :, :], lambdas, n_surrogates) return auc From 9da36bdbf759994f5c4698b71cc41afe5d5e4381 Mon Sep 17 00:00:00 2001 From: eurunuela Date: Tue, 4 Oct 2022 10:15:15 +0200 Subject: [PATCH 8/9] Fix AUC calculation --- pySPFM/deconvolution/stability_selection.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pySPFM/deconvolution/stability_selection.py b/pySPFM/deconvolution/stability_selection.py index 261655e..c6e423b 100644 --- a/pySPFM/deconvolution/stability_selection.py +++ b/pySPFM/deconvolution/stability_selection.py @@ -60,8 +60,13 @@ def calculate_auc(coefs, lambdas, n_surrogates): # Turn coefs_sorted into a binary vector coefs_sorted[coefs_sorted != 0] = 1 - # Calculate the AUC as the normalized area under the curve - auc = np.trapz(coefs_sorted, lambdas_sorted) / np.sum(lambdas_sorted) / n_surrogates + # Calculate the AUC + for i in range(lambdas_sorted.shape[0]): + if i == 0: + auc = coefs_sorted[i] / n_surrogates * lambdas_sorted[i] / np.sum(lambdas_sorted) + else: + auc += coefs_sorted[i] / n_surrogates * lambdas_sorted[i] / np.sum(lambdas_sorted) + # auc = np.trapz(coefs_sorted, lambdas_sorted) / np.sum(lambdas_sorted) / n_surrogates return auc From 716079ee290aad6d6af9b2f90d6d47d5643a58c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eneko=20Uru=C3=B1uela?= Date: Wed, 5 Oct 2022 11:37:05 +0200 Subject: [PATCH 9/9] Fix AUC calculation --- pySPFM/deconvolution/stability_selection.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pySPFM/deconvolution/stability_selection.py b/pySPFM/deconvolution/stability_selection.py index c6e423b..8aad272 100644 --- a/pySPFM/deconvolution/stability_selection.py +++ b/pySPFM/deconvolution/stability_selection.py @@ -54,19 +54,19 @@ def calculate_auc(coefs, lambdas, n_surrogates): lambdas_sorted_idx = np.argsort(lambdas_shared) lambdas_sorted = np.sort(lambdas_shared) - # Sort coefficients to match lambdas + # Sum of all lambdas + sum_lambdas = np.sum(lambdas_sorted) + + # Sort coefficients coefs_sorted = coefs_shared[lambdas_sorted_idx] - # Turn coefs_sorted into a binary vector + # Make coefs_sorted binary coefs_sorted[coefs_sorted != 0] = 1 # Calculate the AUC - for i in range(lambdas_sorted.shape[0]): - if i == 0: - auc = coefs_sorted[i] / n_surrogates * lambdas_sorted[i] / np.sum(lambdas_sorted) - else: - auc += coefs_sorted[i] / n_surrogates * lambdas_sorted[i] / np.sum(lambdas_sorted) - # auc = np.trapz(coefs_sorted, lambdas_sorted) / np.sum(lambdas_sorted) / n_surrogates + auc = 0 + for i in range(lambdas_shared.shape[0]): + auc += coefs_sorted[i] * lambdas_sorted[i] / sum_lambdas return auc