The more I mess around with the stack, the more I wish I hadn't. Don't hack globals to do what you want. Hack bytecode instead. There's two ways that I can think of to do this.
1) Add cells wrapping the references that you want into f.func_closure
. You have to reassemble the bytecode of the function to use LOAD_DEREF
instead of LOAD_GLOBAL
and generate a cell for each value. You then pass a tuple of the cells and the new code object to types.FunctionType
and get a function with the appropriate bindings. Different copies of the function can have different local bindings so it should be as thread safe as you want to make it.
2) Add arguments for your new locals at the end of the functions argument list. Replace appropriate occurrences of LOAD_GLOBAL
with LOAD_FAST
. Then construct a new function by using types.FunctionType
and passing in the new code object and a tuple of the bindings that you want as the default option. This is limited in the sense that python limits function arguments to 255 and it can't be used on functions that use variable arguments. None the less it struck me as the more challenging of the two so that's the one that I implemented (plus there's other stuff that can be done with this one). Again, you can either make different copies of the function with different bindings or call the function with the bindings that you want from each call location. So it too can be as thread safe as you want to make it.
import types
import opcode
# Opcode constants used for comparison and replacecment
LOAD_FAST = opcode.opmap['LOAD_FAST']
LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL']
STORE_FAST = opcode.opmap['STORE_FAST']
DEBUGGING = True
def append_arguments(code_obj, new_locals):
co_varnames = code_obj.co_varnames # Old locals
co_names = code_obj.co_names # Old globals
co_argcount = code_obj.co_argcount # Argument count
co_code = code_obj.co_code # The actual bytecode as a string
# Make one pass over the bytecode to identify names that should be
# left in code_obj.co_names.
not_removed = set(opcode.hasname) - set([LOAD_GLOBAL])
saved_names = set()
for inst in instructions(co_code):
if inst[0] in not_removed:
saved_names.add(co_names[inst[1]])
# Build co_names for the new code object. This should consist of
# globals that were only accessed via LOAD_GLOBAL
names = tuple(name for name in co_names
if name not in set(new_locals) - saved_names)
# Build a dictionary that maps the indices of the entries in co_names
# to their entry in the new co_names
name_translations = dict((co_names.index(name), i)
for i, name in enumerate(names))
# Build co_varnames for the new code object. This should consist of
# the entirety of co_varnames with new_locals spliced in after the
# arguments
new_locals_len = len(new_locals)
varnames = (co_varnames[:co_argcount] + new_locals +
co_varnames[co_argcount:])
# Build the dictionary that maps indices of entries in the old co_varnames
# to their indices in the new co_varnames
range1, range2 = xrange(co_argcount), xrange(co_argcount, len(co_varnames))
varname_translations = dict((i, i) for i in range1)
varname_translations.update((i, i + new_locals_len) for i in range2)
# Build the dictionary that maps indices of deleted entries of co_names
# to their indices in the new co_varnames
names_to_varnames = dict((co_names.index(name), varnames.index(name))
for name in new_locals)
if DEBUGGING:
print "injecting: {0}".format(new_locals)
print "names: {0} -> {1}".format(co_names, names)
print "varnames: {0} -> {1}".format(co_varnames, varnames)
print "names_to_varnames: {0}".format(names_to_varnames)
print "varname_translations: {0}".format(varname_translations)
print "name_translations: {0}".format(name_translations)
# Now we modify the actual bytecode
modified = []
for inst in instructions(code_obj.co_code):
# If the instruction is a LOAD_GLOBAL, we have to check to see if
# it's one of the globals that we are replacing. Either way,
# update its arg using the appropriate dict.
if inst[0] == LOAD_GLOBAL:
print "LOAD_GLOBAL: {0}".format(inst[1])
if inst[1] in names_to_varnames:
print "replacing with {0}: ".format(names_to_varnames[inst[1]])
inst[0] = LOAD_FAST
inst[1] = names_to_varnames[inst[1]]
elif inst[1] in name_translations:
inst[1] = name_translations[inst[1]]
else:
raise ValueError("a name was lost in translation")
# If it accesses co_varnames or co_names then update its argument.
elif inst[0] in opcode.haslocal:
inst[1] = varname_translations[inst[1]]
elif inst[0] in opcode.hasname:
inst[1] = name_translations[inst[1]]
modified.extend(write_instruction(inst))
code = ''.join(modified)
# Done modifying codestring - make the code object
return types.CodeType(co_argcount + new_locals_len,
code_obj.co_nlocals + new_locals_len,
code_obj.co_stacksize,
code_obj.co_flags,
code,
code_obj.co_consts,
names,
varnames,
code_obj.co_filename,
code_obj.co_name,
code_obj.co_firstlineno,
code_obj.co_lnotab)
def instructions(code):
code = map(ord, code)
i, L = 0, len(code)
extended_arg = 0
while i < L:
op = code[i]
i+= 1
if op < opcode.HAVE_ARGUMENT:
yield [op, None]
continue
oparg = code[i] + (code[i+1] << 8) + extended_arg
extended_arg = 0
i += 2
if op == opcode.EXTENDED_ARG:
extended_arg = oparg << 16
continue
yield [op, oparg]
def write_instruction(inst):
op, oparg = inst
if oparg is None:
return [chr(op)]
elif oparg <= 65536L:
return [chr(op), chr(oparg & 255), chr((oparg >> 8) & 255)]
elif oparg <= 4294967296L:
return [chr(opcode.EXTENDED_ARG),
chr((oparg >> 16) & 255),
chr((oparg >> 24) & 255),
chr(op),
chr(oparg & 255),
chr((oparg >> 8) & 255)]
else:
raise ValueError("Invalid oparg: {0} is too large".format(oparg))
if __name__=='__main__':
import dis
class Foo(object):
y = 1
z = 1
def test(x):
foo = Foo()
foo.y = 1
foo = x + y + z + foo.y
print foo
code_obj = append_arguments(test.func_code, ('y',))
f = types.FunctionType(code_obj, test.func_globals, argdefs=(1,))
if DEBUGGING:
dis.dis(test)
print '-'*20
dis.dis(f)
f(1)
Note that a whole branch of this code (that relating to EXTENDED_ARG
) is untested but that for common cases, it seems to be pretty solid. I'll be hacking on it and am currently writing some code to validate the output. Then (when I get around to it) I'll run it against the whole standard library and fix any bugs.
I'll also probably be implementing the first option as well.