update translator and tests

This commit is contained in:
mr-tz
2023-03-14 10:13:49 +01:00
parent 3cdbc66375
commit a7b7f643a5
7 changed files with 1983 additions and 795 deletions
+1 -1
View File
@@ -44,7 +44,7 @@ def is_runtime_ida():
return True
def assert_never(value: NoReturn) -> NoReturn:
def assert_never(value) -> NoReturn:
assert False, f"Unhandled value: {value} ({type(value).__name__})"
+102 -113
View File
@@ -3,13 +3,13 @@ syntax = "proto3";
message APIFeature {
string type = 1;
string api = 2;
string description = 3;
optional string description = 3;
}
message Address {
AddressType type = 1;
oneof value {
Integer v = 2;
Integer v = 2; // TODO rename value?
Token_Offset token_offset = 3;
};
}
@@ -39,7 +39,7 @@ message Analysis {
message ArchFeature {
string type = 1;
string arch = 2;
string description = 3;
optional string description = 3;
}
message AttackSpec {
@@ -52,7 +52,7 @@ message AttackSpec {
message BasicBlockFeature {
string type = 1;
string description = 2;
optional string description = 2;
}
message BasicBlockLayout {
@@ -62,30 +62,30 @@ message BasicBlockLayout {
message BytesFeature {
string type = 1;
string bytes = 2;
string description = 3;
optional string description = 3;
}
message CharacteristicFeature {
string type = 1;
string characteristic = 2;
string description = 3;
optional string description = 3;
}
message ClassFeature {
string type = 1;
string description = 2;
string class = 3;
string class_ = 2; // class is protected Python keyword
optional string description = 3;
}
message CompoundStatement {
string type = 1;
string description = 2;
optional string description = 2;
}
message ExportFeature {
string type = 1;
string export = 2;
string description = 3;
optional string description = 3;
}
message FeatureCounts {
@@ -94,38 +94,49 @@ message FeatureCounts {
}
message FeatureNode {
string type = 1;
/*
TODO results in
"feature": {
"type": "feature",
"api": { <---- ugh, but again this is how proto works and we can translate back using custom code?!
"type": "api",
"api": "ws2_32.recv",
"description": ""
}
},
*/
oneof feature {
OSFeature v0 = 1;
ArchFeature v1 = 2;
FormatFeature v2 = 3;
MatchFeature v3 = 4;
CharacteristicFeature v4 = 5;
ExportFeature v5 = 6;
ImportFeature v6 = 7;
SectionFeature v7 = 8;
FunctionNameFeature v8 = 9;
SubstringFeature v9 = 10;
RegexFeature v10 = 11;
StringFeature v11 = 12;
ClassFeature v12 = 13;
NamespaceFeature v13 = 14;
APIFeature v14 = 15;
PropertyFeature v15 = 16;
NumberFeature v16 = 17;
BytesFeature v17 = 18;
OffsetFeature v18 = 19;
MnemonicFeature v19 = 20;
OperandNumberFeature v20 = 21;
OperandOffsetFeature v21 = 22;
BasicBlockFeature v22 = 23;
OSFeature os = 2;
ArchFeature arch = 3;
FormatFeature format = 4;
MatchFeature match = 5;
CharacteristicFeature characteristic = 6;
ExportFeature export = 7;
ImportFeature import_ = 8; // import is Python keyword
SectionFeature section = 9;
FunctionNameFeature function_name = 10;
SubstringFeature substring = 11;
RegexFeature regex = 12;
StringFeature string = 13;
ClassFeature class_ = 14;
NamespaceFeature namespace = 15;
APIFeature api = 16;
PropertyFeature property = 17;
NumberFeature number = 18;
BytesFeature bytes = 19;
OffsetFeature offset = 20;
MnemonicFeature mnemonic = 21;
OperandNumberFeature operand_number = 22;
OperandOffsetFeature operand_offset = 23;
BasicBlockFeature basic_block = 24;
};
string type = 25;
}
message FormatFeature {
string type = 1;
string format = 2;
string description = 3;
optional string description = 3;
}
message FunctionFeatureCount {
@@ -141,13 +152,13 @@ message FunctionLayout {
message FunctionNameFeature {
string type = 1;
string function_name = 2;
string description = 3;
optional string description = 3;
}
message ImportFeature {
string type = 1;
string description = 2;
string import = 3;
string import_ = 2;
optional string description = 3;
}
message Layout {
@@ -178,22 +189,22 @@ message MaecMetadata {
message Match {
bool success = 1;
oneof node {
StatementNode v0 = 2;
FeatureNode v1 = 3;
StatementNode statement = 2;
FeatureNode feature = 3;
};
repeated Match children = 5;
repeated Address locations = 6;
map <string, Array_Address> captures = 7;
map <string, Addresses> captures = 7;
}
message MatchFeature {
string type = 1;
string match = 2;
string description = 3;
optional string description = 3;
}
message Metadata {
string timestamp = 1; // TODO Timestamp?
string timestamp = 1; // google.protobuf.timestamp_pb2.Timestamp also would work, but seems more of a headache
string version = 2;
repeated string argv = 3;
Sample sample = 4;
@@ -203,93 +214,69 @@ message Metadata {
message MnemonicFeature {
string type = 1;
string mnemonic = 2;
string description = 3;
optional string description = 3;
}
message NamespaceFeature {
string type = 1;
string namespace = 2;
string description = 3;
optional string description = 3;
}
message NumberFeature {
string type = 1;
oneof number {
Integer v0 = 2;
Number v1 = 3;
};
string description = 5;
Number number = 2;
optional string description = 5;
}
message OSFeature {
string type = 1;
string os = 2;
string description = 3;
optional string description = 3;
}
message OffsetFeature {
string type = 1;
Integer offset = 2;
string description = 3;
Integer offset = 2; // TODO can/should this be negative? -- seen it in dotnet sample (bug?)
optional string description = 3;
}
message OperandNumberFeature {
string type = 1;
Integer index = 2;
Integer operand_number = 3;
string description = 4;
uint32 index = 2;
Integer operand_number = 3; // TODO can/should this be negative?
optional string description = 4;
}
message OperandOffsetFeature {
string type = 1;
Integer index = 2;
Integer operand_offset = 3;
string description = 4;
uint32 index = 2;
Integer operand_offset = 3; // TODO can/should this be negative?
optional string description = 4;
}
message PropertyFeature {
string type = 1;
string access = 2;
optional string access = 2;
string property = 3;
string description = 4;
optional string description = 4;
}
message RangeStatement {
string description = 1;
uint32 min = 2;
uint32 max = 3;
oneof child {
OSFeature v0 = 4;
ArchFeature v1 = 5;
FormatFeature v2 = 6;
MatchFeature v3 = 7;
CharacteristicFeature v4 = 8;
ExportFeature v5 = 9;
ImportFeature v6 = 10;
SectionFeature v7 = 11;
FunctionNameFeature v8 = 12;
SubstringFeature v9 = 13;
RegexFeature v10 = 14;
StringFeature v11 = 15;
ClassFeature v12 = 16;
NamespaceFeature v13 = 17;
APIFeature v14 = 18;
PropertyFeature v15 = 19;
NumberFeature v16 = 20;
BytesFeature v17 = 21;
OffsetFeature v18 = 22;
MnemonicFeature v19 = 23;
OperandNumberFeature v20 = 24;
OperandOffsetFeature v21 = 25;
BasicBlockFeature v22 = 26;
};
string type = 28;
string type = 1;
optional string description = 2;
uint64 min = 3;
uint64 max = 4;
// reusing FeatureNode here to avoid duplication and list all features OSFeature, ArchFeature, ... again
// FeatureNode has an extra field `type` which is not present in the pydantic definition, we can
// set it to "" to get rid of it
FeatureNode child = 5;
}
message RegexFeature {
string type = 1;
string regex = 2;
string description = 3;
optional string description = 3;
}
message ResultDocument {
@@ -307,7 +294,7 @@ message RuleMetadata {
string name = 1;
string namespace = 2;
repeated string authors = 3;
Scope scope = 4;
Scope scope = 4; // TODO string scope -> easier translation to proto and from proto to json?!
repeated AttackSpec attack = 5;
repeated MBCSpec mbc = 6;
repeated string references = 7;
@@ -315,7 +302,7 @@ message RuleMetadata {
string description = 9;
bool lib = 10;
MaecMetadata maec = 11;
bool capa_subscope = 12;
bool is_subscope_rule = 12;
}
message Sample {
@@ -326,7 +313,8 @@ message Sample {
}
enum Scope {
SCOPE_UNSPECIFIED = 0;
SCOPE_UNSPECIFIED = 0; // TODO do differently so json conversion works, currently gives `"scope": "SCOPE_FUNCTION"` which pydantic cannot parse
// could just make string?! and assert in code that it's one of supported values?!
SCOPE_FILE = 1;
SCOPE_FUNCTION = 2;
SCOPE_BASIC_BLOCK = 3;
@@ -336,48 +324,49 @@ enum Scope {
message SectionFeature {
string type = 1;
string section = 2;
string description = 3;
optional string description = 3;
}
message SomeStatement {
string description = 1;
uint32 count = 2;
string type = 3;
string type = 1;
optional string description = 2;
uint32 count = 3;
}
message StatementNode {
oneof statement {
RangeStatement v0 = 1;
SomeStatement v1 = 2;
SubscopeStatement v2 = 3;
CompoundStatement v3 = 4;
string type = 1;
oneof statement { // TODO don't specify these and just set type?! well I guess this is how proto is supposed to work...
// so for json conversion we'll also need a translation function (at least for testing)
RangeStatement range = 2;
SomeStatement some = 3;
SubscopeStatement subscope = 4;
CompoundStatement compound = 5;
};
string type = 6;
}
message StringFeature {
string type = 1;
string string = 2;
string description = 3;
optional string description = 3;
}
message SubscopeStatement {
string description = 1;
Scope scope = 2;
string type = 3;
string type = 1;
optional string description = 2;
Scope scope = 3;
}
message SubstringFeature {
string type = 1;
string substring = 2;
string description = 3;
optional string description = 3;
}
message Array_Address { repeated Address values = 1; }
message Addresses { repeated Address address = 1; }
message Pair_Address_Match {
Address v0 = 1;
Match v1 = 2;
Address address = 1;
Match match = 2;
}
message Token_Offset {
@@ -385,6 +374,6 @@ message Token_Offset {
uint64 offset = 2; // offset is always >= 0
}
message Integer { oneof value { uint64 u = 1; sint64 i = 2; } } // unsigned or unsigned int
message Integer { oneof value { uint64 u = 1; sint64 i = 2; } } // unsigned or signed int
message Number { oneof value { uint64 u = 1; sint64 i = 2; double f = 3; } }
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large Load Diff
+444 -119
View File
@@ -1,135 +1,460 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
Convert capa results to protobuf format.
The functionality here is similar to the various *from_capa functions, e.g. ResultDocument.from_capa() or
feature_from_capa.
For few classes we can rely on the proto json parser (e.g. RuleMetadata).
For most classes (e.g. RuleMatches) conversion is tricky, because we use natively unsupported types (e.g. tuples),
several classes with unions, and more complex layouts. So, it's more straight forward to convert explicitly vs.
massaging the data so the protobuf json parser works.
Of note, the 3 in `syntax = "proto3"` has nothing to do with the 2 in capa_pb2.py;
see details in https://github.com/grpc/grpc/issues/15444#issuecomment-396442980.
First compile the protobuf to generate an API file and a mypy stub file
$ protoc.exe --python_out=. --mypy_out=. <path_to_proto> (e.g. capa/render/proto/capa.proto)
Alternatively, --pyi_out=. can be used to generate a Python Interface file that supports development
"""
import sys
import json
import argparse
from typing import Dict, Union
import capa.features.freeze
import capa.render.proto.capa_pb2
import capa.render.result_document
import google.protobuf.json_format
from google.protobuf.json_format import MessageToJson
import capa.rules
import capa.features.freeze as frz
import capa.render.proto.capa_pb2 as capa_pb2
import capa.render.result_document as rd
import capa.features.freeze.features as frzf
from capa.helpers import assert_never
from capa.features.freeze import AddressType
def main():
# first compile protobuf
# protoc.exe --python_out . capa/render/proto/capa.proto
fpath = sys.argv[1]
with open(fpath, "r", encoding="utf-8") as f:
fdata = f.read()
doc = capa.render.result_document.ResultDocument.parse_obj(json.loads(fdata))
p = to_proto(doc)
print(p)
def dict_tuple_to_list_values(d: Dict) -> Dict:
o = dict()
for k, v in d.items():
if isinstance(v, tuple):
o[k] = list(v)
else:
o[k] = v
return o
def to_proto(doc):
m = metadata_from_capa(doc.meta)
return m
def metadata_from_capa(meta: capa.render.result_document.Metadata) -> capa.render.proto.capa_pb2.Metadata:
m = capa.render.proto.capa_pb2.Metadata()
m.timestamp = str(meta.timestamp) # TODO google.protobuf.timestamp_pb2.Timestamp?
m.version = meta.version
m.argv.extend(meta.argv)
m.sample.md5 = meta.sample.md5
m.sample.sha1 = meta.sample.sha1
m.sample.sha256 = meta.sample.sha256
m.sample.path = meta.sample.path
m.analysis.format = meta.analysis.format
m.analysis.arch = meta.analysis.arch
m.analysis.os = meta.analysis.os
m.analysis.extractor = meta.analysis.extractor
m.analysis.rules.extend(meta.analysis.rules)
m.analysis.base_address.CopyFrom(addr_from_freeze(meta.analysis.base_address))
m.analysis.layout.CopyFrom(
capa.render.proto.capa_pb2.Layout(
functions=[
capa.render.proto.capa_pb2.FunctionLayout(
address=addr_from_freeze(f.address),
matched_basic_blocks=[
capa.render.proto.capa_pb2.BasicBlockLayout(address=addr_from_freeze(bb.address))
for bb in f.matched_basic_blocks
],
)
for f in meta.analysis.layout.functions
]
)
)
m.analysis.feature_counts.CopyFrom(
capa.render.proto.capa_pb2.FeatureCounts(
file=meta.analysis.feature_counts.file,
functions=[
capa.render.proto.capa_pb2.FunctionFeatureCount(address=addr_from_freeze(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
]
)
)
m.analysis.library_functions.extend(
[
capa.render.proto.capa_pb2.LibraryFunction(address=addr_from_freeze(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
]
)
return m
def addr_from_freeze(a: capa.features.freeze.Address) -> capa.render.proto.capa_pb2.Address:
address = capa.render.proto.capa_pb2.Address()
if a.type is AddressType.ABSOLUTE:
address.type = capa.render.proto.capa_pb2.AddressType.ADDRESSTYPE_ABSOLUTE
address.v.CopyFrom(int_to_pb2(a.value))
return address
elif a.type is AddressType.RELATIVE:
address.type = capa.render.proto.capa_pb2.AddressType.ADDRESSTYPE_RELATIVE
address.v.CopyFrom(int_to_pb2(a.value))
return address
elif a.type is AddressType.FILE:
address.type = capa.render.proto.capa_pb2.AddressType.ADDRESSTYPE_FILE
address.v.CopyFrom(int_to_pb2(a.value))
return address
elif a.type is AddressType.DN_TOKEN:
address.type = capa.render.proto.capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN
address.v.CopyFrom(int_to_pb2(a.value))
return address
elif a.type is AddressType.DN_TOKEN_OFFSET:
token, offset = a.value
address.type = capa.render.proto.capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN_OFFSET
address.token_offset.token.CopyFrom(int_to_pb2(token))
address.token_offset.offset = offset
return address
elif a.type is AddressType.NO_ADDRESS:
address.type = capa.render.proto.capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS
# value == None so just don't set here
return address
else:
raise NotImplementedError(f"unhandled address type {a.type} ({type(a.type).__name__})")
def int_to_pb2(v):
assert isinstance(v, int)
def int_to_pb2(v: int) -> capa_pb2.Integer:
if v < -2_147_483_648:
raise ValueError("underflow")
raise ValueError(f"value underflow: {v}")
if v > 0xFFFFFFFFFFFFFFFF:
raise ValueError("overflow")
raise ValueError(f"value overflow: {v}")
if v < 0:
return capa.render.proto.capa_pb2.Integer(i=v)
return capa_pb2.Integer(i=v)
else:
return capa.render.proto.capa_pb2.Integer(u=v)
return capa_pb2.Integer(u=v)
def number_to_pb2(v: Union[int, float]) -> capa_pb2.Number:
if isinstance(v, float):
return capa_pb2.Number(f=v)
elif isinstance(v, int):
i = int_to_pb2(v)
if v < 0:
return capa_pb2.Number(i=i.i)
else:
return capa_pb2.Number(u=i.u)
def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address:
if addr.type is AddressType.ABSOLUTE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_ABSOLUTE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.RELATIVE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_RELATIVE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.FILE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_FILE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.DN_TOKEN:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN, v=int_to_pb2(addr.value))
elif addr.type is AddressType.DN_TOKEN_OFFSET:
assert isinstance(addr.value, tuple)
token, offset = addr.value
assert isinstance(token, int)
assert isinstance(offset, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN_OFFSET,
token_offset=capa_pb2.Token_Offset(token=int_to_pb2(token), offset=offset),
)
elif addr.type is AddressType.NO_ADDRESS:
# value == None, so only set type
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS)
else:
assert_never(addr)
def scope_to_pb2(scope: capa.rules.Scope) -> capa_pb2.Scope.ValueType:
if scope == capa.rules.Scope.FILE:
return capa_pb2.Scope.SCOPE_FILE
elif scope == capa.rules.Scope.FUNCTION:
return capa_pb2.Scope.SCOPE_FUNCTION
elif scope == capa.rules.Scope.BASIC_BLOCK:
return capa_pb2.Scope.SCOPE_BASIC_BLOCK
elif scope == capa.rules.Scope.INSTRUCTION:
return capa_pb2.Scope.SCOPE_INSTRUCTION
else:
assert_never(scope)
def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.dict(), capa_pb2.Sample()),
analysis=capa_pb2.Analysis(
format=meta.analysis.format,
arch=meta.analysis.arch,
os=meta.analysis.os,
extractor=meta.analysis.extractor,
rules=meta.analysis.rules,
base_address=addr_to_pb2(meta.analysis.base_address),
layout=capa_pb2.Layout(
functions=[
capa_pb2.FunctionLayout(
address=addr_to_pb2(f.address),
matched_basic_blocks=[
capa_pb2.BasicBlockLayout(address=addr_to_pb2(bb.address)) for bb in f.matched_basic_blocks
],
)
for f in meta.analysis.layout.functions
]
),
feature_counts=capa_pb2.FeatureCounts(
file=meta.analysis.feature_counts.file,
functions=[
capa_pb2.FunctionFeatureCount(address=addr_to_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
],
),
library_functions=[
capa_pb2.LibraryFunction(address=addr_to_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
],
),
)
def statement_to_pb2(statement: rd.Statement) -> capa_pb2.StatementNode:
if isinstance(statement, rd.RangeStatement):
child = feature_to_pb2(statement.child)
# field `type` is not present in the pydantic definition, so set it to "" (empty) here
# TODO is this (too) hacky? deviates a bit from the original proto design/usage
child.type = ""
return capa_pb2.StatementNode(
range=capa_pb2.RangeStatement(
type="range",
description=statement.description,
min=statement.min,
max=statement.max,
child=child,
),
type="statement",
)
elif isinstance(statement, rd.SomeStatement):
return capa_pb2.StatementNode(
some=capa_pb2.SomeStatement(type=statement.type, description=statement.description, count=statement.count),
type="statement",
)
elif isinstance(statement, rd.SubscopeStatement):
return capa_pb2.StatementNode(
subscope=capa_pb2.SubscopeStatement(
type=statement.type,
description=statement.description,
scope=scope_to_pb2(statement.scope),
),
type="statement",
)
elif isinstance(statement, rd.CompoundStatement):
return capa_pb2.StatementNode(
compound=capa_pb2.CompoundStatement(type=statement.type, description=statement.description),
type="statement",
)
else:
assert_never(statement)
def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode:
if isinstance(f, frzf.OSFeature):
return capa_pb2.FeatureNode(
type="feature", os=capa_pb2.OSFeature(type=f.type, os=f.os, description=f.description)
)
elif isinstance(f, frzf.ArchFeature):
return capa_pb2.FeatureNode(
type="feature", arch=capa_pb2.ArchFeature(type=f.type, arch=f.arch, description=f.description)
)
elif isinstance(f, frzf.FormatFeature):
return capa_pb2.FeatureNode(
type="feature", format=capa_pb2.FormatFeature(type=f.type, format=f.format, description=f.description)
)
elif isinstance(f, frzf.MatchFeature):
return capa_pb2.FeatureNode(
type="feature",
match=capa_pb2.MatchFeature(
type=f.type,
match=f.match,
description=f.description,
),
)
elif isinstance(f, frzf.CharacteristicFeature):
return capa_pb2.FeatureNode(
type="feature",
characteristic=capa_pb2.CharacteristicFeature(
type=f.type, characteristic=f.characteristic, description=f.description
),
)
elif isinstance(f, frzf.ExportFeature):
return capa_pb2.FeatureNode(
type="feature", export=capa_pb2.ExportFeature(type=f.type, export=f.export, description=f.description)
)
elif isinstance(f, frzf.ImportFeature):
return capa_pb2.FeatureNode(
type="feature", import_=capa_pb2.ImportFeature(type=f.type, import_=f.import_, description=f.description)
)
elif isinstance(f, frzf.SectionFeature):
return capa_pb2.FeatureNode(
type="feature", section=capa_pb2.SectionFeature(type=f.type, section=f.section, description=f.description)
)
elif isinstance(f, frzf.FunctionNameFeature):
return capa_pb2.FeatureNode(
type="function name",
function_name=capa_pb2.FunctionNameFeature(
type=f.type, function_name=f.function_name, description=f.description
),
)
elif isinstance(f, frzf.SubstringFeature):
return capa_pb2.FeatureNode(
type="feature",
substring=capa_pb2.SubstringFeature(type=f.type, substring=f.substring, description=f.description),
)
elif isinstance(f, frzf.RegexFeature):
return capa_pb2.FeatureNode(
type="feature", regex=capa_pb2.RegexFeature(type=f.type, regex=f.regex, description=f.description)
)
elif isinstance(f, frzf.StringFeature):
return capa_pb2.FeatureNode(
type="feature",
string=capa_pb2.StringFeature(
type=f.type,
string=f.string,
description=f.description,
),
)
elif isinstance(f, frzf.ClassFeature):
return capa_pb2.FeatureNode(
type="feature", class_=capa_pb2.ClassFeature(type=f.type, class_=f.class_, description=f.description)
)
elif isinstance(f, frzf.NamespaceFeature):
return capa_pb2.FeatureNode(
type="feature",
namespace=capa_pb2.NamespaceFeature(type=f.type, namespace=f.namespace, description=f.description),
)
elif isinstance(f, frzf.APIFeature):
return capa_pb2.FeatureNode(
type="feature", api=capa_pb2.APIFeature(type=f.type, api=f.api, description=f.description)
)
elif isinstance(f, frzf.PropertyFeature):
return capa_pb2.FeatureNode(
type="feature",
property=capa_pb2.PropertyFeature(
type=f.type, access=f.access, property=f.property, description=f.description
),
)
elif isinstance(f, frzf.NumberFeature):
return capa_pb2.FeatureNode(
type="feature",
number=capa_pb2.NumberFeature(type=f.type, number=number_to_pb2(f.number), description=f.description),
)
elif isinstance(f, frzf.BytesFeature):
return capa_pb2.FeatureNode(
type="feature", bytes=capa_pb2.BytesFeature(type=f.type, bytes=f.bytes, description=f.description)
)
elif isinstance(f, frzf.OffsetFeature):
return capa_pb2.FeatureNode(
type="feature",
offset=capa_pb2.OffsetFeature(type=f.type, offset=int_to_pb2(f.offset), description=f.description),
)
elif isinstance(f, frzf.MnemonicFeature):
return capa_pb2.FeatureNode(
type="feature",
mnemonic=capa_pb2.MnemonicFeature(type=f.type, mnemonic=f.mnemonic, description=f.description),
)
elif isinstance(f, frzf.OperandNumberFeature):
return capa_pb2.FeatureNode(
type="feature",
operand_number=capa_pb2.OperandNumberFeature(
type=f.type, index=f.index, operand_number=int_to_pb2(f.operand_number), description=f.description
),
)
elif isinstance(f, frzf.OperandOffsetFeature):
return capa_pb2.FeatureNode(
type="feature",
operand_offset=capa_pb2.OperandOffsetFeature(
type=f.type, index=f.index, operand_offset=int_to_pb2(f.operand_offset), description=f.description
),
)
elif isinstance(f, frzf.BasicBlockFeature):
return capa_pb2.FeatureNode(
type="feature", basic_block=capa_pb2.BasicBlockFeature(type=f.type, description=f.description)
)
else:
assert_never(f)
def node_to_pb2(node: rd.Node) -> Union[capa_pb2.FeatureNode, capa_pb2.StatementNode]:
if isinstance(node, rd.StatementNode):
return statement_to_pb2(node.statement)
elif isinstance(node, rd.FeatureNode):
return feature_to_pb2(node.feature)
else:
assert_never(node)
def match_to_pb2(match: rd.Match) -> capa_pb2.Match:
node = node_to_pb2(match.node)
children = list(map(match_to_pb2, match.children))
locations = list(map(addr_to_pb2, match.locations))
if isinstance(node, capa_pb2.StatementNode):
return capa_pb2.Match(
success=match.success,
statement=node,
children=children,
locations=locations,
captures={},
)
elif isinstance(node, capa_pb2.FeatureNode):
return capa_pb2.Match(
success=match.success,
feature=node,
children=children,
locations=locations,
captures={
capture: capa_pb2.Addresses(address=list(map(addr_to_pb2, locs)))
for capture, locs in match.captures.items()
},
)
else:
assert_never(match)
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
# after manual type conversions to the RuleMetadata, we can rely on the protobuf json parser
# conversions include tuple -> list and rd.Enum -> proto.enum
meta = dict_tuple_to_list_values(rule_metadata.dict())
meta["scope"] = scope_to_pb2(meta["scope"])
meta["attack"] = list(map(dict_tuple_to_list_values, meta.get("attack", [])))
meta["mbc"] = list(map(dict_tuple_to_list_values, meta.get("mbc", [])))
return google.protobuf.json_format.ParseDict(meta, capa_pb2.RuleMetadata())
def doc_to_pb2(doc: rd.ResultDocument) -> capa_pb2.ResultDocument:
rule_matches: Dict[str, capa_pb2.RuleMatches] = {}
for rule_name, matches in doc.rules.items():
m = capa_pb2.RuleMatches(
meta=rule_metadata_to_pb2(matches.meta),
source=matches.source,
matches=[
capa_pb2.Pair_Address_Match(address=addr_to_pb2(addr), match=match_to_pb2(match))
for addr, match in matches.matches
],
)
rule_matches[rule_name] = m
r = capa_pb2.ResultDocument(meta=metadata_to_pb2(doc.meta), rules=rule_matches)
return r
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="convert JSON result document to protobuf")
parser.add_argument("json_input", help="path to JSON result document to convert")
parser.add_argument("-j", "--json", action="store_true", help="emit JSON conversion of protobuf instead of text")
args = parser.parse_args(args=argv)
with open(args.json_input, "r", encoding="utf-8") as f:
fdata = f.read()
doc = rd.ResultDocument.parse_obj(json.loads(fdata))
proto_doc = doc_to_pb2(doc)
if args.json:
# TODO use ensure_ascii?
# including_default_value_fields -> so we get empty/unset fields
# see https://googleapis.dev/python/protobuf/latest/google/protobuf/json_format.html
json_obj = MessageToJson(
proto_doc, sort_keys=True, preserving_proto_field_name=True, including_default_value_fields=True
)
print(json_obj)
else:
print(proto_doc)
# TODO test?
# doc2 = rd.ResultDocument.parse_obj(json.loads(json_obj))
# doc2 = rd.ResultDocument.construct(json.loads(json_obj))
# assert doc == doc2
if __name__ == "__main__":
+51 -1
View File
@@ -1105,7 +1105,57 @@ def _039a6_dotnetfile_extractor():
@pytest.fixture
def pma0101_rd():
path = os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll.json")
path = os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll_.json")
with open(path, "rb") as f:
buf = f.read()
src = buf.decode("utf-8")
return capa.render.result_document.ResultDocument.parse_raw(src)
@pytest.fixture
def dotnet_1c444e_rd():
path = os.path.join(CD, "data", "dotnet", "1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json")
with open(path, "rb") as f:
buf = f.read()
src = buf.decode("utf-8")
return capa.render.result_document.ResultDocument.parse_raw(src)
@pytest.fixture
def a3f3bbc_rd():
path = os.path.join(CD, "data", "3f3bbcf8fd90bdcdcdc5494314ed4225.exe_.json")
with open(path, "rb") as f:
buf = f.read()
src = buf.decode("utf-8")
return capa.render.result_document.ResultDocument.parse_raw(src)
@pytest.fixture
def al_khaserx86_rd():
path = os.path.join(CD, "data", "al-khaser_x86.exe_.json")
with open(path, "rb") as f:
buf = f.read()
src = buf.decode("utf-8")
return capa.render.result_document.ResultDocument.parse_raw(src)
@pytest.fixture
def al_khaserx64_rd():
path = os.path.join(CD, "data", "al-khaser_x64.exe_.json")
with open(path, "rb") as f:
buf = f.read()
src = buf.decode("utf-8")
return capa.render.result_document.ResultDocument.parse_raw(src)
@pytest.fixture
def a076114_rd():
path = os.path.join(CD, "data", "0761142efbda6c4b1e801223de723578.dll_.json")
with open(path, "rb") as f:
buf = f.read()
+255 -55
View File
@@ -1,4 +1,4 @@
# Copyright (C) 2023 FireEye, Inc. All Rights Reserved.
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
@@ -10,17 +10,20 @@ import pathlib
import subprocess
import pydantic
from fixtures import *
import capa.rules
import capa.render
import capa.render.proto
import capa.render.utils
import capa.features.freeze
import capa.features.address
import capa.render.proto.proto
import capa.render.proto.capa_pb2
import capa.render.result_document
import capa.render.proto.capa_pb2 as capa_pb2
import capa.render.result_document as rd
import capa.features.freeze.features
from fixtures import *
from capa.render.result_document import ResultDocument
# TODO test_proto_to_rd?
# TODO enable/remove
@@ -29,7 +32,7 @@ def _test_generate_proto(tmp_path: pathlib.Path):
proto_path = tmp_path / "capa.proto"
json_path = tmp_path / "capa.json"
schema = pydantic.schema_of(capa.render.result_document.ResultDocument)
schema = pydantic.schema_of(rd.ResultDocument)
json_path.write_text(json.dumps(schema, indent=4))
proto = capa.render.proto.generate_proto()
@@ -58,13 +61,114 @@ def _test_generate_proto(tmp_path: pathlib.Path):
print("=====================================")
def test_translate_to_proto(pma0101_rd: ResultDocument):
src = pma0101_rd
@pytest.mark.parametrize(
"rd_file",
[
pytest.param("a3f3bbc_rd"),
pytest.param("al_khaserx86_rd"),
pytest.param("al_khaserx64_rd"),
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
],
)
def test_doc_to_pb2(request, rd_file):
src: rd.ResultDocument = request.getfixturevalue(rd_file)
dst = capa.render.proto.proto.doc_to_pb2(src)
meta = src.meta
dst = capa.render.proto.proto.metadata_from_capa(meta)
assert_meta(src.meta, dst.meta)
assert str(meta.timestamp) == dst.timestamp # TODO type?
for rule_name, matches in src.rules.items():
assert rule_name in dst.rules
m: capa_pb2.RuleMetadata = dst.rules[rule_name].meta
assert matches.meta.name == m.name
assert cmp_optional(matches.meta.namespace, m.namespace)
assert list(matches.meta.authors) == m.authors
assert capa.render.proto.proto.scope_to_pb2(matches.meta.scope) == m.scope
assert len(matches.meta.attack) == len(m.attack)
for rd_attack, proto_attack in zip(matches.meta.attack, m.attack):
assert list(rd_attack.parts) == proto_attack.parts
assert rd_attack.tactic == proto_attack.tactic
assert rd_attack.technique == proto_attack.technique
assert rd_attack.subtechnique == proto_attack.subtechnique
assert len(matches.meta.mbc) == len(m.mbc)
for rd_mbc, proto_mbc in zip(matches.meta.mbc, m.mbc):
assert list(rd_mbc.parts) == proto_mbc.parts
assert rd_mbc.objective == proto_mbc.objective
assert rd_mbc.behavior == proto_mbc.behavior
assert rd_mbc.method == proto_mbc.method
assert rd_mbc.id == proto_mbc.id
assert list(matches.meta.references) == m.references
assert list(matches.meta.examples) == m.examples
assert matches.meta.description == m.description
assert matches.meta.lib == m.lib
assert matches.meta.is_subscope_rule == m.is_subscope_rule
assert cmp_optional(matches.meta.maec.analysis_conclusion, m.maec.analysis_conclusion)
assert cmp_optional(matches.meta.maec.analysis_conclusion_ov, m.maec.analysis_conclusion_ov)
assert cmp_optional(matches.meta.maec.malware_family, m.maec.malware_family)
assert cmp_optional(matches.meta.maec.malware_category, m.maec.malware_category)
assert cmp_optional(matches.meta.maec.malware_category_ov, m.maec.malware_category_ov)
assert matches.source == dst.rules[rule_name].source
assert len(matches.matches) == len(dst.rules[rule_name].matches)
for (addr, match), proto_match in zip(matches.matches, dst.rules[rule_name].matches):
assert capa.render.proto.proto.addr_to_pb2(addr) == proto_match.address
assert_match(match, proto_match.match)
def test_addr_to_pb2():
a1 = capa.features.freeze.Address.from_capa(capa.features.address.AbsoluteVirtualAddress(0x400000))
a = capa.render.proto.proto.addr_to_pb2(a1)
assert a.type == capa_pb2.ADDRESSTYPE_ABSOLUTE
assert a.v.u == 0x400000
a2 = capa.features.freeze.Address.from_capa(capa.features.address.RelativeVirtualAddress(0x100))
a = capa.render.proto.proto.addr_to_pb2(a2)
assert a.type == capa_pb2.ADDRESSTYPE_RELATIVE
assert a.v.u == 0x100
a3 = capa.features.freeze.Address.from_capa(capa.features.address.FileOffsetAddress(0x200))
a = capa.render.proto.proto.addr_to_pb2(a3)
assert a.type == capa_pb2.ADDRESSTYPE_FILE
assert a.v.u == 0x200
a4 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenAddress(0x123456))
a = capa.render.proto.proto.addr_to_pb2(a4)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN
assert a.v.u == 0x123456
a5 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenOffsetAddress(0x123456, 0x10))
a = capa.render.proto.proto.addr_to_pb2(a5)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN_OFFSET
assert a.token_offset.token.u == 0x123456
assert a.token_offset.offset == 0x10
a6 = capa.features.freeze.Address.from_capa(capa.features.address._NoAddress())
a = capa.render.proto.proto.addr_to_pb2(a6)
assert a.type == capa_pb2.ADDRESSTYPE_NO_ADDRESS
def test_scope_to_pb2():
assert capa.render.proto.proto.scope_to_pb2(capa.rules.FILE_SCOPE) == capa_pb2.SCOPE_FILE
assert capa.render.proto.proto.scope_to_pb2(capa.rules.FUNCTION_SCOPE) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.proto.scope_to_pb2(capa.rules.BASIC_BLOCK_SCOPE) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.proto.scope_to_pb2(capa.rules.INSTRUCTION_SCOPE) == capa_pb2.SCOPE_INSTRUCTION
def cmp_optional(a, b):
# proto optional value gets deserialized to "" instead of None (used by pydantic)
a = a if a is not None else ""
return a == b
def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert str(meta.timestamp) == dst.timestamp
assert meta.version == dst.version
assert list(meta.argv) == dst.argv
@@ -78,68 +182,164 @@ def test_translate_to_proto(pma0101_rd: ResultDocument):
assert meta.analysis.os == dst.analysis.os
assert meta.analysis.extractor == dst.analysis.extractor
assert list(meta.analysis.rules) == dst.analysis.rules
assert capa.render.proto.proto.addr_from_freeze(meta.analysis.base_address) == dst.analysis.base_address
assert capa.render.proto.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address
assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions)
# TODO use zip()
for i, f in enumerate(meta.analysis.layout.functions):
assert capa.render.proto.proto.addr_from_freeze(f.address) == dst.analysis.layout.functions[i].address
for rd_f, proto_f in zip(meta.analysis.layout.functions, dst.analysis.layout.functions):
assert capa.render.proto.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert len(f.matched_basic_blocks) == len(dst.analysis.layout.functions[i].matched_basic_blocks)
for j, bb in enumerate(f.matched_basic_blocks):
assert (
capa.render.proto.proto.addr_from_freeze(bb.address)
== dst.analysis.layout.functions[i].matched_basic_blocks[j].address
)
assert len(rd_f.matched_basic_blocks) == len(proto_f.matched_basic_blocks)
for rd_bb, proto_bb in zip(rd_f.matched_basic_blocks, proto_f.matched_basic_blocks):
assert capa.render.proto.proto.addr_to_pb2(rd_bb.address) == proto_bb.address
assert meta.analysis.feature_counts.file == dst.analysis.feature_counts.file
assert len(meta.analysis.feature_counts.functions) == len(dst.analysis.feature_counts.functions)
for rd_f, proto_f in zip(meta.analysis.feature_counts.functions, dst.analysis.feature_counts.functions):
assert capa.render.proto.proto.addr_from_freeze(rd_f.address) == proto_f.address
assert capa.render.proto.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert rd_f.count == proto_f.count
assert len(meta.analysis.library_functions) == len(dst.analysis.library_functions)
for rd_lf, proto_lf in zip(meta.analysis.library_functions, dst.analysis.library_functions):
assert capa.render.proto.proto.addr_from_freeze(rd_lf.address) == proto_lf.address
assert capa.render.proto.proto.addr_to_pb2(rd_lf.address) == proto_lf.address
assert rd_lf.name == proto_lf.name
def test_addr_from_freeze():
a = capa.features.address.AbsoluteVirtualAddress(0x400000)
a = capa.features.freeze.Address.from_capa(a)
a = capa.render.proto.proto.addr_from_freeze(a)
assert a.type == capa.render.proto.capa_pb2.ADDRESSTYPE_ABSOLUTE
assert a.v.u == 0x400000
def assert_match(ma: rd.Match, mb: capa_pb2.Match):
assert ma.success == mb.success
a = capa.features.address.RelativeVirtualAddress(0x100)
a = capa.features.freeze.Address.from_capa(a)
a = capa.render.proto.proto.addr_from_freeze(a)
assert a.type == capa.render.proto.capa_pb2.ADDRESSTYPE_RELATIVE
assert a.v.u == 0x100
# node
if isinstance(ma.node, rd.StatementNode):
assert_statement(ma.node, mb.statement)
a = capa.features.address.FileOffsetAddress(0x200)
a = capa.features.freeze.Address.from_capa(a)
a = capa.render.proto.proto.addr_from_freeze(a)
assert a.type == capa.render.proto.capa_pb2.ADDRESSTYPE_FILE
assert a.v.u == 0x200
elif isinstance(ma.node, rd.FeatureNode):
assert ma.node.type == mb.feature.type
assert_feature(ma.node.feature, mb.feature)
a = capa.features.address.DNTokenAddress(0x123456)
a = capa.features.freeze.Address.from_capa(a)
a = capa.render.proto.proto.addr_from_freeze(a)
assert a.type == capa.render.proto.capa_pb2.ADDRESSTYPE_DN_TOKEN
assert a.v.u == 0x123456
# children
assert len(ma.children) == len(mb.children)
for ca, cb in zip(ma.children, mb.children):
assert_match(ca, cb)
a = capa.features.address.DNTokenOffsetAddress(0x123456, 0x10)
a = capa.features.freeze.Address.from_capa(a)
a = capa.render.proto.proto.addr_from_freeze(a)
assert a.type == capa.render.proto.capa_pb2.ADDRESSTYPE_DN_TOKEN_OFFSET
assert a.token_offset.token.u == 0x123456
assert a.token_offset.offset == 0x10
# locations
assert list(map(capa.render.proto.proto.addr_to_pb2, ma.locations)) == mb.locations
a = capa.features.address._NoAddress()
a = capa.features.freeze.Address.from_capa(a)
a = capa.render.proto.proto.addr_from_freeze(a)
assert a.type == capa.render.proto.capa_pb2.ADDRESSTYPE_NO_ADDRESS
# captures
assert len(ma.captures) == len(mb.captures)
for capture, locs in ma.captures.items():
assert capture in mb.captures
assert list(map(capa.render.proto.proto.addr_to_pb2, locs)) == mb.captures[capture].address
# TODO proto to RD?
def assert_feature(fa, fb):
# get field that has been set, e.g., os or api, to access inner fields
fb = getattr(fb, fb.WhichOneof("feature"))
assert fa.type == fb.type
assert cmp_optional(fa.description, fb.description)
if isinstance(fa, capa.features.freeze.features.OSFeature):
assert fa.os == fb.os
elif isinstance(fa, capa.features.freeze.features.ArchFeature):
assert fa.arch == fb.arch
elif isinstance(fa, capa.features.freeze.features.FormatFeature):
assert fa.format == fb.format
elif isinstance(fa, capa.features.freeze.features.MatchFeature):
assert fa.match == fb.match
elif isinstance(fa, capa.features.freeze.features.CharacteristicFeature):
assert fa.characteristic == fb.characteristic
elif isinstance(fa, capa.features.freeze.features.ExportFeature):
assert fa.export == fb.export
elif isinstance(fa, capa.features.freeze.features.ImportFeature):
assert fa.import_ == fb.import_ # or could use getattr
elif isinstance(fa, capa.features.freeze.features.SectionFeature):
assert fa.section == fb.section
elif isinstance(fa, capa.features.freeze.features.FunctionNameFeature):
assert fa.function_name == fb.function_name
elif isinstance(fa, capa.features.freeze.features.SubstringFeature):
assert fa.substring == fb.substring
elif isinstance(fa, capa.features.freeze.features.RegexFeature):
assert fa.regex == fb.regex
elif isinstance(fa, capa.features.freeze.features.StringFeature):
assert fa.string == fb.string
elif isinstance(fa, capa.features.freeze.features.ClassFeature):
assert fa.class_ == fb.class_
elif isinstance(fa, capa.features.freeze.features.NamespaceFeature):
assert fa.namespace == fb.namespace
elif isinstance(fa, capa.features.freeze.features.BasicBlockFeature):
pass
elif isinstance(fa, capa.features.freeze.features.APIFeature):
assert fa.api == fb.api
elif isinstance(fa, capa.features.freeze.features.PropertyFeature):
assert fa.property == fb.property
assert fa.access == fb.access
elif isinstance(fa, capa.features.freeze.features.NumberFeature):
# get number value of set field
n = getattr(fb.number, fb.number.WhichOneof("value"))
assert fa.number == n
elif isinstance(fa, capa.features.freeze.features.BytesFeature):
assert fa.bytes == fb.bytes
elif isinstance(fa, capa.features.freeze.features.OffsetFeature):
assert fa.offset == getattr(fb.offset, fb.offset.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.MnemonicFeature):
assert fa.mnemonic == fb.mnemonic
elif isinstance(fa, capa.features.freeze.features.OperandNumberFeature):
assert fa.index == fb.index
assert fa.operand_number == getattr(fb.operand_number, fb.operand_number.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.OperandOffsetFeature):
assert fa.index == fb.index
assert fa.operand_offset == getattr(fb.operand_offset, fb.operand_offset.WhichOneof("value"))
else:
raise NotImplementedError(f"unhandled feature: {type(fa)}: {fa}")
def assert_statement(a: rd.StatementNode, b: capa_pb2.StatementNode):
assert a.type == b.type
sa = a.statement
sb = getattr(b, str(b.WhichOneof("statement")))
assert sa.type == sb.type
assert cmp_optional(sa.description, sb.description)
if isinstance(sa, rd.RangeStatement):
assert isinstance(sb, capa_pb2.RangeStatement)
assert sa.min == sb.min
assert sa.max == sa.max
assert_feature(sa.child, sb.child)
elif isinstance(sa, rd.SomeStatement):
assert sa.count == sb.count
elif isinstance(sa, rd.SubscopeStatement):
assert capa.render.proto.proto.scope_to_pb2(sa.scope) == sb.scope
elif isinstance(sa, rd.CompoundStatement):
# only has type and description tested above
pass
else:
# unhandled statement
assert False