From bcd2c3fb35486b601da098d5633535c10e203b72 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Mon, 4 Nov 2024 21:03:56 +0000 Subject: [PATCH] show-object-layout: make it generate a graph of code/data instead --- scripts/show-object-layout.py | 228 ++++++++++++++++++++++++++++++++-- 1 file changed, 216 insertions(+), 12 deletions(-) diff --git a/scripts/show-object-layout.py b/scripts/show-object-layout.py index 13fa230a..7a50385c 100644 --- a/scripts/show-object-layout.py +++ b/scripts/show-object-layout.py @@ -1,13 +1,101 @@ import sys +import logging import sqlite3 import argparse +from typing import Optional from pathlib import Path from dataclasses import dataclass +import rich +import rich.table import pefile +import lancelot +import lancelot.be2utils +import networkx as nx +from lancelot.be2utils import BinExport2Index,ReadMemoryError, AddressSpace +from lancelot.be2utils.binexport2_pb2 import BinExport2 import capa.main +logger = logging.getLogger(__name__) + + + +def is_vertex_type(vertex: BinExport2.CallGraph.Vertex, type_: BinExport2.CallGraph.Vertex.Type.ValueType) -> bool: + return vertex.HasField("type") and vertex.type == type_ + + +def is_vertex_thunk(vertex: BinExport2.CallGraph.Vertex) -> bool: + return is_vertex_type(vertex, BinExport2.CallGraph.Vertex.Type.THUNK) + + +THUNK_CHAIN_DEPTH_DELTA = 5 + + +def compute_thunks(be2: BinExport2, idx: BinExport2Index) -> dict[int, int]: + # from thunk address to target function address + thunks: dict[int, int] = {} + + for addr, vertex_idx in idx.vertex_index_by_address.items(): + vertex: BinExport2.CallGraph.Vertex = be2.call_graph.vertex[vertex_idx] + if not is_vertex_thunk(vertex): + continue + + curr_vertex_idx: int = vertex_idx + for _ in range(THUNK_CHAIN_DEPTH_DELTA): + thunk_callees: list[int] = idx.callees_by_vertex_index[curr_vertex_idx] + # if this doesn't hold, then it doesn't seem like this is a thunk, + # because either, len is: + # 0 and the thunk doesn't point to anything, such as `jmp eax`, or + # >1 and the thunk may end up at many functions. + + if not thunk_callees: + # maybe we have an indirect jump, like `jmp eax` + # that we can't actually resolve here. + break + + assert len(thunk_callees) == 1, f"thunk @ {hex(addr)} failed" + + thunked_vertex_idx: int = thunk_callees[0] + thunked_vertex: BinExport2.CallGraph.Vertex = be2.call_graph.vertex[thunked_vertex_idx] + + if not is_vertex_thunk(thunked_vertex): + assert thunked_vertex.HasField("address") + + thunks[addr] = thunked_vertex.address + break + + curr_vertex_idx = thunked_vertex_idx + + return thunks + + +def read_string(address_space: AddressSpace, address: int) -> Optional[str]: + try: + # if at end of segment then there might be an overrun here. + buf: bytes = address_space.read_memory(address, 0x100) + except ReadMemoryError: + logger.debug("failed to read memory: 0x%x", address) + return None + + string: Optional[str] = None + + # note: we *always* break after the first iteration + for s in capa.features.extractors.strings.extract_ascii_strings(buf): + if s.offset != 0: + break + + return string + + # note: we *always* break after the first iteration + for s in capa.features.extractors.strings.extract_unicode_strings(buf): + if s.offset != 0: + break + + return string + + return None + @dataclass class AssemblageRow: @@ -139,12 +227,13 @@ def main(argv=None): except capa.main.ShouldExitError as e: return e.status_code + import logging + logging.getLogger("goblin.pe").setLevel(logging.WARNING) + if not args.assemblage_database.is_file(): raise ValueError("database doesn't exist") db = Assemblage(args.assemblage_database, args.assemblage_directory) - # print(db.get_row_by_binary_id(args.binary_id)) - # print(db.get_pe_by_binary_id(args.binary_id)) @dataclass class Function: @@ -163,20 +252,135 @@ def main(argv=None): for m in db.get_rows_by_binary_id(args.binary_id) ] - import rich - import rich.table + pe = db.get_pe_by_binary_id(args.binary_id) + base_address: int = pe.OPTIONAL_HEADER.ImageBase - print(db.get_path_by_binary_id(args.binary_id)) + pe_path = db.get_path_by_binary_id(args.binary_id) + be2: BinExport2 = lancelot.get_binexport2_from_bytes( + pe_path.read_bytes(), + function_hints=[ + base_address + function.start_rva + for function in functions + ] + ) - t = rich.table.Table() - t.add_column("rva") - t.add_column("filename") - t.add_column("name") + idx = lancelot.be2utils.BinExport2Index(be2) + address_space = lancelot.be2utils.AddressSpace.from_pe(pe, base_address) + thunks = compute_thunks(be2, idx) - for function in sorted(functions, key=lambda f: f.start_rva): - t.add_row(hex(function.start_rva), function.file, function.name) + @dataclass(frozen=True, order=True) + class Node: + address: int + type: str - rich.print(t) + @dataclass(frozen=True, order=True) + class Edge: + source: int + destination: int + type: str + + nodes: set[Node] = set() + edges: set[Edge] = set() + + for flow_graph_index, flow_graph in enumerate(be2.flow_graph): + datas: set[int] = set() + callees: set[str] = set() + + entry_basic_block_index: int = flow_graph.entry_basic_block_index + flow_graph_address: int = idx.get_basic_block_address(entry_basic_block_index) + + for basic_block_index in flow_graph.basic_block_index: + basic_block: BinExport2.BasicBlock = be2.basic_block[basic_block_index] + + for instruction_index, instruction, instruction_address in idx.basic_block_instructions(basic_block): + for addr in instruction.call_target: + addr = thunks.get(addr, addr) + + if addr not in idx.vertex_index_by_address: + # disassembler did not define function at address + logger.debug("0x%x is not a vertex", addr) + continue + + vertex_idx: int = idx.vertex_index_by_address[addr] + vertex: BinExport2.CallGraph.Vertex = be2.call_graph.vertex[vertex_idx] + + callees.add(vertex.address) + + for data_reference_index in idx.data_reference_index_by_source_instruction_index.get(instruction_index, []): + data_reference: BinExport2.DataReference = be2.data_reference[data_reference_index] + data_reference_address: int = data_reference.address + + if data_reference_address in idx.insn_address_by_index: + # appears to be code + continue + + datas.add(data_reference_address) + + vertex_index = idx.vertex_index_by_address[flow_graph_address] + name = idx.get_function_name_by_vertex(vertex_index) + + nodes.add(Node( + address=flow_graph_address, + type="function", + )) + if datas or callees: + logger.info("%s @ 0x%X:", name, flow_graph_address) + + for data in sorted(datas): + logger.info(" - 0x%X", data) + nodes.add(Node( + address=data, + type="data", + )) + edges.add(Edge( + source=flow_graph_address, + destination=data, + type="data", + )) + + for callee in sorted(callees): + logger.info(" - %s", idx.get_function_name_by_address(callee)) + nodes.add(Node( + address=callee, + type="function", + )) + edges.add(Edge( + source=flow_graph_address, + destination=callee, + type="call", + )) + + else: + logger.info("%s @ 0x%X: (none)", name, flow_graph_address) + + for section in pe.sections: + # within each section, emit a neighbor edge for each pair of neighbors. + + section_nodes = [ + node for node in nodes + if section.VirtualAddress + base_address <= node.address < base_address + section.VirtualAddress + section.Misc_VirtualSize + ] + + for i in range(1, len(section_nodes)): + a = section_nodes[i-1] + b = section_nodes[i] + + edges.add(Edge( + type="neighbor", + source=a.address, + destination=b.address, + )) + + g = nx.MultiDiGraph() + + for node in sorted(nodes): + g.add_node(node.address, type=node.type) + + for edge in sorted(edges): + g.add_edge(edge.source, edge.destination, key=edge.type) + + for line in nx.generate_gexf(g): + print(line) # db.conn.close()