-
Notifications
You must be signed in to change notification settings - Fork 62
/
Copy pathrequest_validator.py
151 lines (127 loc) · 6.33 KB
/
request_validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import re
from collections import OrderedDict
from oslo_log import log as logging
import hpedockerplugin.exception as exception
LOG = logging.getLogger(__name__)
class RequestValidator(object):
def __init__(self, backend_configs):
self._backend_configs = backend_configs
def validate_request(self, contents):
self._validate_name(contents['Name'])
operations_map = OrderedDict()
operations_map['virtualCopyOf,scheduleName'] = \
self._validate_snapshot_schedule_opts
operations_map['virtualCopyOf,scheduleFrequency'] = \
self._validate_snapshot_schedule_opts
operations_map['virtualCopyOf,snaphotPrefix'] = \
self._validate_snapshot_schedule_opts
operations_map['virtualCopyOf'] = \
self._validate_snapshot_opts
operations_map['cloneOf'] = \
self._validate_clone_opts
operations_map['importVol'] = \
self._validate_import_vol_opts
operations_map['replicationGroup'] = \
self._validate_rcg_opts
operations_map['help'] = self._validate_help_opt
if 'Opts' in contents and contents['Opts']:
self._validate_mutually_exclusive_ops(contents)
validated = False
for op_name, validator in operations_map.items():
op_name = op_name.split(',')
found = not (set(op_name) - set(contents['Opts'].keys()))
if found:
validator(contents)
validated = True
break
# Validate regular volume options
if not validated:
self._validate_create_volume_opts(contents)
@staticmethod
def _validate_mutually_exclusive_ops(contents):
mutually_exclusive_ops = ['virtualCopyOf', 'cloneOf', 'importVol',
'replicationGroup']
if 'Opts' in contents and contents['Opts']:
received_opts = contents.get('Opts').keys()
diff = set(mutually_exclusive_ops) - set(received_opts)
if len(diff) < len(mutually_exclusive_ops) - 1:
mutually_exclusive_ops.sort()
msg = "Operations %s are mutually exclusive and cannot be " \
"specified together. Please check help for usage." % \
mutually_exclusive_ops
raise exception.InvalidInput(reason=msg)
@staticmethod
def _validate_opts(operation, contents, valid_opts, mandatory_opts=None):
if 'Opts' in contents and contents['Opts']:
received_opts = contents.get('Opts').keys()
if mandatory_opts:
diff = set(mandatory_opts) - set(received_opts)
if diff:
# Print options in sorted manner
mandatory_opts.sort()
msg = "One or more mandatory options %s are missing " \
"for operation %s" % (mandatory_opts, operation)
raise exception.InvalidInput(reason=msg)
diff = set(received_opts) - set(valid_opts)
if diff:
diff = list(diff)
diff.sort()
msg = "Invalid option(s) %s specified for operation %s. " \
"Please check help for usage." % \
(diff, operation)
raise exception.InvalidInput(reason=msg)
def _validate_create_volume_opts(self, contents):
valid_opts = ['compression', 'size', 'provisioning',
'flash-cache', 'qos-name', 'fsOwner',
'fsMode', 'mountConflictDelay', 'cpg',
'snapcpg', 'backend', 'manager']
self._validate_opts("create volume", contents, valid_opts)
def _validate_clone_opts(self, contents):
valid_opts = ['cloneOf', 'size', 'cpg', 'snapcpg',
'mountConflictDelay', 'manager']
self._validate_opts("clone volume", contents, valid_opts)
def _validate_snapshot_opts(self, contents):
valid_opts = ['virtualCopyOf', 'retentionHours', 'expirationHours',
'mountConflictDelay', 'size', 'manager']
self._validate_opts("create snapshot", contents, valid_opts)
def _validate_snapshot_schedule_opts(self, contents):
valid_opts = ['virtualCopyOf', 'scheduleFrequency', 'scheduleName',
'snapshotPrefix', 'expHrs', 'retHrs',
'mountConflictDelay', 'size', 'manager']
mandatory_opts = ['scheduleName', 'snapshotPrefix',
'scheduleFrequency']
self._validate_opts("create snapshot schedule", contents,
valid_opts, mandatory_opts)
def _validate_import_vol_opts(self, contents):
valid_opts = ['importVol', 'backend', 'mountConflictDelay',
'manager']
self._validate_opts("import volume", contents, valid_opts)
# Replication enabled backend cannot be used for volume import
if 'Opts' in contents and contents['Opts']:
backend_name = contents['Opts'].get('backend', None)
if not backend_name:
backend_name = 'DEFAULT'
try:
self._backend_configs[backend_name]
except KeyError:
backend_names = list(self._backend_configs.keys())
backend_names.sort()
msg = "ERROR: Backend '%s' doesn't exist. Available " \
"backends are %s. Please use " \
"a valid backend name and retry." % \
(backend_name, backend_names)
raise exception.InvalidInput(reason=msg)
def _validate_rcg_opts(self, contents):
valid_opts = ['replicationGroup', 'size', 'provisioning',
'backend', 'mountConflictDelay', 'compression',
'manager']
self._validate_opts('create replicated volume', contents, valid_opts)
def _validate_help_opt(self, contents):
valid_opts = ['help']
self._validate_opts('display help', contents, valid_opts)
@staticmethod
def _validate_name(vol_name):
is_valid_name = re.match("^[A-Za-z0-9]+[A-Za-z0-9_-]+$", vol_name)
if not is_valid_name:
msg = 'Invalid volume name: %s is passed.' % vol_name
raise exception.InvalidInput(reason=msg)