MatthieuDartiailh/bytecode

Cannot decompile code with empty try block

Closed this issue · 1 comments

Small reproducer: we add a TryBegin followed immediately by the closing TryEnd. The code re-compiles fine, but cannot be decompiled again

def test_bytecode():
    import bytecode as b

    def foo():
        return 42

    bc = b.Bytecode.from_code(foo.__code__)
    label = b.Label()
    try_begin = b.TryBegin(label, push_lasti=True)
    bc[1:1] = [try_begin, b.TryEnd(try_begin), label]

    foo.__code__ = bc.to_code()

    assert foo() == 42

    bc = b.Bytecode.from_code(foo.__code__)

Result:

________________________________ test_bytecode _________________________________

    def test_bytecode():
        import bytecode as b
    
        def foo():
            return 42
    
        bc = b.Bytecode.from_code(foo.__code__)
        label = b.Label()
        try_begin = b.TryBegin(label, push_lasti=True)
        bc[1:1] = [try_begin, b.TryEnd(try_begin), label]
    
        foo.__code__ = bc.to_code()
    
        assert foo() == 42
    
>       bc = b.Bytecode.from_code(foo.__code__)

tests/internal/test_wrapping.py:779: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.riot/venv_py3120/lib/python3.12/site-packages/bytecode/bytecode.py:283: in from_code
    return concrete.to_bytecode(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ConcreteBytecode instr#=2>, prune_caches = True
conserve_exception_block_stackdepth = False

    def to_bytecode(
        self,
        prune_caches: bool = True,
        conserve_exception_block_stackdepth: bool = False,
    ) -> _bytecode.Bytecode:
        # On 3.11 we generate pseudo-instruction from the exception table
    
        # Copy instruction and remove extended args if any (in-place)
        c_instructions = self[:]
        self._remove_extended_args(c_instructions)
    
        # Find jump targets
        jump_targets: Set[int] = set()
        offset = 0
        for c_instr in c_instructions:
            if isinstance(c_instr, SetLineno):
                continue
            target = c_instr.get_jump_target(offset)
            if target is not None:
                jump_targets.add(target)
            offset += (c_instr.size // 2) if OFFSET_AS_INSTRUCTION else c_instr.size
    
        # On 3.11+ we need to also look at the exception table for jump targets
        for ex_entry in self.exception_table:
            jump_targets.add(ex_entry.target)
    
        # Create look up dict to find entries based on either exception handling
        # block exit or entry offsets. Several blocks can end on the same instruction
        # so we store a list of entry per offset.
        ex_start: Dict[int, ExceptionTableEntry] = {}
        ex_end: Dict[int, List[ExceptionTableEntry]] = {}
        for entry in self.exception_table:
            # Ensure we do not have more than one entry with identical starting
            # offsets
            assert entry.start_offset not in ex_start
            ex_start[entry.start_offset] = entry
            ex_end.setdefault(entry.stop_offset, []).append(entry)
    
        # Create labels and instructions
        jumps: List[Tuple[int, int]] = []
        instructions: List[Union[Instr, Label, TryBegin, TryEnd, SetLineno]] = []
        labels = {}
        tb_instrs: Dict[ExceptionTableEntry, TryBegin] = {}
        offset = 0
        # In Python 3.11+ cell and varnames can be shared and are indexed in a single
        # array.
        # As a consequence, the instruction argument can be either:
        # - < len(varnames): the name is shared an we can directly use
        #   the index to access the name in cellvars
        # - > len(varnames): the name is not shared and is offset by the
        #   number unshared varname.
        # Free vars are never shared and correspond to index larger than the
        # largest cell var.
        # See PyCode_NewWithPosOnlyArgs
        if sys.version_info >= (3, 11):
            cells_lookup = self.varnames + [
                n for n in self.cellvars if n not in self.varnames
            ]
            ncells = len(cells_lookup)
        else:
            ncells = len(self.cellvars)
            cells_lookup = self.cellvars
    
        for lineno, c_instr in self._normalize_lineno(
            c_instructions, self.first_lineno
        ):
            if offset in jump_targets:
                label = Label()
                labels[offset] = label
                instructions.append(label)
    
            # Handle TryBegin pseudo instructions
            if offset in ex_start:
                entry = ex_start[offset]
                tb_instr = TryBegin(
                    Label(),
                    entry.push_lasti,
                    entry.stack_depth if conserve_exception_block_stackdepth else UNSET,
                )
                # Per entry store the pseudo instruction associated
                tb_instrs[entry] = tb_instr
                instructions.append(tb_instr)
    
            jump_target = c_instr.get_jump_target(offset)
            size = c_instr.size
            # If an instruction uses extended args, those appear before the instruction
            # causing the instruction to appear at offset that accounts for extended
            # args. So we first update the offset to account for extended args, then
            # record the instruction offset and then add the instruction itself to the
            # offset.
            offset += (size // 2 - 1) if OFFSET_AS_INSTRUCTION else (size - 2)
            current_instr_offset = offset
            offset += 1 if OFFSET_AS_INSTRUCTION else 2
    
            # on Python 3.11+ remove CACHE opcodes if we are requested to do so.
            # We are careful to first advance the offset and check that the CACHE
            # is not a jump target. It should never be the case but we double check.
            if prune_caches and c_instr.name == "CACHE":
                assert jump_target is None
    
            # We may need to insert a TryEnd after a CACHE so we need to run the
            # through the last block.
            else:
                arg: InstrArg
                c_arg = c_instr.arg
                # FIXME: better error reporting
                if c_instr.opcode in _opcode.hasconst:
                    arg = self.consts[c_arg]
                elif c_instr.opcode in _opcode.haslocal:
                    arg = self.varnames[c_arg]
                elif c_instr.opcode in _opcode.hasname:
                    if c_instr.name in BITFLAG_INSTRUCTIONS:
                        arg = (bool(c_arg & 1), self.names[c_arg >> 1])
                    elif c_instr.name in BITFLAG2_INSTRUCTIONS:
                        arg = (bool(c_arg & 1), bool(c_arg & 2), self.names[c_arg >> 2])
                    else:
                        arg = self.names[c_arg]
                elif c_instr.opcode in _opcode.hasfree:
                    if c_arg < ncells:
                        name = cells_lookup[c_arg]
                        arg = CellVar(name)
                    else:
                        name = self.freevars[c_arg - ncells]
                        arg = FreeVar(name)
                elif c_instr.opcode in _opcode.hascompare:
                    arg = Compare(
                        (c_arg >> 4) if sys.version_info >= (3, 12) else c_arg
                    )
                elif c_instr.opcode in INTRINSIC_1OP:
                    arg = Intrinsic1Op(c_arg)
                elif c_instr.opcode in INTRINSIC_2OP:
                    arg = Intrinsic2Op(c_arg)
                else:
                    arg = c_arg
    
                location = c_instr.location or InstrLocation(lineno, None, None, None)
    
                if jump_target is not None:
                    arg = PLACEHOLDER_LABEL
                    instr_index = len(instructions)
                    jumps.append((instr_index, jump_target))
    
                instructions.append(Instr(c_instr.name, arg, location=location))
    
            # We now insert the TryEnd entries
            if current_instr_offset in ex_end:
                entries = ex_end[current_instr_offset]
                for entry in reversed(entries):
>                   instructions.append(TryEnd(tb_instrs[entry]))
E                   KeyError: ExceptionTableEntry(start_offset=1, stop_offset=0, target=1, stack_depth=0, push_lasti=True

.riot/venv_py3120/lib/python3.12/site-packages/bytecode/concrete.py:1067: KeyError

Another example that shows a slightly different issue, that seems related to a bad reconstruction of the exception table (this requires urllib3)

    import bytecode as b
    impoty urllib3

    urllib3.connectionpool.HTTPConnectionPool.urlopen.__code__ = b.Bytecode.from_code(
        urllib3.connectionpool.HTTPConnectionPool.urlopen.__code__
    ).to_code()

    b.Bytecode.from_code(urllib3.connectionpool.HTTPConnectionPool.urlopen.__code__)

Result:

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
ddtrace/contrib/urllib3/patch.py:212: in patch
    b.Bytecode.from_code(urllib3.connectionpool.HTTPConnectionPool.urlopen.__code__)
.riot/venv_py3114/lib/python3.11/site-packages/bytecode/bytecode.py:283: in from_code
    return concrete.to_bytecode(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ConcreteBytecode instr#=1454>, prune_caches = True
conserve_exception_block_stackdepth = False

    def to_bytecode(
        self,
        prune_caches: bool = True,
        conserve_exception_block_stackdepth: bool = False,
    ) -> _bytecode.Bytecode:
        # On 3.11 we generate pseudo-instruction from the exception table
    
        # Copy instruction and remove extended args if any (in-place)
        c_instructions = self[:]
        self._remove_extended_args(c_instructions)
    
        # Find jump targets
        jump_targets: Set[int] = set()
        offset = 0
        for c_instr in c_instructions:
            if isinstance(c_instr, SetLineno):
                continue
            target = c_instr.get_jump_target(offset)
            if target is not None:
                jump_targets.add(target)
            offset += (c_instr.size // 2) if OFFSET_AS_INSTRUCTION else c_instr.size
    
        # On 3.11+ we need to also look at the exception table for jump targets
        for ex_entry in self.exception_table:
            jump_targets.add(ex_entry.target)
    
        # Create look up dict to find entries based on either exception handling
        # block exit or entry offsets. Several blocks can end on the same instruction
        # so we store a list of entry per offset.
        ex_start: Dict[int, ExceptionTableEntry] = {}
        ex_end: Dict[int, List[ExceptionTableEntry]] = {}
        for entry in self.exception_table:
            # Ensure we do not have more than one entry with identical starting
            # offsets
            assert entry.start_offset not in ex_start
            ex_start[entry.start_offset] = entry
            ex_end.setdefault(entry.stop_offset, []).append(entry)
    
        # Create labels and instructions
        jumps: List[Tuple[int, int]] = []
        instructions: List[Union[Instr, Label, TryBegin, TryEnd, SetLineno]] = []
        labels = {}
        tb_instrs: Dict[ExceptionTableEntry, TryBegin] = {}
        offset = 0
        # In Python 3.11+ cell and varnames can be shared and are indexed in a single
        # array.
        # As a consequence, the instruction argument can be either:
        # - < len(varnames): the name is shared an we can directly use
        #   the index to access the name in cellvars
        # - > len(varnames): the name is not shared and is offset by the
        #   number unshared varname.
        # Free vars are never shared and correspond to index larger than the
        # largest cell var.
        # See PyCode_NewWithPosOnlyArgs
        if sys.version_info >= (3, 11):
            cells_lookup = self.varnames + [
                n for n in self.cellvars if n not in self.varnames
            ]
            ncells = len(cells_lookup)
        else:
            ncells = len(self.cellvars)
            cells_lookup = self.cellvars
    
        for lineno, c_instr in self._normalize_lineno(
            c_instructions, self.first_lineno
        ):
            if offset in jump_targets:
                label = Label()
                labels[offset] = label
                instructions.append(label)
    
            # Handle TryBegin pseudo instructions
            if offset in ex_start:
                entry = ex_start[offset]
                tb_instr = TryBegin(
                    Label(),
                    entry.push_lasti,
                    entry.stack_depth if conserve_exception_block_stackdepth else UNSET,
                )
                # Per entry store the pseudo instruction associated
                tb_instrs[entry] = tb_instr
                instructions.append(tb_instr)
    
            jump_target = c_instr.get_jump_target(offset)
            size = c_instr.size
            # If an instruction uses extended args, those appear before the instruction
            # causing the instruction to appear at offset that accounts for extended
            # args. So we first update the offset to account for extended args, then
            # record the instruction offset and then add the instruction itself to the
            # offset.
            offset += (size // 2 - 1) if OFFSET_AS_INSTRUCTION else (size - 2)
            current_instr_offset = offset
            offset += 1 if OFFSET_AS_INSTRUCTION else 2
    
            # on Python 3.11+ remove CACHE opcodes if we are requested to do so.
            # We are careful to first advance the offset and check that the CACHE
            # is not a jump target. It should never be the case but we double check.
            if prune_caches and c_instr.name == "CACHE":
                assert jump_target is None
    
            # We may need to insert a TryEnd after a CACHE so we need to run the
            # through the last block.
            else:
                arg: InstrArg
                c_arg = c_instr.arg
                # FIXME: better error reporting
                if c_instr.opcode in _opcode.hasconst:
                    arg = self.consts[c_arg]
                elif c_instr.opcode in _opcode.haslocal:
                    arg = self.varnames[c_arg]
                elif c_instr.opcode in _opcode.hasname:
                    if c_instr.name in BITFLAG_INSTRUCTIONS:
                        arg = (bool(c_arg & 1), self.names[c_arg >> 1])
                    elif c_instr.name in BITFLAG2_INSTRUCTIONS:
                        arg = (bool(c_arg & 1), bool(c_arg & 2), self.names[c_arg >> 2])
                    else:
                        arg = self.names[c_arg]
                elif c_instr.opcode in _opcode.hasfree:
                    if c_arg < ncells:
                        name = cells_lookup[c_arg]
                        arg = CellVar(name)
                    else:
                        name = self.freevars[c_arg - ncells]
                        arg = FreeVar(name)
                elif c_instr.opcode in _opcode.hascompare:
                    arg = Compare(
                        (c_arg >> 4) if sys.version_info >= (3, 12) else c_arg
                    )
                elif c_instr.opcode in INTRINSIC_1OP:
                    arg = Intrinsic1Op(c_arg)
                elif c_instr.opcode in INTRINSIC_2OP:
                    arg = Intrinsic2Op(c_arg)
                else:
                    arg = c_arg
    
                location = c_instr.location or InstrLocation(lineno, None, None, None)
    
                if jump_target is not None:
                    arg = PLACEHOLDER_LABEL
                    instr_index = len(instructions)
                    jumps.append((instr_index, jump_target))
    
                instructions.append(Instr(c_instr.name, arg, location=location))
    
            # We now insert the TryEnd entries
            if current_instr_offset in ex_end:
                entries = ex_end[current_instr_offset]
                for entry in reversed(entries):
>                   instructions.append(TryEnd(tb_instrs[entry]))
E                   KeyError: ExceptionTableEntry(start_offset=497, stop_offset=497, target=908, stack_depth=0, push_lasti=False

Looking at the bytecode around the (doubled) offset, there seems to be an EXTENDED_ARG that is skipped. The original table covers

            992 EXTENDED_ARG             1
            994 JUMP_FORWARD           360 (to 1716)

whilst the new table only covers the branching instruction

            994 JUMP_FORWARD           360 (to 1716)