capabilities: use dataclasses to represent complicated return types

foo
This commit is contained in:
Willi Ballenthin
2024-12-10 15:58:32 +00:00
committed by Willi Ballenthin
parent 4896ff01d8
commit 8d17319128
21 changed files with 256 additions and 199 deletions

View File

@@ -16,17 +16,28 @@
import logging
import itertools
import collections
from typing import Any
from typing import Optional
from dataclasses import dataclass
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.address import NO_ADDRESS
from capa.render.result_document import LibraryFunction, StaticFeatureCounts, DynamicFeatureCounts
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
logger = logging.getLogger(__name__)
def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
@dataclass
class FileCapabilities:
features: FeatureSet
matches: MatchResults
feature_count: int
def find_file_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet
) -> FileCapabilities:
file_features: FeatureSet = collections.defaultdict(set)
for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()):
@@ -43,8 +54,8 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi
file_features.update(function_features)
_, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
return matches, len(file_features)
features, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS)
return FileCapabilities(features, matches, len(file_features))
def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
@@ -69,9 +80,14 @@ def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalon
return False
def find_capabilities(
ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs
) -> tuple[MatchResults, Any]:
@dataclass
class Capabilities:
matches: MatchResults
feature_counts: StaticFeatureCounts | DynamicFeatureCounts
library_functions: Optional[tuple[LibraryFunction, ...]] = None
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs) -> Capabilities:
from capa.capabilities.static import find_static_capabilities
from capa.capabilities.dynamic import find_dynamic_capabilities

View File

