Compare commits

...

17 Commits

Author SHA1 Message Date
Willi Ballenthin
c0ff0c2124 add wip proto translator using introspection 2023-02-17 11:11:14 +01:00
Willi Ballenthin
099cd868ae Merge branch 'wb-proto' of personal.github.com:mandiant/capa into wb-proto 2023-02-14 13:04:47 +01:00
Willi Ballenthin
3071394ef4 Update capa/render/proto/__init__.py
Co-authored-by: Moritz <mr-tz@users.noreply.github.com>
2023-02-14 16:24:47 +01:00
Willi Ballenthin
d1b4e59e7d Update capa/render/proto/__init__.py
Co-authored-by: Moritz <mr-tz@users.noreply.github.com>
2023-02-14 16:24:39 +01:00
Willi Ballenthin
50750a59d9 Merge branch 'master' of personal.github.com:mandiant/capa into wb-proto 2023-02-14 13:04:28 +01:00
Willi Ballenthin
fd64b2c5d5 Merge pull request #1315 from mandiant/typing-address
freeze: better type annotations for Address value
2023-02-14 15:05:31 +01:00
Willi Ballenthin
e41afbee58 changelog 2023-02-14 13:04:05 +01:00
Willi Ballenthin
9ea2aca9cb test: proto: emit the schema json, too 2023-02-14 11:24:30 +01:00
Willi Ballenthin
c7ab89507e setup: fix dep spec 2023-02-14 11:02:28 +01:00
Willi Ballenthin
c197fd5086 proto: add type stubs for generate schema 2023-02-14 10:57:43 +01:00
Willi Ballenthin
b6e607f60e ci: ignore syntax, type checking for protobuf generated files 2023-02-14 10:26:05 +01:00
Willi Ballenthin
38d8b7f501 render: add initial proto generator 2023-02-14 10:02:12 +01:00
Willi Ballenthin
514b4929b3 freeze: better type annotations for Address value 2023-02-14 09:47:57 +01:00
Capa Bot
4ea3475d2b Sync capa rules submodule 2023-02-13 09:50:39 +00:00
Capa Bot
15a276e3a5 Sync capa rules submodule 2023-02-13 09:47:05 +00:00
Capa Bot
f6e58ea212 Sync capa rules submodule 2023-02-10 10:08:30 +00:00
Capa Bot
1b191b5aea Sync capa-testfiles submodule 2023-02-10 08:52:58 +00:00
14 changed files with 2731 additions and 10 deletions

View File

@@ -34,11 +34,11 @@ jobs:
- name: Install dependencies
run: pip install -e .[dev]
- name: Lint with isort
run: isort --profile black --length-sort --line-width 120 -c .
run: isort --profile black --length-sort --line-width 120 --skip-glob "*_pb2.py" -c .
- name: Lint with black
run: black -l 120 --check .
run: black -l 120 --extend-exclude ".*_pb2.py" --check .
- name: Lint with pycodestyle
run: pycodestyle --show-source capa/ scripts/ tests/
run: pycodestyle --exclude="*_pb2.py" --show-source capa/ scripts/ tests/
- name: Check types with mypy
run: mypy --config-file .github/mypy/mypy.ini --check-untyped-defs capa/ scripts/ tests/

View File

@@ -4,10 +4,15 @@
### New Features
- add protobuf format for result documents #1219 @williballenthin
### Breaking Changes
### New Rules (0)
### New Rules (3)
- persistence/scheduled-tasks/schedule-task-via-at joren485
- data-manipulation/prng/generate-random-numbers-via-rtlgenrandom william.ballenthin@mandiant.com
- communication/ip/convert-ip-address-from-string @mr-tz
-
### Bug Fixes

View File

