mirror of
https://github.com/mandiant/capa.git
synced 2026-04-28 11:53:20 -07:00
Add the ability to select which functions or processes you which to extract capabilities from (#2156)
This commit is contained in:
@@ -3,7 +3,7 @@
|
||||
## master (unreleased)
|
||||
|
||||
### New Features
|
||||
|
||||
- cli: add the ability to select which specific functions or processes to analyze @yelhamer
|
||||
- webui: explore capa analysis results in a web-based UI online and offline #2224 @s-ff
|
||||
- support analyzing DRAKVUF traces #2143 @yelhamer
|
||||
- IDA extractor: extract names from dynamically resolved APIs stored in renamed global variables #2201 @Ana06
|
||||
|
||||
@@ -23,3 +23,15 @@ class UnsupportedOSError(ValueError):
|
||||
|
||||
class EmptyReportError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidArgument(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class NonExistantFunctionError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class NonExistantProcessError(ValueError):
|
||||
pass
|
||||
|
||||
@@ -9,7 +9,9 @@
|
||||
import abc
|
||||
import hashlib
|
||||
import dataclasses
|
||||
from typing import Any, Dict, Tuple, Union, Iterator
|
||||
from copy import copy
|
||||
from types import MethodType
|
||||
from typing import Any, Set, Dict, Tuple, Union, Iterator
|
||||
from dataclasses import dataclass
|
||||
|
||||
# TODO(williballenthin): use typing.TypeAlias directly when Python 3.9 is deprecated
|
||||
@@ -296,6 +298,22 @@ class StaticFeatureExtractor:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def FunctionFilter(extractor: StaticFeatureExtractor, functions: Set) -> StaticFeatureExtractor:
|
||||
original_get_functions = extractor.get_functions
|
||||
|
||||
def filtered_get_functions(self):
|
||||
yield from (f for f in original_get_functions() if f.address in functions)
|
||||
|
||||
# we make a copy of the original extractor object and then update its get_functions() method with the decorated filter one.
|
||||
# this is in order to preserve the original extractor object's get_functions() method, in case it is used elsewhere in the code.
|
||||
# an example where this is important is in our testfiles where we may use the same extractor object with different tests,
|
||||
# with some of these tests needing to install a functions filter on the extractor object.
|
||||
new_extractor = copy(extractor)
|
||||
new_extractor.get_functions = MethodType(filtered_get_functions, extractor) # type: ignore
|
||||
|
||||
return new_extractor
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProcessHandle:
|
||||
"""
|
||||
@@ -467,4 +485,20 @@ class DynamicFeatureExtractor:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def ProcessFilter(extractor: DynamicFeatureExtractor, processes: Set) -> DynamicFeatureExtractor:
|
||||
original_get_processes = extractor.get_processes
|
||||
|
||||
def filtered_get_processes(self):
|
||||
yield from (f for f in original_get_processes() if f.address.pid in processes)
|
||||
|
||||
# we make a copy of the original extractor object and then update its get_processes() method with the decorated filter one.
|
||||
# this is in order to preserve the original extractor object's get_processes() method, in case it is used elsewhere in the code.
|
||||
# an example where this is important is in our testfiles where we may use the same extractor object with different tests,
|
||||
# with some of these tests needing to install a processes filter on the extractor object.
|
||||
new_extractor = copy(extractor)
|
||||
new_extractor.get_processes = MethodType(filtered_get_processes, extractor) # type: ignore
|
||||
|
||||
return new_extractor
|
||||
|
||||
|
||||
FeatureExtractor: TypeAlias = Union[StaticFeatureExtractor, DynamicFeatureExtractor]
|
||||
|
||||
87
capa/main.py
87
capa/main.py
@@ -17,7 +17,7 @@ import argparse
|
||||
import textwrap
|
||||
import contextlib
|
||||
from types import TracebackType
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Set, Dict, List, Optional, TypedDict
|
||||
from pathlib import Path
|
||||
|
||||
import colorama
|
||||
@@ -62,6 +62,7 @@ from capa.helpers import (
|
||||
log_unsupported_drakvuf_report_error,
|
||||
)
|
||||
from capa.exceptions import (
|
||||
InvalidArgument,
|
||||
EmptyReportError,
|
||||
UnsupportedOSError,
|
||||
UnsupportedArchError,
|
||||
@@ -83,9 +84,17 @@ from capa.features.common import (
|
||||
FORMAT_FREEZE,
|
||||
FORMAT_RESULT,
|
||||
FORMAT_DRAKVUF,
|
||||
STATIC_FORMATS,
|
||||
DYNAMIC_FORMATS,
|
||||
)
|
||||
from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities
|
||||
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
|
||||
from capa.features.extractors.base_extractor import (
|
||||
ProcessFilter,
|
||||
FunctionFilter,
|
||||
FeatureExtractor,
|
||||
StaticFeatureExtractor,
|
||||
DynamicFeatureExtractor,
|
||||
)
|
||||
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
|
||||
@@ -106,10 +115,17 @@ E_MISSING_CAPE_STATIC_ANALYSIS = 21
|
||||
E_MISSING_CAPE_DYNAMIC_ANALYSIS = 22
|
||||
E_EMPTY_REPORT = 23
|
||||
E_UNSUPPORTED_GHIDRA_EXECUTION_MODE = 24
|
||||
E_INVALID_INPUT_FORMAT = 25
|
||||
E_INVALID_FEATURE_EXTRACTOR = 26
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
|
||||
|
||||
class FilterConfig(TypedDict, total=False):
|
||||
processes: Set[int]
|
||||
functions: Set[int]
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def timing(msg: str):
|
||||
t0 = time.time()
|
||||
@@ -276,6 +292,22 @@ def install_common_args(parser, wanted=None):
|
||||
help=f"select backend, {backend_help}",
|
||||
)
|
||||
|
||||
if "restrict-to-functions" in wanted:
|
||||
parser.add_argument(
|
||||
"--restrict-to-functions",
|
||||
type=lambda s: s.replace(" ", "").split(","),
|
||||
default=[],
|
||||
help="provide a list of comma-separated function virtual addresses to analyze (static analysis).",
|
||||
)
|
||||
|
||||
if "restrict-to-processes" in wanted:
|
||||
parser.add_argument(
|
||||
"--restrict-to-processes",
|
||||
type=lambda s: s.replace(" ", "").split(","),
|
||||
default=[],
|
||||
help="provide a list of comma-separated process IDs to analyze (dynamic analysis).",
|
||||
)
|
||||
|
||||
if "os" in wanted:
|
||||
oses = [
|
||||
(OS_AUTO, "detect OS automatically - default"),
|
||||
@@ -749,9 +781,10 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
|
||||
|
||||
os_ = get_os_from_cli(args, backend)
|
||||
sample_path = get_sample_path_from_cli(args, backend)
|
||||
extractor_filters = get_extractor_filters_from_cli(args, input_format)
|
||||
|
||||
try:
|
||||
return capa.loader.get_extractor(
|
||||
extractor = capa.loader.get_extractor(
|
||||
args.input_file,
|
||||
input_format,
|
||||
os_,
|
||||
@@ -761,6 +794,7 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
|
||||
disable_progress=args.quiet or args.debug,
|
||||
sample_path=sample_path,
|
||||
)
|
||||
return apply_extractor_filters(extractor, extractor_filters)
|
||||
except UnsupportedFormatError as e:
|
||||
if input_format == FORMAT_CAPE:
|
||||
log_unsupported_cape_report_error(str(e))
|
||||
@@ -780,6 +814,38 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
|
||||
raise ShouldExitError(E_CORRUPT_FILE) from e
|
||||
|
||||
|
||||
def get_extractor_filters_from_cli(args, input_format) -> FilterConfig:
|
||||
if not hasattr(args, "restrict_to_processes") and not hasattr(args, "restrict_to_functions"):
|
||||
# no processes or function filters were installed in the args
|
||||
return {}
|
||||
|
||||
if input_format in STATIC_FORMATS:
|
||||
if args.restrict_to_processes:
|
||||
raise InvalidArgument("Cannot filter processes with static analysis.")
|
||||
return {"functions": {int(addr, 0) for addr in args.restrict_to_functions}}
|
||||
elif input_format in DYNAMIC_FORMATS:
|
||||
if args.restrict_to_functions:
|
||||
raise InvalidArgument("Cannot filter functions with dynamic analysis.")
|
||||
return {"processes": {int(pid, 0) for pid in args.restrict_to_processes}}
|
||||
else:
|
||||
raise ShouldExitError(E_INVALID_INPUT_FORMAT)
|
||||
|
||||
|
||||
def apply_extractor_filters(extractor: FeatureExtractor, extractor_filters: FilterConfig):
|
||||
if not any(extractor_filters.values()):
|
||||
return extractor
|
||||
|
||||
# if the user specified extractor filters, then apply them here
|
||||
if isinstance(extractor, StaticFeatureExtractor):
|
||||
assert extractor_filters["functions"]
|
||||
return FunctionFilter(extractor, extractor_filters["functions"])
|
||||
elif isinstance(extractor, DynamicFeatureExtractor):
|
||||
assert extractor_filters["processes"]
|
||||
return ProcessFilter(extractor, extractor_filters["processes"])
|
||||
else:
|
||||
raise ShouldExitError(E_INVALID_FEATURE_EXTRACTOR)
|
||||
|
||||
|
||||
def main(argv: Optional[List[str]] = None):
|
||||
if sys.version_info < (3, 8):
|
||||
raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+")
|
||||
@@ -819,7 +885,20 @@ def main(argv: Optional[List[str]] = None):
|
||||
parser = argparse.ArgumentParser(
|
||||
description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
|
||||
)
|
||||
install_common_args(parser, {"input_file", "format", "backend", "os", "signatures", "rules", "tag"})
|
||||
install_common_args(
|
||||
parser,
|
||||
{
|
||||
"input_file",
|
||||
"format",
|
||||
"backend",
|
||||
"os",
|
||||
"signatures",
|
||||
"rules",
|
||||
"tag",
|
||||
"restrict-to-functions",
|
||||
"restrict-to-processes",
|
||||
},
|
||||
)
|
||||
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
|
||||
16
doc/usage.md
16
doc/usage.md
@@ -9,6 +9,22 @@ Use the `-t` option to run rules with the given metadata value (see the rule fie
|
||||
For example, `capa -t william.ballenthin@mandiant.com` runs rules that reference Willi's email address (probably as the author), or
|
||||
`capa -t communication` runs rules with the namespace `communication`.
|
||||
|
||||
### only analyze selected functions
|
||||
Use the `--restrict-to-functions` option to extract capabilities from only a selected set of functions. This is useful for analyzing
|
||||
large functions and figuring out their capabilities and their address of occurance; for example: PEB access, RC4 encryption, etc.
|
||||
|
||||
To use this, you can copy the virtual addresses from your favorite disassembler and pass them to capa as follows:
|
||||
`capa sample.exe --restrict-to-functions 0x4019C0,0x401CD0`. If you add the `-v` option then capa will extract the interesting parts of a function for you.
|
||||
|
||||
### only analyze selected processes
|
||||
Use the `--restrict-to-processes` option to extract capabilities from only a selected set of processes. This is useful for filtering the noise
|
||||
generated from analyzing non-malicious processes that can be reported by some sandboxes, as well as reduce the execution time
|
||||
by not analyzing such processes in the first place.
|
||||
|
||||
To use this, you can pick the PIDs of the processes you are interested in from the sandbox-generated process tree (or from the sandbox-reported malware PID)
|
||||
and pass that to capa as follows: `capa report.log --restrict-to-processes 3888,3214,4299`. If you add the `-v` option then capa will tell you
|
||||
which threads perform what actions (encrypt/decrypt data, initiate a connection, etc.).
|
||||
|
||||
### IDA Pro plugin: capa explorer
|
||||
Please check out the [capa explorer documentation](/capa/ida/plugin/README.md).
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
import textwrap
|
||||
|
||||
import capa.capabilities.common
|
||||
from capa.features.extractors.base_extractor import FunctionFilter
|
||||
|
||||
|
||||
def test_match_across_scopes_file_function(z9324d_extractor):
|
||||
@@ -174,6 +175,37 @@ def test_subscope_bb_rules(z9324d_extractor):
|
||||
assert "test rule" in capabilities
|
||||
|
||||
|
||||
def test_match_specific_functions(z9324d_extractor):
|
||||
rules = capa.rules.RuleSet(
|
||||
[
|
||||
capa.rules.Rule.from_yaml(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
rule:
|
||||
meta:
|
||||
name: receive data
|
||||
scopes:
|
||||
static: function
|
||||
dynamic: call
|
||||
examples:
|
||||
- 9324d1a8ae37a36ae560c37448c9705a:0x401CD0
|
||||
features:
|
||||
- or:
|
||||
- api: recv
|
||||
"""
|
||||
)
|
||||
)
|
||||
]
|
||||
)
|
||||
extractor = FunctionFilter(z9324d_extractor, {0x4019C0})
|
||||
capabilities, meta = capa.capabilities.common.find_capabilities(rules, extractor)
|
||||
matches = capabilities["receive data"]
|
||||
# test that we received only one match
|
||||
assert len(matches) == 1
|
||||
# and that this match is from the specified function
|
||||
assert matches[0][0] == 0x4019C0
|
||||
|
||||
|
||||
def test_byte_matching(z9324d_extractor):
|
||||
rules = capa.rules.RuleSet(
|
||||
[
|
||||
|
||||
Reference in New Issue
Block a user