@@ -16,26 +16,30 @@
import logging
import itertools
import collections
from typing import Any
from dataclasses import dataclass
import capa.perf
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.capabilities.common import find_file_capabilities
from capa.capabilities.common import Capabilities, find_file_capabilities
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor
logger = logging.getLogger(__name__)
@dataclass
class CallCapabilities:
features: FeatureSet
matches: MatchResults
def find_call_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> tuple[FeatureSet, MatchResults]:
) -> CallCapabilities:
"""
find matches for the given rules for the given call.
returns: tuple containing (features for call, match results for call)
"""
# all features found for the call.
features: FeatureSet = collections.defaultdict(set)
@@ -53,16 +57,21 @@ def find_call_capabilities(
for addr, _ in res:
capa.engine.index_rule_matches(features, rule, [addr])
return features, matches
return CallCapabilities(features, matches)
@dataclass
class ThreadCapabilities:
features: FeatureSet
thread_matches: MatchResults
call_matches: MatchResults
def find_thread_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
) -> tuple[FeatureSet, MatchResults, MatchResults]:
) -> ThreadCapabilities:
"""
find matches for the given rules within the given thread.
returns: tuple containing (features for thread, match results for thread, match results for calls)
"""
# all features found within this thread,
# includes features found within calls.
@@ -73,11 +82,11 @@ def find_thread_capabilities(
call_matches: MatchResults = collections.defaultdict(list)
for ch in extractor.get_calls(ph, th):
ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in ifeatures.items():
call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in call_capabilities.features.items():
features[feature].update(vas)
for rule_name, res in imatches.items():
for rule_name, res in call_capabilities.matches.items():
call_matches[rule_name].extend(res)
for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()):
@@ -91,16 +100,22 @@ def find_thread_capabilities(
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])
return features, matches, call_matches
return ThreadCapabilities(features, matches, call_matches)
@dataclass
class ProcessCapabilities:
process_matches: MatchResults
thread_matches: MatchResults
call_matches: MatchResults
feature_count: int
def find_process_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle
) -> tuple[MatchResults, MatchResults, MatchResults, int]:
) -> ProcessCapabilities:
"""
find matches for the given rules within the given process.
returns: tuple containing (match results for process, match results for threads, match results for calls, number of features)
"""
# all features found within this process,
# includes features found within threads (and calls).
@@ -115,26 +130,26 @@ def find_process_capabilities(
call_matches: MatchResults = collections.defaultdict(list)
for th in extractor.get_threads(ph):
features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th)
for feature, vas in features.items():
thread_capabilities = find_thread_capabilities(ruleset, extractor, ph, th)
for feature, vas in thread_capabilities.features.items():
process_features[feature].update(vas)
for rule_name, res in tmatches.items():
for rule_name, res in thread_capabilities.thread_matches.items():
thread_matches[rule_name].extend(res)
for rule_name, res in cmatches.items():
for rule_name, res in thread_capabilities.call_matches.items():
call_matches[rule_name].extend(res)
for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()):
process_features[feature].add(va)
_, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address)
return process_matches, thread_matches, call_matches, len(process_features)
return ProcessCapabilities(process_matches, thread_matches, call_matches, len(process_features))
def find_dynamic_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None
) -> tuple[MatchResults, Any]:
) -> Capabilities:
all_process_matches: MatchResults = collections.defaultdict(list)
all_thread_matches: MatchResults = collections.defaultdict(list)
all_call_matches: MatchResults = collections.defaultdict(list)
@@ -150,19 +165,19 @@ def find_dynamic_capabilities(
) as pbar:
task = pbar.add_task("matching", total=n_processes, unit="processes")
for p in processes:
process_matches, thread_matches, call_matches, feature_count = find_process_capabilities(
ruleset, extractor, p
)
process_capabilities = find_process_capabilities(ruleset, extractor, p)
feature_counts.processes += (
rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count),
rdoc.ProcessFeatureCount(
address=frz.Address.from_capa(p.address), count=process_capabilities.feature_count
),
)
logger.debug("analyzed %s and extracted %d features", p.address, feature_count)
logger.debug("analyzed %s and extracted %d features", p.address, process_capabilities.feature_count)
for rule_name, res in process_matches.items():
for rule_name, res in process_capabilities.process_matches.items():
all_process_matches[rule_name].extend(res)
for rule_name, res in thread_matches.items():
for rule_name, res in process_capabilities.thread_matches.items():
all_thread_matches[rule_name].extend(res)
for rule_name, res in call_matches.items():
for rule_name, res in process_capabilities.call_matches.items():
all_call_matches[rule_name].extend(res)
pbar.advance(task)
@@ -177,8 +192,8 @@ def find_dynamic_capabilities(
rule = ruleset[rule_name]
capa.engine.index_rule_matches(process_and_lower_features, rule, locations)
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features)
feature_counts.file = feature_count
all_file_capabilities = find_file_capabilities(ruleset, extractor, process_and_lower_features)
feature_counts.file = all_file_capabilities.feature_count
matches = dict(
itertools.chain(
@@ -187,13 +202,8 @@ def find_dynamic_capabilities(
# and we can merge the dictionaries naively.
all_thread_matches.items(),
all_process_matches.items(),
all_call_matches.items(),
all_file_matches.items(),
all_file_capabilities.matches.items(),
)
)
meta = {
"feature_counts": feature_counts,
}
return matches, meta
return Capabilities(matches, feature_counts)

View File

@@ -17,7 +17,7 @@ import time
import logging
import itertools
import collections
from typing import Any
from dataclasses import dataclass
import capa.perf
import capa.helpers
@@ -25,19 +25,23 @@ import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.capabilities.common import find_file_capabilities
from capa.capabilities.common import Capabilities, find_file_capabilities
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor
logger = logging.getLogger(__name__)
@dataclass
class InstructionCapabilities:
features: FeatureSet
matches: MatchResults
def find_instruction_capabilities(
ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle
) -> tuple[FeatureSet, MatchResults]:
) -> InstructionCapabilities:
"""
find matches for the given rules for the given instruction.
returns: tuple containing (features for instruction, match results for instruction)
"""
# all features found for the instruction.
features: FeatureSet = collections.defaultdict(set)
@@ -55,16 +59,21 @@ def find_instruction_capabilities(
for addr, _ in res:
capa.engine.index_rule_matches(features, rule, [addr])
return features, matches
return InstructionCapabilities(features, matches)
@dataclass
class BasicBlockCapabilities:
features: FeatureSet
basic_block_matches: MatchResults
instruction_matches: MatchResults
def find_basic_block_capabilities(
ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle
) -> tuple[FeatureSet, MatchResults, MatchResults]:
) -> BasicBlockCapabilities:
"""
find matches for the given rules within the given basic block.
returns: tuple containing (features for basic block, match results for basic block, match results for instructions)
"""
# all features found within this basic block,
# includes features found within instructions.
@@ -75,11 +84,11 @@ def find_basic_block_capabilities(
insn_matches: MatchResults = collections.defaultdict(list)
for insn in extractor.get_instructions(f, bb):
ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn)
for feature, vas in ifeatures.items():
instruction_capabilities = find_instruction_capabilities(ruleset, extractor, f, bb, insn)
for feature, vas in instruction_capabilities.features.items():
features[feature].update(vas)
for rule_name, res in imatches.items():
for rule_name, res in instruction_capabilities.matches.items():
insn_matches[rule_name].extend(res)
for feature, va in itertools.chain(
@@ -95,16 +104,20 @@ def find_basic_block_capabilities(
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])
return features, matches, insn_matches
return BasicBlockCapabilities(features, matches, insn_matches)
def find_code_capabilities(
ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle
) -> tuple[MatchResults, MatchResults, MatchResults, int]:
@dataclass
class CodeCapabilities:
function_matches: MatchResults
basic_block_matches: MatchResults
instruction_matches: MatchResults
feature_count: int
def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle) -> CodeCapabilities:
"""
find matches for the given rules within the given function.
returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features)
"""
# all features found within this function,
# includes features found within basic blocks (and instructions).
@@ -119,26 +132,26 @@ def find_code_capabilities(
insn_matches: MatchResults = collections.defaultdict(list)
for bb in extractor.get_basic_blocks(fh):
features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb)
for feature, vas in features.items():
basic_block_capabilities = find_basic_block_capabilities(ruleset, extractor, fh, bb)
for feature, vas in basic_block_capabilities.features.items():
function_features[feature].update(vas)
for rule_name, res in bmatches.items():
for rule_name, res in basic_block_capabilities.basic_block_matches.items():
bb_matches[rule_name].extend(res)
for rule_name, res in imatches.items():
for rule_name, res in basic_block_capabilities.instruction_matches.items():
insn_matches[rule_name].extend(res)
for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()):
function_features[feature].add(va)
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address)
return function_matches, bb_matches, insn_matches, len(function_features)
return CodeCapabilities(function_matches, bb_matches, insn_matches, len(function_features))
def find_static_capabilities(
ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None
) -> tuple[MatchResults, Any]:
) -> Capabilities:
all_function_matches: MatchResults = collections.defaultdict(list)
all_bb_matches: MatchResults = collections.defaultdict(list)
all_insn_matches: MatchResults = collections.defaultdict(list)
@@ -172,30 +185,36 @@ def find_static_capabilities(
pbar.advance(task)
continue
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(ruleset, extractor, f)
code_capabilities = find_code_capabilities(ruleset, extractor, f)
feature_counts.functions += (
rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count),
rdoc.FunctionFeatureCount(
address=frz.Address.from_capa(f.address), count=code_capabilities.feature_count
),
)
t1 = time.time()
match_count = 0
for name, matches_ in itertools.chain(function_matches.items(), bb_matches.items(), insn_matches.items()):
for name, matches_ in itertools.chain(
code_capabilities.function_matches.items(),
code_capabilities.basic_block_matches.items(),
code_capabilities.instruction_matches.items(),
):
if not ruleset.rules[name].is_subscope_rule():
match_count += len(matches_)
logger.debug(
"analyzed function 0x%x and extracted %d features, %d matches in %0.02fs",
f.address,
feature_count,
code_capabilities.feature_count,
match_count,
t1 - t0,
)
for rule_name, res in function_matches.items():
for rule_name, res in code_capabilities.function_matches.items():
all_function_matches[rule_name].extend(res)
for rule_name, res in bb_matches.items():
for rule_name, res in code_capabilities.basic_block_matches.items():
all_bb_matches[rule_name].extend(res)
for rule_name, res in insn_matches.items():
for rule_name, res in code_capabilities.instruction_matches.items():
all_insn_matches[rule_name].extend(res)
pbar.advance(task)
@@ -210,8 +229,8 @@ def find_static_capabilities(
rule = ruleset[rule_name]
capa.engine.index_rule_matches(function_and_lower_features, rule, locations)
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
feature_counts.file = feature_count
all_file_capabilities = find_file_capabilities(ruleset, extractor, function_and_lower_features)
feature_counts.file = all_file_capabilities.feature_count
matches: MatchResults = dict(
itertools.chain(
@@ -221,13 +240,8 @@ def find_static_capabilities(
all_insn_matches.items(),
all_bb_matches.items(),
all_function_matches.items(),
all_file_matches.items(),
all_file_capabilities.matches.items(),
)
)
meta = {
"feature_counts": feature_counts,
"library_functions": library_functions,
}
return matches, meta
return Capabilities(matches, feature_counts, library_functions)

View File

@@ -245,13 +245,13 @@ def get_capabilities():
meta = capa.ghidra.helpers.collect_metadata([rules_path])
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True)
capabilities = capa.capabilities.common.find_capabilities(rules, extractor, True)
if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False):
if capa.capabilities.common.has_file_limitation(rules, capabilities.matches, is_standalone=False):
popup("capa explorer encountered warnings during analysis. Please check the console output for more information.") # type: ignore [name-defined] # noqa: F821
logger.info("capa encountered warnings during analysis")
return capa.render.json.render(meta, rules, capabilities)
return capa.render.json.render(meta, rules, capabilities.matches)
def get_locations(match_dict):

