Merge pull request #1765 from mandiant/fix/dynamic-proto

protobuf: add dynamic support
This commit is contained in:
Willi Ballenthin
2023-10-19 13:37:45 +02:00
committed by GitHub
11 changed files with 1072 additions and 3891 deletions

View File

@@ -1,12 +1,5 @@
[mypy]
# TODO(yelhamer): remove this once proto has been added
# for the dynamic rendering
exclude = (?x)(
^capa/render/proto/__init__.py$
| ^tests/_test_proto.py$
)
[mypy-halo.*]
ignore_missing_imports = True

View File

@@ -43,9 +43,9 @@ jobs:
- name: Lint with black
run: pre-commit run black --show-diff-on-failure
- name: Lint with flake8
run: pre-commit run flake8
run: pre-commit run flake8 --hook-stage manual
- name: Check types with mypy
run: pre-commit run mypy
run: pre-commit run mypy --hook-stage manual
rule_linter:
runs-on: ubuntu-20.04
@@ -95,6 +95,10 @@ jobs:
run: sudo apt-get install -y libyaml-dev
- name: Install capa
run: pip install -e .[dev]
- name: Run tests (fast)
# this set of tests runs about 80% of the cases in 20% of the time,
# and should catch most errors quickly.
run: pre-commit run pytest-fast --all-files --hook-stage manual
- name: Run tests
run: pytest -v tests/
@@ -103,7 +107,7 @@ jobs:
env:
BN_SERIAL: ${{ secrets.BN_SERIAL }}
runs-on: ubuntu-20.04
needs: [code_style, rule_linter]
needs: [tests]
strategy:
fail-fast: false
matrix:
@@ -143,7 +147,7 @@ jobs:
ghidra-tests:
name: Ghidra tests for ${{ matrix.python-version }}
runs-on: ubuntu-20.04
needs: [code_style, rule_linter]
needs: [tests]
strategy:
fail-fast: false
matrix:
@@ -197,4 +201,4 @@ jobs:
cat ../output.log
exit_code=$(cat ../output.log | grep exit | awk '{print $NF}')
exit $exit_code

View File

@@ -25,7 +25,7 @@ repos:
hooks:
- id: isort
name: isort
stages: [commit, push]
stages: [commit, push, manual]
language: system
entry: isort
args:
@@ -45,7 +45,7 @@ repos:
hooks:
- id: black
name: black
stages: [commit, push]
stages: [commit, push, manual]
language: system
entry: black
args:
@@ -62,7 +62,7 @@ repos:
hooks:
- id: ruff
name: ruff
stages: [commit, push]
stages: [commit, push, manual]
language: system
entry: ruff
args:
@@ -79,7 +79,7 @@ repos:
hooks:
- id: flake8
name: flake8
stages: [commit, push]
stages: [push, manual]
language: system
entry: flake8
args:
@@ -97,7 +97,7 @@ repos:
hooks:
- id: mypy
name: mypy
stages: [commit, push]
stages: [push, manual]
language: system
entry: mypy
args:
@@ -109,3 +109,21 @@ repos:
- "tests/"
always_run: true
pass_filenames: false
- repo: local
hooks:
- id: pytest-fast
name: pytest (fast)
stages: [manual]
language: system
entry: pytest
args:
- "tests/"
- "--ignore=tests/test_binja_features.py"
- "--ignore=tests/test_ghidra_features.py"
- "--ignore=tests/test_ida_features.py"
- "--ignore=tests/test_viv_features.py"
- "--ignore=tests/test_main.py"
- "--ignore=tests/test_scripts.py"
always_run: true
pass_filenames: false

View File

@@ -12,10 +12,13 @@
- binja: add support for forwarded exports #1646 @xusheng6
- binja: add support for symtab names #1504 @xusheng6
- add com class/interface features #322 @Aayush-goel-04
- protobuf: add `Metadata.flavor` @williballenthin
### Breaking Changes
- remove the `SCOPE_*` constants in favor of the `Scope` enum #1764 @williballenthin
- protobuf: deprecate `RuleMetadata.scope` in favor of `RuleMetadata.scopes` @williballenthin
- protobuf: deprecate `Metadata.analysis` in favor of `Metadata.analysis2` that is dynamic analysis aware @williballenthin
### New Rules (19)

View File

