forked from distributed-system-analysis/smallfile
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsmallfile.py
2058 lines (1756 loc) · 73.5 KB
/
smallfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python
# -*- coding: utf-8 -*-
'''
smallfile.py -- SmallfileWorkload class used in each workload thread
Copyright 2012 -- Ben England
Licensed under the Apache License at http://www.apache.org/licenses/LICENSE-2.0
See Appendix on this page for instructions pertaining to license.
Created on Apr 22, 2009
'''
# repeat a file operation N times
# allow for multi-thread tests with stonewalling
# we can launch any combination of these to simulate more complex workloads
# possible enhancements:
# embed parallel python and thread launching logic so we can have both
# CLI and GUI interfaces to same code
#
# to run just one of unit tests do
# python -m unittest smallfile.Test.your-unit-test
# "unittest" regression test API has changed,
# unittest2 is there for backwards compatibility
# so it now uses unittest2,
# but it isn't installed by default so we have to conditionalize its use
# we only need it installed where we want to run regression test this way
# on Fedora:
# yum install python-unittest2
# alternative single-test syntax:
# python smallfile.py -v Test.test_c1_Mkdir
import os
import os.path
from os.path import exists, join
import sys
import time
import copy
import random
import logging
import threading
import socket
import errno
import codecs
OK = 0 # system call return code for success
NOTOK = 1
KB_PER_GB = 1 << 20
pct_files_min = 90 # min % of files considered acceptable for a test run
# we have to support a variety of python environments,
# so for optional features don't blow up if they aren't there, just remember
xattr_installed = True
try:
import xattr
xattr_installed = True
except ImportError as e:
pass
fadvise_installed = False
try:
import drop_buffer_cache
fadvise_installed = True
except ImportError as e:
pass
fallocate_installed = False
try:
import fallocate # not yet in python os module
fallocate_installed = True
except ImportError as e:
pass
unittest2_installed = False
try:
import unittest2
unittest2_installed = True
except ImportError as e:
import unittest
# Windows 2008 server seemed to have this environment variable
# didn't check if it's universal
is_windows_os = (os.getenv('HOMEDRIVE') is not None)
# O_BINARY variable means we don't need to special-case windows
# in every open statement
O_BINARY = 0
if is_windows_os:
O_BINARY = os.O_BINARY
# FIXME: pass in file pathname instead of file number
class MFRdWrExc(Exception):
def __init__(self, opname_in, filenum_in, rqnum_in, bytesrtnd_in):
self.opname = opname_in
self.filenum = filenum_in
self.rqnum = rqnum_in
self.bytesrtnd = bytesrtnd_in
def __str__(self):
return 'file ' + str(self.filenum) + ' request ' \
+ str(self.rqnum) + ' byte count ' + str(self.bytesrtnd) \
+ ' ' + self.opname
class SMFResultException(Exception):
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg
# avoid exception if file we wish to delete is not there
def ensure_deleted(fn):
try:
if os.path.lexists(fn):
os.unlink(fn)
except Exception as e:
# could be race condition with other client processes/hosts
# if was race condition, file will no longer be there
if os.path.exists(fn):
raise Exception('exception while ensuring %s deleted: %s'
% (fn, str(e)))
# just create an empty file
# leave exception handling to caller
def touch(fn):
with open(fn, 'w'):
pass
# abort routine just cleans up threads
def abort_test(abort_fn, thread_list):
if not os.path.exists(abort_fn):
touch(abort_fn)
# create directory if it's not already there
def ensure_dir_exists(dirpath):
if not os.path.exists(dirpath):
parent_path = os.path.dirname(dirpath)
if parent_path == dirpath:
raise Exception('ensure_dir_exists: ' +
'cannot obtain parent path ' +
'of non-existent path: ' +
dirpath)
ensure_dir_exists(parent_path)
try:
os.mkdir(dirpath)
except os.error as e:
if e.errno != errno.EEXIST: # workaround for filesystem bug
raise e
else:
if not os.path.isdir(dirpath):
raise Exception('%s already exists and is not a directory!'
% dirpath)
# next two routines are for asynchronous replication
# we remember the time when a file was completely written
# and its size using xattr,
# then we read xattr in do_await_create operation
# and compute latencies from that
def remember_ctime_size_xattr(filedesc):
nowtime = str(time.time())
st = os.fstat(filedesc)
xattr.setxattr(filedesc, 'user.smallfile-ctime-size', nowtime + ','
+ str(st.st_size / SmallfileWorkload.BYTES_PER_KB))
def recall_ctime_size_xattr(pathname):
(ctime, size_kb) = (None, None)
try:
with open(pathname, 'r') as fd:
xattr_str = xattr.getxattr(fd, 'user.smallfile-ctime-size')
token_pair = str(xattr_str).split(',')
ctime = float(token_pair[0][2:])
size_kb = int(token_pair[1].split('.')[0])
except IOError as e:
eno = e.errno
if eno != errno.ENODATA:
raise e
return (ctime, size_kb)
def get_hostname(h):
if h is None:
h = socket.gethostname()
return h
def hostaddr(h): # return the IP address of a hostname
if h is None:
a = socket.gethostbyname(socket.gethostname())
else:
a = socket.gethostbyname(h)
return a
def hexdump(b):
s = ''
for j in range(0, len(b)):
s += '%02x' % b[j]
return s
def binary_buf_str(b): # display a binary buffer as a text string
if sys.version < '3':
return codecs.unicode_escape_decode(b)[0]
else:
if isinstance(b, str):
return bytes(b).decode('UTF-8', 'backslashreplace')
else:
return b.decode('UTF-8', 'backslashreplace')
class SmallfileWorkload:
rename_suffix = '.rnm'
all_op_names = [
'create',
'delete',
'append',
'overwrite',
'read',
'readdir',
'rename',
'delete-renamed',
'cleanup',
'symlink',
'mkdir',
'rmdir',
'stat',
'chmod',
'setxattr',
'getxattr',
'swift-get',
'swift-put',
'ls-l',
'await-create',
]
OK = 0
NOTOK = 1
BYTES_PER_KB = 1024
MICROSEC_PER_SEC = 1000000.0
# number of files between stonewalling check at smallest file size
max_files_between_checks = 100
# default for UNIX
tmp_dir = os.getenv('TMPDIR')
if tmp_dir is None: # windows case
tmp_dir = os.getenv('TEMP')
if tmp_dir is None: # assume POSIX-like
tmp_dir = '/var/tmp'
# constant file size
fsdistr_fixed = -1
# a file size distribution type that results in a few files much larger
# than the mean and mostly files much smaller than the mean
fsdistr_random_exponential = 0
# multiply mean size by this to get max file size
random_size_limit = 8
# large prime number used to randomly select directory given file number
some_prime = 900593
# build largest supported buffer, and fill it full of random hex digits,
# then just use a substring of it below
biggest_buf_size_bits = 20
random_seg_size_bits = 10
biggest_buf_size = 1 << biggest_buf_size_bits
# initialize files with up to this many different random patterns
buf_offset_range = 1 << 10
loggers = {} # so we only instantiate logger for a given thread name once
# constructor sets up initial, default values for test parameters
# user overrides these values using CLI interface parameters
# for boolean parameters,
# preceding comment describes what happens if parameter is set to True
def __init__(self):
# all threads share same directory
self.is_shared_dir = False
# file operation type, default idempotent
self.opname = 'cleanup'
# how many files accessed, default = quick test
self.iterations = 200
# top of directory tree, default always exists on local fs
top = join(self.tmp_dir, 'smf')
# file that tells thread when to start running
self.starting_gate = None
# transfer size (KB), 0 = default to file size
self.record_sz_kb = 0
# total data read/written in KB
self.total_sz_kb = 64
# file size distribution, default = all files same size
self.filesize_distr = self.fsdistr_fixed
# how many directories to use
self.files_per_dir = 100
# fanout if > 1 dir/thread needed
self.dirs_per_dir = 10
# size of xattrs to read/write
self.xattr_size = 0
# number of xattrs to read/write
self.xattr_count = 0
# test-over polling rate
self.files_between_checks = 20
# prepend this to file name
self.prefix = ''
# append this to file name
self.suffix = ''
# directories are accessed randomly
self.hash_to_dir = False
# fsync() issued after a file is modified
self.fsync = False
# update xattr with ctime+size
self.record_ctime_size = False
# end test as soon as any thread finishes
self.stonewall = True
# finish remaining requests after test ends
self.finish_all_rq = True
# append response times to .rsptimes
self.measure_rsptimes = False
# write/expect binary random (incompressible) data
self.incompressible = False
# , compare read data to what was written
self.verify_read = True
# how many microsec to sleep between each file
self.pause_between_files = 0
# same as pause_between_files but in floating-point seconds
self.pause_sec = 0.0
# which host the invocation ran on
self.onhost = get_hostname(None)
# thread ID
self.tid = ''
# debug to screen
self.log_to_stderr = False
# print debug messages
self.verbose = False
# create directories as needed
self.dirs_on_demand = False
# for internal use only
self.set_top([top])
# logging level, default is just informational, warning or error
self.log_level = logging.INFO
# will be initialized later with thread-safe python logging object
self.log = None
# buffer for reads and writes will be here
self.buf = None
# copy from here on writes, compare to here on reads
self.biggest_buf = None
# random seed used to control sequence of random numbers,
# default to different sequence every time
self.randstate = random.Random()
# reset object state variables
self.reset()
# convert object to string for logging, etc.
def __str__(self):
s = ' opname=' + self.opname
s += ' iterations=' + str(self.iterations)
s += ' top_dirs=' + str(self.top_dirs)
s += ' src_dirs=' + str(self.src_dirs)
s += ' dest_dirs=' + str(self.dest_dirs)
s += ' network_dir=' + str(self.network_dir)
s += ' shared=' + str(self.is_shared_dir)
s += ' record_sz_kb=' + str(self.record_sz_kb)
s += ' total_sz_kb=' + str(self.total_sz_kb)
s += ' filesize_distr=' + str(self.filesize_distr)
s += ' files_per_dir=%d' % self.files_per_dir
s += ' dirs_per_dir=%d' % self.dirs_per_dir
s += ' dirs_on_demand=' + str(self.dirs_on_demand)
s += ' xattr_size=%d' % self.xattr_size
s += ' xattr_count=%d' % self.xattr_count
s += ' starting_gate=' + str(self.starting_gate)
s += ' prefix=' + self.prefix
s += ' suffix=' + self.suffix
s += ' hash_to_dir=' + str(self.hash_to_dir)
s += ' fsync=' + str(self.fsync)
s += ' stonewall=' + str(self.stonewall)
s += ' files_between_checks=' + str(self.files_between_checks)
s += ' verify_read=' + str(self.verify_read)
s += ' incompressible=' + str(self.incompressible)
s += ' finish_all_rq=' + str(self.finish_all_rq)
s += ' rsp_times=' + str(self.measure_rsptimes)
s += ' tid=' + self.tid
s += ' loglevel=' + str(self.log_level)
s += ' filenum=' + str(self.filenum)
s += ' filenum_final=' + str(self.filenum_final)
s += ' rq=' + str(self.rq)
s += ' rq_final=' + str(self.rq_final)
s += ' start=' + str(self.start_time)
s += ' end=' + str(self.end_time)
s += ' elapsed=' + str(self.elapsed_time)
s += ' host=' + str(self.onhost)
s += ' status=' + str(self.status)
s += ' abort=' + str(self.abort)
s += ' log_to_stderr=' + str(self.log_to_stderr)
s += ' verbose=' + str(self.verbose)
return s
# if you want to use the same instance for multiple tests
# call reset() method between tests
def reset(self):
# results returned in variables below
self.filenum = 0 # how many files have been accessed so far
self.filenum_final = 0 # how many files accessed when test ended
self.rq = 0 # how many reads/writes have been attempted so far
self.rq_final = 0 # how many reads/writes completed when test ended
self.abort = False
self.file_dirs = [] # subdirectores within per-thread dir
self.status = self.NOTOK
self.pause_sec = self.pause_between_files / self.MICROSEC_PER_SEC
# to measure per-thread elapsed time
self.start_time = None
self.end_time = None
self.elapsed_time = 0
# to measure file operation response times
self.op_start_time = None
self.rsptimes = []
self.rsptime_filename = None
# given a set of top-level directories (e.g. for NFS benchmarking)
# set up shop in them
# we only use one directory for network synchronization
def set_top(self, top_dirs, network_dir=None):
self.top_dirs = top_dirs
# create/read files here
self.src_dirs = [join(d, 'file_srcdir') for d in top_dirs]
# rename files to here
self.dest_dirs = [join(d, 'file_dstdir') for d in top_dirs]
# directory for synchronization files shared across hosts
self.network_dir = join(top_dirs[0], 'network_shared')
if network_dir:
self.network_dir = network_dir
# create per-thread log file
# we have to avoid getting the logger for self.tid more than once,
# or else we'll add a handler more than once to this logger
# and cause duplicate log messages in per-invoke log file
def start_log(self):
try:
self.log = self.loggers[self.tid]
except KeyError:
self.log = logging.getLogger(self.tid)
self.loggers[self.tid] = self.log
if self.log_to_stderr:
h = logging.StreamHandler()
else:
h = logging.FileHandler(self.log_fn())
log_format = (self.tid +
' %(asctime)s - %(levelname)s - %(message)s')
formatter = logging.Formatter(log_format)
h.setFormatter(formatter)
self.log.addHandler(h)
self.loglevel = logging.INFO
if self.verbose:
self.loglevel = logging.DEBUG
self.log.setLevel(self.loglevel)
# indicate start of an operation
def op_starttime(self, starttime=None):
if self.measure_rsptimes:
if not starttime:
self.op_start_time = time.time()
else:
self.op_start_time = starttime
# indicate end of an operation,
# this appends the elapsed time of the operation to .rsptimes array
def op_endtime(self, opname):
if self.measure_rsptimes:
end_time = time.time()
rsp_time = end_time - self.op_start_time
self.rsptimes.append((opname, self.op_start_time, rsp_time))
self.op_start_time = None
# save response times seen by this thread
def save_rsptimes(self):
fname = 'rsptimes_' + str(self.tid) + '_' + get_hostname(None) \
+ '_' + self.opname + '_' + str(self.start_time) + '.csv'
rsptime_fname = join(self.network_dir, fname)
with open(rsptime_fname, 'w') as f:
for (opname, start_time, rsp_time) in self.rsptimes:
# time granularity is microseconds, accuracy is less
f.write('%8s, %9.6f, %9.6f\n' %
(opname, start_time - self.start_time, rsp_time))
os.fsync(f.fileno()) # particularly for NFS this is needed
# determine if test interval is over for this thread
# each thread uses this to signal that it is at the starting gate
# (i.e. it is ready to immediately begin generating workload)
def gen_thread_ready_fname(self, tid, hostname=None):
return join(self.tmp_dir, 'thread_ready.' + tid + '.tmp')
# each host uses this to signal that it is
# ready to immediately begin generating workload
# each host places this file in a directory shared by all hosts
# to indicate that this host is ready
def gen_host_ready_fname(self, hostname=None):
if not hostname:
hostname = self.onhost
return join(self.network_dir, 'host_ready.' + hostname + '.tmp')
# abort file tells other threads not to start test
# because something has already gone wrong
def abort_fn(self):
return join(self.network_dir, 'abort.tmp')
# stonewall file stops test measurement
# (does not stop worker thread unless --finish N is used)
def stonewall_fn(self):
return join(self.network_dir, 'stonewall.tmp')
# log file for this worker thread goes here
def log_fn(self):
return join(self.tmp_dir, 'invoke_logs-%s.log' % self.tid)
# file for result stored as pickled python object
def host_result_filename(self, result_host=None):
if result_host is None:
result_host = self.onhost
return join(self.network_dir, result_host + '_result.pickle')
# we use the seed function to control per-thread random sequence
# we want seed to be saved
# so that operations subsequent to initial create will know
# what file size is for thread T's file j without having to stat the file
def init_random_seed(self):
if self.filesize_distr == self.fsdistr_fixed:
return
fn = self.gen_thread_ready_fname(self.tid,
hostname=self.onhost) + '.seed'
thread_seed = str(time.time())
self.log.debug('seed opname: ' + self.opname)
if self.opname == 'create' or self.opname == 'swift-put':
thread_seed = str(time.time()) + ' ' + self.tid
ensure_deleted(fn)
with open(fn, 'w') as seedfile:
seedfile.write(str(thread_seed))
self.log.debug('write seed %s ' % thread_seed)
elif ['append', 'read', 'swift-get'].__contains__(self.opname):
with open(fn, 'r') as seedfile:
thread_seed = seedfile.readlines()[0].strip()
self.log.debug('read seed %s ' % thread_seed)
self.randstate.seed(thread_seed)
def get_next_file_size(self):
next_size = self.total_sz_kb
if self.filesize_distr == self.fsdistr_random_exponential:
next_size = max(1, min(int(self.randstate.expovariate(1.0
/ self.total_sz_kb)), self.total_sz_kb
* self.random_size_limit))
if self.log_level == logging.DEBUG:
self.log.debug('rnd expn file size %d KB' % next_size)
else:
self.log.debug('fixed file size %d KB' % next_size)
return next_size
# tell test driver that we're at the starting gate
# this is a 2 phase process
# first wait for each thread on this host to reach starting gate
# second, wait for each host in test to reach starting gate
# in case we have a lot of threads/hosts, sleep 1 sec between polls
# also, wait 2 sec after seeing starting gate to maximize probability
# that other hosts will also see it at the same time
def wait_for_gate(self):
if self.starting_gate:
gateReady = self.gen_thread_ready_fname(self.tid)
touch(gateReady)
while not os.path.exists(self.starting_gate):
if os.path.exists(self.abort_fn()):
raise Exception('thread ' + str(self.tid)
+ ' saw abort flag')
# wait a little longer so that
# other clients have time to see that gate exists
time.sleep(0.3)
# record info needed to compute test statistics
def end_test(self):
self.rq_final = self.rq
self.filenum_final = self.filenum
self.end_time = time.time()
if self.filenum >= self.iterations \
and not os.path.exists(self.stonewall_fn()):
try:
touch(self.stonewall_fn())
self.log.info('stonewall file written by thread %s on host %s'
% (self.tid, get_hostname(None)))
except IOError as e:
err = e.errno
if err != errno.EEXIST:
# workaround for possible bug in Gluster
if err != errno.EINVAL:
raise e
else:
self.log.info('saw EINVAL on stonewall, ignoring it')
def test_ended(self):
return self.end_time > self.start_time
# see if we should do one more file
# to minimize overhead, do not check stonewall file before every iteration
def do_another_file(self):
if self.stonewall and self.filenum % self.files_between_checks == 0:
if not self.test_ended() and os.path.exists(self.stonewall_fn()):
self.log.info('stonewalled after ' + str(self.filenum)
+ ' iterations')
self.end_test()
# if user doesn't want to finish all requests and test has ended, stop
if not self.finish_all_rq and self.test_ended():
return False
if self.filenum >= self.iterations:
if not self.test_ended():
self.end_test()
return False
if self.abort:
raise Exception('thread ' + str(self.tid)
+ ' saw abort flag')
self.filenum += 1
if self.pause_sec > 0.0:
time.sleep(self.pause_sec)
return True
# in this method of directory selection, as filenum increments upwards,
# we place F = files_per_dir files into directory,
# then next F files into directory D+1, etc.
# we generate directory pathnames like radix-D numbers
# where D is subdirectories per directory
# see URL http://gmplib.org/manual/Binary-to-Radix.html#Binary-to-Radix
# this algorithm should take O(log(F))
def mk_seq_dir_name(self, file_num):
dir_in = file_num // self.files_per_dir
# generate powers of self.files_per_dir not greater than dir_in
level_dirs = []
dirs_for_this_level = self.dirs_per_dir
while dirs_for_this_level <= dir_in:
level_dirs.append(dirs_for_this_level)
dirs_for_this_level *= self.dirs_per_dir
# generate each "digit" in radix-D number as result of quotients
# from dividing remainder by next lower power of D (think of base 10)
levels = len(level_dirs)
level = levels - 1
pathlist = []
while level > -1:
dirs_in_level = level_dirs[level]
quotient = dir_in // dirs_in_level
dir_in = dir_in - quotient * dirs_in_level
dirnm = 'd_' + str(quotient).zfill(3)
pathlist.append(dirnm)
level -= 1
pathlist.append('d_' + str(dir_in).zfill(3))
return os.sep.join(pathlist)
def mk_hashed_dir_name(self, file_num):
pathlist = []
random_hash = file_num * self.some_prime % self.iterations
dir_num = random_hash // self.files_per_dir
while dir_num > 1:
dir_num_hash = dir_num * self.some_prime % self.dirs_per_dir
pathlist.insert(0, 'h_' + str(dir_num_hash).zfill(3))
dir_num //= self.dirs_per_dir
return os.sep.join(pathlist)
def mk_dir_name(self, file_num):
if self.hash_to_dir:
return self.mk_hashed_dir_name(file_num)
else:
return self.mk_seq_dir_name(file_num)
# generate file name to put in this directory
# prefix can be used for process ID or host ID for example
# names are unique to each thread
# automatically computes subdirectory for file based on
# files_per_dir, dirs_per_dir and placing file as high in tree as possible
# for multiple-mountpoint tests,
# we need to select top-level dir based on file number
# to spread load across mountpoints,
# so we use round-robin mountpoint selection
# NOTE: this routine is called A LOT,
# so need to optimize by avoiding lots of os.path.join calls
def mk_file_nm(self, base_dirs, filenum=-1):
if filenum == -1:
filenum = self.filenum
listlen = len(base_dirs)
tree = base_dirs[filenum % listlen]
components = [
tree,
os.sep,
self.file_dirs[filenum],
os.sep,
self.prefix,
'_',
self.tid,
'_',
str(filenum),
'_',
self.suffix,
]
return ''.join(components)
# generate buffer contents, use these on writes and
# compare against them for reads where random data is used,
def create_biggest_buf(self, contents_random):
# generate random byte sequence if desired.
random_segment_size = 1 << self.random_seg_size_bits
if not self.incompressible:
# generate a random byte sequence of length 2^random_seg_size_bits
# and then repeat the sequence
# until we get to size 2^biggest_buf_size_bits in length
if contents_random:
biggest_buf = bytearray([self.randstate.randrange(0, 127)
for k in
range(0, random_segment_size)])
else:
biggest_buf = bytearray([k % 128 for k in
range(0, random_segment_size)])
# to prevent confusion in python when printing out buffer contents
# WARNING: this line breaks PythonTidy utility
biggest_buf = biggest_buf.replace(b'\\', b'!')
# keep doubling buffer size until it is big enough
next_power_2 = (self.biggest_buf_size_bits -
self.random_seg_size_bits)
for j in range(0, next_power_2):
biggest_buf.extend(biggest_buf[:])
else: # if incompressible
# for buffer to be incompressible,
# we can't repeat the same (small) random sequence
# FIXME: why shouldn't we always do it this way?
# initialize to a single random byte
biggest_buf = bytearray([self.randstate.randrange(0, 255)])
assert len(biggest_buf) == 1
powerof2 = 1
powersum = 1
for j in range(0, self.biggest_buf_size_bits - 1):
assert len(biggest_buf) == powersum
powerof2 *= 2
powersum += powerof2
# biggest_buf length is now 2^j - 1
biggest_buf.extend(
bytearray([self.randstate.randrange(0, 255)
for k in range(0, powerof2)]))
biggest_buf.extend(
bytearray([self.randstate.randrange(0, 255)]))
# add extra space at end
# so that we can get different buffer contents
# by just using different offset into biggest_buf
biggest_buf.extend(biggest_buf[0:self.buf_offset_range])
assert (len(biggest_buf) ==
self.biggest_buf_size + self.buf_offset_range)
return biggest_buf
# allocate buffer of correct size with offset based on filenum, tid, etc.
def prepare_buf(self):
# determine max record size of I/Os
total_space_kb = self.record_sz_kb
if self.record_sz_kb == 0:
if self.filesize_distr != self.fsdistr_fixed:
total_space_kb = self.total_sz_kb * self.random_size_limit
else:
total_space_kb = self.total_sz_kb
total_space = total_space_kb * self.BYTES_PER_KB
if total_space > SmallfileWorkload.biggest_buf_size:
total_space = SmallfileWorkload.biggest_buf_size
# ensure pre-allocated pre-initialized buffer space
# big enough for xattr ops
# use +, not *, see way buffers are used
total_xattr_space = self.xattr_size + self.xattr_count
if total_xattr_space > total_space:
total_space = total_xattr_space
# create a buffer with somewhat unique contents for this file,
# so we'll know if there is a read error
# FIXME: think harder about this
unique_offset = (hash(self.tid) + self.filenum) % 1024
assert total_space + unique_offset < len(self.biggest_buf)
# NOTE: this means self.biggest_buf must be
# 1K larger than SmallfileWorkload.biggest_buf_size
self.buf = self.biggest_buf[unique_offset:total_space + unique_offset]
# assert len(self.buf) == total_space
# determine record size to use in test
# if record size is 0, that means to use largest possible value
# we try to use the file size as the record size, but
# if the biggest_buf_size is less than the file size, use it instead.
def get_record_size_to_use(self):
rszkb = self.record_sz_kb
if rszkb == 0:
rszkb = self.total_sz_kb
if rszkb > SmallfileWorkload.biggest_buf_size // self.BYTES_PER_KB:
rszkb = SmallfileWorkload.biggest_buf_size // self.BYTES_PER_KB
return rszkb
# make all subdirectories needed for test in advance,
# don't include in measurement
# use set to avoid duplicating operations on directories
def make_all_subdirs(self):
self.log.debug('making all subdirs')
abort_filename = self.abort_fn()
if self.tid != '00' and self.is_shared_dir:
return
dirset = set()
# FIXME: we could check to see if
# self.dest_dirs is actually used before we include it
for tree in [self.src_dirs, self.dest_dirs]:
tree_range = range(0, len(tree))
# if we are hashing into directories,
# we can't make any assumptions about
# which directories will be used first
if self.hash_to_dir:
dir_range = range(0, self.iterations + 1)
else:
# optimization: if not hashing into directories,
# we put files_per_dir files into each directory, so
# we only need to check every files_per_dir filenames
# for a new directory name
dir_range = range(0,
self.iterations + self.files_per_dir,
self.files_per_dir)
# we need this range because
# we need to create directories in each top dir
for k in tree_range:
for j in dir_range:
fpath = self.mk_file_nm(tree, j + k)
dpath = os.path.dirname(fpath)
dirset.add(dpath)
# since we put them into a set, duplicates are filtered out
for unique_dpath in dirset:
if exists(abort_filename):
break
if not exists(unique_dpath):
try:
os.makedirs(unique_dpath, 0o777)
except OSError as e:
if not (e.errno == errno.EEXIST
and self.is_shared_dir):
raise e
# clean up all subdirectories
# algorithm same as make_all_subdirs
def clean_all_subdirs(self):
self.log.debug('cleaning all subdirs')
if self.tid != '00' and self.is_shared_dir:
return
for tree in [self.src_dirs, self.dest_dirs]:
# for efficiency, when we are not using --hash-to-dirs option,
# we only make filename for every files_per_dir files
if self.hash_to_dir:
dir_range = range(0, self.iterations + 1)
else:
dir_range = range(0,
self.iterations + self.files_per_dir,
self.files_per_dir)
# construct set of directories
tree_range = range(0, len(tree))
dirset = set()
for k in tree_range:
for j in dir_range:
fpath = self.mk_file_nm(tree, j + k)
dpath = os.path.dirname(fpath)
dirset.add(dpath)
# now clean them up if empty,
# and do this recursively on parent directories also
# until top directory or non-empty directory is reached
for unique_dpath in dirset:
# determine top directory (i.e. one of list passed in --top)
topdir = None
for t in tree:
if unique_dpath.startswith(t):
topdir = t
break
if not topdir:
raise Exception(('directory %s is not part of ' +
'any top-level directory in %s')
% (unique_dpath, str(tree)))