ghidra: fix api info caching (#1766)

* cache and retrive imports, externs, and fakes in FunctionHandle objects

* reduce cache retreival calls

* cache in GhidraFeatureExtractor, point fh.ctx to cache

* move caching routine to __init__
This commit is contained in:
Colton Gabertan
2023-08-25 19:03:38 -07:00
committed by GitHub
parent 65179805a7
commit fddec33d04
4 changed files with 71 additions and 25 deletions

View File

@@ -137,7 +137,9 @@ def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Featur
def main():
features = []
for fh in capa.features.extractors.ghidra.helpers.get_function_symbols():
from capa.features.extractors.ghidra.extractor import GhidraFeatureExtractor
for fh in GhidraFeatureExtractor().get_functions():
for bbh in capa.features.extractors.ghidra.helpers.get_function_blocks(fh):
features.extend(list(extract_features(fh, bbh)))

View File

@@ -20,10 +20,15 @@ from capa.features.extractors.base_extractor import BBHandle, InsnHandle, Functi
class GhidraFeatureExtractor(FeatureExtractor):
def __init__(self):
super().__init__()
import capa.features.extractors.ghidra.helpers as ghidra_helpers
self.global_features: List[Tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.ghidra.file.extract_file_format())
self.global_features.extend(capa.features.extractors.ghidra.global_.extract_os())
self.global_features.extend(capa.features.extractors.ghidra.global_.extract_arch())
self.imports = ghidra_helpers.get_file_imports()
self.externs = ghidra_helpers.get_file_externs()
self.fakes = ghidra_helpers.map_fake_import_addrs()
def get_base_address(self):
return AbsoluteVirtualAddress(currentProgram().getImageBase().getOffset()) # type: ignore [name-defined] # noqa: F821
@@ -37,7 +42,13 @@ class GhidraFeatureExtractor(FeatureExtractor):
def get_functions(self) -> Iterator[FunctionHandle]:
import capa.features.extractors.ghidra.helpers as ghidra_helpers
yield from ghidra_helpers.get_function_symbols()
for fhandle in ghidra_helpers.get_function_symbols():
fh: FunctionHandle = FunctionHandle(
address=fhandle.getEntryPoint().getOffset(),
inner=fhandle,
ctx={"imports_cache": self.imports, "externs_cache": self.externs, "fakes_cache": self.fakes},
)
yield fh
@staticmethod
def get_function(addr: int) -> FunctionHandle:

View File

@@ -64,11 +64,9 @@ def get_block_bytes(block: ghidra.program.model.mem.MemoryBlock) -> bytes:
return get_bytes(block.getStart(), block.getSize())
def get_function_symbols() -> Iterator[FunctionHandle]:
def get_function_symbols():
"""yield all non-external function symbols"""
for fhandle in currentProgram().getFunctionManager().getFunctionsNoStubs(True): # type: ignore [name-defined] # noqa: F821
yield FunctionHandle(address=AbsoluteVirtualAddress(fhandle.getEntryPoint().getOffset()), inner=fhandle)
yield from currentProgram().getFunctionManager().getFunctionsNoStubs(True) # type: ignore [name-defined] # noqa: F821
def get_function_blocks(fh: FunctionHandle) -> Iterator[BBHandle]:

View File

@@ -22,15 +22,44 @@ from capa.features.extractors.base_extractor import BBHandle, InsnHandle, Functi
# byte range within the first and returning basic blocks, this helps to reduce FP features
SECURITY_COOKIE_BYTES_DELTA = 0x40
# significantly cut down on runtime by caching api info
imports = capa.features.extractors.ghidra.helpers.get_file_imports()
externs = capa.features.extractors.ghidra.helpers.get_file_externs()
mapped_fake_addrs = capa.features.extractors.ghidra.helpers.map_fake_import_addrs()
def get_imports(ctx: Dict[str, Any]) -> Dict[int, Any]:
"""Populate the import cache for this context"""
if "imports_cache" not in ctx:
ctx["imports_cache"] = capa.features.extractors.ghidra.helpers.get_file_imports()
return ctx["imports_cache"]
def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
"""check instruction for API call"""
def get_externs(ctx: Dict[str, Any]) -> Dict[int, Any]:
"""Populate the externs cache for this context"""
if "externs_cache" not in ctx:
ctx["externs_cache"] = capa.features.extractors.ghidra.helpers.get_file_externs()
return ctx["externs_cache"]
def get_fakes(ctx: Dict[str, Any]) -> Dict[int, Any]:
"""Populate the fake import addrs cache for this context"""
if "fakes_cache" not in ctx:
ctx["fakes_cache"] = capa.features.extractors.ghidra.helpers.map_fake_import_addrs()
return ctx["fakes_cache"]
def check_for_api_call(
insn, externs: Dict[int, Any], fakes: Dict[int, Any], imports: Dict[int, Any], imp_or_ex: bool
) -> Iterator[Any]:
"""check instruction for API call
params:
externs - external library functions cache
fakes - mapped fake import addresses cache
imports - imported functions cache
imp_or_ex - flag to check imports or externs
yields:
matched api calls
"""
info = ()
funcs = imports if imp_or_ex else externs
# assume only CALLs or JMPs are passed
ref_type = insn.getOperandType(0)
@@ -42,7 +71,7 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
# If it's an address in a register, check the mapped fake addrs
# since they're dereferenced to their fake addrs
op_ref = insn.getAddress(0).getOffset()
ref = mapped_fake_addrs.get(op_ref) # obtain the real addr
ref = fakes.get(op_ref) # obtain the real addr
if not ref:
return
else:
@@ -50,9 +79,7 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
elif ref_type in (addr_data, addr_code) or (OperandType.isIndirect(ref_type) and OperandType.isAddress(ref_type)):
# we must dereference and check if the addr is a pointer to an api function
addr_ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn)
if not capa.features.extractors.ghidra.helpers.check_addr_for_api(
addr_ref, mapped_fake_addrs, imports, externs
):
if not capa.features.extractors.ghidra.helpers.check_addr_for_api(addr_ref, fakes, imports, externs):
return
ref = addr_ref.getOffset()
elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC:
@@ -65,9 +92,7 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
# that had no address reference.
# This check is faster than checking for (indirect and not address)
return
if not capa.features.extractors.ghidra.helpers.check_addr_for_api(
addr_ref, mapped_fake_addrs, imports, externs
):
if not capa.features.extractors.ghidra.helpers.check_addr_for_api(addr_ref, fakes, imports, externs):
return
ref = addr_ref.getOffset()
@@ -88,13 +113,17 @@ def extract_insn_api_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle)
if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
return
externs = get_externs(fh.ctx)
fakes = get_fakes(fh.ctx)
imports = get_imports(fh.ctx)
# check calls to imported functions
for api in check_for_api_call(insn, imports):
for api in check_for_api_call(insn, externs, fakes, imports, True):
for imp in api:
yield API(imp), ih.address
# check calls to extern functions
for api in check_for_api_call(insn, externs):
for api in check_for_api_call(insn, externs, fakes, imports, False):
for ext in api:
yield API(ext), ih.address
@@ -306,6 +335,10 @@ def extract_insn_cross_section_cflow(
if not capa.features.extractors.ghidra.helpers.is_call_or_jmp(insn):
return
externs = get_externs(fh.ctx)
fakes = get_fakes(fh.ctx)
imports = get_imports(fh.ctx)
# OperandType to dereference
addr_data = OperandType.ADDRESS | OperandType.DATA
addr_code = OperandType.ADDRESS | OperandType.CODE
@@ -317,14 +350,14 @@ def extract_insn_cross_section_cflow(
if OperandType.isRegister(ref_type):
if OperandType.isAddress(ref_type):
ref = insn.getAddress(0) # Ghidra dereferences REG | ADDR
if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, mapped_fake_addrs, imports, externs):
if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, fakes, imports, externs):
return
else:
return
elif ref_type in (addr_data, addr_code) or (OperandType.isIndirect(ref_type) and OperandType.isAddress(ref_type)):
# we must dereference and check if the addr is a pointer to an api function
ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn)
if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, mapped_fake_addrs, imports, externs):
if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, fakes, imports, externs):
return
elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC:
return # cannot resolve dynamics statically
@@ -336,7 +369,7 @@ def extract_insn_cross_section_cflow(
# that had no address reference.
# This check is faster than checking for (indirect and not address)
return
if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, mapped_fake_addrs, imports, externs):
if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, fakes, imports, externs):
return
this_mem_block = getMemoryBlock(insn.getAddress()) # type: ignore [name-defined] # noqa: F821
@@ -472,7 +505,9 @@ INSTRUCTION_HANDLERS = (
def main():
""" """
features = []
for fh in capa.features.extractors.ghidra.helpers.get_function_symbols():
from capa.features.extractors.ghidra.extractor import GhidraFeatureExtractor
for fh in GhidraFeatureExtractor().get_functions():
for bb in capa.features.extractors.ghidra.helpers.get_function_blocks(fh):
for insn in capa.features.extractors.ghidra.helpers.get_insn_in_range(bb):
features.extend(list(extract_features(fh, bb, insn)))