View File

@@ -81,23 +81,23 @@ def run_headless():
meta = capa.ghidra.helpers.collect_metadata([rules_path])
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, False)
capabilities = capa.capabilities.common.find_capabilities(rules, extractor, False)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
meta.analysis.feature_counts = capabilities.feature_counts
meta.analysis.library_functions = capabilities.library_functions
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)
if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True):
if capa.capabilities.common.has_file_limitation(rules, capabilities.matches, is_standalone=True):
logger.info("capa encountered warnings during analysis")
if args.json:
print(capa.render.json.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.json.render(meta, rules, capabilities.matches)) # noqa: T201
elif args.vverbose:
print(capa.render.vverbose.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.vverbose.render(meta, rules, capabilities.matches)) # noqa: T201
elif args.verbose:
print(capa.render.verbose.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.verbose.render(meta, rules, capabilities.matches)) # noqa: T201
else:
print(capa.render.default.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.default.render(meta, rules, capabilities.matches)) # noqa: T201
return 0
@@ -131,21 +131,21 @@ def run_ui():
meta = capa.ghidra.helpers.collect_metadata([rules_path])
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True)
capabilities = capa.capabilities.common.find_capabilities(rules, extractor, True)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
meta.analysis.feature_counts = capabilities.feature_counts
meta.analysis.library_functions = capabilities.library_functions
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)
if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False):
if capa.capabilities.common.has_file_limitation(rules, capabilities.matches, is_standalone=False):
logger.info("capa encountered warnings during analysis")
if verbose == "vverbose":
print(capa.render.vverbose.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.vverbose.render(meta, rules, capabilities.matches)) # noqa: T201
elif verbose == "verbose":
print(capa.render.verbose.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.verbose.render(meta, rules, capabilities.matches)) # noqa: T201
else:
print(capa.render.default.render(meta, rules, capabilities)) # noqa: T201
print(capa.render.default.render(meta, rules, capabilities.matches)) # noqa: T201
return 0

