KoD 1.7.7
- fix di routine ai canali/server\n\n
This commit is contained in:
@@ -26,17 +26,19 @@ def fix_js_args(func):
|
||||
return func
|
||||
code = append_arguments(six.get_function_code(func), ('this', 'arguments'))
|
||||
|
||||
return types.FunctionType(
|
||||
result = types.FunctionType(
|
||||
code,
|
||||
six.get_function_globals(func),
|
||||
func.__name__,
|
||||
closure=six.get_function_closure(func))
|
||||
return result
|
||||
|
||||
|
||||
def append_arguments(code_obj, new_locals):
|
||||
co_varnames = code_obj.co_varnames # Old locals
|
||||
co_names = code_obj.co_names # Old globals
|
||||
co_names += tuple(e for e in new_locals if e not in co_names)
|
||||
new_args = tuple(e for e in new_locals if e not in co_names)
|
||||
co_names += new_args
|
||||
co_argcount = code_obj.co_argcount # Argument count
|
||||
co_code = code_obj.co_code # The actual bytecode as a string
|
||||
|
||||
@@ -76,26 +78,51 @@ def append_arguments(code_obj, new_locals):
|
||||
names_to_varnames = dict(
|
||||
(co_names.index(name), varnames.index(name)) for name in new_locals)
|
||||
|
||||
is_new_bytecode = sys.version_info >= (3, 11)
|
||||
# Now we modify the actual bytecode
|
||||
modified = []
|
||||
drop_future_cache = False
|
||||
for inst in instructions(code_obj):
|
||||
if is_new_bytecode and inst.opname == "CACHE":
|
||||
assert inst.arg == 0
|
||||
if not drop_future_cache:
|
||||
modified.extend(write_instruction(inst.opcode, inst.arg))
|
||||
else:
|
||||
# We need to inject NOOP to not break jumps :(
|
||||
modified.extend(write_instruction(dis.opmap["NOP"], 0))
|
||||
|
||||
continue
|
||||
op, arg = inst.opcode, inst.arg
|
||||
# If the instruction is a LOAD_GLOBAL, we have to check to see if
|
||||
# it's one of the globals that we are replacing. Either way,
|
||||
# update its arg using the appropriate dict.
|
||||
drop_future_cache = False
|
||||
if inst.opcode == LOAD_GLOBAL:
|
||||
if inst.arg in names_to_varnames:
|
||||
idx = inst.arg
|
||||
if is_new_bytecode:
|
||||
idx = idx // 2
|
||||
if idx in names_to_varnames:
|
||||
op = LOAD_FAST
|
||||
arg = names_to_varnames[inst.arg]
|
||||
elif inst.arg in name_translations:
|
||||
arg = name_translations[inst.arg]
|
||||
arg = names_to_varnames[idx]
|
||||
# Cache is not present after LOAD_FAST and needs to be removed.
|
||||
drop_future_cache = True
|
||||
elif idx in name_translations:
|
||||
tgt = name_translations[idx]
|
||||
if is_new_bytecode:
|
||||
tgt = 2*tgt + (inst.arg % 2)
|
||||
arg = tgt
|
||||
else:
|
||||
raise ValueError("a name was lost in translation")
|
||||
raise(ValueError("a name was lost in translation last instruction %s" % str(inst)))
|
||||
# If it accesses co_varnames or co_names then update its argument.
|
||||
elif inst.opcode in opcode.haslocal:
|
||||
arg = varname_translations[inst.arg]
|
||||
elif inst.opcode in opcode.hasname:
|
||||
# for example STORE_GLOBAL
|
||||
arg = name_translations[inst.arg]
|
||||
elif is_new_bytecode and inst.opcode in opcode.hasfree:
|
||||
# Python 3.11+ adds refs at the end (after locals), for whatever reason...
|
||||
if inst.argval not in code_obj.co_varnames[:code_obj.co_argcount]: # we do not need to remap existing arguments, they are not shifted by new ones.
|
||||
arg = inst.arg + len(new_locals)
|
||||
modified.extend(write_instruction(op, arg))
|
||||
if six.PY2:
|
||||
code = ''.join(modified)
|
||||
@@ -113,23 +140,26 @@ def append_arguments(code_obj, new_locals):
|
||||
code_obj.co_filename, code_obj.co_name,
|
||||
code_obj.co_firstlineno, code_obj.co_lnotab,
|
||||
code_obj.co_freevars, code_obj.co_cellvars)
|
||||
|
||||
# Done modifying codestring - make the code object
|
||||
if hasattr(code_obj, "replace"):
|
||||
# Python 3.8+
|
||||
return code_obj.replace(
|
||||
code_obj = code_obj.replace(
|
||||
co_argcount=co_argcount + new_locals_len,
|
||||
co_nlocals=code_obj.co_nlocals + new_locals_len,
|
||||
co_code=code,
|
||||
co_names=names,
|
||||
co_varnames=varnames)
|
||||
return code_obj
|
||||
else:
|
||||
return types.CodeType(*args)
|
||||
|
||||
|
||||
def instructions(code_obj):
|
||||
# easy for python 3.4+
|
||||
if sys.version_info >= (3, 4):
|
||||
def instructions(code_obj, show_cache=True):
|
||||
if sys.version_info >= (3, 11):
|
||||
# Python 3.11 introduced "cache instructions", hidden by default.
|
||||
for inst in dis.Bytecode(code_obj, show_caches=show_cache):
|
||||
yield inst
|
||||
elif sys.version_info >= (3, 4): # easy for python 3.4+
|
||||
for inst in dis.Bytecode(code_obj):
|
||||
yield inst
|
||||
else:
|
||||
@@ -171,7 +201,7 @@ def write_instruction(op, arg):
|
||||
chr((arg >> 8) & 255)
|
||||
]
|
||||
else:
|
||||
raise ValueError("Invalid oparg: {0} is too large".format(oparg))
|
||||
raise ValueError("Invalid oparg: {0} is too large".format(arg))
|
||||
else: # python 3.6+ uses wordcode instead of bytecode and they already supply all the EXTENDEND_ARG ops :)
|
||||
if arg is None:
|
||||
return [chr(op), 0]
|
||||
@@ -191,6 +221,7 @@ def write_instruction(op, arg):
|
||||
# raise ValueError("Invalid oparg: {0} is too large".format(oparg))
|
||||
|
||||
|
||||
|
||||
def check(code_obj):
|
||||
old_bytecode = code_obj.co_code
|
||||
insts = list(instructions(code_obj))
|
||||
@@ -221,24 +252,99 @@ def check(code_obj):
|
||||
'Your python version made changes to the bytecode')
|
||||
|
||||
|
||||
|
||||
|
||||
def signature(func):
|
||||
code_obj = six.get_function_code(func)
|
||||
return (code_obj.co_nlocals, code_obj.co_argcount, code_obj.co_nlocals, code_obj.co_stacksize,
|
||||
code_obj.co_flags, code_obj.co_names, code_obj.co_varnames,
|
||||
code_obj.co_filename,
|
||||
code_obj.co_freevars, code_obj.co_cellvars)
|
||||
|
||||
check(six.get_function_code(check))
|
||||
|
||||
|
||||
|
||||
def compare_func(fake_func, gt_func):
|
||||
print(signature(fake_func))
|
||||
print(signature(gt_func))
|
||||
assert signature(fake_func) == signature(gt_func)
|
||||
fake_ins = list(instructions(six.get_function_code(fake_func), show_cache=False))
|
||||
real_ins = list(instructions(six.get_function_code(gt_func), show_cache=False))
|
||||
offset = 0
|
||||
pos = 0
|
||||
for e in fake_ins:
|
||||
if e.opname == "NOP":
|
||||
offset += 1 # ignore NOPs that are inserted in place of old cache.
|
||||
else:
|
||||
real = real_ins[pos]
|
||||
fake = e
|
||||
print("POS %d OFFSET: %d FAKE VS REAL" % (pos, offset))
|
||||
print(fake)
|
||||
print(real)
|
||||
assert fake.opcode == real.opcode
|
||||
if fake.opcode in dis.hasjabs or fake.opcode in dis.hasjrel:
|
||||
pass
|
||||
else:
|
||||
assert fake.arg == real.arg
|
||||
assert fake.argval == real.argval or fake.opname in ["LOAD_CONST"]
|
||||
assert fake.is_jump_target == real.is_jump_target
|
||||
|
||||
pos += 1
|
||||
assert pos == len(real_ins), (pos, len(real_ins))
|
||||
print("DONE, looks good.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
x = 'Wrong'
|
||||
dick = 3000
|
||||
import faulthandler
|
||||
|
||||
def func(a):
|
||||
print(x, y, z, a)
|
||||
print(dick)
|
||||
d = (x, )
|
||||
for e in (e for e in x):
|
||||
print(e)
|
||||
return x, y, z
|
||||
faulthandler.enable()
|
||||
|
||||
func2 = types.FunctionType(
|
||||
append_arguments(six.get_function_code(func), ('x', 'y', 'z')),
|
||||
six.get_function_globals(func),
|
||||
func.__name__,
|
||||
closure=six.get_function_closure(func))
|
||||
args = (2, 2, 3, 4), 3, 4
|
||||
assert func2(1, *args) == args
|
||||
def func(cmpfn):
|
||||
if not this.Class in ('Array', 'Arguments'):
|
||||
return this.to_object() # do nothing
|
||||
arr = []
|
||||
for i in xrange(len(this)):
|
||||
arr.append(this.get(six.text_type(i)))
|
||||
|
||||
if not arr:
|
||||
return this
|
||||
if not cmpfn.is_callable():
|
||||
cmpfn = None
|
||||
cmp = lambda a, b: sort_compare(a, b, cmpfn)
|
||||
if six.PY3:
|
||||
key = functools.cmp_to_key(cmp)
|
||||
arr.sort(key=key)
|
||||
else:
|
||||
arr.sort(cmp=cmp)
|
||||
for i in xrange(len(arr)):
|
||||
this.put(six.text_type(i), arr[i])
|
||||
|
||||
return this
|
||||
|
||||
|
||||
def func_gt(cmpfn, this, arguments):
|
||||
if not this.Class in ('Array', 'Arguments'):
|
||||
return this.to_object() # do nothing
|
||||
arr = []
|
||||
for i in xrange(len(this)):
|
||||
arr.append(this.get(six.text_type(i)))
|
||||
|
||||
if not arr:
|
||||
return this
|
||||
if not cmpfn.is_callable():
|
||||
cmpfn = None
|
||||
cmp = lambda a, b: sort_compare(a, b, cmpfn)
|
||||
if six.PY3:
|
||||
key = functools.cmp_to_key(cmp)
|
||||
arr.sort(key=key)
|
||||
else:
|
||||
arr.sort(cmp=cmp)
|
||||
for i in xrange(len(arr)):
|
||||
this.put(six.text_type(i), arr[i])
|
||||
|
||||
return this
|
||||
|
||||
|
||||
func2 = fix_js_args(func)
|
||||
compare_func(func2, func_gt)
|
||||
|
||||
Reference in New Issue
Block a user