show-object-layout: make it generate a graph of code/data instead

This commit is contained in:
Willi Ballenthin
2024-11-04 21:03:56 +00:00
parent f340b93a02
commit bcd2c3fb35

View File

@@ -1,13 +1,101 @@
import sys
import logging
import sqlite3
import argparse
from typing import Optional
from pathlib import Path
from dataclasses import dataclass
import rich
import rich.table
import pefile
import lancelot
import lancelot.be2utils
import networkx as nx
from lancelot.be2utils import BinExport2Index,ReadMemoryError, AddressSpace
from lancelot.be2utils.binexport2_pb2 import BinExport2
import capa.main
logger = logging.getLogger(__name__)
def is_vertex_type(vertex: BinExport2.CallGraph.Vertex, type_: BinExport2.CallGraph.Vertex.Type.ValueType) -> bool:
return vertex.HasField("type") and vertex.type == type_
def is_vertex_thunk(vertex: BinExport2.CallGraph.Vertex) -> bool:
return is_vertex_type(vertex, BinExport2.CallGraph.Vertex.Type.THUNK)
THUNK_CHAIN_DEPTH_DELTA = 5
def compute_thunks(be2: BinExport2, idx: BinExport2Index) -> dict[int, int]:
# from thunk address to target function address
thunks: dict[int, int] = {}
for addr, vertex_idx in idx.vertex_index_by_address.items():
vertex: BinExport2.CallGraph.Vertex = be2.call_graph.vertex[vertex_idx]
if not is_vertex_thunk(vertex):
continue
curr_vertex_idx: int = vertex_idx
for _ in range(THUNK_CHAIN_DEPTH_DELTA):
thunk_callees: list[int] = idx.callees_by_vertex_index[curr_vertex_idx]
# if this doesn't hold, then it doesn't seem like this is a thunk,
# because either, len is:
# 0 and the thunk doesn't point to anything, such as `jmp eax`, or
# >1 and the thunk may end up at many functions.
if not thunk_callees:
# maybe we have an indirect jump, like `jmp eax`
# that we can't actually resolve here.
break
assert len(thunk_callees) == 1, f"thunk @ {hex(addr)} failed"
thunked_vertex_idx: int = thunk_callees[0]
thunked_vertex: BinExport2.CallGraph.Vertex = be2.call_graph.vertex[thunked_vertex_idx]
if not is_vertex_thunk(thunked_vertex):
assert thunked_vertex.HasField("address")
thunks[addr] = thunked_vertex.address
break
curr_vertex_idx = thunked_vertex_idx
return thunks
def read_string(address_space: AddressSpace, address: int) -> Optional[str]:
try:
# if at end of segment then there might be an overrun here.
buf: bytes = address_space.read_memory(address, 0x100)
except ReadMemoryError:
logger.debug("failed to read memory: 0x%x", address)
return None
string: Optional[str] = None
# note: we *always* break after the first iteration
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
if s.offset != 0:
break
return string
# note: we *always* break after the first iteration
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
if s.offset != 0:
break
return string
return None
@dataclass
class AssemblageRow:
@@ -139,12 +227,13 @@ def main(argv=None):
except capa.main.ShouldExitError as e:
return e.status_code
import logging
logging.getLogger("goblin.pe").setLevel(logging.WARNING)
if not args.assemblage_database.is_file():
raise ValueError("database doesn't exist")
db = Assemblage(args.assemblage_database, args.assemblage_directory)
# print(db.get_row_by_binary_id(args.binary_id))
# print(db.get_pe_by_binary_id(args.binary_id))
@dataclass
class Function:
@@ -163,20 +252,135 @@ def main(argv=None):
for m in db.get_rows_by_binary_id(args.binary_id)
]
import rich
import rich.table
pe = db.get_pe_by_binary_id(args.binary_id)
base_address: int = pe.OPTIONAL_HEADER.ImageBase
print(db.get_path_by_binary_id(args.binary_id))
pe_path = db.get_path_by_binary_id(args.binary_id)
be2: BinExport2 = lancelot.get_binexport2_from_bytes(
pe_path.read_bytes(),
function_hints=[
base_address + function.start_rva
for function in functions
]
)
t = rich.table.Table()
t.add_column("rva")
t.add_column("filename")
t.add_column("name")
idx = lancelot.be2utils.BinExport2Index(be2)
address_space = lancelot.be2utils.AddressSpace.from_pe(pe, base_address)
thunks = compute_thunks(be2, idx)
for function in sorted(functions, key=lambda f: f.start_rva):
t.add_row(hex(function.start_rva), function.file, function.name)
@dataclass(frozen=True, order=True)
class Node:
address: int
type: str
rich.print(t)
@dataclass(frozen=True, order=True)
class Edge:
source: int
destination: int
type: str
nodes: set[Node] = set()
edges: set[Edge] = set()
for flow_graph_index, flow_graph in enumerate(be2.flow_graph):
datas: set[int] = set()
callees: set[str] = set()
entry_basic_block_index: int = flow_graph.entry_basic_block_index
flow_graph_address: int = idx.get_basic_block_address(entry_basic_block_index)
for basic_block_index in flow_graph.basic_block_index:
basic_block: BinExport2.BasicBlock = be2.basic_block[basic_block_index]
for instruction_index, instruction, instruction_address in idx.basic_block_instructions(basic_block):
for addr in instruction.call_target:
addr = thunks.get(addr, addr)
if addr not in idx.vertex_index_by_address:
# disassembler did not define function at address
logger.debug("0x%x is not a vertex", addr)
continue
vertex_idx: int = idx.vertex_index_by_address[addr]
vertex: BinExport2.CallGraph.Vertex = be2.call_graph.vertex[vertex_idx]
callees.add(vertex.address)
for data_reference_index in idx.data_reference_index_by_source_instruction_index.get(instruction_index, []):
data_reference: BinExport2.DataReference = be2.data_reference[data_reference_index]
data_reference_address: int = data_reference.address
if data_reference_address in idx.insn_address_by_index:
# appears to be code
continue
datas.add(data_reference_address)
vertex_index = idx.vertex_index_by_address[flow_graph_address]
name = idx.get_function_name_by_vertex(vertex_index)
nodes.add(Node(
address=flow_graph_address,
type="function",
))
if datas or callees:
logger.info("%s @ 0x%X:", name, flow_graph_address)
for data in sorted(datas):
logger.info(" - 0x%X", data)
nodes.add(Node(
address=data,
type="data",
))
edges.add(Edge(
source=flow_graph_address,
destination=data,
type="data",
))
for callee in sorted(callees):
logger.info(" - %s", idx.get_function_name_by_address(callee))
nodes.add(Node(
address=callee,
type="function",
))
edges.add(Edge(
source=flow_graph_address,
destination=callee,
type="call",
))
else:
logger.info("%s @ 0x%X: (none)", name, flow_graph_address)
for section in pe.sections:
# within each section, emit a neighbor edge for each pair of neighbors.
section_nodes = [
node for node in nodes
if section.VirtualAddress + base_address <= node.address < base_address + section.VirtualAddress + section.Misc_VirtualSize
]
for i in range(1, len(section_nodes)):
a = section_nodes[i-1]
b = section_nodes[i]
edges.add(Edge(
type="neighbor",
source=a.address,
destination=b.address,
))
g = nx.MultiDiGraph()
for node in sorted(nodes):
g.add_node(node.address, type=node.type)
for edge in sorted(edges):
g.add_edge(edge.source, edge.destination, key=edge.type)
for line in nx.generate_gexf(g):
print(line)
# db.conn.close()