This repository was archived by the owner on Dec 3, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
181 lines (154 loc) · 5.64 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# JZK, 2012-11-14: parameter added that skips HTML-encoding SOAP parameters when asked so by equellasoap.py
import codecs
import os
import os.path
import time
import hashlib
import urllib
import binascii
from xml.dom import Node
ASCII_ENC = codecs.getencoder('us-ascii')
SOAP_HEADER_TOKEN = '''<s:Header>
<equella><token>%(token)s</token></equella>
</s:Header>'''
SOAP_REQUEST = '''<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:se="http://schemas.xmlsoap.org/soap/encoding/" se:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
%(header)s
<s:Body>
<ns1:%(method)s xmlns:ns1="%(ns)s">
%(params)s
</ns1:%(method)s>
</s:Body>
</s:Envelope>'''
SOAP_PARAMETER = '<ns1:%(name)s xsi:type="%(type)s"%(arrayType)s>%(value)s</ns1:%(name)s>'
def escape (s):
return s.replace ('&', '&').replace ('<', '<').replace ('>', '>').replace ('"', '"').replace ("'", ''')
def unzipFile (zipfile, directory, f):
os.system ('unzip -o "%s" "%s" -d "%s"' % (zipfile, f, directory))
def zipFile (zipfile, f):
os.system ('zip -j -9 "%s" "%s"' % (zipfile, f))
def getManifest (tempdir, zipfile):
unzipFile (zipfile, tempdir, 'imsmanifest.xml')
return file (os.path.join (tempdir, 'imsmanifest.xml'), 'rb').read ()
def copyFile (src, dest):
file (dest, 'wb').write (file (src, 'rb').read ())
def updateManifest (tempdir, zipfile, manifest):
file (os.path.join (tempdir, 'imsmanifest.xml'), 'wb').write (manifest)
zipFile (zipfile, os.path.join (tempdir, 'imsmanifest.xml'))
def extractColumnResults (xpaths, xmls):
xml = parseString (xmls)
results = []
for node in xml.getElementsByTagName ('result'):
curResult = {}
for xpath in xpaths:
cur = node
elements = xpath.split ('/')
if elements [-1] [:1] == '@':
elements, attr = elements [:-1], elements [-1] [1:]
else:
attr = None
for element in elements [1:]:
cur = get_named_child_element (xml, cur, element)
if attr:
curResult [xpath] = cur.getAttribute (attr)
else:
curResult [xpath] = value_as_string (cur)
results.append (curResult)
return results
def value_as_string (node):
node.normalize ()
return ''.join ([x.nodeValue for x in node.childNodes])
def value_as_string_array (node):
node.normalize ()
return [x.firstChild.nodeValue for x in node.childNodes]
def get_named_child_value (node, name):
return value_as_string (node.getElementsByTagName (name) [0])
def create_actual_node (dom, nodename, value):
if type (value) == list:
return reduce (lambda a, b: a + b, [create_actual_node (dom, nodename, item) for item in value], [])
else:
rv = dom.createElement (nodename)
if type (value) == dict:
for item in reduce ((lambda a, b: a + b), [create_actual_node (dom, name, val) for name, val in value.items ()]):
rv.appendChild (item)
elif type (value) == tuple: # allow tuples instead of dictionaries to allow ordering of keys
for item in reduce ((lambda a, b: a + b), [create_actual_node (dom, name, val) for name, val in value]):
rv.appendChild (item)
else:
rv.appendChild (dom.createTextNode (unicode (value)))
return [rv]
def clean_unicode (s):
if s.__class__ == unicode:
return ASCII_ENC (s, 'xmlcharrefreplace') [0]
else:
return s
def get_named_child_elements (dom, node, name, create=True):
matches = [x for x in node.childNodes if x.nodeType == Node.ELEMENT_NODE and x.tagName == name]
if len (matches):
return matches
elif create:
child = dom.createElement (name)
node.appendChild (child)
return [child]
else:
return []
def get_named_child_element (dom, node, name, create=True):
return get_named_child_elements (dom, node, name, create) [0]
def value_as_node_or_string (cur):
cur.normalize ()
if len (cur.childNodes) == 1:
if cur.firstChild.nodeType == cur.TEXT_NODE:
return value_as_string (cur)
# Empty node
return ''
def generate_soap_envelope (name, params, ns, token=None, htmlencode=True):
# Need to handle arrays
def p(value):
if isinstance(value, list):
buf = ''
for i in value:
if htmlencode:
subvalue = clean_unicode(i)
else:
subvalue = i
buf += '<input>'+ escape(subvalue) + '</input>'
return buf
else:
if htmlencode:
value = clean_unicode(value)
return escape(value)
def arrayType(value, type):
if isinstance(value, list):
return ' ns1:arrayType="%s[%s]"' % (type, len(value))
else:
return ''
def t(value, type):
if isinstance(value, list):
return 'ns1:Array'
else:
return type
return SOAP_REQUEST % {
'ns': ns,
'method': name,
'params': '' if len(params) == 0 else ''.join([SOAP_PARAMETER % {
'name': 'in' + str(i),
'type': t(v[2], v[1]),
'value': p(v[2]),
'arrayType': arrayType(v[2], v[1])
} for i, v in enumerate(params)]),
'header': '' if token == None or len(token) == 0 else SOAP_HEADER_TOKEN % {'token': token}
}
def urlEncode(text):
return urllib.urlencode ({'q': text}) [2:]
def generateToken(username, sharedSecretId, sharedSecretValue):
seed = str (int (time.time ())) + '000'
id2 = urlEncode (sharedSecretId)
if(not(sharedSecretId == '')):
id2 += ':'
return '%s:%s%s:%s' % (
urlEncode(username),
id2,
seed,
binascii.b2a_base64(hashlib.md5(
username + sharedSecretId + seed + sharedSecretValue
).digest())
)