From fddec33d04d98ebca2c17b45db73ed3332117d28 Mon Sep 17 00:00:00 2001 From: Colton Gabertan <66766340+colton-gabertan@users.noreply.github.com> Date: Fri, 25 Aug 2023 19:03:38 -0700 Subject: [PATCH] ghidra: fix api info caching (#1766) * cache and retrive imports, externs, and fakes in FunctionHandle objects * reduce cache retreival calls * cache in GhidraFeatureExtractor, point fh.ctx to cache * move caching routine to __init__ --- capa/features/extractors/ghidra/basicblock.py | 4 +- capa/features/extractors/ghidra/extractor.py | 13 +++- capa/features/extractors/ghidra/helpers.py | 6 +- capa/features/extractors/ghidra/insn.py | 73 ++++++++++++++----- 4 files changed, 71 insertions(+), 25 deletions(-) diff --git a/capa/features/extractors/ghidra/basicblock.py b/capa/features/extractors/ghidra/basicblock.py index 7811ba0c..b3271586 100644 --- a/capa/features/extractors/ghidra/basicblock.py +++ b/capa/features/extractors/ghidra/basicblock.py @@ -137,7 +137,9 @@ def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Featur def main(): features = [] - for fh in capa.features.extractors.ghidra.helpers.get_function_symbols(): + from capa.features.extractors.ghidra.extractor import GhidraFeatureExtractor + + for fh in GhidraFeatureExtractor().get_functions(): for bbh in capa.features.extractors.ghidra.helpers.get_function_blocks(fh): features.extend(list(extract_features(fh, bbh))) diff --git a/capa/features/extractors/ghidra/extractor.py b/capa/features/extractors/ghidra/extractor.py index 4034ba50..f68b5dd0 100644 --- a/capa/features/extractors/ghidra/extractor.py +++ b/capa/features/extractors/ghidra/extractor.py @@ -20,10 +20,15 @@ from capa.features.extractors.base_extractor import BBHandle, InsnHandle, Functi class GhidraFeatureExtractor(FeatureExtractor): def __init__(self): super().__init__() + import capa.features.extractors.ghidra.helpers as ghidra_helpers + self.global_features: List[Tuple[Feature, Address]] = [] self.global_features.extend(capa.features.extractors.ghidra.file.extract_file_format()) self.global_features.extend(capa.features.extractors.ghidra.global_.extract_os()) self.global_features.extend(capa.features.extractors.ghidra.global_.extract_arch()) + self.imports = ghidra_helpers.get_file_imports() + self.externs = ghidra_helpers.get_file_externs() + self.fakes = ghidra_helpers.map_fake_import_addrs() def get_base_address(self): return AbsoluteVirtualAddress(currentProgram().getImageBase().getOffset()) # type: ignore [name-defined] # noqa: F821 @@ -37,7 +42,13 @@ class GhidraFeatureExtractor(FeatureExtractor): def get_functions(self) -> Iterator[FunctionHandle]: import capa.features.extractors.ghidra.helpers as ghidra_helpers - yield from ghidra_helpers.get_function_symbols() + for fhandle in ghidra_helpers.get_function_symbols(): + fh: FunctionHandle = FunctionHandle( + address=fhandle.getEntryPoint().getOffset(), + inner=fhandle, + ctx={"imports_cache": self.imports, "externs_cache": self.externs, "fakes_cache": self.fakes}, + ) + yield fh @staticmethod def get_function(addr: int) -> FunctionHandle: diff --git a/capa/features/extractors/ghidra/helpers.py b/capa/features/extractors/ghidra/helpers.py index 9a339460..0f405870 100644 --- a/capa/features/extractors/ghidra/helpers.py +++ b/capa/features/extractors/ghidra/helpers.py @@ -64,11 +64,9 @@ def get_block_bytes(block: ghidra.program.model.mem.MemoryBlock) -> bytes: return get_bytes(block.getStart(), block.getSize()) -def get_function_symbols() -> Iterator[FunctionHandle]: +def get_function_symbols(): """yield all non-external function symbols""" - - for fhandle in currentProgram().getFunctionManager().getFunctionsNoStubs(True): # type: ignore [name-defined] # noqa: F821 - yield FunctionHandle(address=AbsoluteVirtualAddress(fhandle.getEntryPoint().getOffset()), inner=fhandle) + yield from currentProgram().getFunctionManager().getFunctionsNoStubs(True) # type: ignore [name-defined] # noqa: F821 def get_function_blocks(fh: FunctionHandle) -> Iterator[BBHandle]: diff --git a/capa/features/extractors/ghidra/insn.py b/capa/features/extractors/ghidra/insn.py index 41787cc8..2404207c 100644 --- a/capa/features/extractors/ghidra/insn.py +++ b/capa/features/extractors/ghidra/insn.py @@ -22,15 +22,44 @@ from capa.features.extractors.base_extractor import BBHandle, InsnHandle, Functi # byte range within the first and returning basic blocks, this helps to reduce FP features SECURITY_COOKIE_BYTES_DELTA = 0x40 -# significantly cut down on runtime by caching api info -imports = capa.features.extractors.ghidra.helpers.get_file_imports() -externs = capa.features.extractors.ghidra.helpers.get_file_externs() -mapped_fake_addrs = capa.features.extractors.ghidra.helpers.map_fake_import_addrs() + +def get_imports(ctx: Dict[str, Any]) -> Dict[int, Any]: + """Populate the import cache for this context""" + if "imports_cache" not in ctx: + ctx["imports_cache"] = capa.features.extractors.ghidra.helpers.get_file_imports() + return ctx["imports_cache"] -def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]: - """check instruction for API call""" +def get_externs(ctx: Dict[str, Any]) -> Dict[int, Any]: + """Populate the externs cache for this context""" + if "externs_cache" not in ctx: + ctx["externs_cache"] = capa.features.extractors.ghidra.helpers.get_file_externs() + return ctx["externs_cache"] + + +def get_fakes(ctx: Dict[str, Any]) -> Dict[int, Any]: + """Populate the fake import addrs cache for this context""" + if "fakes_cache" not in ctx: + ctx["fakes_cache"] = capa.features.extractors.ghidra.helpers.map_fake_import_addrs() + return ctx["fakes_cache"] + + +def check_for_api_call( + insn, externs: Dict[int, Any], fakes: Dict[int, Any], imports: Dict[int, Any], imp_or_ex: bool +) -> Iterator[Any]: + """check instruction for API call + + params: + externs - external library functions cache + fakes - mapped fake import addresses cache + imports - imported functions cache + imp_or_ex - flag to check imports or externs + + yields: + matched api calls + """ info = () + funcs = imports if imp_or_ex else externs # assume only CALLs or JMPs are passed ref_type = insn.getOperandType(0) @@ -42,7 +71,7 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]: # If it's an address in a register, check the mapped fake addrs # since they're dereferenced to their fake addrs op_ref = insn.getAddress(0).getOffset() - ref = mapped_fake_addrs.get(op_ref) # obtain the real addr + ref = fakes.get(op_ref) # obtain the real addr if not ref: return else: @@ -50,9 +79,7 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]: elif ref_type in (addr_data, addr_code) or (OperandType.isIndirect(ref_type) and OperandType.isAddress(ref_type)): # we must dereference and check if the addr is a pointer to an api function addr_ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn) - if not capa.features.extractors.ghidra.helpers.check_addr_for_api( - addr_ref, mapped_fake_addrs, imports, externs - ): + if not capa.features.extractors.ghidra.helpers.check_addr_for_api(addr_ref, fakes, imports, externs): return ref = addr_ref.getOffset() elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC: @@ -65,9 +92,7 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]: # that had no address reference. # This check is faster than checking for (indirect and not address) return - if not capa.features.extractors.ghidra.helpers.check_addr_for_api( - addr_ref, mapped_fake_addrs, imports, externs - ): + if not capa.features.extractors.ghidra.helpers.check_addr_for_api(addr_ref, fakes, imports, externs): return ref = addr_ref.getOffset() @@ -88,13 +113,17 @@ def extract_insn_api_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn): return + externs = get_externs(fh.ctx) + fakes = get_fakes(fh.ctx) + imports = get_imports(fh.ctx) + # check calls to imported functions - for api in check_for_api_call(insn, imports): + for api in check_for_api_call(insn, externs, fakes, imports, True): for imp in api: yield API(imp), ih.address # check calls to extern functions - for api in check_for_api_call(insn, externs): + for api in check_for_api_call(insn, externs, fakes, imports, False): for ext in api: yield API(ext), ih.address @@ -306,6 +335,10 @@ def extract_insn_cross_section_cflow( if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn): return + externs = get_externs(fh.ctx) + fakes = get_fakes(fh.ctx) + imports = get_imports(fh.ctx) + # OperandType to dereference addr_data = OperandType.ADDRESS | OperandType.DATA addr_code = OperandType.ADDRESS | OperandType.CODE @@ -317,14 +350,14 @@ def extract_insn_cross_section_cflow( if OperandType.isRegister(ref_type): if OperandType.isAddress(ref_type): ref = insn.getAddress(0) # Ghidra dereferences REG | ADDR - if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, mapped_fake_addrs, imports, externs): + if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, fakes, imports, externs): return else: return elif ref_type in (addr_data, addr_code) or (OperandType.isIndirect(ref_type) and OperandType.isAddress(ref_type)): # we must dereference and check if the addr is a pointer to an api function ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn) - if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, mapped_fake_addrs, imports, externs): + if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, fakes, imports, externs): return elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC: return # cannot resolve dynamics statically @@ -336,7 +369,7 @@ def extract_insn_cross_section_cflow( # that had no address reference. # This check is faster than checking for (indirect and not address) return - if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, mapped_fake_addrs, imports, externs): + if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, fakes, imports, externs): return this_mem_block = getMemoryBlock(insn.getAddress()) # type: ignore [name-defined] # noqa: F821 @@ -472,7 +505,9 @@ INSTRUCTION_HANDLERS = ( def main(): """ """ features = [] - for fh in capa.features.extractors.ghidra.helpers.get_function_symbols(): + from capa.features.extractors.ghidra.extractor import GhidraFeatureExtractor + + for fh in GhidraFeatureExtractor().get_functions(): for bb in capa.features.extractors.ghidra.helpers.get_function_blocks(fh): for insn in capa.features.extractors.ghidra.helpers.get_insn_in_range(bb): features.extend(list(extract_features(fh, bb, insn)))