diff --git a/capa/features/extractors/ghidra/extractor.py b/capa/features/extractors/ghidra/extractor.py index 65b06706..b4441bf9 100644 --- a/capa/features/extractors/ghidra/extractor.py +++ b/capa/features/extractors/ghidra/extractor.py @@ -2,15 +2,29 @@ import logging import contextlib from typing import List, Tuple, Iterator +import ghidra + import capa.features.extractors.ghidra.global_ from capa.features.common import Feature from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.extractors.base_extractor import FeatureExtractor +currentProgram: ghidra.program.database.ProgramDB + class GhidraFeatureExtractor(FeatureExtractor): def __init__(self): super().__init__() self.global_features: List[Tuple[Feature, Address]] = [] + self.global_features.extend(capa.features.extractors.ghidra.file.extract_file_format()) self.global_features.extend(capa.features.extractors.ghidra.global_.extract_os()) self.global_features.extend(capa.features.extractors.ghidra.global_.extract_arch()) + + def get_base_address(self): + return AbsoluteVirtualAddress(currentProgram.getImageBase().getOffset()) + + def extract_global_features(self): + yield from self.global_features + + def extract_file_features(self): + yield from capa.features.extractors.ghidra.file.extract_features() diff --git a/capa/features/extractors/ghidra/file.py b/capa/features/extractors/ghidra/file.py index e69de29b..a37d5415 100644 --- a/capa/features/extractors/ghidra/file.py +++ b/capa/features/extractors/ghidra/file.py @@ -0,0 +1,201 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import struct +from typing import Tuple, Iterator + +import ghidra +from ghidra.program.model.symbol import SourceType, SymbolType + +import capa.features.extractors.common +import capa.features.extractors.helpers +import capa.features.extractors.strings +import capa.features.extractors.ghidra.helpers +from capa.features.file import Export, Import, Section, FunctionName +from capa.features.common import FORMAT_PE, FORMAT_ELF, Format, String, Feature, Characteristic +from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress, AbsoluteVirtualAddress + +currentProgram: ghidra.program.database.ProgramDB + +MAX_OFFSET_PE_AFTER_MZ = 0x200 + + +def check_segment_for_pe() -> Iterator[Tuple[int, int]]: + """check segment for embedded PE + + adapted for Ghidra from: + https://github.com/vivisect/vivisect/blob/91e8419a861f4977https://github.com/vivisect/vivisect/blob/91e8419a861f49779f18316f155311967e696836/PE/carve.py#L259f18316f155311967e696836/PE/carve.py#L25 + """ + + mz_xor = [ + ( + capa.features.extractors.helpers.xor_static(b"MZ", i), + capa.features.extractors.helpers.xor_static(b"PE", i), + i, + ) + for i in range(256) + ] + + todo = [] + for mzx, pex, i in mz_xor: + # find all segment offsets containing XOR'd "MZ" bytes + off: ghidra.program.model.address.GenericAddress + for off in capa.features.extractors.ghidra.helpers.find_byte_sequence(mzx): + todo.append((off, mzx, pex, i)) + + seg_max = currentProgram.getMaxAddress() + while len(todo): + off, mzx, pex, i = todo.pop() + + # MZ header has one field we will check e_lfanew is at 0x3c + e_lfanew = off.add(0x3C) + + if seg_max.getOffset() < (e_lfanew.getOffset() + 4): + continue + + e_lfanew_bytes = capa.features.extractors.ghidra.helpers.get_bytes(e_lfanew, 4) + newoff = struct.unpack(" MAX_OFFSET_PE_AFTER_MZ: + continue + + peoff = off.add(newoff) + if seg_max.getOffset() < (peoff.getOffset() + 2): + continue + + pe_bytes = capa.features.extractors.ghidra.helpers.get_bytes(peoff, 2) + if pe_bytes == pex: + yield off.getOffset(), i + + +def extract_file_embedded_pe() -> Iterator[Tuple[Feature, Address]]: + """extract embedded PE features""" + + for ea, _ in check_segment_for_pe(): + yield Characteristic("embedded pe"), FileOffsetAddress(ea) + + +def extract_file_export_names() -> Iterator[Tuple[Feature, Address]]: + """extract function exports""" + st = currentProgram.getSymbolTable() + for addr in st.getExternalEntryPointIterator(): + yield Export(st.getPrimarySymbol(addr).getName()), AbsoluteVirtualAddress(addr.getOffset()) + + +def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]: + """extract function imports + + 1. imports by ordinal: + - modulename.#ordinal + + 2. imports by name, results in two features to support importname-only + matching: + - modulename.importname + - importname + """ + + for f in currentProgram.getFunctionManager().getExternalFunctions(): + for r in f.getSymbol().getReferences(): + if r.getReferenceType().isData(): + addr = r.getFromAddress().getOffset() # gets pointer to fake external addr + + fstr = f.toString().split("::") # format: MODULE.dll::import / MODULE::Ordinal_* + if "Ordinal_" in fstr[1]: + fstr[1] = f"#{fstr[1].split('_')[1]}" + + for name in capa.features.extractors.helpers.generate_symbols(fstr[0][:-4], fstr[1]): + yield Import(name), AbsoluteVirtualAddress(addr) + + +def extract_file_section_names() -> Iterator[Tuple[Feature, Address]]: + """extract section names""" + + for block in currentProgram.getMemory().getBlocks(): + yield Section(block.getName()), AbsoluteVirtualAddress(block.getStart().getOffset()) + + +def extract_file_strings() -> Iterator[Tuple[Feature, Address]]: + """extract ASCII and UTF-16 LE strings""" + + for block in currentProgram.getMemory().getBlocks(): + if block.isInitialized(): + p_bytes = capa.features.extractors.ghidra.helpers.get_block_bytes(block) + if len(p_bytes) == 0: + break + + for s in capa.features.extractors.strings.extract_ascii_strings(p_bytes): + offset = block.getStart().getOffset() + s.offset + yield String(s.s), FileOffsetAddress(offset) + + for s in capa.features.extractors.strings.extract_unicode_strings(p_bytes): + offset = block.getStart().getOffset() + s.offset + yield String(s.s), FileOffsetAddress(offset) + + +def extract_file_function_names() -> Iterator[Tuple[Feature, Address]]: + """ + extract the names of statically-linked library functions. + """ + + for sym in currentProgram.getSymbolTable().getAllSymbols(True): + # .isExternal() misses more than this config for the function symbols + if sym.getSymbolType() == SymbolType.FUNCTION and sym.getSource() == SourceType.ANALYSIS and sym.isGlobal(): + name = sym.getName() # starts to resolve names based on Ghidra's FidDB + if name.startswith("FID_conflict:"): # format: FID_conflict: + name = name[13:] + addr = AbsoluteVirtualAddress(sym.getAddress().getOffset()) + yield FunctionName(name), addr + if name.startswith("_"): + # some linkers may prefix linked routines with a `_` to avoid name collisions. + # extract features for both the mangled and un-mangled representations. + # e.g. `_fwrite` -> `fwrite` + # see: https://stackoverflow.com/a/2628384/87207 + yield FunctionName(name[1:]), addr + + +def extract_file_format() -> Iterator[Tuple[Feature, Address]]: + ef = currentProgram.getExecutableFormat() + if "PE" in ef: + yield Format(FORMAT_PE), NO_ADDRESS + elif "ELF" in ef: + yield Format(FORMAT_ELF), NO_ADDRESS + elif "Raw" in ef: + # no file type to return when processing a binary file, but we want to continue processing + return + else: + raise NotImplementedError(f"unexpected file format: {ef}") + + +def extract_features() -> Iterator[Tuple[Feature, Address]]: + """extract file features""" + for file_handler in FILE_HANDLERS: + for feature, addr in file_handler(): + yield feature, addr + + +FILE_HANDLERS = ( + extract_file_embedded_pe, + extract_file_export_names, + extract_file_import_names, + extract_file_section_names, + extract_file_strings, + extract_file_function_names, + extract_file_format, +) + + +def main(): + """ """ + import pprint + + pprint.pprint(list(extract_features())) + + +if __name__ == "__main__": + main() diff --git a/capa/features/extractors/ghidra/global_.py b/capa/features/extractors/ghidra/global_.py index d4026eaf..4db794c8 100644 --- a/capa/features/extractors/ghidra/global_.py +++ b/capa/features/extractors/ghidra/global_.py @@ -1,3 +1,11 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + import logging import contextlib from io import BytesIO diff --git a/capa/features/extractors/ghidra/helpers.py b/capa/features/extractors/ghidra/helpers.py new file mode 100644 index 00000000..dac11547 --- /dev/null +++ b/capa/features/extractors/ghidra/helpers.py @@ -0,0 +1,60 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +from typing import Any, Dict, Tuple, Iterator, Optional + +import ghidra + +currentProgram: ghidra.program.database.ProgramDB + + +def find_byte_sequence(seq: bytes) -> Iterator[int]: + """yield all ea of a given byte sequence + + args: + seq: bytes to search e.g. b"\x01\x03" + """ + seqstr = "".join([f"\\x{b:02x}" for b in seq]) + # .add(1) to avoid false positives on regular PE files + ea = findBytes(currentProgram.getMinAddress().add(1), seqstr, 1, 1) # type: ignore [name-defined] + for e in ea: + yield e + + +def get_bytes(addr: ghidra.program.model.address.Address, length: int) -> bytes: + """yield length bytes at addr + + args: + addr: Address to begin pull from + length: length of bytes to pull + """ + + bytez = b"" + try: + signed_ints = getBytes(addr, length) # type: ignore [name-defined] + for b in signed_ints: + bytez = bytez + (b & 0xFF).to_bytes(1, "little") + return bytez + except RuntimeError: + return bytez + + +def get_block_bytes(block: ghidra.program.model.mem.MemoryBlock) -> bytes: + """yield all bytes in a given block + + args: + block: MemoryBlock to pull from + """ + + bytez = b"" + try: + signed_ints = getBytes(block.getStart(), block.getEnd().getOffset() - block.getStart().getOffset()) # type: ignore [name-defined] + for b in signed_ints: + bytez = bytez + (b & 0xFF).to_bytes(1, "little") + return bytez + except RuntimeError: + return bytez diff --git a/capa/main.py b/capa/main.py index 65e9c81e..06f097a2 100644 --- a/capa/main.py +++ b/capa/main.py @@ -1004,7 +1004,7 @@ def handle_common_args(args): # if isinstance(sys.stdout, io.TextIOWrapper): # sys.stdout.reconfigure(...) sys.stdout.reconfigure(encoding="utf-8") - colorama.just_fix_windows_console() + colorama.just_fix_windows_console() # type: ignore [attr-defined] if args.color == "always": colorama.init(strip=False) @@ -1341,6 +1341,7 @@ def ida_main(): def ghidra_main(): import capa.rules + import capa.features.extractors.ghidra.file # import capa.render.default # import capa.features.extractors.ghidra.extractor @@ -1357,9 +1358,9 @@ def ghidra_main(): logger.debug(" https://github.com/mandiant/capa-rules") logger.debug("-" * 80) - rules_path = os.path.join(get_default_root(), "rules") - logger.debug("rule path: %s", rules_path) - rules = get_rules([rules_path]) + # rules_path = os.path.join(get_default_root(), "rules") + # logger.debug("rule path: %s", rules_path) + # rules = get_rules([rules_path]) # temp test for OS & ARCH extractions globl_features: List[Tuple[Feature, Address]] = [] @@ -1367,6 +1368,10 @@ def ghidra_main(): globl_features.extend(capa.features.extractors.ghidra.global_.extract_arch()) print(globl_features) + file_features: List[Tuple[Feature, Address]] = [] + file_features.extend(capa.features.extractors.ghidra.file.extract_features()) + print(file_features) + def is_runtime_ida(): try: