split codecut into subcommands

This commit is contained in:
Willi Ballenthin
2024-11-06 12:11:17 +00:00
parent 536526f61d
commit 3e02b67480

View File

@@ -1,4 +1,5 @@
import sys
import json
import logging
import sqlite3
import argparse
@@ -74,25 +75,24 @@ def read_string(address_space: AddressSpace, address: int) -> Optional[str]:
try:
# if at end of segment then there might be an overrun here.
buf: bytes = address_space.read_memory(address, 0x100)
except ReadMemoryError:
logger.debug("failed to read memory: 0x%x", address)
return None
string: Optional[str] = None
# note: we *always* break after the first iteration
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
if s.offset != 0:
break
return string
return s.s
# note: we *always* break after the first iteration
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
if s.offset != 0:
break
return string
return s.s
return None
@@ -210,26 +210,9 @@ class Assemblage:
path = self.get_path_by_binary_id(binary_id)
return pefile.PE(data=path.read_bytes(), fast_load=True)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Inspect object boundaries in compiled programs")
capa.main.install_common_args(parser, wanted={})
parser.add_argument("assemblage_database", type=Path, help="path to Assemblage database")
parser.add_argument("assemblage_directory", type=Path, help="path to Assemblage samples directory")
parser.add_argument("binary_id", type=int, help="primary key of binary to inspect")
args = parser.parse_args(args=argv)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
import logging
logging.getLogger("goblin.pe").setLevel(logging.WARNING)
def generate_main(args: argparse.Namespace) -> int:
if not args.assemblage_database.is_file():
raise ValueError("database doesn't exist")
@@ -373,19 +356,82 @@ def main(argv=None):
g.nodes[base_address+function.start_rva]["name"] = function.name
g.nodes[base_address+function.start_rva]["file"] = function.file
# rename unknown functions like: sub_401000
for n, attrs in g.nodes(data=True):
if attrs["type"] != "function":
continue
if "name" in attrs:
continue
attrs["name"] = f"sub_{n:x}"
# assign human-readable repr to add nodes
# assign is_import=bool to functions
# assign is_string=bool to datas
for n, attrs in g.nodes(data=True):
match attrs["type"]:
case "function":
attrs["repr"] = attrs["name"]
attrs["is_import"] = "!" in attrs["name"]
case "data":
if (string := read_string(address_space, n)):
attrs["repr"] = json.dumps(string)
attrs["is_string"] = True
else:
attrs["repr"] = f"data_{n:x}"
attrs["is_string"] = False
for line in nx.generate_gexf(g):
print(line)
# db.conn.close()
def cluster_main(args: argparse.Namespace) -> int:
if not args.graph.is_file():
raise ValueError("graph file doesn't exist")
g = nx.read_gexf(args.graph)
communities = nx.algorithms.community.louvain_communities(g)
for i, community in enumerate(communities):
print(f"[{i}]:")
for node in community:
if "name" in g.nodes[node]:
print(f" - {hex(node)}: {g.nodes[node]['file']}")
print(f" - {hex(int(node, 0))}: {g.nodes[node]['file']}")
else:
print(f" - {hex(node)}")
print(f" - {hex(int(node, 0))}")
def main(argv=None) -> int:
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Identify object boundaries in compiled programs")
capa.main.install_common_args(parser, wanted={})
subparsers = parser.add_subparsers(title="subcommands", required=True)
generate_parser = subparsers.add_parser("generate", help="generate graph for a sample")
generate_parser.add_argument("assemblage_database", type=Path, help="path to Assemblage database")
generate_parser.add_argument("assemblage_directory", type=Path, help="path to Assemblage samples directory")
generate_parser.add_argument("binary_id", type=int, help="primary key of binary to inspect")
generate_parser.set_defaults(func=generate_main)
cluster_parser = subparsers.add_parser("cluster", help="cluster an existing graph")
cluster_parser.add_argument("graph", type=Path, help="path to a graph file")
cluster_parser.set_defaults(func=cluster_main)
args = parser.parse_args(args=argv)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
logging.getLogger("goblin.pe").setLevel(logging.WARNING)
return args.func(args)
# db.conn.close()
if __name__ == "__main__":
sys.exit(main())