diff --git a/scripts/show-object-layout.py b/scripts/codecut.py similarity index 85% rename from scripts/show-object-layout.py rename to scripts/codecut.py index d15d84b0..94dfdf2c 100644 --- a/scripts/show-object-layout.py +++ b/scripts/codecut.py @@ -1,4 +1,5 @@ import sys +import json import logging import sqlite3 import argparse @@ -74,25 +75,24 @@ def read_string(address_space: AddressSpace, address: int) -> Optional[str]: try: # if at end of segment then there might be an overrun here. buf: bytes = address_space.read_memory(address, 0x100) + except ReadMemoryError: logger.debug("failed to read memory: 0x%x", address) return None - string: Optional[str] = None - # note: we *always* break after the first iteration for s in capa.features.extractors.strings.extract_ascii_strings(buf): if s.offset != 0: break - return string + return s.s # note: we *always* break after the first iteration for s in capa.features.extractors.strings.extract_unicode_strings(buf): if s.offset != 0: break - return string + return s.s return None @@ -210,26 +210,9 @@ class Assemblage: path = self.get_path_by_binary_id(binary_id) return pefile.PE(data=path.read_bytes(), fast_load=True) + -def main(argv=None): - if argv is None: - argv = sys.argv[1:] - - parser = argparse.ArgumentParser(description="Inspect object boundaries in compiled programs") - capa.main.install_common_args(parser, wanted={}) - parser.add_argument("assemblage_database", type=Path, help="path to Assemblage database") - parser.add_argument("assemblage_directory", type=Path, help="path to Assemblage samples directory") - parser.add_argument("binary_id", type=int, help="primary key of binary to inspect") - args = parser.parse_args(args=argv) - - try: - capa.main.handle_common_args(args) - except capa.main.ShouldExitError as e: - return e.status_code - - import logging - logging.getLogger("goblin.pe").setLevel(logging.WARNING) - +def generate_main(args: argparse.Namespace) -> int: if not args.assemblage_database.is_file(): raise ValueError("database doesn't exist") @@ -373,19 +356,82 @@ def main(argv=None): g.nodes[base_address+function.start_rva]["name"] = function.name g.nodes[base_address+function.start_rva]["file"] = function.file + # rename unknown functions like: sub_401000 + for n, attrs in g.nodes(data=True): + if attrs["type"] != "function": + continue + if "name" in attrs: + continue + attrs["name"] = f"sub_{n:x}" + + # assign human-readable repr to add nodes + # assign is_import=bool to functions + # assign is_string=bool to datas + for n, attrs in g.nodes(data=True): + match attrs["type"]: + case "function": + attrs["repr"] = attrs["name"] + attrs["is_import"] = "!" in attrs["name"] + case "data": + if (string := read_string(address_space, n)): + attrs["repr"] = json.dumps(string) + attrs["is_string"] = True + else: + attrs["repr"] = f"data_{n:x}" + attrs["is_string"] = False + for line in nx.generate_gexf(g): print(line) + # db.conn.close() + + +def cluster_main(args: argparse.Namespace) -> int: + if not args.graph.is_file(): + raise ValueError("graph file doesn't exist") + + g = nx.read_gexf(args.graph) + communities = nx.algorithms.community.louvain_communities(g) for i, community in enumerate(communities): print(f"[{i}]:") for node in community: if "name" in g.nodes[node]: - print(f" - {hex(node)}: {g.nodes[node]['file']}") + print(f" - {hex(int(node, 0))}: {g.nodes[node]['file']}") else: - print(f" - {hex(node)}") + print(f" - {hex(int(node, 0))}") + + +def main(argv=None) -> int: + if argv is None: + argv = sys.argv[1:] + + parser = argparse.ArgumentParser(description="Identify object boundaries in compiled programs") + capa.main.install_common_args(parser, wanted={}) + subparsers = parser.add_subparsers(title="subcommands", required=True) + + generate_parser = subparsers.add_parser("generate", help="generate graph for a sample") + generate_parser.add_argument("assemblage_database", type=Path, help="path to Assemblage database") + generate_parser.add_argument("assemblage_directory", type=Path, help="path to Assemblage samples directory") + generate_parser.add_argument("binary_id", type=int, help="primary key of binary to inspect") + generate_parser.set_defaults(func=generate_main) + + + cluster_parser = subparsers.add_parser("cluster", help="cluster an existing graph") + cluster_parser.add_argument("graph", type=Path, help="path to a graph file") + cluster_parser.set_defaults(func=cluster_main) + + args = parser.parse_args(args=argv) + + try: + capa.main.handle_common_args(args) + except capa.main.ShouldExitError as e: + return e.status_code + + logging.getLogger("goblin.pe").setLevel(logging.WARNING) + + return args.func(args) - # db.conn.close() if __name__ == "__main__": sys.exit(main())