style: ruff format changed files

This commit is contained in:
Willi Ballenthin
2026-04-22 19:03:46 +03:00
committed by Willi Ballenthin
parent a834c4c0a7
commit e745fa6aab
18 changed files with 369 additions and 77 deletions

View File

@@ -21,8 +21,16 @@ from dataclasses import dataclass
from capa.rules import Rule, Scope, RuleSet from capa.rules import Rule, Scope, RuleSet
from capa.engine import FeatureSet, MatchResults from capa.engine import FeatureSet, MatchResults
from capa.features.address import NO_ADDRESS from capa.features.address import NO_ADDRESS
from capa.render.result_document import LibraryFunction, StaticFeatureCounts, DynamicFeatureCounts from capa.render.result_document import (
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor LibraryFunction,
StaticFeatureCounts,
DynamicFeatureCounts,
)
from capa.features.extractors.base_extractor import (
FeatureExtractor,
StaticFeatureExtractor,
DynamicFeatureExtractor,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -23,7 +23,13 @@ from dataclasses import dataclass
import capa.features.address import capa.features.address
from capa.features.common import Feature from capa.features.common import Feature
from capa.features.address import Address, ThreadAddress, ProcessAddress, DynamicCallAddress, AbsoluteVirtualAddress from capa.features.address import (
Address,
ThreadAddress,
ProcessAddress,
DynamicCallAddress,
AbsoluteVirtualAddress,
)
# feature extractors may reference functions, BBs, insns by opaque handle values. # feature extractors may reference functions, BBs, insns by opaque handle values.
# you can use the `.address` property to get and render the address of the feature. # you can use the `.address` property to get and render the address of the feature.
@@ -119,7 +125,9 @@ class StaticFeatureExtractor(abc.ABC):
self._sample_hashes = hashes self._sample_hashes = hashes
@abc.abstractmethod @abc.abstractmethod
def get_base_address(self) -> Union[AbsoluteVirtualAddress, capa.features.address._NoAddress]: def get_base_address(
self,
) -> Union[AbsoluteVirtualAddress, capa.features.address._NoAddress]:
""" """
fetch the preferred load address at which the sample was analyzed. fetch the preferred load address at which the sample was analyzed.

View File

@@ -116,7 +116,10 @@ def extract_arch(buf) -> Iterator[tuple[Feature, Address]]:
# rules that rely on arch conditions will fail to match on shellcode. # rules that rely on arch conditions will fail to match on shellcode.
# #
# for (2), this logic will need to be updated as the format is implemented. # for (2), this logic will need to be updated as the format is implemented.
logger.debug("unsupported file format: %s, will not guess Arch", binascii.hexlify(buf[:4]).decode("ascii")) logger.debug(
"unsupported file format: %s, will not guess Arch",
binascii.hexlify(buf[:4]).decode("ascii"),
)
return return
@@ -148,5 +151,8 @@ def extract_os(buf, os=OS_AUTO) -> Iterator[tuple[Feature, Address]]:
# rules that rely on OS conditions will fail to match on shellcode. # rules that rely on OS conditions will fail to match on shellcode.
# #
# for (2), this logic will need to be updated as the format is implemented. # for (2), this logic will need to be updated as the format is implemented.
logger.debug("unsupported file format: %s, will not guess OS", binascii.hexlify(buf[:4]).decode("ascii")) logger.debug(
"unsupported file format: %s, will not guess OS",
binascii.hexlify(buf[:4]).decode("ascii"),
)
return return

View File

@@ -114,7 +114,10 @@ def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Cla
typedefnamespace, typedefname = resolve_nested_typedef_name(nested_class_table, rid, typedef, pe) typedefnamespace, typedefname = resolve_nested_typedef_name(nested_class_table, rid, typedef, pe)
token = calculate_dotnet_token_value(dnfile.mdtable.TypeDef.number, rid) token = calculate_dotnet_token_value(dnfile.mdtable.TypeDef.number, rid)
yield Class(DnType.format_name(typedefname, namespace=typedefnamespace)), DNTokenAddress(token) yield (
Class(DnType.format_name(typedefname, namespace=typedefnamespace)),
DNTokenAddress(token),
)
for rid, typeref in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number): for rid, typeref in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number):
# emit external .NET classes # emit external .NET classes
@@ -123,7 +126,10 @@ def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Cla
typerefnamespace, typerefname = resolve_nested_typeref_name(typeref.ResolutionScope.row_index, typeref, pe) typerefnamespace, typerefname = resolve_nested_typeref_name(typeref.ResolutionScope.row_index, typeref, pe)
token = calculate_dotnet_token_value(dnfile.mdtable.TypeRef.number, rid) token = calculate_dotnet_token_value(dnfile.mdtable.TypeRef.number, rid)
yield Class(DnType.format_name(typerefname, namespace=typerefnamespace)), DNTokenAddress(token) yield (
Class(DnType.format_name(typerefname, namespace=typerefnamespace)),
DNTokenAddress(token),
)
def extract_file_os(**kwargs) -> Iterator[tuple[OS, Address]]: def extract_file_os(**kwargs) -> Iterator[tuple[OS, Address]]:

View File

@@ -181,7 +181,12 @@ class ELF:
else: else:
raise NotImplementedError() raise NotImplementedError()
logger.debug("e_phoff: 0x%02x e_phentsize: 0x%02x e_phnum: %d", e_phoff, self.e_phentsize, self.e_phnum) logger.debug(
"e_phoff: 0x%02x e_phentsize: 0x%02x e_phnum: %d",
e_phoff,
self.e_phentsize,
self.e_phnum,
)
self.f.seek(e_phoff) self.f.seek(e_phoff)
program_header_size = self.e_phnum * self.e_phentsize program_header_size = self.e_phnum * self.e_phentsize
@@ -362,13 +367,31 @@ class ELF:
shent = self.shbuf[shent_offset : shent_offset + self.e_shentsize] shent = self.shbuf[shent_offset : shent_offset + self.e_shentsize]
if self.bitness == 32: if self.bitness == 32:
sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, _, _, sh_entsize = struct.unpack_from( (
self.endian + "IIIIIIIIII", shent, 0x0 sh_name,
) sh_type,
sh_flags,
sh_addr,
sh_offset,
sh_size,
sh_link,
_,
_,
sh_entsize,
) = struct.unpack_from(self.endian + "IIIIIIIIII", shent, 0x0)
elif self.bitness == 64: elif self.bitness == 64:
sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, _, _, sh_entsize = struct.unpack_from( (
self.endian + "IIQQQQIIQQ", shent, 0x0 sh_name,
) sh_type,
sh_flags,
sh_addr,
sh_offset,
sh_size,
sh_link,
_,
_,
sh_entsize,
) = struct.unpack_from(self.endian + "IIQQQQIIQQ", shent, 0x0)
else: else:
raise NotImplementedError() raise NotImplementedError()
@@ -377,7 +400,17 @@ class ELF:
if len(buf) != sh_size: if len(buf) != sh_size:
raise ValueError("failed to read section header content") raise ValueError("failed to read section header content")
return Shdr(sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, sh_entsize, buf) return Shdr(
sh_name,
sh_type,
sh_flags,
sh_addr,
sh_offset,
sh_size,
sh_link,
sh_entsize,
buf,
)
@property @property
def section_headers(self): def section_headers(self):
@@ -596,7 +629,12 @@ class PHNote:
name_offset = 0xC name_offset = 0xC
self.desc_offset = name_offset + align(namesz, 0x4) self.desc_offset = name_offset + align(namesz, 0x4)
logger.debug("ph:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, self.descsz, self.type_) logger.debug(
"ph:namesz: 0x%02x descsz: 0x%02x type: 0x%04x",
namesz,
self.descsz,
self.type_,
)
self.name = self.buf[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii") self.name = self.buf[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii")
logger.debug("name: %s", self.name) logger.debug("name: %s", self.name)
@@ -623,7 +661,13 @@ class PHNote:
if not os: if not os:
return None return None
logger.debug("abi tag: %s earliest compatible kernel: %d.%d.%d", os, kmajor, kminor, kpatch) logger.debug(
"abi tag: %s earliest compatible kernel: %d.%d.%d",
os,
kmajor,
kminor,
kpatch,
)
return ABITag(os, kmajor, kminor, kpatch) return ABITag(os, kmajor, kminor, kpatch)
@@ -645,7 +689,12 @@ class SHNote:
name_offset = 0xC name_offset = 0xC
self.desc_offset = name_offset + align(namesz, 0x4) self.desc_offset = name_offset + align(namesz, 0x4)
logger.debug("sh:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, self.descsz, self.type_) logger.debug(
"sh:namesz: 0x%02x descsz: 0x%02x type: 0x%04x",
namesz,
self.descsz,
self.type_,
)
name_buf = self.buf[name_offset : name_offset + namesz] name_buf = self.buf[name_offset : name_offset + namesz]
self.name = read_cstr(name_buf, 0x0) self.name = read_cstr(name_buf, 0x0)
@@ -667,7 +716,13 @@ class SHNote:
if not os: if not os:
return None return None
logger.debug("abi tag: %s earliest compatible kernel: %d.%d.%d", os, kmajor, kminor, kpatch) logger.debug(
"abi tag: %s earliest compatible kernel: %d.%d.%d",
os,
kmajor,
kminor,
kpatch,
)
return ABITag(os, kmajor, kminor, kpatch) return ABITag(os, kmajor, kminor, kpatch)
@@ -752,7 +807,8 @@ class SymTab:
strtab_section = elf.sections[section.sh_link] strtab_section = elf.sections[section.sh_link]
sh_symtab = Shdr.from_viv(section, elf.readAtOffset(section.sh_offset, section.sh_size)) sh_symtab = Shdr.from_viv(section, elf.readAtOffset(section.sh_offset, section.sh_size))
sh_strtab = Shdr.from_viv( sh_strtab = Shdr.from_viv(
strtab_section, elf.readAtOffset(strtab_section.sh_offset, strtab_section.sh_size) strtab_section,
elf.readAtOffset(strtab_section.sh_offset, strtab_section.sh_size),
) )
if sh_symtab is None or sh_strtab is None: if sh_symtab is None or sh_strtab is None:
@@ -1105,7 +1161,12 @@ def guess_os_from_go_buildinfo(elf: ELF) -> Optional[OS]:
assert psize in (4, 8) assert psize in (4, 8)
is_big_endian = flags & 0b01 is_big_endian = flags & 0b01
has_inline_strings = flags & 0b10 has_inline_strings = flags & 0b10
logger.debug("go buildinfo: psize: %d big endian: %s inline: %s", psize, is_big_endian, has_inline_strings) logger.debug(
"go buildinfo: psize: %d big endian: %s inline: %s",
psize,
is_big_endian,
has_inline_strings,
)
GOOS_TO_OS = { GOOS_TO_OS = {
b"aix": OS.AIX, b"aix": OS.AIX,
@@ -1466,7 +1527,12 @@ def guess_os_from_vdso_strings(elf: ELF) -> Optional[OS]:
("x86/32", b"__vdso_time", b"LINUX_2.6"), ("x86/32", b"__vdso_time", b"LINUX_2.6"),
): ):
if symbol in buf and version in buf: if symbol in buf and version in buf:
logger.debug("vdso string: %s %s %s", arch, symbol.decode("ascii"), version.decode("ascii")) logger.debug(
"vdso string: %s %s %s",
arch,
symbol.decode("ascii"),
version.decode("ascii"),
)
return OS.LINUX return OS.LINUX
return None return None

