1
1
import dis
2
2
from collections import deque
3
3
import inspect
4
+ import re
5
+ import sys
6
+ if sys .version_info < (3 , .0 ):
7
+ from StringIO import StringIO
8
+
4
9
5
10
class ExpressionReaction :
6
11
"""An Expression Reaction object will be called when a change is detected.
@@ -43,10 +48,19 @@ def is_placeholder(self):
43
48
def is_build_in (self ):
44
49
return self .buildin
45
50
51
+ def is_new_style (cls ):
52
+ """Checks if a class is a new style class.
53
+ See: https://stackoverflow.com/questions/2654622/identifying-that-a-variable-is-a-new-style-class-in-python"""
54
+ return hasattr (cls , '__class__' ) and ('__dict__' in dir (cls ) or hasattr (cls , '__slots__' ))
55
+
56
+
46
57
def placeaexpr (obj , attr_name , expression_reaction_object ):
47
58
"""placeaexpr will be called by the byte-code analysis
48
59
when an instrumentable attribute is found."""
49
60
61
+ if not is_new_style (obj ):
62
+ raise ValueError ('You have to use new style classes' )
63
+
50
64
if not hasattr (obj , '__listenon__' ):
51
65
if not hasattr (type (obj ), "__aexprhandler__" ):
52
66
original = type (obj ).__setattr__
@@ -201,7 +215,7 @@ def call_function_handler(inst, iq, os, vm):
201
215
for i in range (inst .argval , 0 , - 1 ):
202
216
localvars [params .args [i ]] = func_args [i - 1 ]
203
217
204
- os .append (process_function (func . __code__ , wrapper .base_obj , localvars ))
218
+ os .append (process_function (func , wrapper .base_obj , localvars ))
205
219
else :
206
220
os .append (ObjectWrapper (placeholder = True ))
207
221
@@ -220,7 +234,7 @@ def get_obj_from_wrapper(wrapper, inst):
220
234
print ("Warning: Access to Placeholder isn't supported (Instruction: " + str (inst ) + ")" )
221
235
return wrapper .obj
222
236
223
- def process_function (function_code , self , localvars = None ):
237
+ def process_function (function , self , localvars = None ):
224
238
instruction_queue = deque ()
225
239
object_stack = []
226
240
variable_mapping = {}
@@ -233,7 +247,16 @@ def process_function(function_code, self, localvars=None):
233
247
if "self" not in variable_mapping :
234
248
variable_mapping ["self" ] = ObjectWrapper (obj = self )
235
249
236
- instruction_queue .extend (dis .get_instructions (function_code ))
250
+ if sys .version_info > (3 , 0 ):
251
+ instruction_queue .extend (dis .get_instructions (function .__code__ ))
252
+ else :
253
+ old_stdout = sys .stdout
254
+ result = StringIO ()
255
+ sys .stdout = result
256
+ dis .dis (function )
257
+ sys .stdout = old_stdout
258
+ result_string = result .getvalue ()
259
+ instruction_queue .extend ([Instruction .get_instruction_from_string (s ) for s in result_string .splitlines ()])
237
260
238
261
while instruction_queue :
239
262
inst = instruction_queue .popleft ()
@@ -244,6 +267,26 @@ def process_function(function_code, self, localvars=None):
244
267
raise UnimplementedInstructionException ("Unimplemented Instruction: " + str (inst ))
245
268
return object_stack .pop ()
246
269
247
- process_function (lambda_expression . __code__ , None , localvars )
270
+ process_function (lambda_expression , None , localvars )
248
271
249
272
return expression_reaction_object
273
+
274
+ class Instruction :
275
+ def __init__ (self , opcode , argval ):
276
+ self .opcode = opcode
277
+ self .argval = argval
278
+
279
+ def __str__ (self ):
280
+ return str (self .opcode ) + " " + str (self .argval )
281
+
282
+ @staticmethod
283
+ def get_instruction_from_string (line ):
284
+ splits = line .strip ().split ("(" )
285
+ value = ""
286
+ if len (splits ) == 2 : # Has a value
287
+ value = splits [1 ].replace (")" , "" )
288
+ instruction = re .sub (r"\d" , "" , splits [0 ]).strip ()
289
+ if instruction not in dis .opmap :
290
+ return None
291
+ opcode = dis .opmap [instruction ]
292
+ return Instruction (opcode , value )
0 commit comments