Add Ghidra File Feature Extraction (#1564)

Implement Ghidra backend file feature extraction
This commit is contained in:
Colton Gabertan
2023-07-06 17:05:08 -07:00
committed by GitHub
parent 1f631b3ed1
commit 51ffb1d75c
5 changed files with 292 additions and 4 deletions

View File

@@ -2,15 +2,29 @@ import logging
import contextlib
from typing import List, Tuple, Iterator
import ghidra
import capa.features.extractors.ghidra.global_
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import FeatureExtractor
currentProgram: ghidra.program.database.ProgramDB
class GhidraFeatureExtractor(FeatureExtractor):
def __init__(self):
super().__init__()
self.global_features: List[Tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.ghidra.file.extract_file_format())
self.global_features.extend(capa.features.extractors.ghidra.global_.extract_os())
self.global_features.extend(capa.features.extractors.ghidra.global_.extract_arch())
def get_base_address(self):
return AbsoluteVirtualAddress(currentProgram.getImageBase().getOffset())
def extract_global_features(self):
yield from self.global_features
def extract_file_features(self):
yield from capa.features.extractors.ghidra.file.extract_features()

View File

@@ -0,0 +1,201 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import struct
from typing import Tuple, Iterator
import ghidra
from ghidra.program.model.symbol import SourceType, SymbolType
import capa.features.extractors.common
import capa.features.extractors.helpers
import capa.features.extractors.strings
import capa.features.extractors.ghidra.helpers
from capa.features.file import Export, Import, Section, FunctionName
from capa.features.common import FORMAT_PE, FORMAT_ELF, Format, String, Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress, AbsoluteVirtualAddress
currentProgram: ghidra.program.database.ProgramDB
MAX_OFFSET_PE_AFTER_MZ = 0x200
def check_segment_for_pe() -> Iterator[Tuple[int, int]]:
"""check segment for embedded PE
adapted for Ghidra from:
https://github.com/vivisect/vivisect/blob/91e8419a861f4977https://github.com/vivisect/vivisect/blob/91e8419a861f49779f18316f155311967e696836/PE/carve.py#L259f18316f155311967e696836/PE/carve.py#L25
"""
mz_xor = [
(
capa.features.extractors.helpers.xor_static(b"MZ", i),
capa.features.extractors.helpers.xor_static(b"PE", i),
i,
)
for i in range(256)
]
todo = []
for mzx, pex, i in mz_xor:
# find all segment offsets containing XOR'd "MZ" bytes
off: ghidra.program.model.address.GenericAddress
for off in capa.features.extractors.ghidra.helpers.find_byte_sequence(mzx):
todo.append((off, mzx, pex, i))
seg_max = currentProgram.getMaxAddress()
while len(todo):
off, mzx, pex, i = todo.pop()
# MZ header has one field we will check e_lfanew is at 0x3c
e_lfanew = off.add(0x3C)
if seg_max.getOffset() < (e_lfanew.getOffset() + 4):
continue
e_lfanew_bytes = capa.features.extractors.ghidra.helpers.get_bytes(e_lfanew, 4)
newoff = struct.unpack("<I", capa.features.extractors.helpers.xor_static(e_lfanew_bytes, i))[0]
# assume XOR'd "PE" bytes exist within threshold
if newoff > MAX_OFFSET_PE_AFTER_MZ:
continue
peoff = off.add(newoff)
if seg_max.getOffset() < (peoff.getOffset() + 2):
continue
pe_bytes = capa.features.extractors.ghidra.helpers.get_bytes(peoff, 2)
if pe_bytes == pex:
yield off.getOffset(), i
def extract_file_embedded_pe() -> Iterator[Tuple[Feature, Address]]:
"""extract embedded PE features"""
for ea, _ in check_segment_for_pe():
yield Characteristic("embedded pe"), FileOffsetAddress(ea)
def extract_file_export_names() -> Iterator[Tuple[Feature, Address]]:
"""extract function exports"""
st = currentProgram.getSymbolTable()
for addr in st.getExternalEntryPointIterator():
yield Export(st.getPrimarySymbol(addr).getName()), AbsoluteVirtualAddress(addr.getOffset())
def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]:
"""extract function imports
1. imports by ordinal:
- modulename.#ordinal
2. imports by name, results in two features to support importname-only
matching:
- modulename.importname
- importname
"""
for f in currentProgram.getFunctionManager().getExternalFunctions():
for r in f.getSymbol().getReferences():
if r.getReferenceType().isData():
addr = r.getFromAddress().getOffset() # gets pointer to fake external addr
fstr = f.toString().split("::") # format: MODULE.dll::import / MODULE::Ordinal_*
if "Ordinal_" in fstr[1]:
fstr[1] = f"#{fstr[1].split('_')[1]}"
for name in capa.features.extractors.helpers.generate_symbols(fstr[0][:-4], fstr[1]):
yield Import(name), AbsoluteVirtualAddress(addr)
def extract_file_section_names() -> Iterator[Tuple[Feature, Address]]:
"""extract section names"""
for block in currentProgram.getMemory().getBlocks():
yield Section(block.getName()), AbsoluteVirtualAddress(block.getStart().getOffset())
def extract_file_strings() -> Iterator[Tuple[Feature, Address]]:
"""extract ASCII and UTF-16 LE strings"""
for block in currentProgram.getMemory().getBlocks():
if block.isInitialized():
p_bytes = capa.features.extractors.ghidra.helpers.get_block_bytes(block)
if len(p_bytes) == 0:
break
for s in capa.features.extractors.strings.extract_ascii_strings(p_bytes):
offset = block.getStart().getOffset() + s.offset
yield String(s.s), FileOffsetAddress(offset)
for s in capa.features.extractors.strings.extract_unicode_strings(p_bytes):
offset = block.getStart().getOffset() + s.offset
yield String(s.s), FileOffsetAddress(offset)
def extract_file_function_names() -> Iterator[Tuple[Feature, Address]]:
"""
extract the names of statically-linked library functions.
"""
for sym in currentProgram.getSymbolTable().getAllSymbols(True):
# .isExternal() misses more than this config for the function symbols
if sym.getSymbolType() == SymbolType.FUNCTION and sym.getSource() == SourceType.ANALYSIS and sym.isGlobal():
name = sym.getName() # starts to resolve names based on Ghidra's FidDB
if name.startswith("FID_conflict:"): # format: FID_conflict:<function-name>
name = name[13:]
addr = AbsoluteVirtualAddress(sym.getAddress().getOffset())
yield FunctionName(name), addr
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield FunctionName(name[1:]), addr
def extract_file_format() -> Iterator[Tuple[Feature, Address]]:
ef = currentProgram.getExecutableFormat()
if "PE" in ef:
yield Format(FORMAT_PE), NO_ADDRESS
elif "ELF" in ef:
yield Format(FORMAT_ELF), NO_ADDRESS
elif "Raw" in ef:
# no file type to return when processing a binary file, but we want to continue processing
return
else:
raise NotImplementedError(f"unexpected file format: {ef}")
def extract_features() -> Iterator[Tuple[Feature, Address]]:
"""extract file features"""
for file_handler in FILE_HANDLERS:
for feature, addr in file_handler():
yield feature, addr
FILE_HANDLERS = (
extract_file_embedded_pe,
extract_file_export_names,
extract_file_import_names,
extract_file_section_names,
extract_file_strings,
extract_file_function_names,
extract_file_format,
)
def main():
""" """
import pprint
pprint.pprint(list(extract_features()))
if __name__ == "__main__":
main()