View File

@@ -39,7 +39,11 @@ def extract_file_export_names(elf: ELFFile, **kwargs):
logger.debug("Symbol table '%s' has a sh_entsize of zero!", section.name) logger.debug("Symbol table '%s' has a sh_entsize of zero!", section.name)
continue continue
logger.debug("Symbol table '%s' contains %s entries:", section.name, section.num_symbols()) logger.debug(
"Symbol table '%s' contains %s entries:",
section.name,
section.num_symbols(),
)
for symbol in section.iter_symbols(): for symbol in section.iter_symbols():
# The following conditions are based on the following article # The following conditions are based on the following article

View File

@@ -97,7 +97,10 @@ class Address(HashableModel):
return cls(type=AddressType.THREAD, value=(a.process.ppid, a.process.pid, a.tid)) return cls(type=AddressType.THREAD, value=(a.process.ppid, a.process.pid, a.tid))
elif isinstance(a, capa.features.address.DynamicCallAddress): elif isinstance(a, capa.features.address.DynamicCallAddress):
return cls(type=AddressType.CALL, value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id)) return cls(
type=AddressType.CALL,
value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id),
)
elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress): elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress):
return cls(type=AddressType.NO_ADDRESS, value=None) return cls(type=AddressType.NO_ADDRESS, value=None)
@@ -146,7 +149,8 @@ class Address(HashableModel):
assert isinstance(pid, int) assert isinstance(pid, int)
assert isinstance(tid, int) assert isinstance(tid, int)
return capa.features.address.ThreadAddress( return capa.features.address.ThreadAddress(
process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), tid=tid process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid),
tid=tid,
) )
elif self.type is AddressType.CALL: elif self.type is AddressType.CALL:
@@ -154,7 +158,8 @@ class Address(HashableModel):
ppid, pid, tid, id_ = self.value ppid, pid, tid, id_ = self.value
return capa.features.address.DynamicCallAddress( return capa.features.address.DynamicCallAddress(
thread=capa.features.address.ThreadAddress( thread=capa.features.address.ThreadAddress(
process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), tid=tid process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid),
tid=tid,
), ),
id=id_, id=id_,
) )

View File