View File

@@ -776,13 +776,15 @@ class CapaExplorerForm(idaapi.PluginForm):
try:
meta = capa.ida.helpers.collect_metadata([Path(settings.user[CAPA_SETTINGS_RULE_PATH])])
capabilities, counts = capa.capabilities.common.find_capabilities(
capabilities = capa.capabilities.common.find_capabilities(
ruleset, self.feature_extractor, disable_progress=True
)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.loader.compute_layout(ruleset, self.feature_extractor, capabilities)
meta.analysis.feature_counts = capabilities.feature_counts
meta.analysis.library_functions = capabilities.library_functions
meta.analysis.layout = capa.loader.compute_layout(
ruleset, self.feature_extractor, capabilities.matches
)
except UserCancelledError:
logger.info("User cancelled analysis.")
return False
@@ -818,7 +820,7 @@ class CapaExplorerForm(idaapi.PluginForm):
capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")
if capa.capabilities.common.has_file_limitation(ruleset, capabilities, is_standalone=False):
if capa.capabilities.common.has_file_limitation(ruleset, capabilities.matches, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
except Exception as e:
logger.exception("Failed to check for file limitations (error: %s)", e)
@@ -832,7 +834,7 @@ class CapaExplorerForm(idaapi.PluginForm):
try:
self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(
meta, ruleset, capabilities
meta, ruleset, capabilities.matches
)
except Exception as e:
logger.exception("Failed to collect results (error: %s)", e)

View File

@@ -59,6 +59,7 @@ from capa.features.common import (
FORMAT_BINEXPORT2,
)
from capa.features.address import Address
from capa.capabilities.common import Capabilities
from capa.features.extractors.base_extractor import (
SampleHashes,
FeatureExtractor,
@@ -450,7 +451,7 @@ def get_signatures(sigs_path: Path) -> list[Path]:
return paths
def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts):
def get_sample_analysis(format_, arch, os_, extractor, rules_path, feature_counts, library_functions):
if isinstance(extractor, StaticFeatureExtractor):
return rdoc.StaticAnalysis(
format=format_,
@@ -466,8 +467,8 @@ def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts):
#
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
),
feature_counts=counts["feature_counts"],
library_functions=counts["library_functions"],
feature_counts=feature_counts,
library_functions=library_functions,
)
elif isinstance(extractor, DynamicFeatureExtractor):
return rdoc.DynamicAnalysis(
@@ -479,7 +480,7 @@ def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts):
layout=rdoc.DynamicLayout(
processes=(),
),
feature_counts=counts["feature_counts"],
feature_counts=feature_counts,
)
else:
raise ValueError("invalid extractor type")
@@ -492,7 +493,7 @@ def collect_metadata(
os_: str,
rules_path: list[Path],
extractor: FeatureExtractor,
counts: dict,
capabilities: Capabilities,
) -> rdoc.Metadata:
# if it's a binary sample we hash it, if it's a report
# we fetch the hashes from the report
@@ -535,7 +536,8 @@ def collect_metadata(
os_,
extractor,
rules,
counts,
capabilities.feature_counts,
capabilities.library_functions,
),
)

