Remove from_capa API for MetaData

This commit is contained in:
Aayush Goel
2023-06-03 15:33:49 +05:30
parent 5ded85f46e
commit b4870b120e
7 changed files with 112 additions and 176 deletions

View File

@@ -22,7 +22,8 @@ import capa
import capa.version import capa.version
import capa.render.utils as rutils import capa.render.utils as rutils
import capa.features.common import capa.features.common
import capa.render.result_document import capa.features.freeze
import capa.render.result_document as rdoc
from capa.features.address import AbsoluteVirtualAddress from capa.features.address import AbsoluteVirtualAddress
logger = logging.getLogger("capa") logger = logging.getLogger("capa")
@@ -140,38 +141,34 @@ def collect_metadata(rules):
else: else:
os = "unknown os" os = "unknown os"
return capa.render.result_document.Metadata.from_capa( return rdoc.Metadata(
{ timestamp=datetime.datetime.now(),
"timestamp": datetime.datetime.now().isoformat(), version=capa.version.__version__,
"argv": [], argv=(),
"sample": { sample=rdoc.Sample(
"md5": md5, md5=md5,
"sha1": "", # not easily accessible sha1="", # not easily accessible
"sha256": sha256, sha256=sha256,
"path": idaapi.get_input_file_path(), path=idaapi.get_input_file_path(),
}, ),
"analysis": { analysis=rdoc.Analysis(
"format": idaapi.get_file_type_name(), format=idaapi.get_file_type_name(),
"arch": arch, arch=arch,
"os": os, os=os,
"extractor": "ida", extractor="ida",
"rules": rules, rules=rules,
"base_address": idaapi.get_imagebase(), base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()),
"layout": { layout=rdoc.Layout(
# this is updated after capabilities have been collected. functions=tuple()
# will look like: # this is updated after capabilities have been collected.
# # will look like:
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } #
}, # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
# ignore these for now - not used by IDA plugin. ),
"feature_counts": { # ignore these for now - not used by IDA plugin.
"file": {}, feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()),
"functions": {}, library_functions=tuple(),
}, ),
"library_functions": {},
},
"version": capa.version.__version__,
}
) )
@@ -219,12 +216,12 @@ def idb_contains_cached_results() -> bool:
return False return False
def load_and_verify_cached_results() -> Optional[capa.render.result_document.ResultDocument]: def load_and_verify_cached_results() -> Optional[rdoc.ResultDocument]:
"""verifies that cached results have valid (mapped) addresses for the current database""" """verifies that cached results have valid (mapped) addresses for the current database"""
logger.debug("loading cached capa results from netnode '%s'", CAPA_NETNODE) logger.debug("loading cached capa results from netnode '%s'", CAPA_NETNODE)
n = netnode.Netnode(CAPA_NETNODE) n = netnode.Netnode(CAPA_NETNODE)
doc = capa.render.result_document.ResultDocument.parse_obj(json.loads(n[NETNODE_RESULTS])) doc = rdoc.ResultDocument.parse_obj(json.loads(n[NETNODE_RESULTS]))
for rule in rutils.capability_rules(doc): for rule in rutils.capability_rules(doc):
for location_, _ in rule.matches: for location_, _ in rule.matches:

View File

@@ -771,8 +771,11 @@ class CapaExplorerForm(idaapi.PluginForm):
try: try:
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]]) meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True) capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
meta.analysis.__dict__.update(counts)
meta.analysis.__dict__.update(capa.main.compute_layout(ruleset, extractor, capabilities)) meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(ruleset, extractor, capabilities)
except UserCancelledError: except UserCancelledError:
logger.info("User cancelled analysis.") logger.info("User cancelled analysis.")
return False return False

View File

@@ -38,7 +38,7 @@ import capa.rules.cache
import capa.render.default import capa.render.default
import capa.render.verbose import capa.render.verbose
import capa.features.common import capa.features.common
import capa.features.freeze import capa.features.freeze as frz
import capa.render.vverbose import capa.render.vverbose
import capa.features.extractors import capa.features.extractors
import capa.render.result_document import capa.render.result_document
@@ -247,13 +247,8 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
all_bb_matches = collections.defaultdict(list) # type: MatchResults all_bb_matches = collections.defaultdict(list) # type: MatchResults
all_insn_matches = collections.defaultdict(list) # type: MatchResults all_insn_matches = collections.defaultdict(list) # type: MatchResults
meta = { feature_counts = rdoc.FeatureCounts(file=0, functions=tuple())
"feature_counts": { library_functions: Tuple[rdoc.LibraryFunction, ...] = tuple()
"file": 0,
"functions": {},
},
"library_functions": {},
} # type: Dict[str, Any]
with redirecting_print_to_tqdm(disable_progress): with redirecting_print_to_tqdm(disable_progress):
with tqdm.contrib.logging.logging_redirect_tqdm(): with tqdm.contrib.logging.logging_redirect_tqdm():
@@ -272,8 +267,10 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
if extractor.is_library_function(f.address): if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address) function_name = extractor.get_function_name(f.address)
logger.debug("skipping library function 0x%x (%s)", f.address, function_name) logger.debug("skipping library function 0x%x (%s)", f.address, function_name)
meta["library_functions"][f.address] = function_name library_functions += (
n_libs = len(meta["library_functions"]) rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name),
)
n_libs = len(library_functions)
percentage = round(100 * (n_libs / n_funcs)) percentage = round(100 * (n_libs / n_funcs))
if isinstance(pb, tqdm.tqdm): if isinstance(pb, tqdm.tqdm):
pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)")
@@ -282,7 +279,9 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(
ruleset, extractor, f ruleset, extractor, f
) )
meta["feature_counts"]["functions"][f.address] = feature_count feature_counts.functions += (
rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count),
)
logger.debug("analyzed function 0x%x and extracted %d features", f.address, feature_count) logger.debug("analyzed function 0x%x and extracted %d features", f.address, feature_count)
for rule_name, res in function_matches.items(): for rule_name, res in function_matches.items():
@@ -303,7 +302,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
capa.engine.index_rule_matches(function_and_lower_features, rule, locations) capa.engine.index_rule_matches(function_and_lower_features, rule, locations)
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
meta["feature_counts"]["file"] = feature_count feature_counts.file = feature_count
matches = { matches = {
rule_name: results rule_name: results
@@ -317,11 +316,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
all_file_matches.items(), all_file_matches.items(),
) )
} }
meta["feature_counts"] = rdoc.FeatureCounts.from_capa(meta["feature_counts"])
meta["library_functions"] = tuple( meta = {
rdoc.LibraryFunction(address=capa.features.freeze.Address.from_capa(address), name=name) "feature_counts": feature_counts,
for address, name in meta["library_functions"].items() "library_functions": library_functions,
) }
return matches, meta return matches, meta
@@ -765,39 +764,37 @@ def collect_metadata(
arch = get_arch(sample_path) arch = get_arch(sample_path)
os_ = get_os(sample_path) if os_ == OS_AUTO else os_ os_ = get_os(sample_path) if os_ == OS_AUTO else os_
return rdoc.Metadata.from_capa( return rdoc.Metadata(
{ timestamp=datetime.datetime.now(),
"timestamp": datetime.datetime.now().isoformat(), version=capa.version.__version__,
"version": capa.version.__version__, argv=tuple(argv) if argv else None,
"argv": argv, sample=rdoc.Sample(
"sample": { md5=md5.hexdigest(),
"md5": md5.hexdigest(), sha1=sha1.hexdigest(),
"sha1": sha1.hexdigest(), sha256=sha256.hexdigest(),
"sha256": sha256.hexdigest(), path=os.path.normpath(sample_path),
"path": os.path.normpath(sample_path), ),
}, analysis=rdoc.Analysis(
"analysis": { format=format_,
"format": format_, arch=arch,
"arch": arch, os=os_,
"os": os_, extractor=extractor.__class__.__name__,
"extractor": extractor.__class__.__name__, rules=tuple(rules_path),
"rules": rules_path, base_address=frz.Address.from_capa(extractor.get_base_address()),
"base_address": extractor.get_base_address(), layout=rdoc.Layout(
"layout": { functions=tuple(),
"functions": {}, # this is updated after capabilities have been collected.
# this is updated after capabilities have been collected. # will look like:
# will look like: #
# # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } ),
}, feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()),
"feature_counts": {"file": 0, "functions": {}}, library_functions=tuple(),
"library_functions": {}, ),
},
}
) )
def compute_layout(rules, extractor, capabilities) -> Dict[str, rdoc.Layout]: def compute_layout(rules, extractor, capabilities) -> rdoc.Layout:
""" """
compute a metadata structure that links basic blocks compute a metadata structure that links basic blocks
to the functions in which they're found. to the functions in which they're found.
@@ -822,19 +819,21 @@ def compute_layout(rules, extractor, capabilities) -> Dict[str, rdoc.Layout]:
assert addr in functions_by_bb assert addr in functions_by_bb
matched_bbs.add(addr) matched_bbs.add(addr)
layout = { layout = rdoc.Layout(
"functions": { functions=tuple(
f: { rdoc.FunctionLayout(
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs] address=frz.Address.from_capa(f),
# this object is open to extension in the future, matched_basic_blocks=tuple(
rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs
) # this object is open to extension in the future,
# such as with the function name, etc. # such as with the function name, etc.
} )
for f, bbs in bbs_by_function.items() for f, bbs in bbs_by_function.items()
if len([bb for bb in bbs if bb in matched_bbs]) > 0 if len([bb for bb in bbs if bb in matched_bbs]) > 0
} )
} )
return {"layout": rdoc.Layout.from_capa(layout)} return layout
def install_common_args(parser, wanted=None): def install_common_args(parser, wanted=None):
@@ -1226,7 +1225,7 @@ def main(argv=None):
if format_ == FORMAT_FREEZE: if format_ == FORMAT_FREEZE:
# freeze format deserializes directly into an extractor # freeze format deserializes directly into an extractor
with open(args.sample, "rb") as f: with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read()) extractor = frz.load(f.read())
else: else:
# all other formats we must create an extractor, # all other formats we must create an extractor,
# such as viv, binary ninja, etc. workspaces # such as viv, binary ninja, etc. workspaces
@@ -1268,8 +1267,9 @@ def main(argv=None):
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
meta.analysis.__dict__.update(counts) meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.__dict__.update(compute_layout(rules, extractor, capabilities)) meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = compute_layout(rules, extractor, capabilities)
if has_file_limitation(rules, capabilities): if has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary # bail if capa encountered file limitation e.g. a packed binary
@@ -1321,7 +1321,8 @@ def ida_main():
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor()) capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
meta.analysis.__dict__.update(counts) meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
if has_file_limitation(rules, capabilities, is_standalone=False): if has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis") capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")

View File

@@ -24,7 +24,6 @@ from capa.helpers import assert_never
class FrozenModel(BaseModel): class FrozenModel(BaseModel):
class Config: class Config:
frozen = True
extra = "forbid" extra = "forbid"
@@ -47,20 +46,6 @@ class FunctionLayout(FrozenModel):
class Layout(FrozenModel): class Layout(FrozenModel):
functions: Tuple[FunctionLayout, ...] functions: Tuple[FunctionLayout, ...]
@classmethod
def from_capa(cls, layout: dict) -> "Layout":
return cls(
functions=tuple(
FunctionLayout(
address=frz.Address.from_capa(address),
matched_basic_blocks=tuple(
BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in f["matched_basic_blocks"]
),
)
for address, f in layout["functions"].items()
)
)
class LibraryFunction(FrozenModel): class LibraryFunction(FrozenModel):
address: frz.Address address: frz.Address
@@ -76,16 +61,6 @@ class FeatureCounts(FrozenModel):
file: int file: int
functions: Tuple[FunctionFeatureCount, ...] functions: Tuple[FunctionFeatureCount, ...]
@classmethod
def from_capa(cls, feature_counts: dict) -> "FeatureCounts":
return cls(
file=feature_counts["file"],
functions=tuple(
FunctionFeatureCount(address=frz.Address.from_capa(address), count=count)
for address, count in feature_counts["functions"].items()
),
)
class Analysis(FrozenModel): class Analysis(FrozenModel):
format: str format: str
@@ -106,50 +81,6 @@ class Metadata(FrozenModel):
sample: Sample sample: Sample
analysis: Analysis analysis: Analysis
@classmethod
def from_capa(cls, meta: dict) -> "Metadata":
return cls(
timestamp=meta["timestamp"],
version=meta["version"],
argv=meta["argv"] if "argv" in meta else None,
sample=Sample(
md5=meta["sample"]["md5"],
sha1=meta["sample"]["sha1"],
sha256=meta["sample"]["sha256"],
path=meta["sample"]["path"],
),
analysis=Analysis(
format=meta["analysis"]["format"],
arch=meta["analysis"]["arch"],
os=meta["analysis"]["os"],
extractor=meta["analysis"]["extractor"],
rules=meta["analysis"]["rules"],
base_address=frz.Address.from_capa(meta["analysis"]["base_address"]),
layout=Layout(
functions=tuple(
FunctionLayout(
address=frz.Address.from_capa(address),
matched_basic_blocks=tuple(
BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in f["matched_basic_blocks"]
),
)
for address, f in meta["analysis"]["layout"]["functions"].items()
)
),
feature_counts=FeatureCounts(
file=meta["analysis"]["feature_counts"]["file"],
functions=tuple(
FunctionFeatureCount(address=frz.Address.from_capa(address), count=count)
for address, count in meta["analysis"]["feature_counts"]["functions"].items()
),
),
library_functions=tuple(
LibraryFunction(address=frz.Address.from_capa(address), name=name)
for address, name in meta["analysis"]["library_functions"].items()
),
),
)
class CompoundStatementType: class CompoundStatementType:
AND = "and" AND = "and"
@@ -648,10 +579,7 @@ class ResultDocument(FrozenModel):
), ),
) )
if isinstance(meta, Metadata): return ResultDocument(meta=meta, rules=rule_matches)
return ResultDocument(meta=meta, rules=rule_matches)
return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches)
def to_capa(self) -> Tuple[Metadata, Dict]: def to_capa(self) -> Tuple[Metadata, Dict]:
capabilities: Dict[ capabilities: Dict[

View File

@@ -131,8 +131,10 @@ def get_capa_results(args):
meta = capa.main.collect_metadata([], path, format, os_, [], extractor) meta = capa.main.collect_metadata([], path, format, os_, [], extractor)
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
meta.analysis.__dict__.update(counts)
meta.analysis.__dict__.update(capa.main.compute_layout(rules, extractor, capabilities)) meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities) doc = rd.ResultDocument.from_capa(meta, rules, capabilities)

View File

@@ -172,10 +172,13 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
# collect metadata (used only to make rendering more complete) # collect metadata (used only to make rendering more complete)
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor) meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor)
meta.analysis.__dict__.update(counts)
meta.analysis.__dict__.update(capa.main.compute_layout(rules, extractor, capabilities)) meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
capa_output: Any = False capa_output: Any = False
if output_format == "dictionary": if output_format == "dictionary":
# ...as python dictionary, simplified as textable but in dictionary # ...as python dictionary, simplified as textable but in dictionary
doc = rd.ResultDocument.from_capa(meta, rules, capabilities) doc = rd.ResultDocument.from_capa(meta, rules, capabilities)

View File

@@ -178,8 +178,10 @@ def main(argv=None):
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor) meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor)
capabilities, counts = capa.main.find_capabilities(rules, extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor)
meta.analysis.__dict__.update(counts)
meta.analysis.__dict__.update(capa.main.compute_layout(rules, extractor, capabilities)) meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"]
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
if capa.main.has_file_limitation(rules, capabilities): if capa.main.has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary # bail if capa encountered file limitation e.g. a packed binary