@@ -309,7 +309,11 @@ def log_unsupported_vmray_report_error(error: str):
def log_empty_sandbox_report_error(error: str, sandbox_name: str): def log_empty_sandbox_report_error(error: str, sandbox_name: str):
logger.error("-" * 80) logger.error("-" * 80)
logger.error(" %s report is empty or only contains little useful data: %s", sandbox_name, error) logger.error(
" %s report is empty or only contains little useful data: %s",
sandbox_name,
error,
)
logger.error(" ") logger.error(" ")
logger.error(" Please make sure the sandbox run captures useful behaviour of your sample.") logger.error(" Please make sure the sandbox run captures useful behaviour of your sample.")
logger.error("-" * 80) logger.error("-" * 80)
@@ -391,7 +395,10 @@ def is_cache_newer_than_rule_code(cache_dir: Path) -> bool:
import capa.rules import capa.rules
import capa.rules.cache import capa.rules.cache
latest_rule_code_file = max([Path(capa.rules.__file__), Path(capa.rules.cache.__file__)], key=os.path.getmtime) latest_rule_code_file = max(
[Path(capa.rules.__file__), Path(capa.rules.cache.__file__)],
key=os.path.getmtime,
)
rule_code_timestamp = Path(latest_rule_code_file).stat().st_mtime rule_code_timestamp = Path(latest_rule_code_file).stat().st_mtime
if rule_code_timestamp > cache_timestamp: if rule_code_timestamp > cache_timestamp:

View File

