mirror of
https://github.com/mandiant/capa.git
synced 2025-12-22 23:26:21 -08:00
Update Metadata type in capa main
This commit is contained in:
@@ -24,6 +24,7 @@
|
|||||||
- better handle exceptional cases when parsing ELF files [#1458](https://github.com/mandiant/capa/issues/1458) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
|
- better handle exceptional cases when parsing ELF files [#1458](https://github.com/mandiant/capa/issues/1458) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
|
||||||
- Improved testing coverage for Binary Ninja Backend [#1446](https://github.com/mandiant/capa/issues/1446) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
|
- Improved testing coverage for Binary Ninja Backend [#1446](https://github.com/mandiant/capa/issues/1446) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
|
||||||
- Add logging and print redirect to tqdm for capa main [#749](https://github.com/mandiant/capa/issues/749) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
|
- Add logging and print redirect to tqdm for capa main [#749](https://github.com/mandiant/capa/issues/749) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
|
||||||
|
- Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat
|
||||||
- extractor: fix binja installation path detection does not work with Python 3.11
|
- extractor: fix binja installation path detection does not work with Python 3.11
|
||||||
|
|
||||||
### capa explorer IDA Pro plugin
|
### capa explorer IDA Pro plugin
|
||||||
|
|||||||
@@ -140,37 +140,39 @@ def collect_metadata(rules):
|
|||||||
else:
|
else:
|
||||||
os = "unknown os"
|
os = "unknown os"
|
||||||
|
|
||||||
return {
|
return capa.render.result_document.Metadata.from_capa(
|
||||||
"timestamp": datetime.datetime.now().isoformat(),
|
{
|
||||||
"argv": [],
|
"timestamp": datetime.datetime.now().isoformat(),
|
||||||
"sample": {
|
"argv": [],
|
||||||
"md5": md5,
|
"sample": {
|
||||||
"sha1": "", # not easily accessible
|
"md5": md5,
|
||||||
"sha256": sha256,
|
"sha1": "", # not easily accessible
|
||||||
"path": idaapi.get_input_file_path(),
|
"sha256": sha256,
|
||||||
},
|
"path": idaapi.get_input_file_path(),
|
||||||
"analysis": {
|
|
||||||
"format": idaapi.get_file_type_name(),
|
|
||||||
"arch": arch,
|
|
||||||
"os": os,
|
|
||||||
"extractor": "ida",
|
|
||||||
"rules": rules,
|
|
||||||
"base_address": idaapi.get_imagebase(),
|
|
||||||
"layout": {
|
|
||||||
# this is updated after capabilities have been collected.
|
|
||||||
# will look like:
|
|
||||||
#
|
|
||||||
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
|
||||||
},
|
},
|
||||||
# ignore these for now - not used by IDA plugin.
|
"analysis": {
|
||||||
"feature_counts": {
|
"format": idaapi.get_file_type_name(),
|
||||||
"file": {},
|
"arch": arch,
|
||||||
"functions": {},
|
"os": os,
|
||||||
|
"extractor": "ida",
|
||||||
|
"rules": rules,
|
||||||
|
"base_address": idaapi.get_imagebase(),
|
||||||
|
"layout": {
|
||||||
|
# this is updated after capabilities have been collected.
|
||||||
|
# will look like:
|
||||||
|
#
|
||||||
|
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
||||||
|
},
|
||||||
|
# ignore these for now - not used by IDA plugin.
|
||||||
|
"feature_counts": {
|
||||||
|
"file": {},
|
||||||
|
"functions": {},
|
||||||
|
},
|
||||||
|
"library_functions": {},
|
||||||
},
|
},
|
||||||
"library_functions": {},
|
"version": capa.version.__version__,
|
||||||
},
|
}
|
||||||
"version": capa.version.__version__,
|
)
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class IDAIO:
|
class IDAIO:
|
||||||
|
|||||||
@@ -771,8 +771,9 @@ class CapaExplorerForm(idaapi.PluginForm):
|
|||||||
try:
|
try:
|
||||||
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
|
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
|
||||||
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
|
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
|
||||||
meta["analysis"].update(counts)
|
meta.analysis.__dict__.update(counts)
|
||||||
meta["analysis"]["layout"] = capa.main.compute_layout(ruleset, extractor, capabilities)
|
meta.analysis.__dict__.update(capa.main.compute_layout(ruleset, extractor, capabilities))
|
||||||
|
meta = capa.render.result_document.Metadata.from_capa(meta.dict())
|
||||||
except UserCancelledError:
|
except UserCancelledError:
|
||||||
logger.info("User cancelled analysis.")
|
logger.info("User cancelled analysis.")
|
||||||
return False
|
return False
|
||||||
|
|||||||
77
capa/main.py
77
capa/main.py
@@ -41,6 +41,8 @@ import capa.features.common
|
|||||||
import capa.features.freeze
|
import capa.features.freeze
|
||||||
import capa.render.vverbose
|
import capa.render.vverbose
|
||||||
import capa.features.extractors
|
import capa.features.extractors
|
||||||
|
import capa.render.result_document
|
||||||
|
import capa.render.result_document as rdoc
|
||||||
import capa.features.extractors.common
|
import capa.features.extractors.common
|
||||||
import capa.features.extractors.pefile
|
import capa.features.extractors.pefile
|
||||||
import capa.features.extractors.dnfile_
|
import capa.features.extractors.dnfile_
|
||||||
@@ -315,6 +317,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
|||||||
all_file_matches.items(),
|
all_file_matches.items(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
meta["feature_counts"] = rdoc.FeatureCounts.from_capa(meta["feature_counts"])
|
||||||
|
meta["library_functions"] = tuple(
|
||||||
|
rdoc.LibraryFunction(address=capa.features.freeze.Address.from_capa(address), name=name)
|
||||||
|
for address, name in meta["library_functions"].items()
|
||||||
|
)
|
||||||
|
|
||||||
return matches, meta
|
return matches, meta
|
||||||
|
|
||||||
@@ -739,7 +746,7 @@ def collect_metadata(
|
|||||||
os_: str,
|
os_: str,
|
||||||
rules_path: List[str],
|
rules_path: List[str],
|
||||||
extractor: capa.features.extractors.base_extractor.FeatureExtractor,
|
extractor: capa.features.extractors.base_extractor.FeatureExtractor,
|
||||||
):
|
) -> rdoc.Metadata:
|
||||||
md5 = hashlib.md5()
|
md5 = hashlib.md5()
|
||||||
sha1 = hashlib.sha1()
|
sha1 = hashlib.sha1()
|
||||||
sha256 = hashlib.sha256()
|
sha256 = hashlib.sha256()
|
||||||
@@ -758,34 +765,39 @@ def collect_metadata(
|
|||||||
arch = get_arch(sample_path)
|
arch = get_arch(sample_path)
|
||||||
os_ = get_os(sample_path) if os_ == OS_AUTO else os_
|
os_ = get_os(sample_path) if os_ == OS_AUTO else os_
|
||||||
|
|
||||||
return {
|
return rdoc.Metadata.from_capa(
|
||||||
"timestamp": datetime.datetime.now().isoformat(),
|
{
|
||||||
"version": capa.version.__version__,
|
"timestamp": datetime.datetime.now().isoformat(),
|
||||||
"argv": argv,
|
"version": capa.version.__version__,
|
||||||
"sample": {
|
"argv": argv,
|
||||||
"md5": md5.hexdigest(),
|
"sample": {
|
||||||
"sha1": sha1.hexdigest(),
|
"md5": md5.hexdigest(),
|
||||||
"sha256": sha256.hexdigest(),
|
"sha1": sha1.hexdigest(),
|
||||||
"path": os.path.normpath(sample_path),
|
"sha256": sha256.hexdigest(),
|
||||||
},
|
"path": os.path.normpath(sample_path),
|
||||||
"analysis": {
|
|
||||||
"format": format_,
|
|
||||||
"arch": arch,
|
|
||||||
"os": os_,
|
|
||||||
"extractor": extractor.__class__.__name__,
|
|
||||||
"rules": rules_path,
|
|
||||||
"base_address": extractor.get_base_address(),
|
|
||||||
"layout": {
|
|
||||||
# this is updated after capabilities have been collected.
|
|
||||||
# will look like:
|
|
||||||
#
|
|
||||||
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
|
||||||
},
|
},
|
||||||
},
|
"analysis": {
|
||||||
}
|
"format": format_,
|
||||||
|
"arch": arch,
|
||||||
|
"os": os_,
|
||||||
|
"extractor": extractor.__class__.__name__,
|
||||||
|
"rules": rules_path,
|
||||||
|
"base_address": extractor.get_base_address(),
|
||||||
|
"layout": {
|
||||||
|
"functions": {},
|
||||||
|
# this is updated after capabilities have been collected.
|
||||||
|
# will look like:
|
||||||
|
#
|
||||||
|
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
||||||
|
},
|
||||||
|
"feature_counts": {"file": 0, "functions": {}},
|
||||||
|
"library_functions": {},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def compute_layout(rules, extractor, capabilities):
|
def compute_layout(rules, extractor, capabilities) -> Dict[str, rdoc.Layout]:
|
||||||
"""
|
"""
|
||||||
compute a metadata structure that links basic blocks
|
compute a metadata structure that links basic blocks
|
||||||
to the functions in which they're found.
|
to the functions in which they're found.
|
||||||
@@ -822,7 +834,7 @@ def compute_layout(rules, extractor, capabilities):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return layout
|
return {"layout": rdoc.Layout.from_capa(layout)}
|
||||||
|
|
||||||
|
|
||||||
def install_common_args(parser, wanted=None):
|
def install_common_args(parser, wanted=None):
|
||||||
@@ -1198,7 +1210,7 @@ def main(argv=None):
|
|||||||
return E_FILE_LIMITATION
|
return E_FILE_LIMITATION
|
||||||
|
|
||||||
# TODO: #1411 use a real type, not a dict here.
|
# TODO: #1411 use a real type, not a dict here.
|
||||||
meta: Dict[str, Any]
|
meta: rdoc.Metadata
|
||||||
capabilities: MatchResults
|
capabilities: MatchResults
|
||||||
counts: Dict[str, Any]
|
counts: Dict[str, Any]
|
||||||
|
|
||||||
@@ -1255,15 +1267,15 @@ def main(argv=None):
|
|||||||
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)
|
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)
|
||||||
|
|
||||||
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
|
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
|
||||||
meta["analysis"].update(counts)
|
|
||||||
meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities)
|
meta.analysis.__dict__.update(counts)
|
||||||
|
meta.analysis.__dict__.update(compute_layout(rules, extractor, capabilities))
|
||||||
|
|
||||||
if has_file_limitation(rules, capabilities):
|
if has_file_limitation(rules, capabilities):
|
||||||
# bail if capa encountered file limitation e.g. a packed binary
|
# bail if capa encountered file limitation e.g. a packed binary
|
||||||
# do show the output in verbose mode, though.
|
# do show the output in verbose mode, though.
|
||||||
if not (args.verbose or args.vverbose or args.json):
|
if not (args.verbose or args.vverbose or args.json):
|
||||||
return E_FILE_LIMITATION
|
return E_FILE_LIMITATION
|
||||||
|
|
||||||
if args.json:
|
if args.json:
|
||||||
print(capa.render.json.render(meta, rules, capabilities))
|
print(capa.render.json.render(meta, rules, capabilities))
|
||||||
elif args.vverbose:
|
elif args.vverbose:
|
||||||
@@ -1308,7 +1320,8 @@ def ida_main():
|
|||||||
meta = capa.ida.helpers.collect_metadata([rules_path])
|
meta = capa.ida.helpers.collect_metadata([rules_path])
|
||||||
|
|
||||||
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
|
capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor())
|
||||||
meta["analysis"].update(counts)
|
|
||||||
|
meta.analysis.__dict__.update(counts)
|
||||||
|
|
||||||
if has_file_limitation(rules, capabilities, is_standalone=False):
|
if has_file_limitation(rules, capabilities, is_standalone=False):
|
||||||
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")
|
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")
|
||||||
|
|||||||
@@ -47,6 +47,20 @@ class FunctionLayout(FrozenModel):
|
|||||||
class Layout(FrozenModel):
|
class Layout(FrozenModel):
|
||||||
functions: Tuple[FunctionLayout, ...]
|
functions: Tuple[FunctionLayout, ...]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_capa(cls, layout: dict) -> "Layout":
|
||||||
|
return cls(
|
||||||
|
functions=tuple(
|
||||||
|
FunctionLayout(
|
||||||
|
address=frz.Address.from_capa(address),
|
||||||
|
matched_basic_blocks=tuple(
|
||||||
|
BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in f["matched_basic_blocks"]
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for address, f in layout["functions"].items()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class LibraryFunction(FrozenModel):
|
class LibraryFunction(FrozenModel):
|
||||||
address: frz.Address
|
address: frz.Address
|
||||||
@@ -62,6 +76,16 @@ class FeatureCounts(FrozenModel):
|
|||||||
file: int
|
file: int
|
||||||
functions: Tuple[FunctionFeatureCount, ...]
|
functions: Tuple[FunctionFeatureCount, ...]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_capa(cls, feature_counts: dict) -> "FeatureCounts":
|
||||||
|
return cls(
|
||||||
|
file=feature_counts["file"],
|
||||||
|
functions=tuple(
|
||||||
|
FunctionFeatureCount(address=frz.Address.from_capa(address), count=count)
|
||||||
|
for address, count in feature_counts["functions"].items()
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Analysis(FrozenModel):
|
class Analysis(FrozenModel):
|
||||||
format: str
|
format: str
|
||||||
@@ -83,7 +107,7 @@ class Metadata(FrozenModel):
|
|||||||
analysis: Analysis
|
analysis: Analysis
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_capa(cls, meta: Any) -> "Metadata":
|
def from_capa(cls, meta: dict) -> "Metadata":
|
||||||
return cls(
|
return cls(
|
||||||
timestamp=meta["timestamp"],
|
timestamp=meta["timestamp"],
|
||||||
version=meta["version"],
|
version=meta["version"],
|
||||||
@@ -126,41 +150,6 @@ class Metadata(FrozenModel):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
def to_capa(self) -> Dict[str, Any]:
|
|
||||||
capa_meta = {
|
|
||||||
"timestamp": self.timestamp.isoformat(),
|
|
||||||
"version": self.version,
|
|
||||||
"sample": {
|
|
||||||
"md5": self.sample.md5,
|
|
||||||
"sha1": self.sample.sha1,
|
|
||||||
"sha256": self.sample.sha256,
|
|
||||||
"path": self.sample.path,
|
|
||||||
},
|
|
||||||
"analysis": {
|
|
||||||
"format": self.analysis.format,
|
|
||||||
"arch": self.analysis.arch,
|
|
||||||
"os": self.analysis.os,
|
|
||||||
"extractor": self.analysis.extractor,
|
|
||||||
"rules": self.analysis.rules,
|
|
||||||
"base_address": self.analysis.base_address.to_capa(),
|
|
||||||
"layout": {
|
|
||||||
"functions": {
|
|
||||||
f.address.to_capa(): {
|
|
||||||
"matched_basic_blocks": [bb.address.to_capa() for bb in f.matched_basic_blocks]
|
|
||||||
}
|
|
||||||
for f in self.analysis.layout.functions
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"feature_counts": {
|
|
||||||
"file": self.analysis.feature_counts.file,
|
|
||||||
"functions": {fc.address.to_capa(): fc.count for fc in self.analysis.feature_counts.functions},
|
|
||||||
},
|
|
||||||
"library_functions": {lf.address.to_capa(): lf.name for lf in self.analysis.library_functions},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return capa_meta
|
|
||||||
|
|
||||||
|
|
||||||
class CompoundStatementType:
|
class CompoundStatementType:
|
||||||
AND = "and"
|
AND = "and"
|
||||||
@@ -659,10 +648,12 @@ class ResultDocument(FrozenModel):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if isinstance(meta, Metadata):
|
||||||
|
return ResultDocument(meta=meta, rules=rule_matches)
|
||||||
|
|
||||||
return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches)
|
return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches)
|
||||||
|
|
||||||
def to_capa(self) -> Tuple[Dict, Dict]:
|
def to_capa(self) -> Tuple[Metadata, Dict]:
|
||||||
meta = self.meta.to_capa()
|
|
||||||
capabilities: Dict[
|
capabilities: Dict[
|
||||||
str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]
|
str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]
|
||||||
] = collections.defaultdict(list)
|
] = collections.defaultdict(list)
|
||||||
@@ -678,4 +669,4 @@ class ResultDocument(FrozenModel):
|
|||||||
|
|
||||||
capabilities[rule_name].append((addr.to_capa(), result))
|
capabilities[rule_name].append((addr.to_capa(), result))
|
||||||
|
|
||||||
return meta, capabilities
|
return self.meta, capabilities
|
||||||
|
|||||||
@@ -131,8 +131,8 @@ def get_capa_results(args):
|
|||||||
|
|
||||||
meta = capa.main.collect_metadata([], path, format, os_, [], extractor)
|
meta = capa.main.collect_metadata([], path, format, os_, [], extractor)
|
||||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||||
meta["analysis"].update(counts)
|
meta.analysis.__dict__.update(counts)
|
||||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
meta.analysis.__dict__.update(capa.main.compute_layout(rules, extractor, capabilities))
|
||||||
|
|
||||||
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
|
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
|
||||||
|
|
||||||
|
|||||||
@@ -172,8 +172,8 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
|
|||||||
|
|
||||||
# collect metadata (used only to make rendering more complete)
|
# collect metadata (used only to make rendering more complete)
|
||||||
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor)
|
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor)
|
||||||
meta["analysis"].update(counts)
|
meta.analysis.__dict__.update(counts)
|
||||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
meta.analysis.__dict__.update(capa.main.compute_layout(rules, extractor, capabilities))
|
||||||
|
|
||||||
capa_output: Any = False
|
capa_output: Any = False
|
||||||
if output_format == "dictionary":
|
if output_format == "dictionary":
|
||||||
|
|||||||
@@ -178,8 +178,8 @@ def main(argv=None):
|
|||||||
|
|
||||||
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor)
|
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor)
|
||||||
capabilities, counts = capa.main.find_capabilities(rules, extractor)
|
capabilities, counts = capa.main.find_capabilities(rules, extractor)
|
||||||
meta["analysis"].update(counts)
|
meta.analysis.__dict__.update(counts)
|
||||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
meta.analysis.__dict__.update(capa.main.compute_layout(rules, extractor, capabilities))
|
||||||
|
|
||||||
if capa.main.has_file_limitation(rules, capabilities):
|
if capa.main.has_file_limitation(rules, capabilities):
|
||||||
# bail if capa encountered file limitation e.g. a packed binary
|
# bail if capa encountered file limitation e.g. a packed binary
|
||||||
|
|||||||
@@ -282,5 +282,5 @@ def test_rdoc_to_capa():
|
|||||||
rd = rdoc.ResultDocument.parse_file(path)
|
rd = rdoc.ResultDocument.parse_file(path)
|
||||||
|
|
||||||
meta, capabilites = rd.to_capa()
|
meta, capabilites = rd.to_capa()
|
||||||
assert isinstance(meta, dict)
|
assert isinstance(meta, rdoc.Metadata)
|
||||||
assert isinstance(capabilites, dict)
|
assert isinstance(capabilites, dict)
|
||||||
|
|||||||
Reference in New Issue
Block a user