1
1
from os import path
2
2
import mudata as mu
3
3
import numpy as np
4
+ import scanpy as sc
5
+ import pandas as pd
4
6
import sys
5
7
import pytest
6
8
import sys
9
+ import uuid
7
10
from operator import attrgetter
8
11
9
12
## VIASH START
10
13
meta = {
11
14
'functionality_name' : 'lognorm' ,
12
15
'resources_dir' : 'resources_test/' ,
13
- 'config' : '/home/di/code/openpipeline /src/transform/log1p/config.vsh.yaml' ,
16
+ 'config' : '. /src/transform/log1p/config.vsh.yaml' ,
14
17
'executable' : "../../target/docker/transform/log1p/log1p"
15
18
}
16
19
17
20
18
21
## VIASH END
19
22
20
23
@pytest .fixture
21
- def input_path ():
22
- return f"{ meta ['resources_dir' ]} /pbmc_1k_protein_v3/pbmc_1k_protein_v3_filtered_feature_bc_matrix.h5mu"
24
+ def input_data ():
25
+ return mu .read_h5mu (f"{ meta ['resources_dir' ]} /pbmc_1k_protein_v3/pbmc_1k_protein_v3_filtered_feature_bc_matrix.h5mu" ).copy ()
26
+
27
+ @pytest .fixture
28
+ def random_h5mu_path (tmp_path ):
29
+ def wrapper ():
30
+ unique_filename = f"{ str (uuid .uuid4 ())} .h5mu"
31
+ temp_file = tmp_path / unique_filename
32
+ return temp_file
33
+ return wrapper
23
34
24
35
@pytest .mark .parametrize ("output_layer" , [None , "log_normalized" ])
25
- def test_1logp (run_component , input_path , output_layer ):
26
- output = "output.h5mu"
36
+ @pytest .mark .parametrize ("input_layer" , [None , "normalized" ])
37
+ def test_1logp (run_component , input_data , output_layer , input_layer , random_h5mu_path ):
38
+ output = random_h5mu_path ()
39
+ if input_layer :
40
+ mod = input_data .mod ["rna" ]
41
+ mod .layers [input_layer ] = mod .X .copy ()
42
+ # Overwrite the original layer to make sure
43
+ # it is not accidentally used as input layer.
44
+ mod .X [:] = 0
45
+ input_path = random_h5mu_path ()
46
+ input_data .write (input_path )
27
47
run_args = [
28
48
"--input" , input_path ,
29
49
"--output" , output ,
30
50
"--output_compresion" , "gzip"
31
51
]
32
52
if output_layer :
33
53
run_args .extend (["--output_layer" , output_layer ])
54
+ if input_layer :
55
+ run_args .extend (["--input_layer" , input_layer ])
34
56
run_component (run_args )
35
57
get_output_layer = attrgetter ("X" ) if not output_layer else lambda x : getattr (x , 'layers' )[output_layer ]
36
58
@@ -49,16 +71,31 @@ def test_1logp(run_component, input_path, output_layer):
49
71
50
72
assert rna_in .shape == rna_out .shape , "Should have same shape as before"
51
73
assert prot_in .shape == prot_out .shape , "Should have same shape as before"
74
+ input_layer_data = rna_in .X if not input_layer else rna_in .layers [input_layer ]
75
+ assert np .mean (input_layer_data ) != np .mean (get_output_layer (rna_out )), "Expression should have changed"
52
76
53
- assert np . mean ( rna_in . X ) != np . mean ( get_output_layer ( rna_out )), "Expression should have changed"
54
-
55
- nz_row , nz_col = rna_in . X . nonzero ()
56
- row_corr = np .corrcoef (rna_in . X [ nz_row [ 0 ],:]. toarray (). flatten (), get_output_layer ( rna_out )[ nz_row [ 0 ],: ].toarray ().flatten ())[ 0 , 1 ]
57
- col_corr = np . corrcoef ( rna_in . X [:, nz_col [ 0 ]]. toarray (). flatten (), get_output_layer (rna_out )[:,nz_col [0 ]].toarray ().flatten ())[0 ,1 ]
77
+ nz_row , nz_col = input_layer_data . nonzero ()
78
+ row_corr = np . corrcoef ( input_layer_data [ nz_row [ 0 ],:]. toarray (). flatten (),
79
+ get_output_layer ( rna_out )[ nz_row [ 0 ],:]. toarray (). flatten ())[ 0 , 1 ]
80
+ col_corr = np .corrcoef (input_layer_data [:, nz_col [ 0 ] ].toarray ().flatten (),
81
+ get_output_layer (rna_out )[:,nz_col [0 ]].toarray ().flatten ())[0 ,1 ]
58
82
assert row_corr > .1
59
83
assert col_corr > .1
60
84
61
85
assert 'log1p' in rna_out .uns
62
86
87
+ # Make sure that the original input layer has not been overwritten
88
+ layers_to_test = [None ] + list (rna_in .layers .keys ())
89
+ for layer in layers_to_test :
90
+ if layer != output_layer :
91
+ in_data = sc .get .var_df (rna_in ,
92
+ keys = rna_in .obs_names .to_list (),
93
+ layer = layer )
94
+ out_data = sc .get .var_df (rna_out ,
95
+ keys = rna_in .obs_names .to_list (),
96
+ layer = layer )
97
+ pd .testing .assert_frame_equal (in_data , out_data )
98
+
99
+
63
100
if __name__ == '__main__' :
64
101
sys .exit (pytest .main ([__file__ ]))
0 commit comments