forked from distributed-system-analysis/smallfile
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinvoke_process.py
147 lines (126 loc) · 4.85 KB
/
invoke_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/python
# -*- coding: utf-8 -*-
'''
invoke_process.py
launch multiple subprocesses running SmallfileWorkload instance
Copyright 2012 -- Ben England
Licensed under the Apache License at http://www.apache.org/licenses/LICENSE-2.0
See Appendix on this page for instructions pertaining to license.
'''
import multiprocessing
import unittest
import smallfile
import os
import time
# this class launches multiple threads with SmallfileWorkload instances
# we do this because we can use > 1 core this way, with python threading,
# it doesn't really use > 1 core because of the GIL (global lock)
# occasional status reports could be sent back using pipe as well
class subprocess(multiprocessing.Process):
def __init__(self, invocation):
multiprocessing.Process.__init__(self)
(conn1, conn2) = multiprocessing.Pipe(False)
self.receiver = conn1 # master process receives test result data here
self.sender = conn2 # slave process sends test result data here
invocation.buf = None
invocation.biggest_buf = None
invocation.log = None
self.invoke = invocation # all workload generated by this object
def run(self):
try:
self.invoke.do_workload()
self.invoke.log.debug('exiting subprocess and returning invoke '
+ str(self.invoke))
except Exception as e:
print('Exception seen in thread %s host %s (tail %s) ' %
(self.invoke.tid, self.invoke.onhost, self.invoke.log_fn()))
self.invoke.log.error(str(e))
self.status = self.invoke.NOTOK
finally:
self.rsptimes = None # response time array already saved to file
self.invoke.log = None # log objects cannot be serialized
self.invoke.buf = None
self.invoke.biggest_buf = None
self.rsptimes = []
self.sender.send(self.invoke)
# below are unit tests for SmallfileWorkload
# including multi-threaded test
# to run, just do "python invoke_process.py"
def deltree(dir_tree):
assert len(dir_tree) > 6
if not os.path.exists(dir_tree):
return
assert os.path.isdir(dir_tree)
for (dir, subdirs, files) in os.walk(dir_tree, topdown=False):
for f in files:
os.unlink(os.path.join(dir, f))
for d in subdirs:
os.rmdir(os.path.join(dir, d))
os.rmdir(dir_tree)
ok = 0
class Test(unittest.TestCase):
def setUp(self):
self.invok = smallfile.SmallfileWorkload()
self.invok.debug = True
self.invok.verbose = True
self.invok.tid = 'regtest'
self.invok.start_log()
deltree(self.invok.src_dirs[0])
os.makedirs(self.invok.src_dirs[0], 0o644)
def test_multiproc_stonewall(self):
self.invok.log.info('starting stonewall test')
thread_ready_timeout = 4
thread_count = 4
for tree in self.invok.top_dirs:
deltree(tree)
os.mkdir(tree)
for dir in self.invok.src_dirs:
os.mkdir(dir)
for dir in self.invok.dest_dirs:
os.mkdir(dir)
os.mkdir(self.invok.network_dir)
self.invok.starting_gate = os.path.join(self.invok.network_dir,
'starting-gate')
sgate_file = self.invok.starting_gate
invokeList = []
for j in range(0, thread_count):
s = smallfile.SmallfileWorkload()
# s.log_to_stderr = True
s.verbose = True
s.tid = str(j)
s.prefix = 'thr_'
s.suffix = 'foo'
s.iterations = 10
s.stonewall = False
s.starting_gate = sgate_file
invokeList.append(s)
threadList = []
for s in invokeList:
threadList.append(subprocess(s))
for t in threadList:
t.start()
threads_ready = True
for i in range(0, thread_ready_timeout):
threads_ready = True
for s in invokeList:
thread_ready_file = s.gen_thread_ready_fname(s.tid)
if not os.path.exists(thread_ready_file):
threads_ready = False
if threads_ready:
break
time.sleep(1)
if not threads_ready:
raise Exception('threads did not show up within %d seconds'
% thread_ready_timeout)
time.sleep(1)
smallfile.touch(sgate_file)
for t in threadList:
rtnd_invok = t.receiver.recv()
t.join()
self.invok.log.info(str(rtnd_invok))
if rtnd_invok.status != ok:
raise Exception('subprocess failure for %s invocation %s: '
% (str(t), str(rtnd_invok)))
# so you can just do "python invoke_process.py" to test it
if __name__ == '__main__':
unittest.main()