@@ -33,7 +33,11 @@ import capa.render.result_document as rdoc
import capa.features.extractors.common import capa.features.extractors.common
from capa.rules import RuleSet from capa.rules import RuleSet
from capa.engine import MatchResults from capa.engine import MatchResults
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError from capa.exceptions import (
UnsupportedOSError,
UnsupportedArchError,
UnsupportedFormatError,
)
from capa.features.common import ( from capa.features.common import (
OS_AUTO, OS_AUTO,
FORMAT_PE, FORMAT_PE,
@@ -410,7 +414,9 @@ def get_extractor(
# -1 - Generic errors (database already open, auto-analysis failed, etc.) # -1 - Generic errors (database already open, auto-analysis failed, etc.)
# -2 - User cancelled operation # -2 - User cancelled operation
ret = idapro.open_database( ret = idapro.open_database(
str(input_path), run_auto_analysis=True, args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R" str(input_path),
run_auto_analysis=True,
args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R",
) )
if ret != 0: if ret != 0:
raise RuntimeError("failed to analyze input file") raise RuntimeError("failed to analyze input file")

View File

@@ -239,10 +239,16 @@ def install_common_args(parser, wanted=None):
parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__)) parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__))
parser.add_argument( parser.add_argument(
"-v", "--verbose", action="store_true", help="enable verbose result document (no effect with --json)" "-v",
"--verbose",
action="store_true",
help="enable verbose result document (no effect with --json)",
) )
parser.add_argument( parser.add_argument(
"-vv", "--vverbose", action="store_true", help="enable very verbose result document (no effect with --json)" "-vv",
"--vverbose",
action="store_true",
help="enable very verbose result document (no effect with --json)",
) )
parser.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") parser.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
parser.add_argument("-q", "--quiet", action="store_true", help="disable all output but errors") parser.add_argument("-q", "--quiet", action="store_true", help="disable all output but errors")
@@ -857,7 +863,13 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
""" """
sig_paths = get_signatures_from_cli(args, input_format, backend) sig_paths = get_signatures_from_cli(args, input_format, backend)
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in (
"0",
"no",
"NO",
"n",
None,
)
os_ = get_os_from_cli(args, backend) os_ = get_os_from_cli(args, backend)
sample_path = get_sample_path_from_cli(args, backend) sample_path = get_sample_path_from_cli(args, backend)
@@ -968,7 +980,9 @@ def main(argv: Optional[list[str]] = None):
""") """)
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter description=desc,
epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
) )
install_common_args( install_common_args(
parser, parser,

View File

@@ -21,7 +21,14 @@ logger = logging.getLogger(__name__)
def get_node_cost(node): def get_node_cost(node):
if isinstance(node, (capa.features.common.OS, capa.features.common.Arch, capa.features.common.Format)): if isinstance(
node,
(
capa.features.common.OS,
capa.features.common.Arch,
capa.features.common.Format,
),
):
# we assume these are the most restrictive features: # we assume these are the most restrictive features:
# authors commonly use them at the start of rules to restrict the category of samples to inspect # authors commonly use them at the start of rules to restrict the category of samples to inspect
return 0 return 0
@@ -32,7 +39,14 @@ def get_node_cost(node):
# this should be all hash-lookup features. # this should be all hash-lookup features.
# see below. # see below.
elif isinstance(node, (capa.features.common.Substring, capa.features.common.Regex, capa.features.common.Bytes)): elif isinstance(
node,
(
capa.features.common.Substring,
capa.features.common.Regex,
capa.features.common.Bytes,
),
):
# substring and regex features require a full scan of each string # substring and regex features require a full scan of each string
# which we anticipate is more expensive then a hash lookup feature (e.g. mnemonic or count). # which we anticipate is more expensive then a hash lookup feature (e.g. mnemonic or count).
# #

View File

@@ -55,7 +55,11 @@ def hanging_indent(s: str, indent: int) -> str:
def render_locations( def render_locations(
console: Console, layout: rd.Layout, locations: Iterable[frz.Address], indent: int, use_short_format: bool = False console: Console,
layout: rd.Layout,
locations: Iterable[frz.Address],
indent: int,
use_short_format: bool = False,
): ):
""" """
Render the given locations, such as virtual address or pid/tid/callid with process name. Render the given locations, such as virtual address or pid/tid/callid with process name.
@@ -117,7 +121,13 @@ def render_locations(
raise RuntimeError("unreachable") raise RuntimeError("unreachable")
def render_statement(console: Console, layout: rd.Layout, match: rd.Match, statement: rd.Statement, indent: int): def render_statement(
console: Console,
layout: rd.Layout,
match: rd.Match,
statement: rd.Statement,
indent: int,
):
console.write(" " * indent) console.write(" " * indent)
if isinstance(statement, rd.SubscopeStatement): if isinstance(statement, rd.SubscopeStatement):
@@ -191,7 +201,12 @@ def render_string_value(s: str) -> str:
def render_feature( def render_feature(
console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, feature: frzf.Feature, indent: int console: Console,
layout: rd.Layout,
rule: rd.RuleMatches,
match: rd.Match,
feature: frzf.Feature,
indent: int,
): ):
console.write(" " * indent) console.write(" " * indent)
@@ -220,7 +235,13 @@ def render_feature(
value = render_string_value(value) value = render_string_value(value)
elif isinstance( elif isinstance(
feature, (frzf.NumberFeature, frzf.OffsetFeature, frzf.OperandNumberFeature, frzf.OperandOffsetFeature) feature,
(
frzf.NumberFeature,
frzf.OffsetFeature,
frzf.OperandNumberFeature,
frzf.OperandOffsetFeature,
),
): ):
assert isinstance(value, int) assert isinstance(value, int)
value = capa.helpers.hex(value) value = capa.helpers.hex(value)
@@ -275,7 +296,14 @@ def render_feature(
console.writeln() console.writeln()
def render_node(console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, node: rd.Node, indent: int): def render_node(
console: Console,
layout: rd.Layout,
rule: rd.RuleMatches,
match: rd.Match,
node: rd.Node,
indent: int,
):
if isinstance(node, rd.StatementNode): if isinstance(node, rd.StatementNode):
render_statement(console, layout, match, node.statement, indent=indent) render_statement(console, layout, match, node.statement, indent=indent)
elif isinstance(node, rd.FeatureNode): elif isinstance(node, rd.FeatureNode):
@@ -293,7 +321,12 @@ MODE_FAILURE = "failure"
def render_match( def render_match(
console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, indent=0, mode=MODE_SUCCESS console: Console,
layout: rd.Layout,
rule: rd.RuleMatches,
match: rd.Match,
indent=0,
mode=MODE_SUCCESS,
): ):
child_mode = mode child_mode = mode
if mode == MODE_SUCCESS: if mode == MODE_SUCCESS:
@@ -449,7 +482,10 @@ def render_rules(console: Console, doc: rd.ResultDocument):
rows.append(("scope", rule.meta.scopes.dynamic.value)) rows.append(("scope", rule.meta.scopes.dynamic.value))
if rule.meta.attack: if rule.meta.attack:
rows.append(("att&ck", ", ".join([rutils.format_parts_id(v) for v in rule.meta.attack]))) rows.append((
"att&ck",
", ".join([rutils.format_parts_id(v) for v in rule.meta.attack]),
))
if rule.meta.mbc: if rule.meta.mbc:
rows.append(("mbc", ", ".join([rutils.format_parts_id(v) for v in rule.meta.mbc]))) rows.append(("mbc", ", ".join([rutils.format_parts_id(v) for v in rule.meta.mbc])))
@@ -502,9 +538,19 @@ def render_rules(console: Console, doc: rd.ResultDocument):
console.write(v.render_thread(doc.meta.analysis.layout, location)) console.write(v.render_thread(doc.meta.analysis.layout, location))
elif rule.meta.scopes.dynamic == capa.rules.Scope.SPAN_OF_CALLS: elif rule.meta.scopes.dynamic == capa.rules.Scope.SPAN_OF_CALLS:
calls = sorted(set(collect_span_of_calls_locations(match))) calls = sorted(set(collect_span_of_calls_locations(match)))
console.write(hanging_indent(v.render_span_of_calls(doc.meta.analysis.layout, calls), indent=1)) console.write(
hanging_indent(
v.render_span_of_calls(doc.meta.analysis.layout, calls),
indent=1,
)
)
elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL: elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL:
console.write(hanging_indent(v.render_call(doc.meta.analysis.layout, location), indent=1)) console.write(
hanging_indent(
v.render_call(doc.meta.analysis.layout, location),
indent=1,
)
)
else: else:
capa.helpers.assert_never(rule.meta.scopes.dynamic) capa.helpers.assert_never(rule.meta.scopes.dynamic)

View File

@@ -639,21 +639,35 @@ def build_statements(d, scopes: Scopes):
key = list(d.keys())[0] key = list(d.keys())[0]
description = pop_statement_description_entry(d[key]) description = pop_statement_description_entry(d[key])
if key == "and": if key == "and":
return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description) return ceng.And(
unique(build_statements(dd, scopes) for dd in d[key]),
description=description,
)
elif key == "or": elif key == "or":
return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description) return ceng.Or(
unique(build_statements(dd, scopes) for dd in d[key]),
description=description,
)
elif key == "not": elif key == "not":
if len(d[key]) != 1: if len(d[key]) != 1:
raise InvalidRule("not statement must have exactly one child statement") raise InvalidRule("not statement must have exactly one child statement")
return ceng.Not(build_statements(d[key][0], scopes), description=description) return ceng.Not(build_statements(d[key][0], scopes), description=description)
elif key.endswith(" or more"): elif key.endswith(" or more"):
count = int(key[: -len("or more")]) count = int(key[: -len("or more")])
return ceng.Some(count, unique(build_statements(dd, scopes) for dd in d[key]), description=description) return ceng.Some(
count,
unique(build_statements(dd, scopes) for dd in d[key]),
description=description,
)
elif key == "optional": elif key == "optional":
# `optional` is an alias for `0 or more` # `optional` is an alias for `0 or more`
# which is useful for documenting behaviors, # which is useful for documenting behaviors,
# like with `write file`, we might say that `WriteFile` is optionally found alongside `CreateFileA`. # like with `write file`, we might say that `WriteFile` is optionally found alongside `CreateFileA`.
return ceng.Some(0, unique(build_statements(dd, scopes) for dd in d[key]), description=description) return ceng.Some(
0,
unique(build_statements(dd, scopes) for dd in d[key]),
description=description,
)
elif key == "process": elif key == "process":
if not is_subscope_compatible(scopes.dynamic, Scope.PROCESS): if not is_subscope_compatible(scopes.dynamic, Scope.PROCESS):
@@ -663,7 +677,9 @@ def build_statements(d, scopes: Scopes):
raise InvalidRule("subscope must have exactly one child statement") raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope( return ceng.Subscope(
Scope.PROCESS, build_statements(d[key][0], Scopes(dynamic=Scope.PROCESS)), description=description Scope.PROCESS,
build_statements(d[key][0], Scopes(dynamic=Scope.PROCESS)),
description=description,
) )
elif key == "thread": elif key == "thread":
@@ -674,7 +690,9 @@ def build_statements(d, scopes: Scopes):
raise InvalidRule("subscope must have exactly one child statement") raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope( return ceng.Subscope(
Scope.THREAD, build_statements(d[key][0], Scopes(dynamic=Scope.THREAD)), description=description Scope.THREAD,
build_statements(d[key][0], Scopes(dynamic=Scope.THREAD)),
description=description,
) )
elif key == "span of calls": elif key == "span of calls":
@@ -698,7 +716,9 @@ def build_statements(d, scopes: Scopes):
raise InvalidRule("subscope must have exactly one child statement") raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope( return ceng.Subscope(
Scope.CALL, build_statements(d[key][0], Scopes(dynamic=Scope.CALL)), description=description Scope.CALL,
build_statements(d[key][0], Scopes(dynamic=Scope.CALL)),
description=description,
) )
elif key == "function": elif key == "function":
@@ -709,7 +729,9 @@ def build_statements(d, scopes: Scopes):
raise InvalidRule("subscope must have exactly one child statement") raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope( return ceng.Subscope(
Scope.FUNCTION, build_statements(d[key][0], Scopes(static=Scope.FUNCTION)), description=description Scope.FUNCTION,
build_statements(d[key][0], Scopes(static=Scope.FUNCTION)),
description=description,
) )
elif key == "basic block": elif key == "basic block":
@@ -720,7 +742,9 @@ def build_statements(d, scopes: Scopes):
raise InvalidRule("subscope must have exactly one child statement") raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope( return ceng.Subscope(
Scope.BASIC_BLOCK, build_statements(d[key][0], Scopes(static=Scope.BASIC_BLOCK)), description=description Scope.BASIC_BLOCK,
build_statements(d[key][0], Scopes(static=Scope.BASIC_BLOCK)),
description=description,
) )
elif key == "instruction": elif key == "instruction":
@@ -1589,7 +1613,14 @@ class RuleSet:
# Other numbers are assumed to be uncommon. # Other numbers are assumed to be uncommon.
return 7 return 7
elif isinstance(node, (capa.features.common.Substring, capa.features.common.Regex, capa.features.common.Bytes)): elif isinstance(
node,
(
capa.features.common.Substring,
capa.features.common.Regex,
capa.features.common.Bytes,
),
):
# Scanning features (non-hashable), which we can't use for quick matching/filtering. # Scanning features (non-hashable), which we can't use for quick matching/filtering.
return 0 return 0
@@ -1831,20 +1862,37 @@ class RuleSet:
string_features = [ string_features = [
feature feature
for feature in features for feature in features
if isinstance(feature, (capa.features.common.Substring, capa.features.common.Regex)) if isinstance(
feature,
(capa.features.common.Substring, capa.features.common.Regex),
)
] ]
hashable_features = [ hashable_features = [
feature feature
for feature in features for feature in features
if not isinstance( if not isinstance(
feature, (capa.features.common.Substring, capa.features.common.Regex, capa.features.common.Bytes) feature,
(
capa.features.common.Substring,
capa.features.common.Regex,
capa.features.common.Bytes,
),
) )
] ]
logger.debug("indexing: features: %d, score: %d, rule: %s", len(features), score, rule_name) logger.debug(
"indexing: features: %d, score: %d, rule: %s",
len(features),
score,
rule_name,
)
scores_by_rule[rule_name] = score scores_by_rule[rule_name] = score
for feature in features: for feature in features:
logger.debug(" : [%d] %s", RuleSet._score_feature(scores_by_rule, feature), feature) logger.debug(
" : [%d] %s",
RuleSet._score_feature(scores_by_rule, feature),
feature,
)
if string_features: if string_features:
string_rules[rule_name] = cast(list[Feature], string_features) string_rules[rule_name] = cast(list[Feature], string_features)
@@ -1871,7 +1919,9 @@ class RuleSet:
len([feature for feature, rules in rules_by_feature.items() if len(rules) > 3]), len([feature for feature, rules in rules_by_feature.items() if len(rules) > 3]),
) )
logger.debug( logger.debug(
"indexing: %d scanning string features, %d scanning bytes features", len(string_rules), bytes_rules_count "indexing: %d scanning string features, %d scanning bytes features",
len(string_rules),
bytes_rules_count,
) )
return RuleSet._RuleFeatureIndex(rules_by_feature, string_rules, dict(bytes_prefix_index)) return RuleSet._RuleFeatureIndex(rules_by_feature, string_rules, dict(bytes_prefix_index))
@@ -1934,13 +1984,23 @@ class RuleSet:
for rule in rules: for rule in rules:
for k, v in rule.meta.items(): for k, v in rule.meta.items():
if isinstance(v, str) and tag in v: if isinstance(v, str) and tag in v:
logger.debug('using rule "%s" and dependencies, found tag in meta.%s: %s', rule.name, k, v) logger.debug(
'using rule "%s" and dependencies, found tag in meta.%s: %s',
rule.name,
k,
v,
)
rules_filtered.update(set(get_rules_and_dependencies(rules, rule.name))) rules_filtered.update(set(get_rules_and_dependencies(rules, rule.name)))
break break
if isinstance(v, list): if isinstance(v, list):
for vv in v: for vv in v:
if tag in vv: if tag in vv:
logger.debug('using rule "%s" and dependencies, found tag in meta.%s: %s', rule.name, k, vv) logger.debug(
'using rule "%s" and dependencies, found tag in meta.%s: %s',
rule.name,
k,
vv,
)
rules_filtered.update(set(get_rules_and_dependencies(rules, rule.name))) rules_filtered.update(set(get_rules_and_dependencies(rules, rule.name)))
break break
return RuleSet(list(rules_filtered)) return RuleSet(list(rules_filtered))

2
rules

Submodule rules updated: 2af9fbfc1c...03a20f69ae

View File

@@ -17,7 +17,13 @@ import pytest
import capa.features.address import capa.features.address
from capa.engine import Or, And, Not, Some, Range from capa.engine import Or, And, Not, Some, Range
from capa.features.insn import Number from capa.features.insn import Number
from capa.features.address import ThreadAddress, ProcessAddress, DynamicCallAddress, DNTokenOffsetAddress, AbsoluteVirtualAddress from capa.features.address import (
ThreadAddress,
ProcessAddress,
DynamicCallAddress,
DNTokenOffsetAddress,
AbsoluteVirtualAddress,
)
ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001) ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001)
ADDR2 = capa.features.address.AbsoluteVirtualAddress(0x401002) ADDR2 = capa.features.address.AbsoluteVirtualAddress(0x401002)
@@ -138,7 +144,10 @@ def test_some():
def test_complex(): def test_complex():
assert True is bool( assert True is bool(
Or([And([Number(1), Number(2)]), Or([Number(3), Some(2, [Number(4), Number(5), Number(6)])])]).evaluate({ Or([
And([Number(1), Number(2)]),
Or([Number(3), Some(2, [Number(4), Number(5), Number(6)])]),
]).evaluate({
Number(5): {ADDR1}, Number(5): {ADDR1},
Number(6): {ADDR1}, Number(6): {ADDR1},
Number(7): {ADDR1}, Number(7): {ADDR1},
@@ -147,7 +156,10 @@ def test_complex():
) )
assert False is bool( assert False is bool(
Or([And([Number(1), Number(2)]), Or([Number(3), Some(2, [Number(4), Number(5)])])]).evaluate({ Or([
And([Number(1), Number(2)]),
Or([Number(3), Some(2, [Number(4), Number(5)])]),
]).evaluate({
Number(5): {ADDR1}, Number(5): {ADDR1},
Number(6): {ADDR1}, Number(6): {ADDR1},
Number(7): {ADDR1}, Number(7): {ADDR1},

View File

@@ -29,7 +29,11 @@ import capa.features.extractors.null
import capa.features.freeze.features import capa.features.freeze.features
import capa.features.extractors.base_extractor import capa.features.extractors.base_extractor
from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, SampleHashes, FunctionHandle from capa.features.extractors.base_extractor import (
BBHandle,
SampleHashes,
FunctionHandle,
)
EXTRACTOR = capa.features.extractors.null.NullStaticFeatureExtractor( EXTRACTOR = capa.features.extractors.null.NullStaticFeatureExtractor(
base_address=AbsoluteVirtualAddress(0x401000), base_address=AbsoluteVirtualAddress(0x401000),
@@ -40,28 +44,46 @@ EXTRACTOR = capa.features.extractors.null.NullStaticFeatureExtractor(
), ),
global_features=[], global_features=[],
file_features=[ file_features=[
(AbsoluteVirtualAddress(0x402345), capa.features.common.Characteristic("embedded pe")), (
AbsoluteVirtualAddress(0x402345),
capa.features.common.Characteristic("embedded pe"),
),
], ],
functions={ functions={
AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.FunctionFeatures( AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.FunctionFeatures(
features=[ features=[
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("indirect call")), (
AbsoluteVirtualAddress(0x401000),
capa.features.common.Characteristic("indirect call"),
),
], ],
basic_blocks={ basic_blocks={
AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.BasicBlockFeatures( AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.BasicBlockFeatures(
features=[ features=[
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("tight loop")), (
AbsoluteVirtualAddress(0x401000),
capa.features.common.Characteristic("tight loop"),
),
], ],
instructions={ instructions={
AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.InstructionFeatures( AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.InstructionFeatures(
features=[ features=[
(AbsoluteVirtualAddress(0x401000), capa.features.insn.Mnemonic("xor")), (
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("nzxor")), AbsoluteVirtualAddress(0x401000),
capa.features.insn.Mnemonic("xor"),
),
(
AbsoluteVirtualAddress(0x401000),
capa.features.common.Characteristic("nzxor"),
),
], ],
), ),
AbsoluteVirtualAddress(0x401002): capa.features.extractors.null.InstructionFeatures( AbsoluteVirtualAddress(0x401002): capa.features.extractors.null.InstructionFeatures(
features=[ features=[
(AbsoluteVirtualAddress(0x401002), capa.features.insn.Mnemonic("mov")), (
AbsoluteVirtualAddress(0x401002),
capa.features.insn.Mnemonic("mov"),
),
], ],
), ),
}, },

View File

@@ -44,7 +44,12 @@ def test_render_offset():
def test_render_property(): def test_render_property():
assert ( assert (
str(capa.features.insn.Property("System.IO.FileInfo::Length", access=capa.features.common.FeatureAccess.READ)) str(
capa.features.insn.Property(
"System.IO.FileInfo::Length",
access=capa.features.common.FeatureAccess.READ,
)
)
== "property/read(System.IO.FileInfo::Length)" == "property/read(System.IO.FileInfo::Length)"
) )
@@ -184,7 +189,10 @@ def test_render_meta_maec():
(capa.features.common.Regex("^foo"), "regex: ^foo"), (capa.features.common.Regex("^foo"), "regex: ^foo"),
(capa.features.common.String("foo"), 'string: "foo" @ 0x401000'), (capa.features.common.String("foo"), 'string: "foo" @ 0x401000'),
(capa.features.common.Class("BeanFactory"), "class: BeanFactory @ 0x401000"), (capa.features.common.Class("BeanFactory"), "class: BeanFactory @ 0x401000"),
(capa.features.common.Namespace("std::enterprise"), "namespace: std::enterprise @ 0x401000"), (
capa.features.common.Namespace("std::enterprise"),
"namespace: std::enterprise @ 0x401000",
),
(capa.features.insn.API("CreateFileW"), "api: CreateFileW @ 0x401000"), (capa.features.insn.API("CreateFileW"), "api: CreateFileW @ 0x401000"),
(capa.features.insn.Property("foo"), "property: foo @ 0x401000"), (capa.features.insn.Property("foo"), "property: foo @ 0x401000"),
(capa.features.insn.Property("foo", "read"), "property/read: foo @ 0x401000"), (capa.features.insn.Property("foo", "read"), "property/read: foo @ 0x401000"),