View File

@@ -1,3 +1,11 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import contextlib
from io import BytesIO

View File

@@ -0,0 +1,60 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Any, Dict, Tuple, Iterator, Optional
import ghidra
currentProgram: ghidra.program.database.ProgramDB
def find_byte_sequence(seq: bytes) -> Iterator[int]:
"""yield all ea of a given byte sequence
args:
seq: bytes to search e.g. b"\x01\x03"
"""
seqstr = "".join([f"\\x{b:02x}" for b in seq])
# .add(1) to avoid false positives on regular PE files
ea = findBytes(currentProgram.getMinAddress().add(1), seqstr, 1, 1) # type: ignore [name-defined]
for e in ea:
yield e
def get_bytes(addr: ghidra.program.model.address.Address, length: int) -> bytes:
"""yield length bytes at addr
args:
addr: Address to begin pull from
length: length of bytes to pull
"""
bytez = b""
try:
signed_ints = getBytes(addr, length) # type: ignore [name-defined]
for b in signed_ints:
bytez = bytez + (b & 0xFF).to_bytes(1, "little")
return bytez
except RuntimeError:
return bytez
def get_block_bytes(block: ghidra.program.model.mem.MemoryBlock) -> bytes:
"""yield all bytes in a given block
args:
block: MemoryBlock to pull from
"""
bytez = b""
try:
signed_ints = getBytes(block.getStart(), block.getEnd().getOffset() - block.getStart().getOffset()) # type: ignore [name-defined]
for b in signed_ints:
bytez = bytez + (b & 0xFF).to_bytes(1, "little")
return bytez
except RuntimeError:
return bytez

View File

@@ -1004,7 +1004,7 @@ def handle_common_args(args):
# if isinstance(sys.stdout, io.TextIOWrapper):
# sys.stdout.reconfigure(...)
sys.stdout.reconfigure(encoding="utf-8")
colorama.just_fix_windows_console()
colorama.just_fix_windows_console() # type: ignore [attr-defined]
if args.color == "always":
colorama.init(strip=False)
@@ -1341,6 +1341,7 @@ def ida_main():
def ghidra_main():
import capa.rules
import capa.features.extractors.ghidra.file
# import capa.render.default
# import capa.features.extractors.ghidra.extractor
@@ -1357,9 +1358,9 @@ def ghidra_main():
logger.debug(" https://github.com/mandiant/capa-rules")
logger.debug("-" * 80)
rules_path = os.path.join(get_default_root(), "rules")
logger.debug("rule path: %s", rules_path)
rules = get_rules([rules_path])
# rules_path = os.path.join(get_default_root(), "rules")
# logger.debug("rule path: %s", rules_path)
# rules = get_rules([rules_path])
# temp test for OS & ARCH extractions
globl_features: List[Tuple[Feature, Address]] = []
@@ -1367,6 +1368,10 @@ def ghidra_main():
globl_features.extend(capa.features.extractors.ghidra.global_.extract_arch())
print(globl_features)
file_features: List[Tuple[Feature, Address]] = []
file_features.extend(capa.features.extractors.ghidra.file.extract_features())
print(file_features)
def is_runtime_ida():
try: