proto: sketch from pb2 routines

This commit is contained in:
Willi Ballenthin
2023-03-22 11:15:34 +01:00
parent ec785f9d6d
commit 26ca593fad

View File

@@ -422,6 +422,350 @@ def doc_to_pb2(doc: rd.ResultDocument) -> capa_pb2.ResultDocument:
return r
def xaddr_from_pb2(addr: frz.Address) -> capa_pb2.Address:
if addr.type is AddressType.ABSOLUTE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_ABSOLUTE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.RELATIVE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_RELATIVE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.FILE:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_FILE, v=int_to_pb2(addr.value))
elif addr.type is AddressType.DN_TOKEN:
assert isinstance(addr.value, int)
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN, v=int_to_pb2(addr.value))
elif addr.type is AddressType.DN_TOKEN_OFFSET:
assert isinstance(addr.value, tuple)
token, offset = addr.value
assert isinstance(token, int)
assert isinstance(offset, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_DN_TOKEN_OFFSET,
token_offset=capa_pb2.Token_Offset(token=int_to_pb2(token), offset=offset),
)
elif addr.type is AddressType.NO_ADDRESS:
# value == None, so only set type
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS)
else:
assert_never(addr)
def xscope_from_pb2(scope: capa.rules.Scope) -> capa_pb2.Scope.ValueType:
if scope == capa.rules.Scope.FILE:
return capa_pb2.Scope.SCOPE_FILE
elif scope == capa.rules.Scope.FUNCTION:
return capa_pb2.Scope.SCOPE_FUNCTION
elif scope == capa.rules.Scope.BASIC_BLOCK:
return capa_pb2.Scope.SCOPE_BASIC_BLOCK
elif scope == capa.rules.Scope.INSTRUCTION:
return capa_pb2.Scope.SCOPE_INSTRUCTION
else:
assert_never(scope)
def xmetadata_from_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
return capa_pb2.Metadata(
timestamp=str(meta.timestamp),
version=meta.version,
argv=meta.argv,
sample=google.protobuf.json_format.ParseDict(meta.sample.dict(), capa_pb2.Sample()),
analysis=capa_pb2.Analysis(
format=meta.analysis.format,
arch=meta.analysis.arch,
os=meta.analysis.os,
extractor=meta.analysis.extractor,
rules=meta.analysis.rules,
base_address=addr_to_pb2(meta.analysis.base_address),
layout=capa_pb2.Layout(
functions=[
capa_pb2.FunctionLayout(
address=addr_to_pb2(f.address),
matched_basic_blocks=[
capa_pb2.BasicBlockLayout(address=addr_to_pb2(bb.address)) for bb in f.matched_basic_blocks
],
)
for f in meta.analysis.layout.functions
]
),
feature_counts=capa_pb2.FeatureCounts(
file=meta.analysis.feature_counts.file,
functions=[
capa_pb2.FunctionFeatureCount(address=addr_to_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
],
),
library_functions=[
capa_pb2.LibraryFunction(address=addr_to_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
],
),
)
def xstatement_from_pb2(statement: rd.Statement) -> capa_pb2.StatementNode:
if isinstance(statement, rd.RangeStatement):
return capa_pb2.StatementNode(
range=capa_pb2.RangeStatement(
type="range",
description=statement.description,
min=statement.min,
max=statement.max,
child=feature_to_pb2(statement.child),
),
type="statement",
)
elif isinstance(statement, rd.SomeStatement):
return capa_pb2.StatementNode(
some=capa_pb2.SomeStatement(type=statement.type, description=statement.description, count=statement.count),
type="statement",
)
elif isinstance(statement, rd.SubscopeStatement):
return capa_pb2.StatementNode(
subscope=capa_pb2.SubscopeStatement(
type=statement.type,
description=statement.description,
scope=scope_to_pb2(statement.scope),
),
type="statement",
)
elif isinstance(statement, rd.CompoundStatement):
return capa_pb2.StatementNode(
compound=capa_pb2.CompoundStatement(type=statement.type, description=statement.description),
type="statement",
)
else:
assert_never(statement)
def xfeature_from_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode:
if isinstance(f, frzf.OSFeature):
return capa_pb2.FeatureNode(
type="feature", os=capa_pb2.OSFeature(type=f.type, os=f.os, description=f.description)
)
elif isinstance(f, frzf.ArchFeature):
return capa_pb2.FeatureNode(
type="feature", arch=capa_pb2.ArchFeature(type=f.type, arch=f.arch, description=f.description)
)
elif isinstance(f, frzf.FormatFeature):
return capa_pb2.FeatureNode(
type="feature", format=capa_pb2.FormatFeature(type=f.type, format=f.format, description=f.description)
)
elif isinstance(f, frzf.MatchFeature):
return capa_pb2.FeatureNode(
type="feature",
match=capa_pb2.MatchFeature(
type=f.type,
match=f.match,
description=f.description,
),
)
elif isinstance(f, frzf.CharacteristicFeature):
return capa_pb2.FeatureNode(
type="feature",
characteristic=capa_pb2.CharacteristicFeature(
type=f.type, characteristic=f.characteristic, description=f.description
),
)
elif isinstance(f, frzf.ExportFeature):
return capa_pb2.FeatureNode(
type="feature", export=capa_pb2.ExportFeature(type=f.type, export=f.export, description=f.description)
)
elif isinstance(f, frzf.ImportFeature):
return capa_pb2.FeatureNode(
type="feature", import_=capa_pb2.ImportFeature(type=f.type, import_=f.import_, description=f.description)
)
elif isinstance(f, frzf.SectionFeature):
return capa_pb2.FeatureNode(
type="feature", section=capa_pb2.SectionFeature(type=f.type, section=f.section, description=f.description)
)
elif isinstance(f, frzf.FunctionNameFeature):
return capa_pb2.FeatureNode(
type="function name",
function_name=capa_pb2.FunctionNameFeature(
type=f.type, function_name=f.function_name, description=f.description
),
)
elif isinstance(f, frzf.SubstringFeature):
return capa_pb2.FeatureNode(
type="feature",
substring=capa_pb2.SubstringFeature(type=f.type, substring=f.substring, description=f.description),
)
elif isinstance(f, frzf.RegexFeature):
return capa_pb2.FeatureNode(
type="feature", regex=capa_pb2.RegexFeature(type=f.type, regex=f.regex, description=f.description)
)
elif isinstance(f, frzf.StringFeature):
return capa_pb2.FeatureNode(
type="feature",
string=capa_pb2.StringFeature(
type=f.type,
string=f.string,
description=f.description,
),
)
elif isinstance(f, frzf.ClassFeature):
return capa_pb2.FeatureNode(
type="feature", class_=capa_pb2.ClassFeature(type=f.type, class_=f.class_, description=f.description)
)
elif isinstance(f, frzf.NamespaceFeature):
return capa_pb2.FeatureNode(
type="feature",
namespace=capa_pb2.NamespaceFeature(type=f.type, namespace=f.namespace, description=f.description),
)
elif isinstance(f, frzf.APIFeature):
return capa_pb2.FeatureNode(
type="feature", api=capa_pb2.APIFeature(type=f.type, api=f.api, description=f.description)
)
elif isinstance(f, frzf.PropertyFeature):
return capa_pb2.FeatureNode(
type="feature",
property_=capa_pb2.PropertyFeature(
type=f.type, access=f.access, property_=f.property, description=f.description
),
)
elif isinstance(f, frzf.NumberFeature):
return capa_pb2.FeatureNode(
type="feature",
number=capa_pb2.NumberFeature(type=f.type, number=number_to_pb2(f.number), description=f.description),
)
elif isinstance(f, frzf.BytesFeature):
return capa_pb2.FeatureNode(
type="feature", bytes=capa_pb2.BytesFeature(type=f.type, bytes=f.bytes, description=f.description)
)
elif isinstance(f, frzf.OffsetFeature):
return capa_pb2.FeatureNode(
type="feature",
offset=capa_pb2.OffsetFeature(type=f.type, offset=int_to_pb2(f.offset), description=f.description),
)
elif isinstance(f, frzf.MnemonicFeature):
return capa_pb2.FeatureNode(
type="feature",
mnemonic=capa_pb2.MnemonicFeature(type=f.type, mnemonic=f.mnemonic, description=f.description),
)
elif isinstance(f, frzf.OperandNumberFeature):
return capa_pb2.FeatureNode(
type="feature",
operand_number=capa_pb2.OperandNumberFeature(
type=f.type, index=f.index, operand_number=int_to_pb2(f.operand_number), description=f.description
),
)
elif isinstance(f, frzf.OperandOffsetFeature):
return capa_pb2.FeatureNode(
type="feature",
operand_offset=capa_pb2.OperandOffsetFeature(
type=f.type, index=f.index, operand_offset=int_to_pb2(f.operand_offset), description=f.description
),
)
elif isinstance(f, frzf.BasicBlockFeature):
return capa_pb2.FeatureNode(
type="feature", basic_block=capa_pb2.BasicBlockFeature(type=f.type, description=f.description)
)
else:
assert_never(f)
def xnode_from_pb2(node: rd.Node) -> Union[capa_pb2.FeatureNode, capa_pb2.StatementNode]:
if isinstance(node, rd.StatementNode):
return statement_to_pb2(node.statement)
elif isinstance(node, rd.FeatureNode):
return feature_to_pb2(node.feature)
else:
assert_never(node)
def xmatch_from_pb2(match: rd.Match) -> capa_pb2.Match:
node = node_to_pb2(match.node)
children = list(map(match_to_pb2, match.children))
locations = list(map(addr_to_pb2, match.locations))
if isinstance(node, capa_pb2.StatementNode):
return capa_pb2.Match(
success=match.success,
statement=node,
children=children,
locations=locations,
captures={},
)
elif isinstance(node, capa_pb2.FeatureNode):
return capa_pb2.Match(
success=match.success,
feature=node,
children=children,
locations=locations,
captures={
capture: capa_pb2.Addresses(address=list(map(addr_to_pb2, locs)))
for capture, locs in match.captures.items()
},
)
else:
assert_never(match)
def xrule_metadata_from_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
# after manual type conversions to the RuleMetadata, we can rely on the protobuf json parser
# conversions include tuple -> list and rd.Enum -> proto.enum
meta = dict_tuple_to_list_values(rule_metadata.dict())
meta["scope"] = scope_to_pb2(meta["scope"])
meta["attack"] = list(map(dict_tuple_to_list_values, meta.get("attack", [])))
meta["mbc"] = list(map(dict_tuple_to_list_values, meta.get("mbc", [])))
return google.protobuf.json_format.ParseDict(meta, capa_pb2.RuleMetadata())
def xdoc_from_pb2(doc: capa_pb2.ResultDocument) -> rd.ResultDocument:
rule_matches: Dict[str, rd.RuleMatches] = {}
for rule_name, matches in doc.rules.items():
m = rd.RuleMatches(
meta=xrule_metadata_from_pb2(matches.meta),
source=matches.source,
matches=tuple([
(xaddr_from_pb2(addr), xmatch_from_pb2(match))
for addr, match in matches.matches
]),
)
rule_matches[rule_name] = m
return rd.ResultDocument(meta=xmetadata_from_pb2(doc.meta), rules=rule_matches)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]