Merge branch 'dynamic-feature-extraction' into capabilities-module

This commit is contained in:
Yacine
2023-10-20 08:55:49 +02:00
committed by GitHub
22 changed files with 1245 additions and 3919 deletions
-7
View File
@@ -1,12 +1,5 @@
[mypy]
# TODO(yelhamer): remove this once proto has been added
# for the dynamic rendering
exclude = (?x)(
^capa/render/proto/__init__.py$
| ^tests/_test_proto.py$
)
[mypy-halo.*]
ignore_missing_imports = True
+1
View File
@@ -17,6 +17,7 @@ a = Analysis(
# when invoking pyinstaller from the project root,
# this gets invoked from the directory of the spec file,
# i.e. ./.github/pyinstaller
("../../assets", "assets"),
("../../rules", "rules"),
("../../sigs", "sigs"),
("../../cache", "cache"),
+9 -5
View File
@@ -43,9 +43,9 @@ jobs:
- name: Lint with black
run: pre-commit run black --show-diff-on-failure
- name: Lint with flake8
run: pre-commit run flake8
run: pre-commit run flake8 --hook-stage manual
- name: Check types with mypy
run: pre-commit run mypy
run: pre-commit run mypy --hook-stage manual
rule_linter:
runs-on: ubuntu-20.04
@@ -95,6 +95,10 @@ jobs:
run: sudo apt-get install -y libyaml-dev
- name: Install capa
run: pip install -e .[dev]
- name: Run tests (fast)
# this set of tests runs about 80% of the cases in 20% of the time,
# and should catch most errors quickly.
run: pre-commit run pytest-fast --all-files --hook-stage manual
- name: Run tests
run: pytest -v tests/
@@ -103,7 +107,7 @@ jobs:
env:
BN_SERIAL: ${{ secrets.BN_SERIAL }}
runs-on: ubuntu-20.04
needs: [code_style, rule_linter]
needs: [tests]
strategy:
fail-fast: false
matrix:
@@ -143,7 +147,7 @@ jobs:
ghidra-tests:
name: Ghidra tests for ${{ matrix.python-version }}
runs-on: ubuntu-20.04
needs: [code_style, rule_linter]
needs: [tests]
strategy:
fail-fast: false
matrix:
@@ -197,4 +201,4 @@ jobs:
cat ../output.log
exit_code=$(cat ../output.log | grep exit | awk '{print $NF}')
exit $exit_code
+23 -5
View File
@@ -25,7 +25,7 @@ repos:
hooks:
- id: isort
name: isort
stages: [commit, push]
stages: [commit, push, manual]
language: system
entry: isort
args:
@@ -45,7 +45,7 @@ repos:
hooks:
- id: black
name: black
stages: [commit, push]
stages: [commit, push, manual]
language: system
entry: black
args:
@@ -62,7 +62,7 @@ repos:
hooks:
- id: ruff
name: ruff
stages: [commit, push]
stages: [commit, push, manual]
language: system
entry: ruff
args:
@@ -79,7 +79,7 @@ repos:
hooks:
- id: flake8
name: flake8
stages: [commit, push]
stages: [push, manual]
language: system
entry: flake8
args:
@@ -97,7 +97,7 @@ repos:
hooks:
- id: mypy
name: mypy
stages: [commit, push]
stages: [push, manual]
language: system
entry: mypy
args:
@@ -109,3 +109,21 @@ repos:
- "tests/"
always_run: true
pass_filenames: false
- repo: local
hooks:
- id: pytest-fast
name: pytest (fast)
stages: [manual]
language: system
entry: pytest
args:
- "tests/"
- "--ignore=tests/test_binja_features.py"
- "--ignore=tests/test_ghidra_features.py"
- "--ignore=tests/test_ida_features.py"
- "--ignore=tests/test_viv_features.py"
- "--ignore=tests/test_main.py"
- "--ignore=tests/test_scripts.py"
always_run: true
pass_filenames: false
+5 -1
View File
@@ -11,10 +11,14 @@
- ghidra: add entry script helping users run capa against a loaded Ghidra database #1767 @mike-hunhoff
- binja: add support for forwarded exports #1646 @xusheng6
- binja: add support for symtab names #1504 @xusheng6
- add com class/interface features #322 @Aayush-goel-04
- protobuf: add `Metadata.flavor` @williballenthin
### Breaking Changes
- remove the `SCOPE_*` constants in favor of the `Scope` enum #1764 @williballenthin
- protobuf: deprecate `RuleMetadata.scope` in favor of `RuleMetadata.scopes` @williballenthin
- protobuf: deprecate `Metadata.analysis` in favor of `Metadata.analysis2` that is dynamic analysis aware @williballenthin
### New Rules (19)
@@ -1606,4 +1610,4 @@ Download a standalone binary below and checkout the readme [here on GitHub](http
### Raw diffs
- [capa v1.0.0...v1.1.0](https://github.com/mandiant/capa/compare/v1.0.0...v1.1.0)
- [capa-rules v1.0.0...v1.1.0](https://github.com/mandiant/capa-rules/compare/v1.0.0...v1.1.0)
- [capa-rules v1.0.0...v1.1.0](https://github.com/mandiant/capa-rules/compare/v1.0.0...v1.1.0)
Binary file not shown.
Binary file not shown.
+7
View File
@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import re
import logging
import binascii
import contextlib
@@ -41,6 +42,7 @@ logger = logging.getLogger(__name__)
MATCH_PE = b"MZ"
MATCH_ELF = b"\x7fELF"
MATCH_RESULT = b'{"meta":'
MATCH_JSON_OBJECT = b'{"'
def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]:
@@ -63,6 +65,11 @@ def extract_format(buf) -> Iterator[Tuple[Feature, Address]]:
yield Format(FORMAT_FREEZE), NO_ADDRESS
elif buf.startswith(MATCH_RESULT):
yield Format(FORMAT_RESULT), NO_ADDRESS
elif re.sub(rb"\w", b"", buf[:20]).startswith(MATCH_JSON_OBJECT):
# potential start of JSON object data without whitespace
# we don't know what it is exactly, but may support it (e.g. a dynamic CAPE sandbox report)
# skip verdict here and let subsequent code analyze this further
return
else:
# we likely end up here:
# 1. handling a file format (e.g. macho)
+3 -5
View File
@@ -147,11 +147,9 @@ def redirecting_print_to_tqdm(disable_progress):
def log_unsupported_format_error():
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE or ELF file.")
logger.error(" Input file does not appear to be a supported file.")
logger.error(" ")
logger.error(
" capa currently only supports analyzing PE and ELF files (or shellcode, when using --format sc32|sc64)."
)
logger.error(" See all supported file formats via capa's help output (-h).")
logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.")
logger.error("-" * 80)
@@ -160,7 +158,7 @@ def log_unsupported_cape_report_error(error: str):
logger.error("-" * 80)
logger.error("Input file is not a valid CAPE report: %s", error)
logger.error(" ")
logger.error(" capa currently only supports analyzing standard CAPE json reports.")
logger.error(" capa currently only supports analyzing standard CAPE reports in JSON format.")
logger.error(
" Please make sure your report file is in the standard format and contains both the static and dynamic sections."
)
+308 -102
View File
@@ -38,16 +38,6 @@ from capa.helpers import assert_never
from capa.features.freeze import AddressType
def dict_tuple_to_list_values(d: Dict) -> Dict:
o = {}
for k, v in d.items():
if isinstance(v, tuple):
o[k] = list(v)
else:
o[k] = v
return o
def int_to_pb2(v: int) -> capa_pb2.Integer:
if v < -2_147_483_648:
raise ValueError(f"value underflow: {v}")
@@ -100,6 +90,51 @@ def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address:
token_offset=capa_pb2.Token_Offset(token=int_to_pb2(token), offset=offset),
)
elif addr.type is AddressType.PROCESS:
assert isinstance(addr.value, tuple)
ppid, pid = addr.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_PROCESS,
ppid_pid=capa_pb2.Ppid_Pid(
ppid=int_to_pb2(ppid),
pid=int_to_pb2(pid),
),
)
elif addr.type is AddressType.THREAD:
assert isinstance(addr.value, tuple)
ppid, pid, tid = addr.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
assert isinstance(tid, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_THREAD,
ppid_pid_tid=capa_pb2.Ppid_Pid_Tid(
ppid=int_to_pb2(ppid),
pid=int_to_pb2(pid),
tid=int_to_pb2(tid),
),
)
elif addr.type is AddressType.CALL:
assert isinstance(addr.value, tuple)
ppid, pid, tid, id_ = addr.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
assert isinstance(tid, int)
assert isinstance(id_, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_CALL,
ppid_pid_tid_id=capa_pb2.Ppid_Pid_Tid_Id(
ppid=int_to_pb2(ppid),
pid=int_to_pb2(pid),
tid=int_to_pb2(tid),
id=int_to_pb2(id_),
),
)
elif addr.type is AddressType.NO_ADDRESS:
# value == None, so only set type
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS)
@@ -117,10 +152,26 @@ def scope_to_pb2(scope: capa.rules.Scope) -> capa_pb2.Scope.ValueType:
return capa_pb2.Scope.SCOPE_BASIC_BLOCK
elif scope == capa.rules.Scope.INSTRUCTION:
return capa_pb2.Scope.SCOPE_INSTRUCTION
elif scope == capa.rules.Scope.PROCESS:
return capa_pb2.Scope.SCOPE_PROCESS
elif scope == capa.rules.Scope.THREAD:
return capa_pb2.Scope.SCOPE_THREAD
elif scope == capa.rules.Scope.CALL:
return capa_pb2.Scope.SCOPE_CALL
else:
assert_never(scope)
def scopes_to_pb2(scopes: capa.rules.Scopes) -> capa_pb2.Scopes:
doc = {}
if scopes.static:
doc["static"] = scope_to_pb2(scopes.static)
if scopes.dynamic:
doc["dynamic"] = scope_to_pb2(scopes.dynamic)
return google.protobuf.json_format.ParseDict(doc, capa_pb2.Scopes())
def flavor_to_pb2(flavor: rd.Flavor) -> capa_pb2.Flavor.ValueType:
if flavor == rd.Flavor.STATIC:
return capa_pb2.Flavor.FLAVOR_STATIC
@@ -130,47 +181,87 @@ def flavor_to_pb2(flavor: rd.Flavor) -> capa_pb2.Flavor.ValueType:
assert_never(flavor)
def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
assert isinstance(meta.analysis, rd.StaticAnalysis)
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.model_dump(), capa_pb2.Sample()),
flavor=flavor_to_pb2(meta.flavor),
analysis=capa_pb2.Analysis(
format=meta.analysis.format,
arch=meta.analysis.arch,
os=meta.analysis.os,
extractor=meta.analysis.extractor,
rules=list(meta.analysis.rules),
base_address=addr_to_pb2(meta.analysis.base_address),
layout=capa_pb2.Layout(
functions=[
capa_pb2.FunctionLayout(
address=addr_to_pb2(f.address),
matched_basic_blocks=[
capa_pb2.BasicBlockLayout(address=addr_to_pb2(bb.address)) for bb in f.matched_basic_blocks
],
)
for f in meta.analysis.layout.functions
]
),
feature_counts=capa_pb2.FeatureCounts(
file=meta.analysis.feature_counts.file,
functions=[
capa_pb2.FunctionFeatureCount(address=addr_to_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
],
),
library_functions=[
capa_pb2.LibraryFunction(address=addr_to_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
def static_analysis_to_pb2(analysis: rd.StaticAnalysis) -> capa_pb2.StaticAnalysis:
return capa_pb2.StaticAnalysis(
format=analysis.format,
arch=analysis.arch,
os=analysis.os,
extractor=analysis.extractor,
rules=list(analysis.rules),
base_address=addr_to_pb2(analysis.base_address),
layout=capa_pb2.StaticLayout(
functions=[
capa_pb2.FunctionLayout(
address=addr_to_pb2(f.address),
matched_basic_blocks=[
capa_pb2.BasicBlockLayout(address=addr_to_pb2(bb.address)) for bb in f.matched_basic_blocks
],
)
for f in analysis.layout.functions
]
),
feature_counts=capa_pb2.StaticFeatureCounts(
file=analysis.feature_counts.file,
functions=[
capa_pb2.FunctionFeatureCount(address=addr_to_pb2(f.address), count=f.count)
for f in analysis.feature_counts.functions
],
),
library_functions=[
capa_pb2.LibraryFunction(address=addr_to_pb2(lf.address), name=lf.name) for lf in analysis.library_functions
],
)
def dynamic_analysis_to_pb2(analysis: rd.DynamicAnalysis) -> capa_pb2.DynamicAnalysis:
return capa_pb2.DynamicAnalysis(
format=analysis.format,
arch=analysis.arch,
os=analysis.os,
extractor=analysis.extractor,
rules=list(analysis.rules),
layout=capa_pb2.DynamicLayout(
processes=[
capa_pb2.ProcessLayout(
address=addr_to_pb2(p.address),
matched_threads=[capa_pb2.ThreadLayout(address=addr_to_pb2(t.address)) for t in p.matched_threads],
)
for p in analysis.layout.processes
]
),
feature_counts=capa_pb2.DynamicFeatureCounts(
file=analysis.feature_counts.file,
processes=[
capa_pb2.ProcessFeatureCount(address=addr_to_pb2(p.address), count=p.count)
for p in analysis.feature_counts.processes
],
),
)
def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
if isinstance(meta.analysis, rd.StaticAnalysis):
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.model_dump(), capa_pb2.Sample()),
flavor=flavor_to_pb2(meta.flavor),
static_analysis=static_analysis_to_pb2(meta.analysis),
)
elif isinstance(meta.analysis, rd.DynamicAnalysis):
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.model_dump(), capa_pb2.Sample()),
flavor=flavor_to_pb2(meta.flavor),
dynamic_analysis=dynamic_analysis_to_pb2(meta.analysis),
)
else:
assert_never(meta.analysis)
def statement_to_pb2(statement: rd.Statement) -> capa_pb2.StatementNode:
if isinstance(statement, rd.RangeStatement):
return capa_pb2.StatementNode(
@@ -401,15 +492,51 @@ def match_to_pb2(match: rd.Match) -> capa_pb2.Match:
assert_never(match)
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
# after manual type conversions to the RuleMetadata, we can rely on the protobuf json parser
# conversions include tuple -> list and rd.Enum -> proto.enum
meta = dict_tuple_to_list_values(rule_metadata.model_dump())
meta["scope"] = scope_to_pb2(meta["scope"])
meta["attack"] = list(map(dict_tuple_to_list_values, meta.get("attack", [])))
meta["mbc"] = list(map(dict_tuple_to_list_values, meta.get("mbc", [])))
def attack_to_pb2(attack: rd.AttackSpec) -> capa_pb2.AttackSpec:
return capa_pb2.AttackSpec(
parts=list(attack.parts),
tactic=attack.tactic,
technique=attack.technique,
subtechnique=attack.subtechnique,
id=attack.id,
)
return google.protobuf.json_format.ParseDict(meta, capa_pb2.RuleMetadata())
def mbc_to_pb2(mbc: rd.MBCSpec) -> capa_pb2.MBCSpec:
return capa_pb2.MBCSpec(
parts=list(mbc.parts),
objective=mbc.objective,
behavior=mbc.behavior,
method=mbc.method,
id=mbc.id,
)
def maec_to_pb2(maec: rd.MaecMetadata) -> capa_pb2.MaecMetadata:
return capa_pb2.MaecMetadata(
analysis_conclusion=maec.analysis_conclusion or "",
analysis_conclusion_ov=maec.analysis_conclusion_ov or "",
malware_family=maec.malware_family or "",
malware_category=maec.malware_category or "",
malware_category_ov=maec.malware_category_ov or "",
)
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
return capa_pb2.RuleMetadata(
name=rule_metadata.name,
namespace=rule_metadata.namespace or "",
authors=rule_metadata.authors,
attack=[attack_to_pb2(m) for m in rule_metadata.attack],
mbc=[mbc_to_pb2(m) for m in rule_metadata.mbc],
references=rule_metadata.references,
examples=rule_metadata.examples,
description=rule_metadata.description,
lib=rule_metadata.lib,
maec=maec_to_pb2(rule_metadata.maec),
is_subscope_rule=rule_metadata.is_subscope_rule,
scopes=scopes_to_pb2(rule_metadata.scopes),
)
def doc_to_pb2(doc: rd.ResultDocument) -> capa_pb2.ResultDocument:
@@ -470,6 +597,24 @@ def addr_from_pb2(addr: capa_pb2.Address) -> frz.Address:
offset = addr.token_offset.offset
return frz.Address(type=frz.AddressType.DN_TOKEN_OFFSET, value=(token, offset))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_PROCESS:
ppid = int_from_pb2(addr.ppid_pid.ppid)
pid = int_from_pb2(addr.ppid_pid.pid)
return frz.Address(type=frz.AddressType.PROCESS, value=(ppid, pid))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_THREAD:
ppid = int_from_pb2(addr.ppid_pid_tid.ppid)
pid = int_from_pb2(addr.ppid_pid_tid.pid)
tid = int_from_pb2(addr.ppid_pid_tid.tid)
return frz.Address(type=frz.AddressType.THREAD, value=(ppid, pid, tid))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_CALL:
ppid = int_from_pb2(addr.ppid_pid_tid_id.ppid)
pid = int_from_pb2(addr.ppid_pid_tid_id.pid)
tid = int_from_pb2(addr.ppid_pid_tid_id.tid)
id_ = int_from_pb2(addr.ppid_pid_tid_id.id)
return frz.Address(type=frz.AddressType.CALL, value=(ppid, pid, tid, id_))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS:
return frz.Address(type=frz.AddressType.NO_ADDRESS, value=None)
@@ -486,10 +631,23 @@ def scope_from_pb2(scope: capa_pb2.Scope.ValueType) -> capa.rules.Scope:
return capa.rules.Scope.BASIC_BLOCK
elif scope == capa_pb2.Scope.SCOPE_INSTRUCTION:
return capa.rules.Scope.INSTRUCTION
elif scope == capa_pb2.Scope.SCOPE_PROCESS:
return capa.rules.Scope.PROCESS
elif scope == capa_pb2.Scope.SCOPE_THREAD:
return capa.rules.Scope.THREAD
elif scope == capa_pb2.Scope.SCOPE_CALL:
return capa.rules.Scope.CALL
else:
assert_never(scope)
def scopes_from_pb2(scopes: capa_pb2.Scopes) -> capa.rules.Scopes:
return capa.rules.Scopes(
static=scope_from_pb2(scopes.static) if scopes.static else None,
dynamic=scope_from_pb2(scopes.dynamic) if scopes.dynamic else None,
)
def flavor_from_pb2(flavor: capa_pb2.Flavor.ValueType) -> rd.Flavor:
if flavor == capa_pb2.Flavor.FLAVOR_STATIC:
return rd.Flavor.STATIC
@@ -499,60 +657,108 @@ def flavor_from_pb2(flavor: capa_pb2.Flavor.ValueType) -> rd.Flavor:
assert_never(flavor)
def metadata_from_pb2(meta: capa_pb2.Metadata) -> rd.Metadata:
return rd.Metadata(
timestamp=datetime.datetime.fromisoformat(meta.timestamp),
version=meta.version,
argv=tuple(meta.argv) if meta.argv else None,
sample=rd.Sample(
md5=meta.sample.md5,
sha1=meta.sample.sha1,
sha256=meta.sample.sha256,
path=meta.sample.path,
),
flavor=flavor_from_pb2(meta.flavor),
analysis=rd.StaticAnalysis(
format=meta.analysis.format,
arch=meta.analysis.arch,
os=meta.analysis.os,
extractor=meta.analysis.extractor,
rules=tuple(meta.analysis.rules),
base_address=addr_from_pb2(meta.analysis.base_address),
layout=rd.StaticLayout(
functions=tuple(
[
rd.FunctionLayout(
address=addr_from_pb2(f.address),
matched_basic_blocks=tuple(
[
rd.BasicBlockLayout(address=addr_from_pb2(bb.address))
for bb in f.matched_basic_blocks
]
),
)
for f in meta.analysis.layout.functions
]
)
),
feature_counts=rd.StaticFeatureCounts(
file=meta.analysis.feature_counts.file,
functions=tuple(
[
rd.FunctionFeatureCount(address=addr_from_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
]
),
),
library_functions=tuple(
def static_analysis_from_pb2(analysis: capa_pb2.StaticAnalysis) -> rd.StaticAnalysis:
return rd.StaticAnalysis(
format=analysis.format,
arch=analysis.arch,
os=analysis.os,
extractor=analysis.extractor,
rules=tuple(analysis.rules),
base_address=addr_from_pb2(analysis.base_address),
layout=rd.StaticLayout(
functions=tuple(
[
rd.LibraryFunction(address=addr_from_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
rd.FunctionLayout(
address=addr_from_pb2(f.address),
matched_basic_blocks=tuple(
[rd.BasicBlockLayout(address=addr_from_pb2(bb.address)) for bb in f.matched_basic_blocks]
),
)
for f in analysis.layout.functions
]
)
),
feature_counts=rd.StaticFeatureCounts(
file=analysis.feature_counts.file,
functions=tuple(
[
rd.FunctionFeatureCount(address=addr_from_pb2(f.address), count=f.count)
for f in analysis.feature_counts.functions
]
),
),
library_functions=tuple(
[rd.LibraryFunction(address=addr_from_pb2(lf.address), name=lf.name) for lf in analysis.library_functions]
),
)
def dynamic_analysis_from_pb2(analysis: capa_pb2.DynamicAnalysis) -> rd.DynamicAnalysis:
return rd.DynamicAnalysis(
format=analysis.format,
arch=analysis.arch,
os=analysis.os,
extractor=analysis.extractor,
rules=tuple(analysis.rules),
layout=rd.DynamicLayout(
processes=tuple(
[
rd.ProcessLayout(
address=addr_from_pb2(p.address),
matched_threads=tuple(
[rd.ThreadLayout(address=addr_from_pb2(t.address)) for t in p.matched_threads]
),
)
for p in analysis.layout.processes
]
)
),
feature_counts=rd.DynamicFeatureCounts(
file=analysis.feature_counts.file,
processes=tuple(
[
rd.ProcessFeatureCount(address=addr_from_pb2(p.address), count=p.count)
for p in analysis.feature_counts.processes
]
),
),
)
def metadata_from_pb2(meta: capa_pb2.Metadata) -> rd.Metadata:
analysis_type = meta.WhichOneof("analysis2")
if analysis_type == "static_analysis":
return rd.Metadata(
timestamp=datetime.datetime.fromisoformat(meta.timestamp),
version=meta.version,
argv=tuple(meta.argv) if meta.argv else None,
sample=rd.Sample(
md5=meta.sample.md5,
sha1=meta.sample.sha1,
sha256=meta.sample.sha256,
path=meta.sample.path,
),
flavor=flavor_from_pb2(meta.flavor),
analysis=static_analysis_from_pb2(meta.static_analysis),
)
elif analysis_type == "dynamic_analysis":
return rd.Metadata(
timestamp=datetime.datetime.fromisoformat(meta.timestamp),
version=meta.version,
argv=tuple(meta.argv) if meta.argv else None,
sample=rd.Sample(
md5=meta.sample.md5,
sha1=meta.sample.sha1,
sha256=meta.sample.sha256,
path=meta.sample.path,
),
flavor=flavor_from_pb2(meta.flavor),
analysis=dynamic_analysis_from_pb2(meta.dynamic_analysis),
)
else:
assert_never(analysis_type)
def statement_from_pb2(statement: capa_pb2.StatementNode) -> rd.Statement:
type_ = statement.WhichOneof("statement")
@@ -732,7 +938,7 @@ def rule_metadata_from_pb2(pb: capa_pb2.RuleMetadata) -> rd.RuleMetadata:
name=pb.name,
namespace=pb.namespace or None,
authors=tuple(pb.authors),
scope=scope_from_pb2(pb.scope),
scopes=scopes_from_pb2(pb.scopes),
attack=tuple([attack_from_pb2(attack) for attack in pb.attack]),
mbc=tuple([mbc_from_pb2(mbc) for mbc in pb.mbc]),
references=tuple(pb.references),
+99 -2
View File
@@ -11,6 +11,9 @@ message Address {
oneof value {
Integer v = 2;
Token_Offset token_offset = 3;
Ppid_Pid ppid_pid = 4;
Ppid_Pid_Tid ppid_pid_tid = 5;
Ppid_Pid_Tid_Id ppid_pid_tid_id = 6;
};
}
@@ -22,6 +25,9 @@ enum AddressType {
ADDRESSTYPE_DN_TOKEN = 4;
ADDRESSTYPE_DN_TOKEN_OFFSET = 5;
ADDRESSTYPE_NO_ADDRESS = 6;
ADDRESSTYPE_PROCESS = 7;
ADDRESSTYPE_THREAD = 8;
ADDRESSTYPE_CALL = 9;
}
message Analysis {
@@ -82,6 +88,25 @@ message CompoundStatement {
optional string description = 2;
}
message DynamicAnalysis {
string format = 1;
string arch = 2;
string os = 3;
string extractor = 4;
repeated string rules = 5;
DynamicLayout layout = 6;
DynamicFeatureCounts feature_counts = 7;
}
message DynamicFeatureCounts {
uint64 file = 1;
repeated ProcessFeatureCount processes = 2;
}
message DynamicLayout {
repeated ProcessLayout processes = 1;
}
message ExportFeature {
string type = 1;
string export = 2;
@@ -203,8 +228,15 @@ message Metadata {
string version = 2;
repeated string argv = 3;
Sample sample = 4;
Analysis analysis = 5;
// deprecated in v7.0.
// use analysis2 instead.
Analysis analysis = 5 [deprecated = true];
Flavor flavor = 6;
oneof analysis2 {
// use analysis2 instead of analysis (deprecated in v7.0).
StaticAnalysis static_analysis = 7;
DynamicAnalysis dynamic_analysis = 8;
};
}
message MnemonicFeature {
@@ -251,6 +283,16 @@ message OperandOffsetFeature {
optional string description = 4;
}
message ProcessFeatureCount {
Address address = 1;
uint64 count = 2;
}
message ProcessLayout {
Address address = 1;
repeated ThreadLayout matched_threads = 2;
}
message PropertyFeature {
string type = 1;
string property_ = 2; // property is a Python top-level decorator name
@@ -288,7 +330,9 @@ message RuleMetadata {
string name = 1;
string namespace = 2;
repeated string authors = 3;
Scope scope = 4;
// deprecated in v7.0.
// use scopes instead.
Scope scope = 4 [deprecated = true];
repeated AttackSpec attack = 5;
repeated MBCSpec mbc = 6;
repeated string references = 7;
@@ -297,6 +341,8 @@ message RuleMetadata {
bool lib = 10;
MaecMetadata maec = 11;
bool is_subscope_rule = 12;
// use scopes over scope (deprecated in v7.0).
Scopes scopes = 13;
}
message Sample {
@@ -312,6 +358,14 @@ enum Scope {
SCOPE_FUNCTION = 2;
SCOPE_BASIC_BLOCK = 3;
SCOPE_INSTRUCTION = 4;
SCOPE_PROCESS = 5;
SCOPE_THREAD = 6;
SCOPE_CALL = 7;
}
message Scopes {
optional Scope static = 1;
optional Scope dynamic = 2;
}
message SectionFeature {
@@ -336,6 +390,27 @@ message StatementNode {
};
}
message StaticAnalysis {
string format = 1;
string arch = 2;
string os = 3;
string extractor = 4;
repeated string rules = 5;
Address base_address = 6;
StaticLayout layout = 7;
StaticFeatureCounts feature_counts = 8;
repeated LibraryFunction library_functions = 9;
}
message StaticFeatureCounts {
uint64 file = 1;
repeated FunctionFeatureCount functions = 2;
}
message StaticLayout {
repeated FunctionLayout functions = 1;
}
message StringFeature {
string type = 1;
string string = 2;
@@ -354,6 +429,10 @@ message SubstringFeature {
optional string description = 3;
}
message ThreadLayout {
Address address = 1;
}
message Addresses { repeated Address address = 1; }
message Pair_Address_Match {
@@ -366,6 +445,24 @@ message Token_Offset {
uint64 offset = 2; // offset is always >= 0
}
message Ppid_Pid {
Integer ppid = 1;
Integer pid = 2;
}
message Ppid_Pid_Tid {
Integer ppid = 1;
Integer pid = 2;
Integer tid = 3;
}
message Ppid_Pid_Tid_Id {
Integer ppid = 1;
Integer pid = 2;
Integer tid = 3;
Integer id = 4;
}
message Integer { oneof value { uint64 u = 1; sint64 i = 2; } } // unsigned or signed int
message Number { oneof value { uint64 u = 1; sint64 i = 2; double f = 3; } }
File diff suppressed because one or more lines are too long
+361 -8
View File
@@ -31,6 +31,9 @@ class _AddressTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._En
ADDRESSTYPE_DN_TOKEN: _AddressType.ValueType # 4
ADDRESSTYPE_DN_TOKEN_OFFSET: _AddressType.ValueType # 5
ADDRESSTYPE_NO_ADDRESS: _AddressType.ValueType # 6
ADDRESSTYPE_PROCESS: _AddressType.ValueType # 7
ADDRESSTYPE_THREAD: _AddressType.ValueType # 8
ADDRESSTYPE_CALL: _AddressType.ValueType # 9
class AddressType(_AddressType, metaclass=_AddressTypeEnumTypeWrapper): ...
@@ -41,6 +44,9 @@ ADDRESSTYPE_FILE: AddressType.ValueType # 3
ADDRESSTYPE_DN_TOKEN: AddressType.ValueType # 4
ADDRESSTYPE_DN_TOKEN_OFFSET: AddressType.ValueType # 5
ADDRESSTYPE_NO_ADDRESS: AddressType.ValueType # 6
ADDRESSTYPE_PROCESS: AddressType.ValueType # 7
ADDRESSTYPE_THREAD: AddressType.ValueType # 8
ADDRESSTYPE_CALL: AddressType.ValueType # 9
global___AddressType = AddressType
class _Flavor:
@@ -71,6 +77,9 @@ class _ScopeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumType
SCOPE_FUNCTION: _Scope.ValueType # 2
SCOPE_BASIC_BLOCK: _Scope.ValueType # 3
SCOPE_INSTRUCTION: _Scope.ValueType # 4
SCOPE_PROCESS: _Scope.ValueType # 5
SCOPE_THREAD: _Scope.ValueType # 6
SCOPE_CALL: _Scope.ValueType # 7
class Scope(_Scope, metaclass=_ScopeEnumTypeWrapper): ...
@@ -79,6 +88,9 @@ SCOPE_FILE: Scope.ValueType # 1
SCOPE_FUNCTION: Scope.ValueType # 2
SCOPE_BASIC_BLOCK: Scope.ValueType # 3
SCOPE_INSTRUCTION: Scope.ValueType # 4
SCOPE_PROCESS: Scope.ValueType # 5
SCOPE_THREAD: Scope.ValueType # 6
SCOPE_CALL: Scope.ValueType # 7
global___Scope = Scope
@typing_extensions.final
@@ -111,21 +123,33 @@ class Address(google.protobuf.message.Message):
TYPE_FIELD_NUMBER: builtins.int
V_FIELD_NUMBER: builtins.int
TOKEN_OFFSET_FIELD_NUMBER: builtins.int
PPID_PID_FIELD_NUMBER: builtins.int
PPID_PID_TID_FIELD_NUMBER: builtins.int
PPID_PID_TID_ID_FIELD_NUMBER: builtins.int
type: global___AddressType.ValueType
@property
def v(self) -> global___Integer: ...
@property
def token_offset(self) -> global___Token_Offset: ...
@property
def ppid_pid(self) -> global___Ppid_Pid: ...
@property
def ppid_pid_tid(self) -> global___Ppid_Pid_Tid: ...
@property
def ppid_pid_tid_id(self) -> global___Ppid_Pid_Tid_Id: ...
def __init__(
self,
*,
type: global___AddressType.ValueType = ...,
v: global___Integer | None = ...,
token_offset: global___Token_Offset | None = ...,
ppid_pid: global___Ppid_Pid | None = ...,
ppid_pid_tid: global___Ppid_Pid_Tid | None = ...,
ppid_pid_tid_id: global___Ppid_Pid_Tid_Id | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["token_offset", b"token_offset", "v", b"v", "value", b"value"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["token_offset", b"token_offset", "type", b"type", "v", b"v", "value", b"value"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["value", b"value"]) -> typing_extensions.Literal["v", "token_offset"] | None: ...
def HasField(self, field_name: typing_extensions.Literal["ppid_pid", b"ppid_pid", "ppid_pid_tid", b"ppid_pid_tid", "ppid_pid_tid_id", b"ppid_pid_tid_id", "token_offset", b"token_offset", "v", b"v", "value", b"value"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["ppid_pid", b"ppid_pid", "ppid_pid_tid", b"ppid_pid_tid", "ppid_pid_tid_id", b"ppid_pid_tid_id", "token_offset", b"token_offset", "type", b"type", "v", b"v", "value", b"value"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["value", b"value"]) -> typing_extensions.Literal["v", "token_offset", "ppid_pid", "ppid_pid_tid", "ppid_pid_tid_id"] | None: ...
global___Address = Address
@@ -352,6 +376,78 @@ class CompoundStatement(google.protobuf.message.Message):
global___CompoundStatement = CompoundStatement
@typing_extensions.final
class DynamicAnalysis(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FORMAT_FIELD_NUMBER: builtins.int
ARCH_FIELD_NUMBER: builtins.int
OS_FIELD_NUMBER: builtins.int
EXTRACTOR_FIELD_NUMBER: builtins.int
RULES_FIELD_NUMBER: builtins.int
LAYOUT_FIELD_NUMBER: builtins.int
FEATURE_COUNTS_FIELD_NUMBER: builtins.int
format: builtins.str
arch: builtins.str
os: builtins.str
extractor: builtins.str
@property
def rules(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
@property
def layout(self) -> global___DynamicLayout: ...
@property
def feature_counts(self) -> global___DynamicFeatureCounts: ...
def __init__(
self,
*,
format: builtins.str = ...,
arch: builtins.str = ...,
os: builtins.str = ...,
extractor: builtins.str = ...,
rules: collections.abc.Iterable[builtins.str] | None = ...,
layout: global___DynamicLayout | None = ...,
feature_counts: global___DynamicFeatureCounts | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["feature_counts", b"feature_counts", "layout", b"layout"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["arch", b"arch", "extractor", b"extractor", "feature_counts", b"feature_counts", "format", b"format", "layout", b"layout", "os", b"os", "rules", b"rules"]) -> None: ...
global___DynamicAnalysis = DynamicAnalysis
@typing_extensions.final
class DynamicFeatureCounts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FILE_FIELD_NUMBER: builtins.int
PROCESSES_FIELD_NUMBER: builtins.int
file: builtins.int
@property
def processes(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ProcessFeatureCount]: ...
def __init__(
self,
*,
file: builtins.int = ...,
processes: collections.abc.Iterable[global___ProcessFeatureCount] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["file", b"file", "processes", b"processes"]) -> None: ...
global___DynamicFeatureCounts = DynamicFeatureCounts
@typing_extensions.final
class DynamicLayout(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PROCESSES_FIELD_NUMBER: builtins.int
@property
def processes(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ProcessLayout]: ...
def __init__(
self,
*,
processes: collections.abc.Iterable[global___ProcessLayout] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["processes", b"processes"]) -> None: ...
global___DynamicLayout = DynamicLayout
@typing_extensions.final
class ExportFeature(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -794,6 +890,8 @@ class Metadata(google.protobuf.message.Message):
SAMPLE_FIELD_NUMBER: builtins.int
ANALYSIS_FIELD_NUMBER: builtins.int
FLAVOR_FIELD_NUMBER: builtins.int
STATIC_ANALYSIS_FIELD_NUMBER: builtins.int
DYNAMIC_ANALYSIS_FIELD_NUMBER: builtins.int
timestamp: builtins.str
"""iso8601 format, like: 2019-01-01T00:00:00Z"""
version: builtins.str
@@ -802,8 +900,16 @@ class Metadata(google.protobuf.message.Message):
@property
def sample(self) -> global___Sample: ...
@property
def analysis(self) -> global___Analysis: ...
def analysis(self) -> global___Analysis:
"""deprecated in v7.0.
use analysis2 instead.
"""
flavor: global___Flavor.ValueType
@property
def static_analysis(self) -> global___StaticAnalysis:
"""use analysis2 instead of analysis (deprecated in v7.0)."""
@property
def dynamic_analysis(self) -> global___DynamicAnalysis: ...
def __init__(
self,
*,
@@ -813,9 +919,12 @@ class Metadata(google.protobuf.message.Message):
sample: global___Sample | None = ...,
analysis: global___Analysis | None = ...,
flavor: global___Flavor.ValueType = ...,
static_analysis: global___StaticAnalysis | None = ...,
dynamic_analysis: global___DynamicAnalysis | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["analysis", b"analysis", "sample", b"sample"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["analysis", b"analysis", "argv", b"argv", "flavor", b"flavor", "sample", b"sample", "timestamp", b"timestamp", "version", b"version"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["analysis", b"analysis", "analysis2", b"analysis2", "dynamic_analysis", b"dynamic_analysis", "sample", b"sample", "static_analysis", b"static_analysis"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["analysis", b"analysis", "analysis2", b"analysis2", "argv", b"argv", "dynamic_analysis", b"dynamic_analysis", "flavor", b"flavor", "sample", b"sample", "static_analysis", b"static_analysis", "timestamp", b"timestamp", "version", b"version"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["analysis2", b"analysis2"]) -> typing_extensions.Literal["static_analysis", "dynamic_analysis"] | None: ...
global___Metadata = Metadata
@@ -993,6 +1102,47 @@ class OperandOffsetFeature(google.protobuf.message.Message):
global___OperandOffsetFeature = OperandOffsetFeature
@typing_extensions.final
class ProcessFeatureCount(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
ADDRESS_FIELD_NUMBER: builtins.int
COUNT_FIELD_NUMBER: builtins.int
@property
def address(self) -> global___Address: ...
count: builtins.int
def __init__(
self,
*,
address: global___Address | None = ...,
count: builtins.int = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["address", b"address"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["address", b"address", "count", b"count"]) -> None: ...
global___ProcessFeatureCount = ProcessFeatureCount
@typing_extensions.final
class ProcessLayout(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
ADDRESS_FIELD_NUMBER: builtins.int
MATCHED_THREADS_FIELD_NUMBER: builtins.int
@property
def address(self) -> global___Address: ...
@property
def matched_threads(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ThreadLayout]: ...
def __init__(
self,
*,
address: global___Address | None = ...,
matched_threads: collections.abc.Iterable[global___ThreadLayout] | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["address", b"address"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["address", b"address", "matched_threads", b"matched_threads"]) -> None: ...
global___ProcessLayout = ProcessLayout
@typing_extensions.final
class PropertyFeature(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -1156,11 +1306,15 @@ class RuleMetadata(google.protobuf.message.Message):
LIB_FIELD_NUMBER: builtins.int
MAEC_FIELD_NUMBER: builtins.int
IS_SUBSCOPE_RULE_FIELD_NUMBER: builtins.int
SCOPES_FIELD_NUMBER: builtins.int
name: builtins.str
namespace: builtins.str
@property
def authors(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
scope: global___Scope.ValueType
"""deprecated in v7.0.
use scopes instead.
"""
@property
def attack(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___AttackSpec]: ...
@property
@@ -1174,6 +1328,9 @@ class RuleMetadata(google.protobuf.message.Message):
@property
def maec(self) -> global___MaecMetadata: ...
is_subscope_rule: builtins.bool
@property
def scopes(self) -> global___Scopes:
"""use scopes over scope (deprecated in v7.0)."""
def __init__(
self,
*,
@@ -1189,9 +1346,10 @@ class RuleMetadata(google.protobuf.message.Message):
lib: builtins.bool = ...,
maec: global___MaecMetadata | None = ...,
is_subscope_rule: builtins.bool = ...,
scopes: global___Scopes | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["maec", b"maec"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["attack", b"attack", "authors", b"authors", "description", b"description", "examples", b"examples", "is_subscope_rule", b"is_subscope_rule", "lib", b"lib", "maec", b"maec", "mbc", b"mbc", "name", b"name", "namespace", b"namespace", "references", b"references", "scope", b"scope"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["maec", b"maec", "scopes", b"scopes"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["attack", b"attack", "authors", b"authors", "description", b"description", "examples", b"examples", "is_subscope_rule", b"is_subscope_rule", "lib", b"lib", "maec", b"maec", "mbc", b"mbc", "name", b"name", "namespace", b"namespace", "references", b"references", "scope", b"scope", "scopes", b"scopes"]) -> None: ...
global___RuleMetadata = RuleMetadata
@@ -1219,6 +1377,29 @@ class Sample(google.protobuf.message.Message):
global___Sample = Sample
@typing_extensions.final
class Scopes(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
STATIC_FIELD_NUMBER: builtins.int
DYNAMIC_FIELD_NUMBER: builtins.int
static: global___Scope.ValueType
dynamic: global___Scope.ValueType
def __init__(
self,
*,
static: global___Scope.ValueType | None = ...,
dynamic: global___Scope.ValueType | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_dynamic", b"_dynamic", "_static", b"_static", "dynamic", b"dynamic", "static", b"static"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_dynamic", b"_dynamic", "_static", b"_static", "dynamic", b"dynamic", "static", b"static"]) -> None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_dynamic", b"_dynamic"]) -> typing_extensions.Literal["dynamic"] | None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_static", b"_static"]) -> typing_extensions.Literal["static"] | None: ...
global___Scopes = Scopes
@typing_extensions.final
class SectionFeature(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -1298,6 +1479,86 @@ class StatementNode(google.protobuf.message.Message):
global___StatementNode = StatementNode
@typing_extensions.final
class StaticAnalysis(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FORMAT_FIELD_NUMBER: builtins.int
ARCH_FIELD_NUMBER: builtins.int
OS_FIELD_NUMBER: builtins.int
EXTRACTOR_FIELD_NUMBER: builtins.int
RULES_FIELD_NUMBER: builtins.int
BASE_ADDRESS_FIELD_NUMBER: builtins.int
LAYOUT_FIELD_NUMBER: builtins.int
FEATURE_COUNTS_FIELD_NUMBER: builtins.int
LIBRARY_FUNCTIONS_FIELD_NUMBER: builtins.int
format: builtins.str
arch: builtins.str
os: builtins.str
extractor: builtins.str
@property
def rules(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
@property
def base_address(self) -> global___Address: ...
@property
def layout(self) -> global___StaticLayout: ...
@property
def feature_counts(self) -> global___StaticFeatureCounts: ...
@property
def library_functions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___LibraryFunction]: ...
def __init__(
self,
*,
format: builtins.str = ...,
arch: builtins.str = ...,
os: builtins.str = ...,
extractor: builtins.str = ...,
rules: collections.abc.Iterable[builtins.str] | None = ...,
base_address: global___Address | None = ...,
layout: global___StaticLayout | None = ...,
feature_counts: global___StaticFeatureCounts | None = ...,
library_functions: collections.abc.Iterable[global___LibraryFunction] | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["base_address", b"base_address", "feature_counts", b"feature_counts", "layout", b"layout"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["arch", b"arch", "base_address", b"base_address", "extractor", b"extractor", "feature_counts", b"feature_counts", "format", b"format", "layout", b"layout", "library_functions", b"library_functions", "os", b"os", "rules", b"rules"]) -> None: ...
global___StaticAnalysis = StaticAnalysis
@typing_extensions.final
class StaticFeatureCounts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FILE_FIELD_NUMBER: builtins.int
FUNCTIONS_FIELD_NUMBER: builtins.int
file: builtins.int
@property
def functions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___FunctionFeatureCount]: ...
def __init__(
self,
*,
file: builtins.int = ...,
functions: collections.abc.Iterable[global___FunctionFeatureCount] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["file", b"file", "functions", b"functions"]) -> None: ...
global___StaticFeatureCounts = StaticFeatureCounts
@typing_extensions.final
class StaticLayout(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FUNCTIONS_FIELD_NUMBER: builtins.int
@property
def functions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___FunctionLayout]: ...
def __init__(
self,
*,
functions: collections.abc.Iterable[global___FunctionLayout] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["functions", b"functions"]) -> None: ...
global___StaticLayout = StaticLayout
@typing_extensions.final
class StringFeature(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -1367,6 +1628,23 @@ class SubstringFeature(google.protobuf.message.Message):
global___SubstringFeature = SubstringFeature
@typing_extensions.final
class ThreadLayout(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
ADDRESS_FIELD_NUMBER: builtins.int
@property
def address(self) -> global___Address: ...
def __init__(
self,
*,
address: global___Address | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["address", b"address"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["address", b"address"]) -> None: ...
global___ThreadLayout = ThreadLayout
@typing_extensions.final
class Addresses(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -1425,6 +1703,81 @@ class Token_Offset(google.protobuf.message.Message):
global___Token_Offset = Token_Offset
@typing_extensions.final
class Ppid_Pid(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PPID_FIELD_NUMBER: builtins.int
PID_FIELD_NUMBER: builtins.int
@property
def ppid(self) -> global___Integer: ...
@property
def pid(self) -> global___Integer: ...
def __init__(
self,
*,
ppid: global___Integer | None = ...,
pid: global___Integer | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["pid", b"pid", "ppid", b"ppid"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["pid", b"pid", "ppid", b"ppid"]) -> None: ...
global___Ppid_Pid = Ppid_Pid
@typing_extensions.final
class Ppid_Pid_Tid(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PPID_FIELD_NUMBER: builtins.int
PID_FIELD_NUMBER: builtins.int
TID_FIELD_NUMBER: builtins.int
@property
def ppid(self) -> global___Integer: ...
@property
def pid(self) -> global___Integer: ...
@property
def tid(self) -> global___Integer: ...
def __init__(
self,
*,
ppid: global___Integer | None = ...,
pid: global___Integer | None = ...,
tid: global___Integer | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["pid", b"pid", "ppid", b"ppid", "tid", b"tid"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["pid", b"pid", "ppid", b"ppid", "tid", b"tid"]) -> None: ...
global___Ppid_Pid_Tid = Ppid_Pid_Tid
@typing_extensions.final
class Ppid_Pid_Tid_Id(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PPID_FIELD_NUMBER: builtins.int
PID_FIELD_NUMBER: builtins.int
TID_FIELD_NUMBER: builtins.int
ID_FIELD_NUMBER: builtins.int
@property
def ppid(self) -> global___Integer: ...
@property
def pid(self) -> global___Integer: ...
@property
def tid(self) -> global___Integer: ...
@property
def id(self) -> global___Integer: ...
def __init__(
self,
*,
ppid: global___Integer | None = ...,
pid: global___Integer | None = ...,
tid: global___Integer | None = ...,
id: global___Integer | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["id", b"id", "pid", b"pid", "ppid", b"ppid", "tid", b"tid"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["id", b"id", "pid", b"pid", "ppid", b"ppid", "tid", b"tid"]) -> None: ...
global___Ppid_Pid_Tid_Id = Ppid_Pid_Tid_Id
@typing_extensions.final
class Integer(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
+8 -20
View File
@@ -271,7 +271,6 @@ def render_rules(ostream, doc: rd.ResultDocument):
"""
functions_by_bb: Dict[capa.features.address.Address, capa.features.address.Address] = {}
processes_by_thread: Dict[capa.features.address.Address, capa.features.address.Address] = {}
if isinstance(doc.meta.analysis, rd.StaticAnalysis):
for finfo in doc.meta.analysis.layout.functions:
faddress = finfo.address.to_capa()
@@ -280,12 +279,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
bbaddress = bb.address.to_capa()
functions_by_bb[bbaddress] = faddress
elif isinstance(doc.meta.analysis, rd.DynamicAnalysis):
for pinfo in doc.meta.analysis.layout.processes:
paddress = pinfo.address.to_capa()
for thread in pinfo.matched_threads:
taddress = thread.address.to_capa()
processes_by_thread[taddress] = paddress
pass
else:
raise ValueError("invalid analysis field in the document's meta")
@@ -336,12 +330,11 @@ def render_rules(ostream, doc: rd.ResultDocument):
rows.append(("author", ", ".join(rule.meta.authors)))
rows.append(("scopes", ""))
if rule.meta.scopes.static:
rows.append((" static:", str(rule.meta.scopes.static)))
if doc.meta.flavor == rd.Flavor.STATIC:
rows.append(("scope", f"{rule.meta.scopes.static}"))
if rule.meta.scopes.dynamic:
rows.append((" dynamic:", str(rule.meta.scopes.dynamic)))
if doc.meta.flavor == rd.Flavor.DYNAMIC:
rows.append(("scope", f"{rule.meta.scopes.dynamic}"))
if rule.meta.attack:
rows.append(("att&ck", ", ".join([rutils.format_parts_id(v) for v in rule.meta.attack])))
@@ -376,6 +369,9 @@ def render_rules(ostream, doc: rd.ResultDocument):
else:
capa.helpers.assert_never(doc.meta.flavor)
# TODO(mr-tz): process rendering should use human-readable name
# https://github.com/mandiant/capa/issues/1816
ostream.write(" @ ")
ostream.write(capa.render.verbose.format_address(location))
@@ -385,14 +381,6 @@ def render_rules(ostream, doc: rd.ResultDocument):
+ capa.render.verbose.format_address(frz.Address.from_capa(functions_by_bb[location.to_capa()]))
)
if doc.meta.flavor == rd.Flavor.DYNAMIC and rule.meta.scopes.dynamic == capa.rules.Scope.THREAD:
ostream.write(
" in process "
+ capa.render.verbose.format_address(
frz.Address.from_capa(processes_by_thread[location.to_capa()])
)
)
ostream.write("\n")
render_match(ostream, match, indent=1)
if rule.meta.lib:
+71
View File
@@ -8,6 +8,8 @@
import io
import re
import gzip
import json
import uuid
import codecs
import logging
@@ -326,6 +328,68 @@ def ensure_feature_valid_for_scopes(scopes: Scopes, feature: Union[Feature, Stat
raise InvalidRule(f"feature {feature} not supported for scopes {scopes}")
class ComType(Enum):
CLASS = "class"
INTERFACE = "interface"
# COM data source https://github.com/stevemk14ebr/COM-Code-Helper/tree/master
VALID_COM_TYPES = {
ComType.CLASS: {"db_path": "assets/classes.json.gz", "prefix": "CLSID_"},
ComType.INTERFACE: {"db_path": "assets/interfaces.json.gz", "prefix": "IID_"},
}
@lru_cache(maxsize=None)
def load_com_database(com_type: ComType) -> Dict[str, List[str]]:
com_db_path: Path = capa.main.get_default_root() / VALID_COM_TYPES[com_type]["db_path"]
if not com_db_path.exists():
raise IOError(f"COM database path '{com_db_path}' does not exist or cannot be accessed")
try:
with gzip.open(com_db_path, "rb") as gzfile:
return json.loads(gzfile.read().decode("utf-8"))
except Exception as e:
raise IOError(f"Error loading COM database from '{com_db_path}'") from e
def translate_com_feature(com_name: str, com_type: ComType) -> ceng.Or:
com_db = load_com_database(com_type)
guid_strings: Optional[List[str]] = com_db.get(com_name)
if guid_strings is None or len(guid_strings) == 0:
logger.error(" %s doesn't exist in COM %s database", com_name, com_type)
raise InvalidRule(f"'{com_name}' doesn't exist in COM {com_type} database")
com_features: List = []
for guid_string in guid_strings:
hex_chars = guid_string.replace("-", "")
h = [hex_chars[i : i + 2] for i in range(0, len(hex_chars), 2)]
reordered_hex_pairs = [
h[3],
h[2],
h[1],
h[0],
h[5],
h[4],
h[7],
h[6],
h[8],
h[9],
h[10],
h[11],
h[12],
h[13],
h[14],
h[15],
]
guid_bytes = bytes.fromhex("".join(reordered_hex_pairs))
prefix = VALID_COM_TYPES[com_type]["prefix"]
com_features.append(capa.features.common.StringFactory(guid_string, f"{prefix+com_name} as GUID string"))
com_features.append(capa.features.common.Bytes(guid_bytes, f"{prefix+com_name} as bytes"))
return ceng.Or(com_features)
def parse_int(s: str) -> int:
if s.startswith("0x"):
return int(s, 0x10)
@@ -742,6 +806,13 @@ def build_statements(d, scopes: Scopes):
ensure_feature_valid_for_scopes(scopes, feature)
return feature
elif key.startswith("com/"):
com_type = str(key[len("com/") :]).upper()
if com_type not in [item.name for item in ComType]:
raise InvalidRule(f"unexpected COM type: {com_type}")
value, description = parse_description(d[key], key, d.get("description"))
return translate_com_feature(value, ComType[com_type])
else:
Feature = parse_feature(key)
value, description = parse_description(d[key], key, d.get("description"))
+6 -5
View File
@@ -105,27 +105,28 @@ To install these development dependencies, run:
We use [pre-commit](https://pre-commit.com/) so that its trivial to run the same linters & configuration locally as in CI.
Run all linters liks:
Run all linters like:
pre-commit run --all-files
pre-commit run --hook-stage=manual --all-files
isort....................................................................Passed
black....................................................................Passed
ruff.....................................................................Passed
flake8...................................................................Passed
mypy.....................................................................Passed
pytest (fast)............................................................Passed
Or run a single linter like:
pre-commit run --all-files isort
pre-commit run --all-files --hook-stage=manual isort
isort....................................................................Passed
Importantly, you can configure pre-commit to run automatically before every commit by running:
pre-commit install --hook-type pre-commit
pre-commit install --hook-type=pre-commit
pre-commit installed at .git/hooks/pre-commit
pre-commit install --hook-type pre-push
pre-commit install --hook-type=pre-push
pre-commit installed at .git/hooks/pre-push
This way you can ensure that you don't commit code style or formatting offenses.
+1 -1
Submodule rules updated: c88979a1bc...94de0355cd
+20
View File
@@ -318,6 +318,8 @@ def get_data_path_by_name(name) -> Path:
return CD / "data" / "499c2a85f6e8142c3f48d4251c9c7cd6.raw32"
elif name.startswith("9324d"):
return CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_"
elif name.startswith("395eb"):
return CD / "data" / "395eb0ddd99d2c9e37b6d0b73485ee9c.exe_"
elif name.startswith("a1982"):
return CD / "data" / "a198216798ca38f280dc413f8c57f2c2.exe_"
elif name.startswith("a933a"):
@@ -1346,6 +1348,11 @@ def z9324d_extractor():
return get_extractor(get_data_path_by_name("9324d..."))
@pytest.fixture
def z395eb_extractor():
return get_extractor(get_data_path_by_name("395eb..."))
@pytest.fixture
def pma12_04_extractor():
return get_extractor(get_data_path_by_name("pma12-04"))
@@ -1432,29 +1439,42 @@ def get_result_doc(path: Path):
@pytest.fixture
def pma0101_rd():
# python -m capa.main tests/data/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_ --json > tests/data/rd/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_.json
return get_result_doc(CD / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json")
@pytest.fixture
def dotnet_1c444e_rd():
# .NET sample
# python -m capa.main tests/data/dotnet/1c444ebeba24dcba8628b7dfe5fec7c6.exe_ --json > tests/data/rd/1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json
return get_result_doc(CD / "data" / "rd" / "1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json")
@pytest.fixture
def a3f3bbc_rd():
# python -m capa.main tests/data/3f3bbcf8fd90bdcdcdc5494314ed4225.exe_ --json > tests/data/rd/3f3bbcf8fd90bdcdcdc5494314ed4225.exe_.json
return get_result_doc(CD / "data" / "rd" / "3f3bbcf8fd90bdcdcdc5494314ed4225.exe_.json")
@pytest.fixture
def al_khaserx86_rd():
# python -m capa.main tests/data/al-khaser_x86.exe_ --json > tests/data/rd/al-khaser_x86.exe_.json
return get_result_doc(CD / "data" / "rd" / "al-khaser_x86.exe_.json")
@pytest.fixture
def al_khaserx64_rd():
# python -m capa.main tests/data/al-khaser_x64.exe_ --json > tests/data/rd/al-khaser_x64.exe_.json
return get_result_doc(CD / "data" / "rd" / "al-khaser_x64.exe_.json")
@pytest.fixture
def a076114_rd():
# python -m capa.main tests/data/0761142efbda6c4b1e801223de723578.dll_ --json > tests/data/rd/0761142efbda6c4b1e801223de723578.dll_.json
return get_result_doc(CD / "data" / "rd" / "0761142efbda6c4b1e801223de723578.dll_.json")
@pytest.fixture
def dynamic_a0000a6_rd():
# python -m capa.main tests/data/dynamic/cape/v2.2/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json --json > tests/data/rd/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json
return get_result_doc(CD / "data" / "rd" / "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json")
+95 -38
View File
@@ -46,7 +46,7 @@ def test_doc_to_pb2(request, rd_file):
assert matches.meta.name == m.name
assert cmp_optional(matches.meta.namespace, m.namespace)
assert list(matches.meta.authors) == m.authors
assert capa.render.proto.scope_to_pb2(matches.meta.scope) == m.scope
assert capa.render.proto.scopes_to_pb2(matches.meta.scopes) == m.scopes
assert len(matches.meta.attack) == len(m.attack)
for rd_attack, proto_attack in zip(matches.meta.attack, m.attack):
@@ -116,10 +116,27 @@ def test_addr_to_pb2():
def test_scope_to_pb2():
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.FILE)) == capa_pb2.SCOPE_FILE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.FUNCTION)) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.BASIC_BLOCK)) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.INSTRUCTION)) == capa_pb2.SCOPE_INSTRUCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.FILE) == capa_pb2.SCOPE_FILE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.FUNCTION) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.BASIC_BLOCK) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.INSTRUCTION) == capa_pb2.SCOPE_INSTRUCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.PROCESS) == capa_pb2.SCOPE_PROCESS
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.THREAD) == capa_pb2.SCOPE_THREAD
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.CALL) == capa_pb2.SCOPE_CALL
def test_scopes_to_pb2():
assert capa.render.proto.scopes_to_pb2(
capa.rules.Scopes.from_dict({"static": "file", "dynamic": "file"})
) == capa_pb2.Scopes(
static=capa_pb2.SCOPE_FILE,
dynamic=capa_pb2.SCOPE_FILE,
)
assert capa.render.proto.scopes_to_pb2(
capa.rules.Scopes.from_dict({"static": "file", "dynamic": "unsupported"})
) == capa_pb2.Scopes(
static=capa_pb2.SCOPE_FILE,
)
def cmp_optional(a: Any, b: Any) -> bool:
@@ -128,8 +145,59 @@ def cmp_optional(a: Any, b: Any) -> bool:
return a == b
def assert_static_analyis(analysis: rd.StaticAnalysis, dst: capa_pb2.StaticAnalysis):
assert analysis.format == dst.format
assert analysis.arch == dst.arch
assert analysis.os == dst.os
assert analysis.extractor == dst.extractor
assert list(analysis.rules) == dst.rules
assert capa.render.proto.addr_to_pb2(analysis.base_address) == dst.base_address
assert len(analysis.layout.functions) == len(dst.layout.functions)
for rd_f, proto_f in zip(analysis.layout.functions, dst.layout.functions):
assert capa.render.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert len(rd_f.matched_basic_blocks) == len(proto_f.matched_basic_blocks)
for rd_bb, proto_bb in zip(rd_f.matched_basic_blocks, proto_f.matched_basic_blocks):
assert capa.render.proto.addr_to_pb2(rd_bb.address) == proto_bb.address
assert analysis.feature_counts.file == dst.feature_counts.file
assert len(analysis.feature_counts.functions) == len(dst.feature_counts.functions)
for rd_cf, proto_cf in zip(analysis.feature_counts.functions, dst.feature_counts.functions):
assert capa.render.proto.addr_to_pb2(rd_cf.address) == proto_cf.address
assert rd_cf.count == proto_cf.count
assert len(analysis.library_functions) == len(dst.library_functions)
for rd_lf, proto_lf in zip(analysis.library_functions, dst.library_functions):
assert capa.render.proto.addr_to_pb2(rd_lf.address) == proto_lf.address
assert rd_lf.name == proto_lf.name
def assert_dynamic_analyis(analysis: rd.DynamicAnalysis, dst: capa_pb2.DynamicAnalysis):
assert analysis.format == dst.format
assert analysis.arch == dst.arch
assert analysis.os == dst.os
assert analysis.extractor == dst.extractor
assert list(analysis.rules) == dst.rules
assert len(analysis.layout.processes) == len(dst.layout.processes)
for rd_p, proto_p in zip(analysis.layout.processes, dst.layout.processes):
assert capa.render.proto.addr_to_pb2(rd_p.address) == proto_p.address
assert len(rd_p.matched_threads) == len(proto_p.matched_threads)
for rd_t, proto_t in zip(rd_p.matched_threads, proto_p.matched_threads):
assert capa.render.proto.addr_to_pb2(rd_t.address) == proto_t.address
assert analysis.feature_counts.processes == dst.feature_counts.processes
assert len(analysis.feature_counts.processes) == len(dst.feature_counts.processes)
for rd_cp, proto_cp in zip(analysis.feature_counts.processes, dst.feature_counts.processes):
assert capa.render.proto.addr_to_pb2(rd_cp.address) == proto_cp.address
assert rd_cp.count == proto_cp.count
def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert isinstance(rd.Metadata.analysis, rd.StaticAnalysis)
assert isinstance(meta.analysis, rd.StaticAnalysis)
assert str(meta.timestamp) == dst.timestamp
assert meta.version == dst.version
if meta.argv is None:
@@ -142,32 +210,18 @@ def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert meta.sample.sha256 == dst.sample.sha256
assert meta.sample.path == dst.sample.path
assert meta.analysis.format == dst.analysis.format
assert meta.analysis.arch == dst.analysis.arch
assert meta.analysis.os == dst.analysis.os
assert meta.analysis.extractor == dst.analysis.extractor
assert list(meta.analysis.rules) == dst.analysis.rules
assert capa.render.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address
assert isinstance(rd.Metadata.analysis.layout, rd.StaticLayout)
assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions)
for rd_f, proto_f in zip(meta.analysis.layout.functions, dst.analysis.layout.functions):
assert capa.render.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert len(rd_f.matched_basic_blocks) == len(proto_f.matched_basic_blocks)
for rd_bb, proto_bb in zip(rd_f.matched_basic_blocks, proto_f.matched_basic_blocks):
assert capa.render.proto.addr_to_pb2(rd_bb.address) == proto_bb.address
assert meta.analysis.feature_counts.file == dst.analysis.feature_counts.file
assert len(meta.analysis.feature_counts.functions) == len(dst.analysis.feature_counts.functions)
for rd_cf, proto_cf in zip(meta.analysis.feature_counts.functions, dst.analysis.feature_counts.functions):
assert capa.render.proto.addr_to_pb2(rd_cf.address) == proto_cf.address
assert rd_cf.count == proto_cf.count
assert len(meta.analysis.library_functions) == len(dst.analysis.library_functions)
for rd_lf, proto_lf in zip(meta.analysis.library_functions, dst.analysis.library_functions):
assert capa.render.proto.addr_to_pb2(rd_lf.address) == proto_lf.address
assert rd_lf.name == proto_lf.name
if meta.flavor == rd.Flavor.STATIC:
assert dst.flavor == capa_pb2.FLAVOR_STATIC
assert dst.WhichOneof("analysis2") == "static_analysis"
assert isinstance(meta.analysis, rd.StaticAnalysis)
assert_static_analyis(meta.analysis, dst.static_analysis)
elif meta.flavor == rd.Flavor.DYNAMIC:
assert dst.flavor == capa_pb2.FLAVOR_DYNAMIC
assert dst.WhichOneof("analysis2") == "dynamic_analysis"
assert isinstance(meta.analysis, rd.DynamicAnalysis)
assert_dynamic_analyis(meta.analysis, dst.dynamic_analysis)
else:
assert_never(dst.flavor)
def assert_match(ma: rd.Match, mb: capa_pb2.Match):
@@ -320,20 +374,22 @@ def assert_round_trip(doc: rd.ResultDocument):
# show the round trip works
# first by comparing the objects directly,
# which works thanks to pydantic model equality.
assert one.meta == two.meta
assert one.rules == two.rules
assert one == two
# second by showing their protobuf representations are the same.
assert capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True) == capa.render.proto.doc_to_pb2(
two
).SerializeToString(deterministic=True)
one_bytes = capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True)
two_bytes = capa.render.proto.doc_to_pb2(two).SerializeToString(deterministic=True)
assert one_bytes == two_bytes
# now show that two different versions are not equal.
three = copy.deepcopy(two)
three.meta.__dict__.update({"version": "0.0.0"})
assert one.meta.version != three.meta.version
assert one != three
assert capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True) != capa.render.proto.doc_to_pb2(
three
).SerializeToString(deterministic=True)
three_bytes = capa.render.proto.doc_to_pb2(three).SerializeToString(deterministic=True)
assert one_bytes != three_bytes
@pytest.mark.parametrize(
@@ -345,6 +401,7 @@ def assert_round_trip(doc: rd.ResultDocument):
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
pytest.param("dynamic_a0000a6_rd"),
],
)
def test_round_trip(request, rd_file):
+72
View File
@@ -1531,3 +1531,75 @@ def test_property_access_symbol():
)
is True
)
def test_translate_com_features():
r = capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: basic block
dynamic: call
features:
- com/class: WICPngDecoder
# 389ea17b-5078-4cde-b6ef-25c15175c751 WICPngDecoder
# e018945b-aa86-4008-9bd4-6777a1e40c11 WICPngDecoder
"""
)
)
com_name = "WICPngDecoder"
com_features = [
capa.features.common.Bytes(b"{\xa1\x9e8xP\xdeL\xb6\xef%\xc1Qu\xc7Q", f"CLSID_{com_name} as bytes"),
capa.features.common.StringFactory("389ea17b-5078-4cde-b6ef-25c15175c751", f"CLSID_{com_name} as GUID string"),
capa.features.common.Bytes(b"[\x94\x18\xe0\x86\xaa\x08@\x9b\xd4gw\xa1\xe4\x0c\x11", f"IID_{com_name} as bytes"),
capa.features.common.StringFactory("e018945b-aa86-4008-9bd4-6777a1e40c11", f"IID_{com_name} as GUID string"),
]
assert set(com_features) == set(r.statement.get_children())
def test_invalid_com_features():
# test for unknown COM class
with pytest.raises(capa.rules.InvalidRule):
_ = capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
features:
- com/class: invalid_com
"""
)
)
# test for unknown COM interface
with pytest.raises(capa.rules.InvalidRule):
_ = capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
features:
- com/interface: invalid_com
"""
)
)
# test for invalid COM type
# valid_com_types = "class", "interface"
with pytest.raises(capa.rules.InvalidRule):
_ = capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
features:
- com/invalid_COM_type: WICPngDecoder
"""
)
)