View File

@@ -22,7 +22,7 @@ import argparse
import textwrap
import contextlib
from types import TracebackType
from typing import Any, Optional, TypedDict
from typing import Optional, TypedDict
from pathlib import Path
import colorama
@@ -47,7 +47,6 @@ import capa.render.result_document
import capa.render.result_document as rdoc
import capa.features.extractors.common
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.loader import (
BACKEND_IDA,
BACKEND_VIV,
@@ -100,7 +99,7 @@ from capa.features.common import (
FORMAT_BINJA_DB,
FORMAT_BINEXPORT2,
)
from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities
from capa.capabilities.common import Capabilities, find_capabilities, has_file_limitation, find_file_capabilities
from capa.features.extractors.base_extractor import (
ProcessFilter,
FunctionFilter,
@@ -761,7 +760,7 @@ def find_file_limitations_from_cli(args, rules: RuleSet, file_extractors: list[F
found_file_limitation = False
for file_extractor in file_extractors:
try:
pure_file_capabilities, _ = find_file_capabilities(rules, file_extractor, {})
pure_file_capabilities = find_file_capabilities(rules, file_extractor, {})
except PEFormatError as e:
logger.error("Input file '%s' is not a valid PE file: %s", args.input_file, str(e))
raise ShouldExitError(E_CORRUPT_FILE) from e
@@ -771,7 +770,7 @@ def find_file_limitations_from_cli(args, rules: RuleSet, file_extractors: list[F
# file limitations that rely on non-file scope won't be detected here.
# nor on FunctionName features, because pefile doesn't support this.
found_file_limitation = has_file_limitation(rules, pure_file_capabilities)
found_file_limitation = has_file_limitation(rules, pure_file_capabilities.matches)
if found_file_limitation:
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
@@ -974,8 +973,7 @@ def main(argv: Optional[list[str]] = None):
return e.status_code
meta: rdoc.Metadata
capabilities: MatchResults
counts: dict[str, Any]
capabilities: Capabilities
if input_format == FORMAT_RESULT:
# result document directly parses into meta, capabilities
@@ -997,10 +995,12 @@ def main(argv: Optional[list[str]] = None):
except ShouldExitError as e:
return e.status_code
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
capabilities = find_capabilities(rules, extractor, disable_progress=args.quiet)
meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
meta = capa.loader.collect_metadata(
argv, args.input_file, input_format, os_, args.rules, extractor, capabilities
)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)
if isinstance(extractor, StaticFeatureExtractor) and found_file_limitation:
# bail if capa's static feature extractor encountered file limitation e.g. a packed binary
@@ -1009,13 +1009,13 @@ def main(argv: Optional[list[str]] = None):
return E_FILE_LIMITATION
if args.json:
print(capa.render.json.render(meta, rules, capabilities))
print(capa.render.json.render(meta, rules, capabilities.matches))
elif args.vverbose:
print(capa.render.vverbose.render(meta, rules, capabilities))
print(capa.render.vverbose.render(meta, rules, capabilities.matches))
elif args.verbose:
print(capa.render.verbose.render(meta, rules, capabilities))
print(capa.render.verbose.render(meta, rules, capabilities.matches))
else:
print(capa.render.default.render(meta, rules, capabilities))
print(capa.render.default.render(meta, rules, capabilities.matches))
colorama.deinit()
logger.debug("done.")
@@ -1051,16 +1051,16 @@ def ida_main():
meta = capa.ida.helpers.collect_metadata([rules_path])
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
capabilities = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.feature_counts = capabilities.feature_counts
meta.analysis.library_functions = capabilities.library_functions
if has_file_limitation(rules, capabilities, is_standalone=False):
if has_file_limitation(rules, capabilities.matches, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")
colorama.init(strip=True)
print(capa.render.default.render(meta, rules, capabilities))
print(capa.render.default.render(meta, rules, capabilities.matches))
def ghidra_main():
@@ -1085,19 +1085,19 @@ def ghidra_main():
meta = capa.ghidra.helpers.collect_metadata([rules_path])
capabilities, counts = find_capabilities(
capabilities = find_capabilities(
rules,
capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor(),
not capa.ghidra.helpers.is_running_headless(),
)
meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.feature_counts = capabilities.feature_counts
meta.analysis.library_functions = capabilities.library_functions
if has_file_limitation(rules, capabilities, is_standalone=False):
if has_file_limitation(rules, capabilities.matches, is_standalone=False):
logger.info("capa encountered warnings during analysis")
print(capa.render.default.render(meta, rules, capabilities))
print(capa.render.default.render(meta, rules, capabilities.matches))
if __name__ == "__main__":

View File

@@ -15,7 +15,7 @@
import datetime
import collections
from enum import Enum
from typing import Union, Literal, Optional, TypeAlias
from typing import TYPE_CHECKING, Union, Literal, Optional, TypeAlias
from pathlib import Path
from pydantic import Field, BaseModel, ConfigDict
@@ -30,6 +30,9 @@ from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.helpers import assert_never, load_json_from_path
if TYPE_CHECKING:
from capa.capabilities.common import Capabilities
class FrozenModel(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
@@ -674,8 +677,10 @@ class ResultDocument(FrozenModel):
return ResultDocument(meta=meta, rules=rule_matches)
def to_capa(self) -> tuple[Metadata, dict]:
capabilities: dict[str, list[tuple[capa.features.address.Address, capa.features.common.Result]]] = (
def to_capa(self) -> tuple[Metadata, "Capabilities"]:
from capa.capabilities.common import Capabilities
matches: dict[str, list[tuple[capa.features.address.Address, capa.features.common.Result]]] = (
collections.defaultdict(list)
)
@@ -688,7 +693,14 @@ class ResultDocument(FrozenModel):
for addr, match in rule_match.matches:
result: capa.engine.Result = match.to_capa(rules_by_name)
capabilities[rule_name].append((addr.to_capa(), result))
matches[rule_name].append((addr.to_capa(), result))
if isinstance(self.meta.analysis, StaticAnalysis):
capabilities = Capabilities(
matches, self.meta.analysis.feature_counts, self.meta.analysis.library_functions
)
elif isinstance(self.meta.analysis, DynamicAnalysis):
capabilities = Capabilities(matches, self.meta.analysis.feature_counts)
return self.meta, capabilities

2
rules

Submodule rules updated: b4e0c8cdf8...6cb2ec010b

View File

@@ -146,12 +146,12 @@ def get_capa_results(args):
"error": f"unexpected error: {e}",
}
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
capabilities = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
meta = capa.loader.collect_metadata(argv, args.input_file, format_, os_, [], extractor, counts)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
meta = capa.loader.collect_metadata(argv, args.input_file, format_, os_, [], extractor, capabilities)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities.matches)
return {"path": input_file, "status": "ok", "ok": doc.model_dump()}

View File

@@ -184,25 +184,25 @@ def capa_details(rules_path: Path, input_file: Path, output_format="dictionary")
extractor = capa.loader.get_extractor(
input_file, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], should_save_workspace=False, disable_progress=True
)
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
capabilities = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
# collect metadata (used only to make rendering more complete)
meta = capa.loader.collect_metadata([], input_file, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
meta = capa.loader.collect_metadata([], input_file, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, capabilities)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)
capa_output: Any = False
if output_format == "dictionary":
# ...as python dictionary, simplified as textable but in dictionary
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities.matches)
capa_output = render_dictionary(doc)
elif output_format == "json":
# render results
# ...as json
capa_output = json.loads(capa.render.json.render(meta, rules, capabilities))
capa_output = json.loads(capa.render.json.render(meta, rules, capabilities.matches))
elif output_format == "texttable":
# ...as human readable text table
capa_output = capa.render.default.render(meta, rules, capabilities)
capa_output = capa.render.default.render(meta, rules, capabilities.matches)
return capa_output

View File

@@ -100,12 +100,12 @@ def main(argv=None):
except capa.main.ShouldExitError as e:
return e.status_code
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor)
capabilities = capa.capabilities.common.find_capabilities(rules, extractor)
meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, capabilities)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities.matches)
pb = capa.render.proto.doc_to_pb2(doc)
sys.stdout.buffer.write(pb.SerializeToString(deterministic=True))

View File

@@ -91,7 +91,7 @@ def main():
return -2
rows = []
for name in capabilities.keys():
for name in capabilities.matches.keys():
rule = result_doc.rules[name]
if rule.meta.lib:
continue

View File

@@ -357,10 +357,10 @@ def get_sample_capabilities(ctx: Context, path: Path) -> set[str]:
disable_progress=True,
)
capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True)
capabilities = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True)
# mypy doesn't seem to be happy with the MatchResults type alias & set(...keys())?
# so we ignore a few types here.
capabilities = set(capabilities.keys()) # type: ignore
capabilities = set(capabilities.matches.keys()) # type: ignore
assert isinstance(capabilities, set)
logger.debug("computed results: %s: %d capabilities", nice_path, len(capabilities))