@@ -38,16 +38,6 @@ from capa.helpers import assert_never
from capa.features.freeze import AddressType
def dict_tuple_to_list_values(d: Dict) -> Dict:
o = {}
for k, v in d.items():
if isinstance(v, tuple):
o[k] = list(v)
else:
o[k] = v
return o
def int_to_pb2(v: int) -> capa_pb2.Integer:
if v < -2_147_483_648:
raise ValueError(f"value underflow: {v}")
@@ -100,6 +90,51 @@ def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address:
token_offset=capa_pb2.Token_Offset(token=int_to_pb2(token), offset=offset),
)
elif addr.type is AddressType.PROCESS:
assert isinstance(addr.value, tuple)
ppid, pid = addr.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_PROCESS,
ppid_pid=capa_pb2.Ppid_Pid(
ppid=int_to_pb2(ppid),
pid=int_to_pb2(pid),
),
)
elif addr.type is AddressType.THREAD:
assert isinstance(addr.value, tuple)
ppid, pid, tid = addr.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
assert isinstance(tid, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_THREAD,
ppid_pid_tid=capa_pb2.Ppid_Pid_Tid(
ppid=int_to_pb2(ppid),
pid=int_to_pb2(pid),
tid=int_to_pb2(tid),
),
)
elif addr.type is AddressType.CALL:
assert isinstance(addr.value, tuple)
ppid, pid, tid, id_ = addr.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
assert isinstance(tid, int)
assert isinstance(id_, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_CALL,
ppid_pid_tid_id=capa_pb2.Ppid_Pid_Tid_Id(
ppid=int_to_pb2(ppid),
pid=int_to_pb2(pid),
tid=int_to_pb2(tid),
id=int_to_pb2(id_),
),
)
elif addr.type is AddressType.NO_ADDRESS:
# value == None, so only set type
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS)
@@ -117,10 +152,26 @@ def scope_to_pb2(scope: capa.rules.Scope) -> capa_pb2.Scope.ValueType:
return capa_pb2.Scope.SCOPE_BASIC_BLOCK
elif scope == capa.rules.Scope.INSTRUCTION:
return capa_pb2.Scope.SCOPE_INSTRUCTION
elif scope == capa.rules.Scope.PROCESS:
return capa_pb2.Scope.SCOPE_PROCESS
elif scope == capa.rules.Scope.THREAD:
return capa_pb2.Scope.SCOPE_THREAD
elif scope == capa.rules.Scope.CALL:
return capa_pb2.Scope.SCOPE_CALL
else:
assert_never(scope)
def scopes_to_pb2(scopes: capa.rules.Scopes) -> capa_pb2.Scopes:
doc = {}
if scopes.static:
doc["static"] = scope_to_pb2(scopes.static)
if scopes.dynamic:
doc["dynamic"] = scope_to_pb2(scopes.dynamic)
return google.protobuf.json_format.ParseDict(doc, capa_pb2.Scopes())
def flavor_to_pb2(flavor: rd.Flavor) -> capa_pb2.Flavor.ValueType:
if flavor == rd.Flavor.STATIC:
return capa_pb2.Flavor.FLAVOR_STATIC
@@ -130,47 +181,87 @@ def flavor_to_pb2(flavor: rd.Flavor) -> capa_pb2.Flavor.ValueType:
assert_never(flavor)
def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
assert isinstance(meta.analysis, rd.StaticAnalysis)
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.model_dump(), capa_pb2.Sample()),
flavor=flavor_to_pb2(meta.flavor),
analysis=capa_pb2.Analysis(
format=meta.analysis.format,
arch=meta.analysis.arch,
os=meta.analysis.os,
extractor=meta.analysis.extractor,
rules=list(meta.analysis.rules),
base_address=addr_to_pb2(meta.analysis.base_address),
layout=capa_pb2.Layout(
functions=[
capa_pb2.FunctionLayout(
address=addr_to_pb2(f.address),
matched_basic_blocks=[
capa_pb2.BasicBlockLayout(address=addr_to_pb2(bb.address)) for bb in f.matched_basic_blocks
],
)
for f in meta.analysis.layout.functions
]
),
feature_counts=capa_pb2.FeatureCounts(
file=meta.analysis.feature_counts.file,
functions=[
capa_pb2.FunctionFeatureCount(address=addr_to_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
],
),
library_functions=[
capa_pb2.LibraryFunction(address=addr_to_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
def static_analysis_to_pb2(analysis: rd.StaticAnalysis) -> capa_pb2.StaticAnalysis:
return capa_pb2.StaticAnalysis(
format=analysis.format,
arch=analysis.arch,
os=analysis.os,
extractor=analysis.extractor,
rules=list(analysis.rules),
base_address=addr_to_pb2(analysis.base_address),
layout=capa_pb2.StaticLayout(
functions=[
capa_pb2.FunctionLayout(
address=addr_to_pb2(f.address),
matched_basic_blocks=[
capa_pb2.BasicBlockLayout(address=addr_to_pb2(bb.address)) for bb in f.matched_basic_blocks
],
)
for f in analysis.layout.functions
]
),
feature_counts=capa_pb2.StaticFeatureCounts(
file=analysis.feature_counts.file,
functions=[
capa_pb2.FunctionFeatureCount(address=addr_to_pb2(f.address), count=f.count)
for f in analysis.feature_counts.functions
],
),
library_functions=[
capa_pb2.LibraryFunction(address=addr_to_pb2(lf.address), name=lf.name) for lf in analysis.library_functions
],
)
def dynamic_analysis_to_pb2(analysis: rd.DynamicAnalysis) -> capa_pb2.DynamicAnalysis:
return capa_pb2.DynamicAnalysis(
format=analysis.format,
arch=analysis.arch,
os=analysis.os,
extractor=analysis.extractor,
rules=list(analysis.rules),
layout=capa_pb2.DynamicLayout(
processes=[
capa_pb2.ProcessLayout(
address=addr_to_pb2(p.address),
matched_threads=[capa_pb2.ThreadLayout(address=addr_to_pb2(t.address)) for t in p.matched_threads],
)
for p in analysis.layout.processes
]
),
feature_counts=capa_pb2.DynamicFeatureCounts(
file=analysis.feature_counts.file,
processes=[
capa_pb2.ProcessFeatureCount(address=addr_to_pb2(p.address), count=p.count)
for p in analysis.feature_counts.processes
],
),
)
def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
if isinstance(meta.analysis, rd.StaticAnalysis):
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.model_dump(), capa_pb2.Sample()),
flavor=flavor_to_pb2(meta.flavor),
static_analysis=static_analysis_to_pb2(meta.analysis),
)
elif isinstance(meta.analysis, rd.DynamicAnalysis):
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.model_dump(), capa_pb2.Sample()),
flavor=flavor_to_pb2(meta.flavor),
dynamic_analysis=dynamic_analysis_to_pb2(meta.analysis),
)
else:
assert_never(meta.analysis)
def statement_to_pb2(statement: rd.Statement) -> capa_pb2.StatementNode:
if isinstance(statement, rd.RangeStatement):
return capa_pb2.StatementNode(
@@ -401,15 +492,51 @@ def match_to_pb2(match: rd.Match) -> capa_pb2.Match:
assert_never(match)
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
# after manual type conversions to the RuleMetadata, we can rely on the protobuf json parser
# conversions include tuple -> list and rd.Enum -> proto.enum
meta = dict_tuple_to_list_values(rule_metadata.model_dump())
meta["scope"] = scope_to_pb2(meta["scope"])
meta["attack"] = list(map(dict_tuple_to_list_values, meta.get("attack", [])))
meta["mbc"] = list(map(dict_tuple_to_list_values, meta.get("mbc", [])))
def attack_to_pb2(attack: rd.AttackSpec) -> capa_pb2.AttackSpec:
return capa_pb2.AttackSpec(
parts=list(attack.parts),
tactic=attack.tactic,
technique=attack.technique,
subtechnique=attack.subtechnique,
id=attack.id,
)
return google.protobuf.json_format.ParseDict(meta, capa_pb2.RuleMetadata())
def mbc_to_pb2(mbc: rd.MBCSpec) -> capa_pb2.MBCSpec:
return capa_pb2.MBCSpec(
parts=list(mbc.parts),
objective=mbc.objective,
behavior=mbc.behavior,
method=mbc.method,
id=mbc.id,
)
def maec_to_pb2(maec: rd.MaecMetadata) -> capa_pb2.MaecMetadata:
return capa_pb2.MaecMetadata(
analysis_conclusion=maec.analysis_conclusion or "",
analysis_conclusion_ov=maec.analysis_conclusion_ov or "",
malware_family=maec.malware_family or "",
malware_category=maec.malware_category or "",
malware_category_ov=maec.malware_category_ov or "",
)
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
return capa_pb2.RuleMetadata(
name=rule_metadata.name,
namespace=rule_metadata.namespace or "",
authors=rule_metadata.authors,
attack=[attack_to_pb2(m) for m in rule_metadata.attack],
mbc=[mbc_to_pb2(m) for m in rule_metadata.mbc],
references=rule_metadata.references,
examples=rule_metadata.examples,
description=rule_metadata.description,
lib=rule_metadata.lib,
maec=maec_to_pb2(rule_metadata.maec),
is_subscope_rule=rule_metadata.is_subscope_rule,
scopes=scopes_to_pb2(rule_metadata.scopes),
)
def doc_to_pb2(doc: rd.ResultDocument) -> capa_pb2.ResultDocument:
@@ -470,6 +597,24 @@ def addr_from_pb2(addr: capa_pb2.Address) -> frz.Address:
offset = addr.token_offset.offset
return frz.Address(type=frz.AddressType.DN_TOKEN_OFFSET, value=(token, offset))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_PROCESS:
ppid = int_from_pb2(addr.ppid_pid.ppid)
pid = int_from_pb2(addr.ppid_pid.pid)
return frz.Address(type=frz.AddressType.PROCESS, value=(ppid, pid))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_THREAD:
ppid = int_from_pb2(addr.ppid_pid_tid.ppid)
pid = int_from_pb2(addr.ppid_pid_tid.pid)
tid = int_from_pb2(addr.ppid_pid_tid.tid)
return frz.Address(type=frz.AddressType.THREAD, value=(ppid, pid, tid))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_CALL:
ppid = int_from_pb2(addr.ppid_pid_tid_id.ppid)
pid = int_from_pb2(addr.ppid_pid_tid_id.pid)
tid = int_from_pb2(addr.ppid_pid_tid_id.tid)
id_ = int_from_pb2(addr.ppid_pid_tid_id.id)
return frz.Address(type=frz.AddressType.CALL, value=(ppid, pid, tid, id_))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS:
return frz.Address(type=frz.AddressType.NO_ADDRESS, value=None)
@@ -486,10 +631,23 @@ def scope_from_pb2(scope: capa_pb2.Scope.ValueType) -> capa.rules.Scope:
return capa.rules.Scope.BASIC_BLOCK
elif scope == capa_pb2.Scope.SCOPE_INSTRUCTION:
return capa.rules.Scope.INSTRUCTION
elif scope == capa_pb2.Scope.SCOPE_PROCESS:
return capa.rules.Scope.PROCESS
elif scope == capa_pb2.Scope.SCOPE_THREAD:
return capa.rules.Scope.THREAD
elif scope == capa_pb2.Scope.SCOPE_CALL:
return capa.rules.Scope.CALL
else:
assert_never(scope)
def scopes_from_pb2(scopes: capa_pb2.Scopes) -> capa.rules.Scopes:
return capa.rules.Scopes(
static=scope_from_pb2(scopes.static) if scopes.static else None,
dynamic=scope_from_pb2(scopes.dynamic) if scopes.dynamic else None,
)
def flavor_from_pb2(flavor: capa_pb2.Flavor.ValueType) -> rd.Flavor:
if flavor == capa_pb2.Flavor.FLAVOR_STATIC:
return rd.Flavor.STATIC
@@ -499,60 +657,108 @@ def flavor_from_pb2(flavor: capa_pb2.Flavor.ValueType) -> rd.Flavor:
assert_never(flavor)
def metadata_from_pb2(meta: capa_pb2.Metadata) -> rd.Metadata:
return rd.Metadata(
timestamp=datetime.datetime.fromisoformat(meta.timestamp),
version=meta.version,
argv=tuple(meta.argv) if meta.argv else None,
sample=rd.Sample(
md5=meta.sample.md5,
sha1=meta.sample.sha1,
sha256=meta.sample.sha256,
path=meta.sample.path,
),
flavor=flavor_from_pb2(meta.flavor),
analysis=rd.StaticAnalysis(
format=meta.analysis.format,
arch=meta.analysis.arch,
os=meta.analysis.os,
extractor=meta.analysis.extractor,
rules=tuple(meta.analysis.rules),
base_address=addr_from_pb2(meta.analysis.base_address),
layout=rd.StaticLayout(
functions=tuple(
[
rd.FunctionLayout(
address=addr_from_pb2(f.address),
matched_basic_blocks=tuple(
[
rd.BasicBlockLayout(address=addr_from_pb2(bb.address))
for bb in f.matched_basic_blocks
]
),
)
for f in meta.analysis.layout.functions
]
)
),
feature_counts=rd.StaticFeatureCounts(
file=meta.analysis.feature_counts.file,
functions=tuple(
[
rd.FunctionFeatureCount(address=addr_from_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
]
),
),
library_functions=tuple(
def static_analysis_from_pb2(analysis: capa_pb2.StaticAnalysis) -> rd.StaticAnalysis:
return rd.StaticAnalysis(
format=analysis.format,
arch=analysis.arch,
os=analysis.os,
extractor=analysis.extractor,
rules=tuple(analysis.rules),
base_address=addr_from_pb2(analysis.base_address),
layout=rd.StaticLayout(
functions=tuple(
[
rd.LibraryFunction(address=addr_from_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
rd.FunctionLayout(
address=addr_from_pb2(f.address),
matched_basic_blocks=tuple(
[rd.BasicBlockLayout(address=addr_from_pb2(bb.address)) for bb in f.matched_basic_blocks]
),
)
for f in analysis.layout.functions
]
)
),
feature_counts=rd.StaticFeatureCounts(
file=analysis.feature_counts.file,
functions=tuple(
[
rd.FunctionFeatureCount(address=addr_from_pb2(f.address), count=f.count)
for f in analysis.feature_counts.functions
]
),
),
library_functions=tuple(
[rd.LibraryFunction(address=addr_from_pb2(lf.address), name=lf.name) for lf in analysis.library_functions]
),
)
def dynamic_analysis_from_pb2(analysis: capa_pb2.DynamicAnalysis) -> rd.DynamicAnalysis:
return rd.DynamicAnalysis(
format=analysis.format,
arch=analysis.arch,
os=analysis.os,
extractor=analysis.extractor,
rules=tuple(analysis.rules),
layout=rd.DynamicLayout(
processes=tuple(
[
rd.ProcessLayout(
address=addr_from_pb2(p.address),
matched_threads=tuple(
[rd.ThreadLayout(address=addr_from_pb2(t.address)) for t in p.matched_threads]
),
)
for p in analysis.layout.processes
]
)
),
feature_counts=rd.DynamicFeatureCounts(
file=analysis.feature_counts.file,
processes=tuple(
[
rd.ProcessFeatureCount(address=addr_from_pb2(p.address), count=p.count)
for p in analysis.feature_counts.processes
]
),
),
)
def metadata_from_pb2(meta: capa_pb2.Metadata) -> rd.Metadata:
analysis_type = meta.WhichOneof("analysis2")
if analysis_type == "static_analysis":
return rd.Metadata(
timestamp=datetime.datetime.fromisoformat(meta.timestamp),
version=meta.version,
argv=tuple(meta.argv) if meta.argv else None,
sample=rd.Sample(
md5=meta.sample.md5,
sha1=meta.sample.sha1,
sha256=meta.sample.sha256,
path=meta.sample.path,
),
flavor=flavor_from_pb2(meta.flavor),
analysis=static_analysis_from_pb2(meta.static_analysis),
)
elif analysis_type == "dynamic_analysis":
return rd.Metadata(
timestamp=datetime.datetime.fromisoformat(meta.timestamp),
version=meta.version,
argv=tuple(meta.argv) if meta.argv else None,
sample=rd.Sample(
md5=meta.sample.md5,
sha1=meta.sample.sha1,
sha256=meta.sample.sha256,
path=meta.sample.path,
),
flavor=flavor_from_pb2(meta.flavor),
analysis=dynamic_analysis_from_pb2(meta.dynamic_analysis),
)
else:
assert_never(analysis_type)
def statement_from_pb2(statement: capa_pb2.StatementNode) -> rd.Statement:
type_ = statement.WhichOneof("statement")
@@ -732,7 +938,7 @@ def rule_metadata_from_pb2(pb: capa_pb2.RuleMetadata) -> rd.RuleMetadata:
name=pb.name,
namespace=pb.namespace or None,
authors=tuple(pb.authors),
scope=scope_from_pb2(pb.scope),
scopes=scopes_from_pb2(pb.scopes),
attack=tuple([attack_from_pb2(attack) for attack in pb.attack]),
mbc=tuple([mbc_from_pb2(mbc) for mbc in pb.mbc]),
references=tuple(pb.references),

View File

@@ -11,6 +11,9 @@ message Address {
oneof value {
Integer v = 2;
Token_Offset token_offset = 3;
Ppid_Pid ppid_pid = 4;
Ppid_Pid_Tid ppid_pid_tid = 5;
Ppid_Pid_Tid_Id ppid_pid_tid_id = 6;
};
}
@@ -22,6 +25,9 @@ enum AddressType {
ADDRESSTYPE_DN_TOKEN = 4;
ADDRESSTYPE_DN_TOKEN_OFFSET = 5;
ADDRESSTYPE_NO_ADDRESS = 6;
ADDRESSTYPE_PROCESS = 7;
ADDRESSTYPE_THREAD = 8;
ADDRESSTYPE_CALL = 9;
}
message Analysis {
@@ -82,6 +88,25 @@ message CompoundStatement {
optional string description = 2;
}
message DynamicAnalysis {
string format = 1;
string arch = 2;
string os = 3;
string extractor = 4;
repeated string rules = 5;
DynamicLayout layout = 6;
DynamicFeatureCounts feature_counts = 7;
}
message DynamicFeatureCounts {
uint64 file = 1;
repeated ProcessFeatureCount processes = 2;
}
message DynamicLayout {
repeated ProcessLayout processes = 1;
}
message ExportFeature {
string type = 1;
string export = 2;
@@ -203,8 +228,15 @@ message Metadata {
string version = 2;
repeated string argv = 3;
Sample sample = 4;
Analysis analysis = 5;
// deprecated in v7.0.
// use analysis2 instead.
Analysis analysis = 5 [deprecated = true];
Flavor flavor = 6;
oneof analysis2 {
// use analysis2 instead of analysis (deprecated in v7.0).
StaticAnalysis static_analysis = 7;
DynamicAnalysis dynamic_analysis = 8;
};
}
message MnemonicFeature {
@@ -251,6 +283,16 @@ message OperandOffsetFeature {
optional string description = 4;
}
message ProcessFeatureCount {
Address address = 1;
uint64 count = 2;
}
message ProcessLayout {
Address address = 1;
repeated ThreadLayout matched_threads = 2;
}
message PropertyFeature {
string type = 1;
string property_ = 2; // property is a Python top-level decorator name
@@ -288,7 +330,9 @@ message RuleMetadata {
string name = 1;
string namespace = 2;
repeated string authors = 3;
Scope scope = 4;
// deprecated in v7.0.
// use scopes instead.
Scope scope = 4 [deprecated = true];
repeated AttackSpec attack = 5;
repeated MBCSpec mbc = 6;
repeated string references = 7;
@@ -297,6 +341,8 @@ message RuleMetadata {
bool lib = 10;
MaecMetadata maec = 11;
bool is_subscope_rule = 12;
// use scopes over scope (deprecated in v7.0).
Scopes scopes = 13;
}
message Sample {
@@ -312,6 +358,14 @@ enum Scope {
SCOPE_FUNCTION = 2;
SCOPE_BASIC_BLOCK = 3;
SCOPE_INSTRUCTION = 4;
SCOPE_PROCESS = 5;
SCOPE_THREAD = 6;
SCOPE_CALL = 7;
}
message Scopes {
optional Scope static = 1;
optional Scope dynamic = 2;
}
message SectionFeature {
@@ -336,6 +390,27 @@ message StatementNode {
};
}
message StaticAnalysis {
string format = 1;
string arch = 2;
string os = 3;
string extractor = 4;
repeated string rules = 5;
Address base_address = 6;
StaticLayout layout = 7;
StaticFeatureCounts feature_counts = 8;
repeated LibraryFunction library_functions = 9;
}
message StaticFeatureCounts {
uint64 file = 1;
repeated FunctionFeatureCount functions = 2;
}
message StaticLayout {
repeated FunctionLayout functions = 1;
}
message StringFeature {
string type = 1;
string string = 2;
@@ -354,6 +429,10 @@ message SubstringFeature {
optional string description = 3;
}
message ThreadLayout {
Address address = 1;
}
message Addresses { repeated Address address = 1; }
message Pair_Address_Match {
@@ -366,6 +445,24 @@ message Token_Offset {
uint64 offset = 2; // offset is always >= 0
}
message Ppid_Pid {
Integer ppid = 1;
Integer pid = 2;
}
message Ppid_Pid_Tid {
Integer ppid = 1;
Integer pid = 2;
Integer tid = 3;
}
message Ppid_Pid_Tid_Id {
Integer ppid = 1;
Integer pid = 2;
Integer tid = 3;
Integer id = 4;
}
message Integer { oneof value { uint64 u = 1; sint64 i = 2; } } // unsigned or signed int
message Number { oneof value { uint64 u = 1; sint64 i = 2; double f = 3; } }

File diff suppressed because one or more lines are too long

View File

@@ -31,6 +31,9 @@ class _AddressTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._En
ADDRESSTYPE_DN_TOKEN: _AddressType.ValueType # 4
ADDRESSTYPE_DN_TOKEN_OFFSET: _AddressType.ValueType # 5
ADDRESSTYPE_NO_ADDRESS: _AddressType.ValueType # 6
ADDRESSTYPE_PROCESS: _AddressType.ValueType # 7
ADDRESSTYPE_THREAD: _AddressType.ValueType # 8
ADDRESSTYPE_CALL: _AddressType.ValueType # 9
class AddressType(_AddressType, metaclass=_AddressTypeEnumTypeWrapper): ...
@@ -41,6 +44,9 @@ ADDRESSTYPE_FILE: AddressType.ValueType # 3
ADDRESSTYPE_DN_TOKEN: AddressType.ValueType # 4
ADDRESSTYPE_DN_TOKEN_OFFSET: AddressType.ValueType # 5
ADDRESSTYPE_NO_ADDRESS: AddressType.ValueType # 6
ADDRESSTYPE_PROCESS: AddressType.ValueType # 7
ADDRESSTYPE_THREAD: AddressType.ValueType # 8
ADDRESSTYPE_CALL: AddressType.ValueType # 9
global___AddressType = AddressType
class _Flavor:
@@ -71,6 +77,9 @@ class _ScopeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumType
SCOPE_FUNCTION: _Scope.ValueType # 2
SCOPE_BASIC_BLOCK: _Scope.ValueType # 3
SCOPE_INSTRUCTION: _Scope.ValueType # 4
SCOPE_PROCESS: _Scope.ValueType # 5
SCOPE_THREAD: _Scope.ValueType # 6
SCOPE_CALL: _Scope.ValueType # 7
class Scope(_Scope, metaclass=_ScopeEnumTypeWrapper): ...
@@ -79,6 +88,9 @@ SCOPE_FILE: Scope.ValueType # 1
SCOPE_FUNCTION: Scope.ValueType # 2
SCOPE_BASIC_BLOCK: Scope.ValueType # 3
SCOPE_INSTRUCTION: Scope.ValueType # 4
SCOPE_PROCESS: Scope.ValueType # 5
SCOPE_THREAD: Scope.ValueType # 6
SCOPE_CALL: Scope.ValueType # 7
global___Scope = Scope
@typing_extensions.final
@@ -111,21 +123,33 @@ class Address(google.protobuf.message.Message):
TYPE_FIELD_NUMBER: builtins.int
V_FIELD_NUMBER: builtins.int
TOKEN_OFFSET_FIELD_NUMBER: builtins.int
PPID_PID_FIELD_NUMBER: builtins.int
PPID_PID_TID_FIELD_NUMBER: builtins.int
PPID_PID_TID_ID_FIELD_NUMBER: builtins.int
type: global___AddressType.ValueType
@property
def v(self) -> global___Integer: ...
@property
def token_offset(self) -> global___Token_Offset: ...
@property
def ppid_pid(self) -> global___Ppid_Pid: ...
@property
def ppid_pid_tid(self) -> global___Ppid_Pid_Tid: ...
@property
def ppid_pid_tid_id(self) -> global___Ppid_Pid_Tid_Id: ...
def __init__(
self,
*,
type: global___AddressType.ValueType = ...,
v: global___Integer | None = ...,
token_offset: global___Token_Offset | None = ...,
ppid_pid: global___Ppid_Pid | None = ...,
ppid_pid_tid: global___Ppid_Pid_Tid | None = ...,
ppid_pid_tid_id: global___Ppid_Pid_Tid_Id | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["token_offset", b"token_offset", "v", b"v", "value", b"value"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["token_offset", b"token_offset", "type", b"type", "v", b"v", "value", b"value"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["value", b"value"]) -> typing_extensions.Literal["v", "token_offset"] | None: ...
def HasField(self, field_name: typing_extensions.Literal["ppid_pid", b"ppid_pid", "ppid_pid_tid", b"ppid_pid_tid", "ppid_pid_tid_id", b"ppid_pid_tid_id", "token_offset", b"token_offset", "v", b"v", "value", b"value"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["ppid_pid", b"ppid_pid", "ppid_pid_tid", b"ppid_pid_tid", "ppid_pid_tid_id", b"ppid_pid_tid_id", "token_offset", b"token_offset", "type", b"type", "v", b"v", "value", b"value"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["value", b"value"]) -> typing_extensions.Literal["v", "token_offset", "ppid_pid", "ppid_pid_tid", "ppid_pid_tid_id"] | None: ...
global___Address = Address
@@ -352,6 +376,78 @@ class CompoundStatement(google.protobuf.message.Message):
global___CompoundStatement = CompoundStatement
@typing_extensions.final
class DynamicAnalysis(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FORMAT_FIELD_NUMBER: builtins.int
ARCH_FIELD_NUMBER: builtins.int
OS_FIELD_NUMBER: builtins.int
EXTRACTOR_FIELD_NUMBER: builtins.int
RULES_FIELD_NUMBER: builtins.int
LAYOUT_FIELD_NUMBER: builtins.int
FEATURE_COUNTS_FIELD_NUMBER: builtins.int
format: builtins.str
arch: builtins.str
os: builtins.str
extractor: builtins.str
@property
def rules(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
@property
def layout(self) -> global___DynamicLayout: ...
@property
def feature_counts(self) -> global___DynamicFeatureCounts: ...
def __init__(
self,
*,
format: builtins.str = ...,
arch: builtins.str = ...,
os: builtins.str = ...,
extractor: builtins.str = ...,
rules: collections.abc.Iterable[builtins.str] | None = ...,
layout: global___DynamicLayout | None = ...,
feature_counts: global___DynamicFeatureCounts | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["feature_counts", b"feature_counts", "layout", b"layout"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["arch", b"arch", "extractor", b"extractor", "feature_counts", b"feature_counts", "format", b"format", "layout", b"layout", "os", b"os", "rules", b"rules"]) -> None: ...
global___DynamicAnalysis = DynamicAnalysis
@typing_extensions.final
class DynamicFeatureCounts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FILE_FIELD_NUMBER: builtins.int
PROCESSES_FIELD_NUMBER: builtins.int
file: builtins.int
@property
def processes(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ProcessFeatureCount]: ...
def __init__(
self,
*,
file: builtins.int = ...,
processes: collections.abc.Iterable[global___ProcessFeatureCount] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["file", b"file", "processes", b"processes"]) -> None: ...
global___DynamicFeatureCounts = DynamicFeatureCounts
@typing_extensions.final
class DynamicLayout(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PROCESSES_FIELD_NUMBER: builtins.int
@property
def processes(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ProcessLayout]: ...
def __init__(
self,
*,
processes: collections.abc.Iterable[global___ProcessLayout] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["processes", b"processes"]) -> None: ...
global___DynamicLayout = DynamicLayout
@typing_extensions.final
class ExportFeature(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -794,6 +890,8 @@ class Metadata(google.protobuf.message.Message):
SAMPLE_FIELD_NUMBER: builtins.int
ANALYSIS_FIELD_NUMBER: builtins.int
FLAVOR_FIELD_NUMBER: builtins.int
STATIC_ANALYSIS_FIELD_NUMBER: builtins.int
DYNAMIC_ANALYSIS_FIELD_NUMBER: builtins.int
timestamp: builtins.str
"""iso8601 format, like: 2019-01-01T00:00:00Z"""
version: builtins.str
@@ -802,8 +900,16 @@ class Metadata(google.protobuf.message.Message):
@property
def sample(self) -> global___Sample: ...
@property
def analysis(self) -> global___Analysis: ...
def analysis(self) -> global___Analysis:
"""deprecated in v7.0.
use analysis2 instead.
"""
flavor: global___Flavor.ValueType
@property
def static_analysis(self) -> global___StaticAnalysis:
"""use analysis2 instead of analysis (deprecated in v7.0)."""
@property
def dynamic_analysis(self) -> global___DynamicAnalysis: ...
def __init__(
self,
*,
@@ -813,9 +919,12 @@ class Metadata(google.protobuf.message.Message):
sample: global___Sample | None = ...,
analysis: global___Analysis | None = ...,
flavor: global___Flavor.ValueType = ...,
static_analysis: global___StaticAnalysis | None = ...,
dynamic_analysis: global___DynamicAnalysis | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["analysis", b"analysis", "sample", b"sample"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["analysis", b"analysis", "argv", b"argv", "flavor", b"flavor", "sample", b"sample", "timestamp", b"timestamp", "version", b"version"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["analysis", b"analysis", "analysis2", b"analysis2", "dynamic_analysis", b"dynamic_analysis", "sample", b"sample", "static_analysis", b"static_analysis"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["analysis", b"analysis", "analysis2", b"analysis2", "argv", b"argv", "dynamic_analysis", b"dynamic_analysis", "flavor", b"flavor", "sample", b"sample", "static_analysis", b"static_analysis", "timestamp", b"timestamp", "version", b"version"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["analysis2", b"analysis2"]) -> typing_extensions.Literal["static_analysis", "dynamic_analysis"] | None: ...
global___Metadata = Metadata
@@ -993,6 +1102,47 @@ class OperandOffsetFeature(google.protobuf.message.Message):
global___OperandOffsetFeature = OperandOffsetFeature
@typing_extensions.final
class ProcessFeatureCount(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
ADDRESS_FIELD_NUMBER: builtins.int
COUNT_FIELD_NUMBER: builtins.int
@property
def address(self) -> global___Address: ...
count: builtins.int
def __init__(
self,
*,
address: global___Address | None = ...,
count: builtins.int = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["address", b"address"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["address", b"address", "count", b"count"]) -> None: ...
global___ProcessFeatureCount = ProcessFeatureCount
@typing_extensions.final
class ProcessLayout(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
ADDRESS_FIELD_NUMBER: builtins.int
MATCHED_THREADS_FIELD_NUMBER: builtins.int
@property
def address(self) -> global___Address: ...
@property
def matched_threads(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ThreadLayout]: ...
def __init__(
self,
*,
address: global___Address | None = ...,
matched_threads: collections.abc.Iterable[global___ThreadLayout] | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["address", b"address"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["address", b"address", "matched_threads", b"matched_threads"]) -> None: ...
global___ProcessLayout = ProcessLayout
@typing_extensions.final
class PropertyFeature(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -1156,11 +1306,15 @@ class RuleMetadata(google.protobuf.message.Message):
LIB_FIELD_NUMBER: builtins.int
MAEC_FIELD_NUMBER: builtins.int
IS_SUBSCOPE_RULE_FIELD_NUMBER: builtins.int
SCOPES_FIELD_NUMBER: builtins.int
name: builtins.str
namespace: builtins.str
@property
def authors(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
scope: global___Scope.ValueType
"""deprecated in v7.0.
use scopes instead.
"""
@property
def attack(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___AttackSpec]: ...
@property
@@ -1174,6 +1328,9 @@ class RuleMetadata(google.protobuf.message.Message):
@property
def maec(self) -> global___MaecMetadata: ...
is_subscope_rule: builtins.bool
@property
def scopes(self) -> global___Scopes:
"""use scopes over scope (deprecated in v7.0)."""
def __init__(
self,
*,
@@ -1189,9 +1346,10 @@ class RuleMetadata(google.protobuf.message.Message):
lib: builtins.bool = ...,
maec: global___MaecMetadata | None = ...,
is_subscope_rule: builtins.bool = ...,
scopes: global___Scopes | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["maec", b"maec"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["attack", b"attack", "authors", b"authors", "description", b"description", "examples", b"examples", "is_subscope_rule", b"is_subscope_rule", "lib", b"lib", "maec", b"maec", "mbc", b"mbc", "name", b"name", "namespace", b"namespace", "references", b"references", "scope", b"scope"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["maec", b"maec", "scopes", b"scopes"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["attack", b"attack", "authors", b"authors", "description", b"description", "examples", b"examples", "is_subscope_rule", b"is_subscope_rule", "lib", b"lib", "maec", b"maec", "mbc", b"mbc", "name", b"name", "namespace", b"namespace", "references", b"references", "scope", b"scope", "scopes", b"scopes"]) -> None: ...
global___RuleMetadata = RuleMetadata
@@ -1219,6 +1377,29 @@ class Sample(google.protobuf.message.Message):
global___Sample = Sample
@typing_extensions.final
class Scopes(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
STATIC_FIELD_NUMBER: builtins.int
DYNAMIC_FIELD_NUMBER: builtins.int
static: global___Scope.ValueType
dynamic: global___Scope.ValueType
def __init__(
self,
*,
static: global___Scope.ValueType | None = ...,
dynamic: global___Scope.ValueType | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_dynamic", b"_dynamic", "_static", b"_static", "dynamic", b"dynamic", "static", b"static"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_dynamic", b"_dynamic", "_static", b"_static", "dynamic", b"dynamic", "static", b"static"]) -> None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_dynamic", b"_dynamic"]) -> typing_extensions.Literal["dynamic"] | None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_static", b"_static"]) -> typing_extensions.Literal["static"] | None: ...
global___Scopes = Scopes
@typing_extensions.final
class SectionFeature(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -1298,6 +1479,86 @@ class StatementNode(google.protobuf.message.Message):
global___StatementNode = StatementNode
@typing_extensions.final
class StaticAnalysis(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FORMAT_FIELD_NUMBER: builtins.int
ARCH_FIELD_NUMBER: builtins.int
OS_FIELD_NUMBER: builtins.int
EXTRACTOR_FIELD_NUMBER: builtins.int
RULES_FIELD_NUMBER: builtins.int
BASE_ADDRESS_FIELD_NUMBER: builtins.int
LAYOUT_FIELD_NUMBER: builtins.int
FEATURE_COUNTS_FIELD_NUMBER: builtins.int
LIBRARY_FUNCTIONS_FIELD_NUMBER: builtins.int
format: builtins.str
arch: builtins.str
os: builtins.str
extractor: builtins.str
@property
def rules(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
@property
def base_address(self) -> global___Address: ...
@property
def layout(self) -> global___StaticLayout: ...
@property
def feature_counts(self) -> global___StaticFeatureCounts: ...
@property
def library_functions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___LibraryFunction]: ...
def __init__(
self,
*,
format: builtins.str = ...,
arch: builtins.str = ...,
os: builtins.str = ...,
extractor: builtins.str = ...,
rules: collections.abc.Iterable[builtins.str] | None = ...,
base_address: global___Address | None = ...,
layout: global___StaticLayout | None = ...,
feature_counts: global___StaticFeatureCounts | None = ...,
library_functions: collections.abc.Iterable[global___LibraryFunction] | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["base_address", b"base_address", "feature_counts", b"feature_counts", "layout", b"layout"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["arch", b"arch", "base_address", b"base_address", "extractor", b"extractor", "feature_counts", b"feature_counts", "format", b"format", "layout", b"layout", "library_functions", b"library_functions", "os", b"os", "rules", b"rules"]) -> None: ...
global___StaticAnalysis = StaticAnalysis
@typing_extensions.final
class StaticFeatureCounts(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FILE_FIELD_NUMBER: builtins.int
FUNCTIONS_FIELD_NUMBER: builtins.int
file: builtins.int
@property
def functions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___FunctionFeatureCount]: ...
def __init__(
self,
*,
file: builtins.int = ...,
functions: collections.abc.Iterable[global___FunctionFeatureCount] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["file", b"file", "functions", b"functions"]) -> None: ...
global___StaticFeatureCounts = StaticFeatureCounts
@typing_extensions.final
class StaticLayout(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FUNCTIONS_FIELD_NUMBER: builtins.int
@property
def functions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___FunctionLayout]: ...
def __init__(
self,
*,
functions: collections.abc.Iterable[global___FunctionLayout] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["functions", b"functions"]) -> None: ...
global___StaticLayout = StaticLayout
@typing_extensions.final
class StringFeature(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -1367,6 +1628,23 @@ class SubstringFeature(google.protobuf.message.Message):
global___SubstringFeature = SubstringFeature
@typing_extensions.final
class ThreadLayout(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
ADDRESS_FIELD_NUMBER: builtins.int
@property
def address(self) -> global___Address: ...
def __init__(
self,
*,
address: global___Address | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["address", b"address"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["address", b"address"]) -> None: ...
global___ThreadLayout = ThreadLayout
@typing_extensions.final
class Addresses(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -1425,6 +1703,81 @@ class Token_Offset(google.protobuf.message.Message):
global___Token_Offset = Token_Offset
@typing_extensions.final
class Ppid_Pid(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PPID_FIELD_NUMBER: builtins.int
PID_FIELD_NUMBER: builtins.int
@property
def ppid(self) -> global___Integer: ...
@property
def pid(self) -> global___Integer: ...
def __init__(
self,
*,
ppid: global___Integer | None = ...,
pid: global___Integer | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["pid", b"pid", "ppid", b"ppid"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["pid", b"pid", "ppid", b"ppid"]) -> None: ...
global___Ppid_Pid = Ppid_Pid
@typing_extensions.final
class Ppid_Pid_Tid(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PPID_FIELD_NUMBER: builtins.int
PID_FIELD_NUMBER: builtins.int
TID_FIELD_NUMBER: builtins.int
@property
def ppid(self) -> global___Integer: ...
@property
def pid(self) -> global___Integer: ...
@property
def tid(self) -> global___Integer: ...
def __init__(
self,
*,
ppid: global___Integer | None = ...,
pid: global___Integer | None = ...,
tid: global___Integer | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["pid", b"pid", "ppid", b"ppid", "tid", b"tid"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["pid", b"pid", "ppid", b"ppid", "tid", b"tid"]) -> None: ...
global___Ppid_Pid_Tid = Ppid_Pid_Tid
@typing_extensions.final
class Ppid_Pid_Tid_Id(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
PPID_FIELD_NUMBER: builtins.int
PID_FIELD_NUMBER: builtins.int
TID_FIELD_NUMBER: builtins.int
ID_FIELD_NUMBER: builtins.int
@property
def ppid(self) -> global___Integer: ...
@property
def pid(self) -> global___Integer: ...
@property
def tid(self) -> global___Integer: ...
@property
def id(self) -> global___Integer: ...
def __init__(
self,
*,
ppid: global___Integer | None = ...,
pid: global___Integer | None = ...,
tid: global___Integer | None = ...,
id: global___Integer | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["id", b"id", "pid", b"pid", "ppid", b"ppid", "tid", b"tid"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["id", b"id", "pid", b"pid", "ppid", b"ppid", "tid", b"tid"]) -> None: ...
global___Ppid_Pid_Tid_Id = Ppid_Pid_Tid_Id
@typing_extensions.final
class Integer(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

View File

@@ -107,25 +107,26 @@ We use [pre-commit](https://pre-commit.com/) so that its trivial to run the same
Run all linters liks:
pre-commit run --all-files
pre-commit run --hook-stage=manual --all-files
isort....................................................................Passed
black....................................................................Passed
ruff.....................................................................Passed
flake8...................................................................Passed
mypy.....................................................................Passed
pytest (fast)............................................................Passed
Or run a single linter like:
pre-commit run --all-files isort
pre-commit run --all-files --hook-stage=manual isort
isort....................................................................Passed
Importantly, you can configure pre-commit to run automatically before every commit by running:
pre-commit install --hook-type pre-commit
pre-commit install --hook-type=pre-commit
pre-commit installed at .git/hooks/pre-commit
pre-commit install --hook-type pre-push
pre-commit install --hook-type=pre-push
pre-commit installed at .git/hooks/pre-push
This way you can ensure that you don't commit code style or formatting offenses.

View File

@@ -1439,29 +1439,42 @@ def get_result_doc(path: Path):
@pytest.fixture
def pma0101_rd():
# python -m capa.main tests/data/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_ --json > tests/data/rd/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_.json
return get_result_doc(CD / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json")
@pytest.fixture
def dotnet_1c444e_rd():
# .NET sample
# python -m capa.main tests/data/dotnet/1c444ebeba24dcba8628b7dfe5fec7c6.exe_ --json > tests/data/rd/1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json
return get_result_doc(CD / "data" / "rd" / "1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json")
@pytest.fixture
def a3f3bbc_rd():
# python -m capa.main tests/data/3f3bbcf8fd90bdcdcdc5494314ed4225.exe_ --json > tests/data/rd/3f3bbcf8fd90bdcdcdc5494314ed4225.exe_.json
return get_result_doc(CD / "data" / "rd" / "3f3bbcf8fd90bdcdcdc5494314ed4225.exe_.json")
@pytest.fixture
def al_khaserx86_rd():
# python -m capa.main tests/data/al-khaser_x86.exe_ --json > tests/data/rd/al-khaser_x86.exe_.json
return get_result_doc(CD / "data" / "rd" / "al-khaser_x86.exe_.json")
@pytest.fixture
def al_khaserx64_rd():
# python -m capa.main tests/data/al-khaser_x64.exe_ --json > tests/data/rd/al-khaser_x64.exe_.json
return get_result_doc(CD / "data" / "rd" / "al-khaser_x64.exe_.json")
@pytest.fixture
def a076114_rd():
# python -m capa.main tests/data/0761142efbda6c4b1e801223de723578.dll_ --json > tests/data/rd/0761142efbda6c4b1e801223de723578.dll_.json
return get_result_doc(CD / "data" / "rd" / "0761142efbda6c4b1e801223de723578.dll_.json")
@pytest.fixture
def dynamic_a0000a6_rd():
# python -m capa.main tests/data/dynamic/cape/v2.2/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json --json > tests/data/rd/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json
return get_result_doc(CD / "data" / "rd" / "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json")

View File

@@ -46,7 +46,7 @@ def test_doc_to_pb2(request, rd_file):
assert matches.meta.name == m.name
assert cmp_optional(matches.meta.namespace, m.namespace)
assert list(matches.meta.authors) == m.authors
assert capa.render.proto.scope_to_pb2(matches.meta.scope) == m.scope
assert capa.render.proto.scopes_to_pb2(matches.meta.scopes) == m.scopes
assert len(matches.meta.attack) == len(m.attack)
for rd_attack, proto_attack in zip(matches.meta.attack, m.attack):
@@ -116,10 +116,27 @@ def test_addr_to_pb2():
def test_scope_to_pb2():
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.FILE)) == capa_pb2.SCOPE_FILE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.FUNCTION)) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.BASIC_BLOCK)) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.INSTRUCTION)) == capa_pb2.SCOPE_INSTRUCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.FILE) == capa_pb2.SCOPE_FILE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.FUNCTION) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.BASIC_BLOCK) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.INSTRUCTION) == capa_pb2.SCOPE_INSTRUCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.PROCESS) == capa_pb2.SCOPE_PROCESS
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.THREAD) == capa_pb2.SCOPE_THREAD
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.CALL) == capa_pb2.SCOPE_CALL
def test_scopes_to_pb2():
assert capa.render.proto.scopes_to_pb2(
capa.rules.Scopes.from_dict({"static": "file", "dynamic": "file"})
) == capa_pb2.Scopes(
static=capa_pb2.SCOPE_FILE,
dynamic=capa_pb2.SCOPE_FILE,
)
assert capa.render.proto.scopes_to_pb2(
capa.rules.Scopes.from_dict({"static": "file", "dynamic": "unsupported"})
) == capa_pb2.Scopes(
static=capa_pb2.SCOPE_FILE,
)
def cmp_optional(a: Any, b: Any) -> bool:
@@ -128,8 +145,59 @@ def cmp_optional(a: Any, b: Any) -> bool:
return a == b
def assert_static_analyis(analysis: rd.StaticAnalysis, dst: capa_pb2.StaticAnalysis):
assert analysis.format == dst.format
assert analysis.arch == dst.arch
assert analysis.os == dst.os
assert analysis.extractor == dst.extractor
assert list(analysis.rules) == dst.rules
assert capa.render.proto.addr_to_pb2(analysis.base_address) == dst.base_address
assert len(analysis.layout.functions) == len(dst.layout.functions)
for rd_f, proto_f in zip(analysis.layout.functions, dst.layout.functions):
assert capa.render.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert len(rd_f.matched_basic_blocks) == len(proto_f.matched_basic_blocks)
for rd_bb, proto_bb in zip(rd_f.matched_basic_blocks, proto_f.matched_basic_blocks):
assert capa.render.proto.addr_to_pb2(rd_bb.address) == proto_bb.address
assert analysis.feature_counts.file == dst.feature_counts.file
assert len(analysis.feature_counts.functions) == len(dst.feature_counts.functions)
for rd_cf, proto_cf in zip(analysis.feature_counts.functions, dst.feature_counts.functions):
assert capa.render.proto.addr_to_pb2(rd_cf.address) == proto_cf.address
assert rd_cf.count == proto_cf.count
assert len(analysis.library_functions) == len(dst.library_functions)
for rd_lf, proto_lf in zip(analysis.library_functions, dst.library_functions):
assert capa.render.proto.addr_to_pb2(rd_lf.address) == proto_lf.address
assert rd_lf.name == proto_lf.name
def assert_dynamic_analyis(analysis: rd.DynamicAnalysis, dst: capa_pb2.DynamicAnalysis):
assert analysis.format == dst.format
assert analysis.arch == dst.arch
assert analysis.os == dst.os
assert analysis.extractor == dst.extractor
assert list(analysis.rules) == dst.rules
assert len(analysis.layout.processes) == len(dst.layout.processes)
for rd_p, proto_p in zip(analysis.layout.processes, dst.layout.processes):
assert capa.render.proto.addr_to_pb2(rd_p.address) == proto_p.address
assert len(rd_p.matched_threads) == len(proto_p.matched_threads)
for rd_t, proto_t in zip(rd_p.matched_threads, proto_p.matched_threads):
assert capa.render.proto.addr_to_pb2(rd_t.address) == proto_t.address
assert analysis.feature_counts.processes == dst.feature_counts.processes
assert len(analysis.feature_counts.processes) == len(dst.feature_counts.processes)
for rd_cp, proto_cp in zip(analysis.feature_counts.processes, dst.feature_counts.processes):
assert capa.render.proto.addr_to_pb2(rd_cp.address) == proto_cp.address
assert rd_cp.count == proto_cp.count
def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert isinstance(rd.Metadata.analysis, rd.StaticAnalysis)
assert isinstance(meta.analysis, rd.StaticAnalysis)
assert str(meta.timestamp) == dst.timestamp
assert meta.version == dst.version
if meta.argv is None:
@@ -142,32 +210,18 @@ def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert meta.sample.sha256 == dst.sample.sha256
assert meta.sample.path == dst.sample.path
assert meta.analysis.format == dst.analysis.format
assert meta.analysis.arch == dst.analysis.arch
assert meta.analysis.os == dst.analysis.os
assert meta.analysis.extractor == dst.analysis.extractor
assert list(meta.analysis.rules) == dst.analysis.rules
assert capa.render.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address
assert isinstance(rd.Metadata.analysis.layout, rd.StaticLayout)
assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions)
for rd_f, proto_f in zip(meta.analysis.layout.functions, dst.analysis.layout.functions):
assert capa.render.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert len(rd_f.matched_basic_blocks) == len(proto_f.matched_basic_blocks)
for rd_bb, proto_bb in zip(rd_f.matched_basic_blocks, proto_f.matched_basic_blocks):
assert capa.render.proto.addr_to_pb2(rd_bb.address) == proto_bb.address
assert meta.analysis.feature_counts.file == dst.analysis.feature_counts.file
assert len(meta.analysis.feature_counts.functions) == len(dst.analysis.feature_counts.functions)
for rd_cf, proto_cf in zip(meta.analysis.feature_counts.functions, dst.analysis.feature_counts.functions):
assert capa.render.proto.addr_to_pb2(rd_cf.address) == proto_cf.address
assert rd_cf.count == proto_cf.count
assert len(meta.analysis.library_functions) == len(dst.analysis.library_functions)
for rd_lf, proto_lf in zip(meta.analysis.library_functions, dst.analysis.library_functions):
assert capa.render.proto.addr_to_pb2(rd_lf.address) == proto_lf.address
assert rd_lf.name == proto_lf.name
if meta.flavor == rd.Flavor.STATIC:
assert dst.flavor == capa_pb2.FLAVOR_STATIC
assert dst.WhichOneof("analysis2") == "static_analysis"
assert isinstance(meta.analysis, rd.StaticAnalysis)
assert_static_analyis(meta.analysis, dst.static_analysis)
elif meta.flavor == rd.Flavor.DYNAMIC:
assert dst.flavor == capa_pb2.FLAVOR_DYNAMIC
assert dst.WhichOneof("analysis2") == "dynamic_analysis"
assert isinstance(meta.analysis, rd.DynamicAnalysis)
assert_dynamic_analyis(meta.analysis, dst.dynamic_analysis)
else:
assert_never(dst.flavor)
def assert_match(ma: rd.Match, mb: capa_pb2.Match):
@@ -320,20 +374,22 @@ def assert_round_trip(doc: rd.ResultDocument):
# show the round trip works
# first by comparing the objects directly,
# which works thanks to pydantic model equality.
assert one.meta == two.meta
assert one.rules == two.rules
assert one == two
# second by showing their protobuf representations are the same.
assert capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True) == capa.render.proto.doc_to_pb2(
two
).SerializeToString(deterministic=True)
one_bytes = capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True)
two_bytes = capa.render.proto.doc_to_pb2(two).SerializeToString(deterministic=True)
assert one_bytes == two_bytes
# now show that two different versions are not equal.
three = copy.deepcopy(two)
three.meta.__dict__.update({"version": "0.0.0"})
assert one.meta.version != three.meta.version
assert one != three
assert capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True) != capa.render.proto.doc_to_pb2(
three
).SerializeToString(deterministic=True)
three_bytes = capa.render.proto.doc_to_pb2(three).SerializeToString(deterministic=True)
assert one_bytes != three_bytes
@pytest.mark.parametrize(
@@ -345,6 +401,7 @@ def assert_round_trip(doc: rd.ResultDocument):
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
pytest.param("dynamic_a0000a6_rd"),
],
)
def test_round_trip(request, rd_file):