mirror of
https://github.com/mandiant/capa.git
synced 2026-04-28 11:53:20 -07:00
Ghidra: Fixes & Enhancements (#1733)
* restore from corrupted .git * lint repo * temp: remove lint failing rule * implement dereferencing, clean up extractors * implement proper dereferencing routines as applicable * fix nzxor implementation, remediate ghidra analysis issues * lint repo * Assert typing, lint repo * avoid extracting pointers in bytes extraction * attempt to recover submodule * implement GhidraFeatureExtractor & ghidra_main() * lint repo * document examples, clean-up & testing * lint repo * properly map import dict * properly map fake addresses * fix fake addr mapping * properly map externs * re-align consistency with other backends * lint repo * fix dereferencing routine * clean up helpers * fix format string * disable progress bar to exit gracefully * enable pbar in headless runtime mode * implement fixture test script * implement ghidra unit test script * refactor repo for breaking Ghidrathon change * bump ghidrathon CI version, run unit test in CI * change CI config * fix wget line for ghidrathon * fix unzip paths * fix ghidra import issue * disable pytest faulthandler module * fix dereference function * fix ghidra state variables * implement dereferencing for string extraction * use toAddr * restructure for consistency * Bump Ghidrathon version for CI, fix pytest ghidra runtime detection * fix number & offset extractors * yield both signed & unsgned values for offset extraction * add LEA insn handling to number & offset extraction * fix indirect call extraction * implement thunk function checking for dereferences * revise ghidra feature count tests, pass unit testing * fix feature test format * implement additional support for dereferencing thunked functions * integrate external locations into find_file_imports * change api yield string for .elf samples to match other extractors * fix potential NoneType errors during dereferencing * user helper in global_ * fix GHIDRAIO class, implement in global_ * comment on getOriginalByte * simplify get_file_imports * implement explicit thunk chain handling * simplify LEA number extraction * simplify thunk handling * temp: demonstrate CI failure & output * fix log path * run new test against mimikatz
This commit is contained in:
6
.github/workflows/tests.yml
vendored
6
.github/workflows/tests.yml
vendored
@@ -194,5 +194,9 @@ jobs:
|
||||
- name: Install capa
|
||||
run: pip install -e .[dev]
|
||||
- name: Run tests
|
||||
run: .github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC/support/analyzeHeadless .github/ghidra/project ghidra_test -Import ./tests/data/'mimikatz.exe_' -ScriptPath ./tests/ -PostScript test_ghidra_features.py
|
||||
run: |
|
||||
.github/ghidra/ghidra_${{ matrix.ghidra-version }}_PUBLIC/support/analyzeHeadless .github/ghidra/project ghidra_test -Import ./tests/data/mimikatz.exe_ -ScriptPath ./tests/ -PostScript test_ghidra_features.py > ../output.log
|
||||
cat ../output.log
|
||||
exit_code=$(cat ../output.log | grep exit | awk '{print $NF}')
|
||||
exit $exit_code
|
||||
|
||||
|
||||
@@ -898,7 +898,7 @@ def guess_os_from_symtab(elf: ELF) -> Optional[OS]:
|
||||
|
||||
def detect_elf_os(f) -> str:
|
||||
"""
|
||||
f: type Union[BinaryIO, IDAIO]
|
||||
f: type Union[BinaryIO, IDAIO, GHIDRAIO]
|
||||
"""
|
||||
try:
|
||||
elf = ELF(f)
|
||||
|
||||
@@ -7,10 +7,11 @@
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import logging
|
||||
import contextlib
|
||||
from io import BytesIO
|
||||
from typing import Tuple, Iterator
|
||||
|
||||
import capa.ghidra.helpers
|
||||
import capa.features.extractors.elf
|
||||
import capa.features.extractors.ghidra.helpers
|
||||
from capa.features.common import OS, ARCH_I386, ARCH_AMD64, OS_WINDOWS, Arch, Feature
|
||||
from capa.features.address import NO_ADDRESS, Address
|
||||
|
||||
@@ -24,19 +25,7 @@ def extract_os() -> Iterator[Tuple[Feature, Address]]:
|
||||
yield OS(OS_WINDOWS), NO_ADDRESS
|
||||
|
||||
elif "ELF" in format_name:
|
||||
program_memory = currentProgram().getMemory() # type: ignore [name-defined] # noqa: F821
|
||||
fbytes_list = program_memory.getAllFileBytes()
|
||||
fbytes = fbytes_list[0]
|
||||
|
||||
# Java likes to return signed ints, so we must convert them
|
||||
# back into unsigned bytes manually and write to BytesIO
|
||||
# note: May be deprecated if Jep has implements better support for Java Lists
|
||||
pb_arr = b""
|
||||
for i in range(fbytes.getSize()):
|
||||
pb_arr = pb_arr + (fbytes.getOriginalByte(i) & 0xFF).to_bytes(1, "little")
|
||||
buf = BytesIO(pb_arr)
|
||||
|
||||
with contextlib.closing(buf) as f:
|
||||
with contextlib.closing(capa.ghidra.helpers.GHIDRAIO()) as f:
|
||||
os = capa.features.extractors.elf.detect_elf_os(f)
|
||||
|
||||
yield OS(os), NO_ADDRESS
|
||||
|
||||
@@ -14,6 +14,7 @@ from ghidra.program.model.symbol import SourceType, SymbolType
|
||||
from ghidra.program.model.address import AddressSpace
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
from capa.features.common import THUNK_CHAIN_DEPTH_DELTA
|
||||
from capa.features.address import AbsoluteVirtualAddress
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
|
||||
|
||||
@@ -109,12 +110,19 @@ def get_file_imports() -> Dict[int, List[str]]:
|
||||
if r.getReferenceType().isData():
|
||||
addr = r.getFromAddress().getOffset() # gets pointer to fake external addr
|
||||
|
||||
fstr = f.toString().split("::") # format: MODULE.dll::import / MODULE::Ordinal_*
|
||||
ex_loc = f.getExternalLocation().getAddress() # map external locations as well (offset into module files)
|
||||
|
||||
fstr = f.toString().split("::") # format: MODULE.dll::import / MODULE::Ordinal_* / <EXTERNAL>::import
|
||||
if "Ordinal_" in fstr[1]:
|
||||
fstr[1] = f"#{fstr[1].split('_')[1]}"
|
||||
|
||||
for name in capa.features.extractors.helpers.generate_symbols(fstr[0][:-4], fstr[1]):
|
||||
# <EXTERNAL> mostly shows up in ELF files, otherwise, strip '.dll' w/ [:-4]
|
||||
fstr[0] = "*" if "<EXTERNAL>" in fstr[0] else fstr[0][:-4]
|
||||
|
||||
for name in capa.features.extractors.helpers.generate_symbols(fstr[0], fstr[1]):
|
||||
import_dict.setdefault(addr, []).append(name)
|
||||
if ex_loc:
|
||||
import_dict.setdefault(ex_loc.getOffset(), []).append(name)
|
||||
|
||||
return import_dict
|
||||
|
||||
@@ -181,35 +189,11 @@ def map_fake_import_addrs() -> Dict[int, List[int]]:
|
||||
return fake_dict
|
||||
|
||||
|
||||
def get_external_locs() -> List[int]:
|
||||
"""
|
||||
Helps to discern external offsets from regular bytes when extracting
|
||||
data.
|
||||
|
||||
Ghidra behavior:
|
||||
- Offsets that point to specific sections of external programs
|
||||
i.e. library code.
|
||||
- Stored in data, and pointed to by an absolute address
|
||||
https://github.com/NationalSecurityAgency/ghidra/blob/26d4bd9104809747c21f2528cab8aba9aef9acd5/Ghidra/Framework/SoftwareModeling/src/main/java/ghidra/program/model/symbol/ExternalLocation.java#L25-30
|
||||
|
||||
Example: (mimikatz.exe_) 5f66b82558ca92e54e77f216ef4c066c:0x473090
|
||||
- 0x473090 -> PTR_CreateServiceW_00473090
|
||||
- 0x000b34EC -> External Location
|
||||
"""
|
||||
locs = []
|
||||
for fh in currentProgram().getFunctionManager().getExternalFunctions(): # type: ignore [name-defined] # noqa: F821
|
||||
external_loc = fh.getExternalLocation().getAddress()
|
||||
if external_loc:
|
||||
locs.append(external_loc)
|
||||
return locs
|
||||
|
||||
|
||||
def check_addr_for_api(
|
||||
addr: ghidra.program.model.address.Address,
|
||||
fakes: Dict[int, List[int]],
|
||||
imports: Dict[int, List[str]],
|
||||
externs: Dict[int, List[str]],
|
||||
ex_locs: List[int],
|
||||
) -> bool:
|
||||
offset = addr.getOffset()
|
||||
|
||||
@@ -225,9 +209,6 @@ def check_addr_for_api(
|
||||
if extern:
|
||||
return True
|
||||
|
||||
if addr in ex_locs:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@@ -244,6 +225,13 @@ def is_sp_modified(insn: ghidra.program.database.code.InstructionDB) -> bool:
|
||||
|
||||
def is_stack_referenced(insn: ghidra.program.database.code.InstructionDB) -> bool:
|
||||
"""generic catch-all for stack references"""
|
||||
for i in range(insn.getNumOperands()):
|
||||
if insn.getOperandType(i) == OperandType.REGISTER:
|
||||
if "BP" in insn.getRegister(i).getName():
|
||||
return True
|
||||
else:
|
||||
continue
|
||||
|
||||
return any(ref.isStackReference() for ref in insn.getReferencesFrom())
|
||||
|
||||
|
||||
@@ -263,9 +251,34 @@ def is_zxor(insn: ghidra.program.database.code.InstructionDB) -> bool:
|
||||
return all(n == operands[0] for n in operands)
|
||||
|
||||
|
||||
def handle_thunk(addr: ghidra.program.model.address.Address):
|
||||
"""Follow thunk chains down to a reasonable depth"""
|
||||
ref = addr
|
||||
for _ in range(THUNK_CHAIN_DEPTH_DELTA):
|
||||
thunk_jmp = getInstructionAt(ref) # type: ignore [name-defined] # noqa: F821
|
||||
if thunk_jmp and is_call_or_jmp(thunk_jmp):
|
||||
if OperandType.isAddress(thunk_jmp.getOperandType(0)):
|
||||
ref = thunk_jmp.getAddress(0)
|
||||
else:
|
||||
thunk_dat = getDataContaining(ref) # type: ignore [name-defined] # noqa: F821
|
||||
if thunk_dat and thunk_dat.isDefined() and thunk_dat.isPointer():
|
||||
ref = thunk_dat.getValue()
|
||||
break # end of thunk chain reached
|
||||
return ref
|
||||
|
||||
|
||||
def dereference_ptr(insn: ghidra.program.database.code.InstructionDB):
|
||||
addr_code = OperandType.ADDRESS | OperandType.CODE
|
||||
to_deref = insn.getAddress(0)
|
||||
dat = getDataContaining(to_deref) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
if insn.getOperandType(0) == addr_code:
|
||||
thfunc = getFunctionContaining(to_deref) # type: ignore [name-defined] # noqa: F821
|
||||
if thfunc and thfunc.isThunk():
|
||||
return handle_thunk(to_deref)
|
||||
else:
|
||||
# if it doesn't poin to a thunk, it's usually a jmp to a label
|
||||
return to_deref
|
||||
if not dat:
|
||||
return to_deref
|
||||
if dat.isDefined() and dat.isPointer():
|
||||
|
||||
@@ -26,7 +26,6 @@ SECURITY_COOKIE_BYTES_DELTA = 0x40
|
||||
imports = capa.features.extractors.ghidra.helpers.get_file_imports()
|
||||
externs = capa.features.extractors.ghidra.helpers.get_file_externs()
|
||||
mapped_fake_addrs = capa.features.extractors.ghidra.helpers.map_fake_import_addrs()
|
||||
external_locs = capa.features.extractors.ghidra.helpers.get_external_locs()
|
||||
|
||||
|
||||
def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
|
||||
@@ -36,6 +35,7 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
|
||||
# assume only CALLs or JMPs are passed
|
||||
ref_type = insn.getOperandType(0)
|
||||
addr_data = OperandType.ADDRESS | OperandType.DATA # needs dereferencing
|
||||
addr_code = OperandType.ADDRESS | OperandType.CODE # needs dereferencing
|
||||
|
||||
if OperandType.isRegister(ref_type):
|
||||
if OperandType.isAddress(ref_type):
|
||||
@@ -47,23 +47,21 @@ def check_for_api_call(insn, funcs: Dict[int, Any]) -> Iterator[Any]:
|
||||
return
|
||||
else:
|
||||
return
|
||||
elif ref_type == addr_data:
|
||||
elif ref_type in (addr_data, addr_code) or OperandType.isIndirect(ref_type):
|
||||
# we must dereference and check if the addr is a pointer to an api function
|
||||
addr_ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn)
|
||||
if not capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
addr_ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
addr_ref, mapped_fake_addrs, imports, externs
|
||||
):
|
||||
return
|
||||
ref = addr_ref.getOffset()
|
||||
elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC:
|
||||
return # cannot resolve dynamics statically
|
||||
elif OperandType.isIndirect(ref_type):
|
||||
return # cannot resolve the indirection statically
|
||||
else:
|
||||
# pure address does not need to get dereferenced/ handled
|
||||
addr_ref = insn.getAddress(0)
|
||||
if not capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
addr_ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
addr_ref, mapped_fake_addrs, imports, externs
|
||||
):
|
||||
return
|
||||
ref = addr_ref.getOffset()
|
||||
@@ -115,25 +113,40 @@ def extract_insn_number_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandl
|
||||
return
|
||||
|
||||
for i in range(insn.getNumOperands()):
|
||||
if insn.getOperandType(i) != OperandType.SCALAR:
|
||||
# Exceptions for LEA insn:
|
||||
# invalid operand encoding, considered numbers instead of offsets
|
||||
# see: mimikatz.exe_:0x4018C0
|
||||
if insn.getOperandType(i) == OperandType.DYNAMIC and insn.getMnemonicString().startswith("LEA"):
|
||||
# Additional check, avoid yielding "wide" values (ex. mimikatz.exe:0x471EE6 LEA EBX, [ECX + EAX*0x4])
|
||||
op_objs = insn.getOpObjects(i)
|
||||
if len(op_objs) == 3: # ECX, EAX, 0x4
|
||||
continue
|
||||
|
||||
if isinstance(op_objs[-1], ghidra.program.model.scalar.Scalar):
|
||||
const = op_objs[-1].getUnsignedValue()
|
||||
addr = ih.address
|
||||
|
||||
yield Number(const), addr
|
||||
yield OperandNumber(i, const), addr
|
||||
elif not OperandType.isScalar(insn.getOperandType(i)):
|
||||
# skip things like:
|
||||
# references, void types
|
||||
continue
|
||||
else:
|
||||
const = insn.getScalar(i).getUnsignedValue()
|
||||
addr = ih.address
|
||||
|
||||
const = insn.getScalar(i).getValue()
|
||||
addr = ih.address
|
||||
yield Number(const), addr
|
||||
yield OperandNumber(i, const), addr
|
||||
|
||||
yield Number(const), addr
|
||||
yield OperandNumber(i, const), addr
|
||||
|
||||
if insn.getMnemonicString().startswith("ADD") and 0 < const < MAX_STRUCTURE_SIZE:
|
||||
# for pattern like:
|
||||
#
|
||||
# add eax, 0x10
|
||||
#
|
||||
# assume 0x10 is also an offset (imagine eax is a pointer).
|
||||
yield Offset(const), addr
|
||||
yield OperandOffset(i, const), addr
|
||||
if insn.getMnemonicString().startswith("ADD") and 0 < const < MAX_STRUCTURE_SIZE:
|
||||
# for pattern like:
|
||||
#
|
||||
# add eax, 0x10
|
||||
#
|
||||
# assume 0x10 is also an offset (imagine eax is a pointer).
|
||||
yield Offset(const), addr
|
||||
yield OperandOffset(i, const), addr
|
||||
|
||||
|
||||
def extract_insn_offset_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
@@ -145,6 +158,9 @@ def extract_insn_offset_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandl
|
||||
"""
|
||||
insn: ghidra.program.database.code.InstructionDB = ih.inner
|
||||
|
||||
if insn.getMnemonicString().startswith("LEA"):
|
||||
return
|
||||
|
||||
# ignore any stack references
|
||||
if not capa.features.extractors.ghidra.helpers.is_stack_referenced(insn):
|
||||
# Ghidra stores operands in 2D arrays if they contain offsets
|
||||
@@ -156,6 +172,9 @@ def extract_insn_offset_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandl
|
||||
op_off = op_objs[-1].getValue()
|
||||
yield Offset(op_off), ih.address
|
||||
yield OperandOffset(i, op_off), ih.address
|
||||
else:
|
||||
yield Offset(0), ih.address
|
||||
yield OperandOffset(i, 0), ih.address
|
||||
|
||||
|
||||
def extract_insn_bytes_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
@@ -171,7 +190,7 @@ def extract_insn_bytes_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandle
|
||||
|
||||
ref = insn.getAddress() # init to insn addr
|
||||
for i in range(insn.getNumOperands()):
|
||||
if OperandType.isScalarAsAddress(insn.getOperandType(i)):
|
||||
if OperandType.isAddress(insn.getOperandType(i)):
|
||||
ref = insn.getAddress(i) # pulls pointer if there is one
|
||||
|
||||
if ref != insn.getAddress(): # bail out if there's no pointer
|
||||
@@ -193,11 +212,18 @@ def extract_insn_string_features(fh: FunctionHandle, bb: BBHandle, ih: InsnHandl
|
||||
push offset aAcr ; "ACR > "
|
||||
"""
|
||||
insn: ghidra.program.database.code.InstructionDB = ih.inner
|
||||
dyn_addr = OperandType.DYNAMIC | OperandType.ADDRESS
|
||||
|
||||
ref = insn.getAddress()
|
||||
for i in range(insn.getNumOperands()):
|
||||
if OperandType.isScalarAsAddress(insn.getOperandType(i)):
|
||||
ref = insn.getAddress(i)
|
||||
# strings are also referenced dynamically via pointers & arrays, so we need to deref them
|
||||
if insn.getOperandType(i) == dyn_addr:
|
||||
ref = insn.getAddress(i)
|
||||
dat = getDataAt(ref) # type: ignore [name-defined] # noqa: F821
|
||||
if dat and dat.isPointer():
|
||||
ref = dat.getValue()
|
||||
|
||||
if ref != insn.getAddress():
|
||||
ghidra_dat = getDataAt(ref) # type: ignore [name-defined] # noqa: F821
|
||||
@@ -277,6 +303,7 @@ def extract_insn_cross_section_cflow(
|
||||
|
||||
# OperandType to dereference
|
||||
addr_data = OperandType.ADDRESS | OperandType.DATA
|
||||
addr_code = OperandType.ADDRESS | OperandType.CODE
|
||||
|
||||
ref_type = insn.getOperandType(0)
|
||||
|
||||
@@ -285,29 +312,21 @@ def extract_insn_cross_section_cflow(
|
||||
if OperandType.isRegister(ref_type):
|
||||
if OperandType.isAddress(ref_type):
|
||||
ref = insn.getAddress(0) # Ghidra dereferences REG | ADDR
|
||||
if capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
):
|
||||
if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, mapped_fake_addrs, imports, externs):
|
||||
return
|
||||
else:
|
||||
return
|
||||
elif ref_type == addr_data:
|
||||
elif ref_type in (addr_data, addr_code) or OperandType.isIndirect(ref_type):
|
||||
# we must dereference and check if the addr is a pointer to an api function
|
||||
ref = capa.features.extractors.ghidra.helpers.dereference_ptr(insn)
|
||||
if capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
):
|
||||
if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, mapped_fake_addrs, imports, externs):
|
||||
return
|
||||
elif ref_type == OperandType.DYNAMIC | OperandType.ADDRESS or ref_type == OperandType.DYNAMIC:
|
||||
return # cannot resolve dynamics statically
|
||||
elif OperandType.isIndirect(ref_type):
|
||||
return # cannot resolve the indirection statically
|
||||
else:
|
||||
# pure address does not need to get dereferenced/ handled
|
||||
ref = insn.getAddress(0)
|
||||
if capa.features.extractors.ghidra.helpers.check_addr_for_api(
|
||||
ref, mapped_fake_addrs, imports, externs, external_locs
|
||||
):
|
||||
if capa.features.extractors.ghidra.helpers.check_addr_for_api(ref, mapped_fake_addrs, imports, externs):
|
||||
return
|
||||
|
||||
this_mem_block = getMemoryBlock(insn.getAddress()) # type: ignore [name-defined] # noqa: F821
|
||||
@@ -359,6 +378,8 @@ def extract_function_indirect_call_characteristic_features(
|
||||
insn: ghidra.program.database.code.InstructionDB = ih.inner
|
||||
|
||||
if insn.getMnemonicString().startswith("CALL"):
|
||||
if OperandType.isRegister(insn.getOperandType(0)):
|
||||
yield Characteristic("indirect call"), ih.address
|
||||
if OperandType.isIndirect(insn.getOperandType(0)):
|
||||
yield Characteristic("indirect call"), ih.address
|
||||
|
||||
|
||||
@@ -20,9 +20,6 @@ import capa.features.extractors.ghidra.helpers
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
|
||||
currentProgram = currentProgram() # type: ignore # noqa: F821
|
||||
currentAddress = currentAddress() # type: ignore # noqa: F821
|
||||
|
||||
# file type as returned by Ghidra
|
||||
SUPPORTED_FILE_TYPES = ("Executable and Linking Format (ELF)", "Portable Executable (PE)", "Raw Binary")
|
||||
|
||||
@@ -36,29 +33,41 @@ class GHIDRAIO:
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.offset = 0
|
||||
self.bytez = self.get_file_bytes()
|
||||
|
||||
def seek(self, offset, whence=0):
|
||||
assert whence == 0
|
||||
self.offset = offset
|
||||
|
||||
def read(self, size):
|
||||
try:
|
||||
# ghidra.program.model.address.Address has no public constructor,
|
||||
# so we have to use the exposed currentAddress object for its
|
||||
# member function .getAddress()
|
||||
ea = currentAddress.getAddress(hex(self.offset)) # type: ignore [name-defined] # noqa: F821
|
||||
except RuntimeError: # AddressFormatException to Ghidra
|
||||
logger.debug("reading 0x%x bytes at 0x%x (ea: 0x%x)", size, self.offset, currentProgram().getImageBase().add(self.offset).getOffset()) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
b_len = len(self.bytez)
|
||||
if size > b_len - self.offset:
|
||||
logger.debug("cannot read 0x%x bytes at 0x%x (ea: BADADDR)", size, self.offset)
|
||||
return b""
|
||||
|
||||
logger.debug("reading 0x%x bytes at 0x%x (ea: 0x%x)", size, self.offset, ea.getOffset())
|
||||
|
||||
# returns bytes or b""
|
||||
return capa.features.extractors.ghidra.helpers.get_bytes(ea, size)
|
||||
else:
|
||||
read_bytes = b""
|
||||
read = [
|
||||
capa.features.extractors.ghidra.helpers.fix_byte(b)
|
||||
for b in self.bytez[self.offset : self.offset + size]
|
||||
]
|
||||
for b in read:
|
||||
read_bytes = read_bytes + b
|
||||
return read_bytes
|
||||
|
||||
def close(self):
|
||||
return
|
||||
|
||||
def get_file_bytes(self):
|
||||
fbytes = currentProgram().getMemory().getAllFileBytes()[0] # type: ignore [name-defined] # noqa: F821
|
||||
bytez = b""
|
||||
for i in range(fbytes.getSize()):
|
||||
# getOriginalByte() allows for raw file parsing on the Ghidra side
|
||||
# other functions will fail as Ghidra will think that it's reading uninitialized memory
|
||||
bytez = bytez + capa.features.extractors.ghidra.helpers.fix_byte(fbytes.getOriginalByte(i))
|
||||
return bytez
|
||||
|
||||
|
||||
def is_supported_ghidra_version():
|
||||
version = float(getGhidraVersion()[:4]) # type: ignore [name-defined] # noqa: F821
|
||||
@@ -75,7 +84,7 @@ def is_running_headless():
|
||||
|
||||
|
||||
def is_supported_file_type():
|
||||
file_info = currentProgram.getExecutableFormat() # type: ignore [name-defined] # noqa: F821
|
||||
file_info = currentProgram().getExecutableFormat() # type: ignore [name-defined] # noqa: F821
|
||||
if file_info.filetype not in SUPPORTED_FILE_TYPES:
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to be a supported file type.")
|
||||
@@ -90,7 +99,7 @@ def is_supported_file_type():
|
||||
|
||||
|
||||
def is_supported_arch_type():
|
||||
file_info = currentProgram.getLanguageID() # type: ignore [name-defined] # noqa: F821
|
||||
file_info = currentProgram().getLanguageID() # type: ignore [name-defined] # noqa: F821
|
||||
if "x86" not in file_info or not any(arch in file_info for arch in ["32", "64"]):
|
||||
logger.error("-" * 80)
|
||||
logger.error(" Input file does not appear to target a supported architecture.")
|
||||
@@ -102,18 +111,18 @@ def is_supported_arch_type():
|
||||
|
||||
|
||||
def get_file_md5():
|
||||
return currentProgram.getExecutableMD5() # type: ignore [name-defined] # noqa: F821
|
||||
return currentProgram().getExecutableMD5() # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
|
||||
def get_file_sha256():
|
||||
return currentProgram.getExecutableSHA256() # type: ignore [name-defined] # noqa: F821
|
||||
return currentProgram().getExecutableSHA256() # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
|
||||
def collect_metadata(rules: List[Path]):
|
||||
md5 = get_file_md5()
|
||||
sha256 = get_file_sha256()
|
||||
|
||||
info = currentProgram.getLanguageID().toString() # type: ignore [name-defined] # noqa: F821
|
||||
info = currentProgram().getLanguageID().toString() # type: ignore [name-defined] # noqa: F821
|
||||
if "x86" in info and "64" in info:
|
||||
arch = "x86_64"
|
||||
elif "x86" in info and "32" in info:
|
||||
@@ -121,7 +130,7 @@ def collect_metadata(rules: List[Path]):
|
||||
else:
|
||||
arch = "unknown arch"
|
||||
|
||||
format_name: str = currentProgram.getExecutableFormat() # type: ignore [name-defined] # noqa: F821
|
||||
format_name: str = currentProgram().getExecutableFormat() # type: ignore [name-defined] # noqa: F821
|
||||
if "PE" in format_name:
|
||||
os = "windows"
|
||||
elif "ELF" in format_name:
|
||||
@@ -138,15 +147,15 @@ def collect_metadata(rules: List[Path]):
|
||||
md5=md5,
|
||||
sha1="",
|
||||
sha256=sha256,
|
||||
path=currentProgram.getExecutablePath(), # type: ignore [name-defined] # noqa: F821
|
||||
path=currentProgram().getExecutablePath(), # type: ignore [name-defined] # noqa: F821
|
||||
),
|
||||
analysis=rdoc.Analysis(
|
||||
format=currentProgram.getExecutableFormat(), # type: ignore [name-defined] # noqa: F821
|
||||
format=currentProgram().getExecutableFormat(), # type: ignore [name-defined] # noqa: F821
|
||||
arch=arch,
|
||||
os=os,
|
||||
extractor="ghidra",
|
||||
rules=tuple(r.resolve().absolute().as_posix() for r in rules),
|
||||
base_address=capa.features.freeze.Address.from_capa(currentProgram.getImageBase().getOffset()), # type: ignore [name-defined] # noqa: F821
|
||||
base_address=capa.features.freeze.Address.from_capa(currentProgram().getImageBase().getOffset()), # type: ignore [name-defined] # noqa: F821
|
||||
layout=rdoc.Layout(
|
||||
functions=(),
|
||||
),
|
||||
|
||||
@@ -1063,6 +1063,14 @@ FEATURE_COUNT_TESTS_DOTNET = [
|
||||
]
|
||||
|
||||
|
||||
FEATURE_COUNT_TESTS_GHIDRA = [
|
||||
# Ghidra may render functions as labels, as well as provide differing amounts of call references
|
||||
# (Colton) TODO: Add more test cases
|
||||
("mimikatz", "function=0x4702FD", capa.features.common.Characteristic("calls from"), 0),
|
||||
("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("calls to"), 0),
|
||||
]
|
||||
|
||||
|
||||
def do_test_feature_presence(get_extractor, sample, scope, feature, expected):
|
||||
extractor = get_extractor(sample)
|
||||
features = scope(extractor)
|
||||
|
||||
@@ -73,7 +73,9 @@ def test_ghidra_features(sample, scope, feature, expected):
|
||||
|
||||
|
||||
@pytest.mark.skipif(ghidra_present is False, reason="Ghidra tests must be ran within Ghidra")
|
||||
@fixtures.parametrize("sample,scope,feature,expected", fixtures.FEATURE_COUNT_TESTS, indirect=["sample", "scope"])
|
||||
@fixtures.parametrize(
|
||||
"sample,scope,feature,expected", fixtures.FEATURE_COUNT_TESTS_GHIDRA, indirect=["sample", "scope"]
|
||||
)
|
||||
def test_ghidra_feature_counts(sample, scope, feature, expected):
|
||||
try:
|
||||
check_input_file(sample)
|
||||
|
||||
Reference in New Issue
Block a user