View File

@@ -155,18 +155,18 @@ def main(argv=None):
except capa.main.ShouldExitError as e:
return e.status_code
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor)
capabilities = capa.capabilities.common.find_capabilities(rules, extractor)
meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, capabilities)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)
if capa.capabilities.common.has_file_limitation(rules, capabilities):
if capa.capabilities.common.has_file_limitation(rules, capabilities.matches):
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
return capa.main.E_FILE_LIMITATION
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities.matches)
print(render_matches_by_function(doc))
colorama.deinit()

View File

@@ -82,10 +82,10 @@ def test_match_across_scopes_file_function(z9324d_extractor):
),
]
)
capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "install service" in capabilities
assert ".text section" in capabilities
assert ".text section and install service" in capabilities
capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "install service" in capabilities.matches
assert ".text section" in capabilities.matches
assert ".text section and install service" in capabilities.matches
def test_match_across_scopes(z9324d_extractor):
@@ -150,10 +150,10 @@ def test_match_across_scopes(z9324d_extractor):
),
]
)
capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "tight loop" in capabilities
assert "kill thread loop" in capabilities
assert "kill thread program" in capabilities
capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "tight loop" in capabilities.matches
assert "kill thread loop" in capabilities.matches
assert "kill thread program" in capabilities.matches
def test_subscope_bb_rules(z9324d_extractor):
@@ -178,8 +178,8 @@ def test_subscope_bb_rules(z9324d_extractor):
]
)
# tight loop at 0x403685
capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "test rule" in capabilities
capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "test rule" in capabilities.matches
def test_match_specific_functions(z9324d_extractor):
@@ -205,8 +205,8 @@ def test_match_specific_functions(z9324d_extractor):
]
)
extractor = FunctionFilter(z9324d_extractor, {0x4019C0})
capabilities, meta = capa.capabilities.common.find_capabilities(rules, extractor)
matches = capabilities["receive data"]
capabilities = capa.capabilities.common.find_capabilities(rules, extractor)
matches = capabilities.matches["receive data"]
# test that we received only one match
assert len(matches) == 1
# and that this match is from the specified function
@@ -233,8 +233,8 @@ def test_byte_matching(z9324d_extractor):
)
]
)
capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "byte match test" in capabilities
capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "byte match test" in capabilities.matches
def test_com_feature_matching(z395eb_extractor):
@@ -259,8 +259,8 @@ def test_com_feature_matching(z395eb_extractor):
)
]
)
capabilities, meta = capa.main.find_capabilities(rules, z395eb_extractor)
assert "initialize IWebBrowser2" in capabilities
capabilities = capa.main.find_capabilities(rules, z395eb_extractor)
assert "initialize IWebBrowser2" in capabilities.matches
def test_count_bb(z9324d_extractor):
@@ -284,8 +284,8 @@ def test_count_bb(z9324d_extractor):
)
]
)
capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "count bb" in capabilities
capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "count bb" in capabilities.matches
def test_instruction_scope(z9324d_extractor):
@@ -311,9 +311,9 @@ def test_instruction_scope(z9324d_extractor):
)
]
)
capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "push 1000" in capabilities
assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]}
capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "push 1000" in capabilities.matches
assert 0x4071A4 in {result[0] for result in capabilities.matches["push 1000"]}
def test_instruction_subscope(z9324d_extractor):
@@ -343,6 +343,6 @@ def test_instruction_subscope(z9324d_extractor):
)
]
)
capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "push 1000 on i386" in capabilities
assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]}
capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor)
assert "push 1000 on i386" in capabilities.matches
assert 0x406F60 in {result[0] for result in capabilities.matches["push 1000 on i386"]}

View File

@@ -125,8 +125,8 @@ def test_null_feature_extractor():
),
]
)
capabilities, _ = capa.main.find_capabilities(rules, EXTRACTOR)
assert "create file" in capabilities
capabilities = capa.main.find_capabilities(rules, EXTRACTOR)
assert "create file" in capabilities.matches
def compare_extractors(a: DynamicFeatureExtractor, b: DynamicFeatureExtractor):

View File

@@ -107,8 +107,8 @@ def test_null_feature_extractor():
),
]
)
capabilities, meta = capa.main.find_capabilities(rules, EXTRACTOR)
assert "xor loop" in capabilities
capabilities = capa.main.find_capabilities(rules, EXTRACTOR)
assert "xor loop" in capabilities.matches
def compare_extractors(a, b):

View File

@@ -21,6 +21,7 @@ import capa
import capa.engine as ceng
import capa.render.result_document as rdoc
import capa.features.freeze.features as frzf
from capa.capabilities.common import Capabilities
def test_optional_node_from_capa():
@@ -289,4 +290,4 @@ def test_rdoc_to_capa():
meta, capabilites = rd.to_capa()
assert isinstance(meta, rdoc.Metadata)
assert isinstance(capabilites, dict)
assert isinstance(capabilites, Capabilities)