@@ -2,7 +2,7 @@
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/flare-capa)](https://pypi.org/project/flare-capa)
[![Last release](https://img.shields.io/github/v/release/mandiant/capa)](https://github.com/mandiant/capa/releases)
[![Number of rules](https://img.shields.io/badge/rules-770-blue.svg)](https://github.com/mandiant/capa-rules)
[![Number of rules](https://img.shields.io/badge/rules-773-blue.svg)](https://github.com/mandiant/capa-rules)
[![CI status](https://github.com/mandiant/capa/workflows/CI/badge.svg)](https://github.com/mandiant/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster)
[![Downloads](https://img.shields.io/github/downloads/mandiant/capa/total)](https://github.com/mandiant/capa/releases)
[![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE.txt)

View File

@@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations
import zlib
import logging
from enum import Enum
from typing import Any, List, Tuple
from typing import Any, List, Tuple, Union
from pydantic import Field, BaseModel
@@ -46,7 +46,7 @@ class AddressType(str, Enum):
class Address(HashableModel):
type: AddressType
value: Any
value: Union[int, Tuple[int, int], None]
@classmethod
def from_capa(cls, a: capa.features.address.Address) -> "Address":
@@ -79,19 +79,26 @@ class Address(HashableModel):
def to_capa(self) -> capa.features.address.Address:
if self.type is AddressType.ABSOLUTE:
assert isinstance(self.value, int)
return capa.features.address.AbsoluteVirtualAddress(self.value)
elif self.type is AddressType.RELATIVE:
assert isinstance(self.value, int)
return capa.features.address.RelativeVirtualAddress(self.value)
elif self.type is AddressType.FILE:
assert isinstance(self.value, int)
return capa.features.address.FileOffsetAddress(self.value)
elif self.type is AddressType.DN_TOKEN:
assert isinstance(self.value, int)
return capa.features.address.DNTokenAddress(self.value)
elif self.type is AddressType.DN_TOKEN_OFFSET:
assert isinstance(self.value, tuple)
token, offset = self.value
assert isinstance(token, int)
assert isinstance(offset, int)
return capa.features.address.DNTokenOffsetAddress(token, offset)
elif self.type is AddressType.NO_ADDRESS:
@@ -108,7 +115,11 @@ class Address(HashableModel):
return True
else:
return self.value < other.value
assert self.type == other.type
# mypy doesn't realize we've proven that either
# both are ints, or both are tuples of ints.
# and both of these are comparable.
return self.value < other.value # type: ignore
class GlobalFeature(HashableModel):

View File

@@ -0,0 +1,596 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import logging
from typing import Dict, Union
from dataclasses import dataclass
import pydantic
import capa.render
import capa.render.utils
import capa.features.freeze
import capa.render.result_document
import capa.features.freeze.features
from capa.render.utils import StringIO
logger = logging.getLogger(__name__)
def is_enum(prop):
return "type" in prop and prop["type"] == "string" and "enum" in prop
def get_enum_name(prop):
return prop["title"]
def get_enum_value_name(enum, value):
# like: ADDRESSTYPE
prefix = get_enum_name(enum).upper()
# like: ADDRESSTYPE_ABSOLUTE
return "%s_%s" % (prefix, value.upper().replace(" ", "_"))
def emit_proto_enum(out: StringIO, enum):
# like:
#
# enum AddressType {
# ADDRESSTYPE_UNSPECIFIED = 0;
# ADDRESSTYPE_ABSOLUTE = 1;
# ADDRESSTYPE_RELATIVE = 2;
# ...
# }
out.writeln(f"enum {get_enum_name(enum)} {{")
out.writeln(f' {get_enum_value_name(enum, "unspecified")} = 0;')
for i, value in enumerate(enum["enum"]):
out.writeln(f" {get_enum_value_name(enum, value)} = {i + 1};")
out.writeln(f"}}")
out.writeln("")
def is_ref(prop):
return "$ref" in prop
def get_ref_type_name(prop):
# from: {"$ref": "#/definitions/Scope"}},
# to: "Scope"
assert is_ref(prop)
assert prop["$ref"].startswith("#/definitions/")
return prop["$ref"][len("#/definitions/") :]
def is_primitive_type(prop):
# things like: string, integer, bool, etc.
return "type" in prop and not prop["type"] == "object" and not "enum" in prop
def is_custom_type(prop):
# struct-like things defined in the schema, like Features, etc.
return "type" in prop and prop["type"] == "object" and "additionalProperties" not in prop
def get_custom_type_name(prop):
return prop["title"]
def is_tuple(prop):
# a tuple is an array with a fixed size.
# the types of the elements can vary.
# we'll emit a custom message type for each tuple, like Pair_Address_Match.
#
# like:
#
# {"items": [{"$ref": "#/definitions/Address"},
# {"$ref": "#/definitions/Match"}],
# "maxItems": 2,
# "minItems": 2,
# "type": "array"},
if "type" not in prop:
return False
if prop["type"] != "array":
return False
if "maxItems" not in prop or "minItems" not in prop:
return False
if prop["maxItems"] != prop["minItems"]:
# tuples have a fixed size
return False
return True
def get_tuple_type_name(prop):
assert is_tuple(prop)
if prop["maxItems"] == 2:
base = "Pair"
else:
base = "Tuple"
# this won't work for nested tuples, but good enough for here.
return base + "_" + "_".join(get_type_name(item) for item in prop["items"])
def is_array(prop):
# an array is a sequence of elements of the same type.
# typically we can use a repeated field for this.
# note: there's a special case within maps, where the array elements are a custom wrapper type.
#
# like:
#
# {"items": {"type": "string"},
# "title": "Parts",
# "type": "array"},
if "type" not in prop:
return False
if prop["type"] != "array":
return False
if "maxItems" in prop and "minItems" in prop and prop["maxItems"] == prop["minItems"]:
# tuples have a fixed size, arrays are variable
return False
if not isinstance(prop["items"], dict):
# array elements have a fixed type
return False
return True
def is_map(prop):
# a map maps from string key to a fixed type.
# the value type cannot be repeated, so we'll emit a custom wrapper type.
#
# like:
#
# {"additionalProperties": {"items": {"$ref": "#/definitions/Address"},
# "type": "array"},
# "title": "Captures",
# "type": "object"},
return "type" in prop and prop["type"] == "object" and "additionalProperties" in prop
def get_primitive_type_name(prop):
assert is_primitive_type(prop)
if prop["type"] == "string":
return "string"
elif prop["type"] == "boolean":
return "bool"
elif prop["type"] == "integer":
# this integer has arbitrary range.
# but proto supports only i64 and u64.
# so we hook this specially, including within the translator.
return "Integer"
elif prop["type"] == "number":
# number: int | float
# we hook this specially
return "Number"
elif is_tuple(prop):
return get_tuple_type_name(prop)
elif is_array(prop):
aitem = prop["items"]
if is_primitive_type(aitem):
atype = get_primitive_type_name(prop["items"])
elif is_ref(aitem):
atype = get_ref_type_name(aitem)
elif is_custom_type(aitem):
atype = get_custom_type_name(aitem)
else:
raise NotImplementedError(aitem)
return f"repeated {atype}"
else:
raise NotImplementedError(prop["type"])
def get_type_name(prop):
if is_primitive_type(prop):
return get_primitive_type_name(prop)
elif is_custom_type(prop):
return get_custom_type_name(prop)
elif is_ref(prop):
return get_ref_type_name(prop)
elif is_enum(prop):
return get_enum_name(prop)
else:
raise NotImplementedError(prop)
def is_union(prop):
# a union is a field that can be one of several types.
return "anyOf" in prop
def sanitize_prop_name(name):
# like: "analysis-conclusion" -> "analysis_conclusion"
# like: "att&ck" -> "attack"
# like: "capa/subscope" -> "capa-subscope"
# like: "function name" -> "function-name"
return name.replace("-", "_").replace("&", "a").replace("/", "_").replace(" ", "_")
def _find_capa_class(name):
# try to find the capa class that corresponds to the given name.
# we use this to find the class that defines the property order.
try:
return getattr(capa.render.result_document, name)
except AttributeError:
pass
try:
return getattr(capa.features.freeze, name)
except AttributeError:
pass
try:
return getattr(capa.features.freeze.features, name)
except AttributeError:
pass
raise NotImplementedError(name)
def _enum_properties(message):
"""enumerate the properties of the message defined, ordered by class declaration"""
# this is just for convenience.
# the order of properties provided by the class. guaranteed.
property_order = list(_find_capa_class(message["title"]).__signature__.parameters.keys())
# order of properties provided by pydantic. not guaranteed. the fallback.
# used when we can't figure out an alias, such as capa/subscope -> is_subscope.
properties = list(message["properties"].keys())
def get_property_index(name):
try:
# prefer the order of properties provided by the class.
return property_order.index(sanitize_prop_name(name))
except ValueError:
# fallback to whatever pydantic extracts.
return len(message["properties"]) + properties.index(name)
return sorted(message["properties"].items(), key=lambda p: get_property_index(p[0]))
@dataclass
class DeferredArrayType:
name: str
item: dict
@dataclass
class DeferredTupleType:
name: str
count: int
items: dict
def emit_proto_message(out: StringIO, deferred_types: Dict, message):
# like: Address
title = message["title"]
out.writeln(f"message {title} {{")
counter = iter(range(1, sys.maxsize))
for raw_name, prop in _enum_properties(message):
# we use a counter like this so that
# union/oneof fields can increment the counter.
i = next(counter)
name = sanitize_prop_name(raw_name)
if is_ref(prop):
ptype = get_ref_type_name(prop)
out.writeln(f" {ptype} {name} = {i};")
elif is_primitive_type(prop):
ptype = get_primitive_type_name(prop)
out.writeln(f" {ptype} {name} = {i};")
if is_tuple(prop):
deferred_types[ptype] = DeferredTupleType(ptype, prop["minItems"], prop["items"])
elif is_array(prop):
aitem = prop["items"]
if is_tuple(aitem):
atype = get_tuple_type_name(aitem)
deferred_types[atype] = DeferredTupleType(atype, aitem["minItems"], aitem["items"])
elif is_custom_type(prop):
ptype = get_custom_type_name(prop)
out.writeln(f" {ptype} {name} = {i};")
elif is_union(prop):
out.writeln(f" oneof {name} {{")
for j, of in enumerate(prop["anyOf"]):
if is_ref(of):
ptype = get_ref_type_name(of)
out.writeln(f" {ptype} v{j} = {i};")
elif is_primitive_type(of):
ptype = get_primitive_type_name(of)
out.writeln(f" {ptype} v{j} = {i};")
if is_tuple(of):
deferred_types[ptype] = DeferredTupleType(ptype, of["minItems"], of["items"])
# pydantic doesn't seem to encode None option
# fortunately, neither does protobuf.
# still seems weird not to be explicit.
else:
raise NotImplementedError(of)
i = next(counter)
out.writeln(f" }};")
elif is_map(prop):
if is_array(prop["additionalProperties"]):
# map values cannot be repeated, see:
# https://stackoverflow.com/a/41552990/87207
#
# so create a wrapper type around the repeated values.
# like: message Array_Integer { repeated int32 values = 1; }
#
# no:
#
# map <string, repeated int32> things = 1;
#
# yes:
#
# map <string, Array_Integer> things = 1;
#
# we could do this for every array, like Array_Integer and Array_Address,
# but its less idiomatic and more noisy.
# so we only create these types when we need them.
item_def = prop["additionalProperties"]["items"]
vtype = "Array_" + get_type_name(item_def)
# register this type to be emitted once we're done with the
# top level custom types in the schema.
deferred_types[vtype] = DeferredArrayType(vtype, item_def)
else:
vtype = get_type_name(prop["additionalProperties"])
out.writeln(f" map <string, {vtype}> {name} = {i};")
else:
raise ValueError("unexpected type: %s" % prop)
out.writeln(f"}}")
out.writeln("")
def emit_proto_entry(out: StringIO, deferred_types: Dict, schema, name):
if not name.startswith("#/definitions/"):
raise ValueError("unexpected name: %s" % name)
title = name[len("#/definitions/") :]
definition = schema["definitions"][title]
if definition["title"] != title:
raise ValueError("title mismatch: %s" % definition["title"])
if definition["type"] == "string" and "enum" in definition:
emit_proto_enum(out, definition)
elif definition["type"] == "object":
emit_proto_message(out, deferred_types, definition)
else:
raise NotImplementedError(definition["type"])
def generate_proto_from_pydantic(schema):
out: StringIO = capa.render.utils.StringIO()
out.writeln("// Generated by the capa.render.proto translator. DO NOT EDIT!")
out.writeln('syntax = "proto3";')
out.writeln("")
deferred_types: Dict[str, Union[DeferredArrayType, DeferredTupleType]] = dict()
for name in sorted(schema["definitions"].keys()):
emit_proto_entry(out, deferred_types, schema, "#/definitions/" + name)
for name, deferred_type in sorted(deferred_types.items()):
if isinstance(deferred_type, DeferredArrayType):
vtype = get_type_name(deferred_type.item)
out.writeln(f"message {name} {{ repeated {vtype} values = 1; }}\n")
elif isinstance(deferred_type, DeferredTupleType):
out.writeln(f"message {name} {{")
for i, item in enumerate(deferred_type.items):
vtype = get_type_name(item)
out.writeln(f" {vtype} v{i} = {i + 1};")
out.writeln(f"}}\n")
# these are additional primitive types that we'll use throughout.
out.writeln("message Integer { oneof value { uint64 u = 1; int64 i = 2; } }\n")
out.writeln("message Number { oneof value { uint64 u = 1; int64 i = 2; double f = 3; } }\n")
return out.getvalue()
def generate_proto() -> str:
"""
generate a protobuf v3 schema for the ResultDocument format.
we use introspection of the pydantic schema to generate this.
note: we *cannot* use the generated proto from version to version of capa,
because this translator does guarantee field ordering/numbering.
that is, if we add a new property to any of the pydantic models,
the proto field numbers may change, and any clients using the proto will break.
instead, we should use this method to generate the proto,
probably once per major version,
and then commit the proto to the repo.
"""
return generate_proto_from_pydantic(pydantic.schema_of(capa.render.result_document.ResultDocument))
def int_to_pb2(v):
assert isinstance(v, int)
if v < -2_147_483_648:
raise ValueError("underflow")
if v > 0xFFFFFFFFFFFFFFFF:
raise ValueError("overflow")
if v < 0:
return capa.render.proto.capa_pb2.Integer(i=v)
else:
return capa.render.proto.capa_pb2.Integer(u=v)
def translate_to_pb2(schema, typ, src, dst):
logger.debug("translate: %s", get_type_name(typ))
if is_custom_type(typ):
for pname, ptyp in typ["properties"].items():
if is_union(ptyp):
logger.debug("translate: %s.%s (union)", get_type_name(typ), pname)
elif is_map(ptyp):
logger.debug("translate: %s.%s (map)", get_type_name(typ), pname)
else:
logger.debug("translate: %s.%s (%s)", get_type_name(typ), pname, get_type_name(ptyp))
psrc = getattr(src, pname)
if is_ref(ptyp):
logger.debug("resolving ref: %s", get_type_name(ptyp))
ptyp = schema["definitions"][get_ref_type_name(ptyp)]
if is_primitive_type(ptyp):
if ptyp["type"] == "string":
if "format" in ptyp and ptyp["format"] == "date-time":
pdst = psrc.isoformat("T") + "Z"
else:
pdst = psrc
setattr(dst, pname, pdst)
elif ptyp["type"] == "integer":
getattr(dst, pname).CopyFrom(int_to_pb2(psrc))
# TODO: move array out of primitives
elif is_array(ptyp):
vtyp = ptyp["items"]
if is_ref(vtyp):
logger.debug("resolving ref: %s", get_type_name(vtyp))
vtyp = schema["definitions"][get_ref_type_name(vtyp)]
if get_type_name(vtyp) == "string":
pdst = getattr(dst, pname)
for v in psrc:
pdst.append(v)
elif is_custom_type(vtyp):
pdst = getattr(dst, pname)
Dst = getattr(capa.render.proto.capa_pb2, get_type_name(vtyp))
for psrcv in psrc:
pdst = Dst()
translate_to_pb2(schema, vtyp, psrcv, pdst)
getattr(dst, pname).append(pdst)
else:
raise NotImplementedError(get_type_name(vtyp))
# TODO: move tuple out of primitives
elif is_tuple(ptyp):
raise NotImplementedError("tuple")
else:
raise NotImplementedError(ptyp["type"])
elif is_custom_type(ptyp):
ptyp = schema["definitions"][get_type_name(ptyp)]
Dst = getattr(capa.render.proto.capa_pb2, get_type_name(ptyp))
pdst = Dst()
translate_to_pb2(schema, ptyp, psrc, pdst)
# you can't just assign to a non-initialized composite field.
#
# https://stackoverflow.com/a/22771612/87207
getattr(dst, pname).CopyFrom(pdst)
elif is_enum(ptyp):
# like: AddressType
Enum = getattr(capa.render.proto.capa_pb2, get_type_name(ptyp))
# like: AddressType.ADDRESSTYPE_ABSOLUTE
v = getattr(Enum, get_enum_value_name(ptyp, psrc.value))
setattr(dst, pname, v)
elif is_tuple(ptyp):
raise NotImplementedError("tuple")
elif is_union(ptyp):
# in this scenario, we have a field that can be one of several types.
# in the proto message, we set *one* of many disjoint fields.
# they are named v0, v1, v2, etc. and not named after the type.
# so, we need to match up the types and resolve the destination field name.
# it is guaranteed that of the candidate fields, they each have a unique type.
# 1. resolve the name of the source type
ptypname = None
for candidate_type in ptyp["anyOf"]:
logger.debug("candidate: %s", get_type_name(candidate_type))
if get_type_name(candidate_type) == "Integer" and isinstance(psrc, int):
# special handling of numbers to account for range
ptypname = "Integer"
if not ptypname:
raise NotImplementedError(ptyp)
pdstname = None
for candidate_descriptor in dst.DESCRIPTOR.oneofs_by_name[pname].fields:
if candidate_descriptor.type == 11:
if candidate_descriptor.message_type.full_name == ptypname:
pdstname = candidate_descriptor.name
break
else:
raise NotImplementedError(candidate_descriptor.type)
if not pdstname:
raise NotImplementedError(ptypname)
if ptypname == "Integer":
getattr(dst, pdstname).CopyFrom(int_to_pb2(psrc))
else:
raise NotImplementedError(type(psrc))
else:
raise NotImplementedError(get_type_name(ptyp))
else:
raise NotImplementedError(get_type_name(typ))

View File

@@ -0,0 +1,392 @@
// Generated by the capa.render.proto translator. DO NOT EDIT!
syntax = "proto3";
message APIFeature {
string type = 1;
string api = 2;
string description = 3;
}
message Address {
AddressType type = 1;
oneof value {
Integer v0 = 2;
Pair_Integer_Integer v1 = 3;
};
}
enum AddressType {
ADDRESSTYPE_UNSPECIFIED = 0;
ADDRESSTYPE_ABSOLUTE = 1;
ADDRESSTYPE_RELATIVE = 2;
ADDRESSTYPE_FILE = 3;
ADDRESSTYPE_DN_TOKEN = 4;
ADDRESSTYPE_DN_TOKEN_OFFSET = 5;
ADDRESSTYPE_NO_ADDRESS = 6;
}
message Analysis {
string format = 1;
string arch = 2;
string os = 3;
string extractor = 4;
repeated string rules = 5;
Address base_address = 6;
Layout layout = 7;
FeatureCounts feature_counts = 8;
repeated LibraryFunction library_functions = 9;
}
message ArchFeature {
string type = 1;
string arch = 2;
string description = 3;
}
message AttackSpec {
repeated string parts = 1;
string tactic = 2;
string technique = 3;
string subtechnique = 4;
string id = 5;
}
message BasicBlockFeature {
string type = 1;
string description = 2;
}
message BasicBlockLayout {
Address address = 1;
}
message BytesFeature {
string type = 1;
string bytes = 2;
string description = 3;
}
message CharacteristicFeature {
string type = 1;
string characteristic = 2;
string description = 3;
}
message ClassFeature {
string type = 1;
string description = 2;
string class = 3;
}
message CompoundStatement {
string type = 1;
string description = 2;
}
message ExportFeature {
string type = 1;
string export = 2;
string description = 3;
}
message FeatureCounts {
Integer file = 1;
repeated FunctionFeatureCount functions = 2;
}
message FeatureNode {
oneof feature {
OSFeature v0 = 1;
ArchFeature v1 = 2;
FormatFeature v2 = 3;
MatchFeature v3 = 4;
CharacteristicFeature v4 = 5;
ExportFeature v5 = 6;
ImportFeature v6 = 7;
SectionFeature v7 = 8;
FunctionNameFeature v8 = 9;
SubstringFeature v9 = 10;
RegexFeature v10 = 11;
StringFeature v11 = 12;
ClassFeature v12 = 13;
NamespaceFeature v13 = 14;
APIFeature v14 = 15;
PropertyFeature v15 = 16;
NumberFeature v16 = 17;
BytesFeature v17 = 18;
OffsetFeature v18 = 19;
MnemonicFeature v19 = 20;
OperandNumberFeature v20 = 21;
OperandOffsetFeature v21 = 22;
BasicBlockFeature v22 = 23;
};
string type = 25;
}
message FormatFeature {
string type = 1;
string format = 2;
string description = 3;
}
message FunctionFeatureCount {
Address address = 1;
Integer count = 2;
}
message FunctionLayout {
Address address = 1;
repeated BasicBlockLayout matched_basic_blocks = 2;
}
message FunctionNameFeature {
string type = 1;
string function_name = 2;
string description = 3;
}
message ImportFeature {
string type = 1;
string description = 2;
string import = 3;
}
message Layout {
repeated FunctionLayout functions = 1;
}
message LibraryFunction {
Address address = 1;
string name = 2;
}
message MBCSpec {
repeated string parts = 1;
string objective = 2;
string behavior = 3;
string method = 4;
string id = 5;
}
message MaecMetadata {
string analysis_conclusion = 1;
string analysis_conclusion_ov = 2;
string malware_family = 3;
string malware_category = 4;
string malware_category_ov = 5;
}
message Match {
bool success = 1;
oneof node {
StatementNode v0 = 2;
FeatureNode v1 = 3;
};
repeated Match children = 5;
repeated Address locations = 6;
map <string, Array_Address> captures = 7;
}
message MatchFeature {
string type = 1;
string match = 2;
string description = 3;
}
message Metadata {
string timestamp = 1;
string version = 2;
repeated string argv = 3;
Sample sample = 4;
Analysis analysis = 5;
}
message MnemonicFeature {
string type = 1;
string mnemonic = 2;
string description = 3;
}
message NamespaceFeature {
string type = 1;
string namespace = 2;
string description = 3;
}
message NumberFeature {
string type = 1;
oneof number {
Integer v0 = 2;
Number v1 = 3;
};
string description = 5;
}
message OSFeature {
string type = 1;
string os = 2;
string description = 3;
}
message OffsetFeature {
string type = 1;
Integer offset = 2;
string description = 3;
}
message OperandNumberFeature {
string type = 1;
Integer index = 2;
Integer operand_number = 3;
string description = 4;
}
message OperandOffsetFeature {
string type = 1;
Integer index = 2;
Integer operand_offset = 3;
string description = 4;
}
message PropertyFeature {
string type = 1;
string access = 2;
string property = 3;
string description = 4;
}
message RangeStatement {
string description = 1;
Integer min = 2;
Integer max = 3;
oneof child {
OSFeature v0 = 4;
ArchFeature v1 = 5;
FormatFeature v2 = 6;
MatchFeature v3 = 7;
CharacteristicFeature v4 = 8;
ExportFeature v5 = 9;
ImportFeature v6 = 10;
SectionFeature v7 = 11;
FunctionNameFeature v8 = 12;
SubstringFeature v9 = 13;
RegexFeature v10 = 14;
StringFeature v11 = 15;
ClassFeature v12 = 16;
NamespaceFeature v13 = 17;
APIFeature v14 = 18;
PropertyFeature v15 = 19;
NumberFeature v16 = 20;
BytesFeature v17 = 21;
OffsetFeature v18 = 22;
MnemonicFeature v19 = 23;
OperandNumberFeature v20 = 24;
OperandOffsetFeature v21 = 25;
BasicBlockFeature v22 = 26;
};
string type = 28;
}
message RegexFeature {
string type = 1;
string regex = 2;
string description = 3;
}
message ResultDocument {
Metadata meta = 1;
map <string, RuleMatches> rules = 2;
}
message RuleMatches {
RuleMetadata meta = 1;
string source = 2;
repeated Pair_Address_Match matches = 3;
}
message RuleMetadata {
string name = 1;
string namespace = 2;
repeated string authors = 3;
Scope scope = 4;
repeated AttackSpec attack = 5;
repeated MBCSpec mbc = 6;
repeated string references = 7;
repeated string examples = 8;
string description = 9;
bool lib = 10;
MaecMetadata maec = 11;
bool capa_subscope = 12;
}
message Sample {
string md5 = 1;
string sha1 = 2;
string sha256 = 3;
string path = 4;
}
enum Scope {
SCOPE_UNSPECIFIED = 0;
SCOPE_FILE = 1;
SCOPE_FUNCTION = 2;
SCOPE_BASIC_BLOCK = 3;
SCOPE_INSTRUCTION = 4;
}
message SectionFeature {
string type = 1;
string section = 2;
string description = 3;
}
message SomeStatement {
string description = 1;
Integer count = 2;
string type = 3;
}
message StatementNode {
oneof statement {
RangeStatement v0 = 1;
SomeStatement v1 = 2;
SubscopeStatement v2 = 3;
CompoundStatement v3 = 4;
};
string type = 6;
}
message StringFeature {
string type = 1;
string string = 2;
string description = 3;
}
message SubscopeStatement {
string description = 1;
Scope scope = 2;
string type = 3;
}
message SubstringFeature {
string type = 1;
string substring = 2;
string description = 3;
}
message Array_Address { repeated Address values = 1; }
message Pair_Address_Match {
Address v0 = 1;
Match v1 = 2;
}
message Pair_Integer_Integer {
Integer v0 = 1;
Integer v1 = 2;
}
message Integer { oneof value { uint64 u = 1; int64 i = 2; } }
message Number { oneof value { uint64 u = 1; int64 i = 2; double f = 3; } }

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -37,15 +37,22 @@ from capa.engine import MatchResults
def format_address(address: frz.Address) -> str:
if address.type == frz.AddressType.ABSOLUTE:
assert isinstance(address.value, int)
return capa.helpers.hex(address.value)
elif address.type == frz.AddressType.RELATIVE:
assert isinstance(address.value, int)
return f"base address+{capa.helpers.hex(address.value)}"
elif address.type == frz.AddressType.FILE:
assert isinstance(address.value, int)
return f"file+{capa.helpers.hex(address.value)}"
elif address.type == frz.AddressType.DN_TOKEN:
assert isinstance(address.value, int)
return f"token({capa.helpers.hex(address.value)})"
elif address.type == frz.AddressType.DN_TOKEN_OFFSET:
assert isinstance(address.value, tuple)
token, offset = address.value
assert isinstance(token, int)
assert isinstance(offset, int)
return f"token({capa.helpers.hex(token)})+{capa.helpers.hex(offset)}"
elif address.type == frz.AddressType.NO_ADDRESS:
return "global"

2
rules

Submodule rules updated: 5aa83379a5...2586016cf2

View File

@@ -28,6 +28,7 @@ requirements = [
"dnfile==0.13.0",
"dncil==1.0.2",
"pydantic==1.10.4",
"protobuf==4.21.12",
]
# this sets __version__
@@ -79,6 +80,7 @@ setuptools.setup(
"psutil==5.9.2",
"stix2==3.0.1",
"requests==2.28.0",
"mypy-protobuf==3.4.0",
# type stubs for mypy
"types-backports==0.1.3",
"types-colorama==0.4.15",
@@ -87,6 +89,7 @@ setuptools.setup(
"types-termcolor==1.1.4",
"types-psutil==5.8.23",
"types_requests==2.28.1",
"types-protobuf==4.21.0.5",
],
"build": [
"pyinstaller==5.7.0",

View File

@@ -1101,3 +1101,13 @@ def _0953c_dotnetfile_extractor():
@pytest.fixture
def _039a6_dotnetfile_extractor():
return get_dnfile_extractor(get_data_path_by_name("_039a6"))
@pytest.fixture
def pma0101_rd():
path = os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll.json")
with open(path, "rb") as f:
buf = f.read()
src = buf.decode("utf-8")
return capa.render.result_document.ResultDocument.parse_raw(src)

73
tests/test_proto.py Normal file
View File

@@ -0,0 +1,73 @@
# Copyright (C) 2023 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import json
import pathlib
import subprocess
import pydantic
from fixtures import *
import capa.render
import capa.render.proto
import capa.render.utils
import capa.features.freeze
import capa.render.proto.capa_pb2
import capa.render.result_document
import capa.features.freeze.features
from capa.render.result_document import ResultDocument
def test_generate_proto(tmp_path: pathlib.Path):
tmp_path.mkdir(exist_ok=True, parents=True)
proto_path = tmp_path / "capa.proto"
json_path = tmp_path / "capa.json"
schema = pydantic.schema_of(capa.render.result_document.ResultDocument)
json_path.write_text(json.dumps(schema, indent=4))
proto = capa.render.proto.generate_proto()
print("=====================================")
print(proto_path)
print("-------------------------------------")
for i, line in enumerate(proto.split("\n")):
print(f" {i} | {line}")
print("=====================================")
proto_path.write_text(proto)
subprocess.run(
[
"protoc",
"-I=" + str(tmp_path),
"--python_out=" + str(tmp_path),
"--mypy_out=" + str(tmp_path),
str(proto_path),
],
check=True,
)
pb = tmp_path / "capa_pb2.py"
print(pb.read_text())
print("=====================================")
def test_translate_to_pb2(pma0101_rd: ResultDocument):
schema = pydantic.schema_of(capa.render.result_document.ResultDocument)
src = pma0101_rd
dst = capa.render.proto.capa_pb2.ResultDocument()
typ = schema["definitions"]["ResultDocument"]
capa.render.proto.translate_to_pb2(schema, typ, src, dst)
print()
print("=====================================")
print(dst)
print("=====================================")
assert False