mirror of
https://github.com/mandiant/capa.git
synced 2025-12-22 23:26:21 -08:00
main: collect os/format/arch into metadata and render it
This commit is contained in:
63
capa/main.py
63
capa/main.py
@@ -237,7 +237,7 @@ def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalon
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def is_supported_file_type(sample: str) -> bool:
|
def is_supported_format(sample: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Return if this is a supported file based on magic header values
|
Return if this is a supported file based on magic header values
|
||||||
"""
|
"""
|
||||||
@@ -247,6 +247,17 @@ def is_supported_file_type(sample: str) -> bool:
|
|||||||
return len(list(capa.features.extractors.common.extract_format(taste))) == 1
|
return len(list(capa.features.extractors.common.extract_format(taste))) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def get_format(sample: str) -> str:
|
||||||
|
with open(sample, "rb") as f:
|
||||||
|
buf = f.read()
|
||||||
|
|
||||||
|
for feature, _ in capa.features.extractors.common.extract_format(buf):
|
||||||
|
assert isinstance(feature.value, str)
|
||||||
|
return feature.value
|
||||||
|
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
def is_supported_arch(sample: str) -> bool:
|
def is_supported_arch(sample: str) -> bool:
|
||||||
with open(sample, "rb") as f:
|
with open(sample, "rb") as f:
|
||||||
buf = f.read()
|
buf = f.read()
|
||||||
@@ -254,6 +265,17 @@ def is_supported_arch(sample: str) -> bool:
|
|||||||
return len(list(capa.features.extractors.common.extract_arch(buf))) == 1
|
return len(list(capa.features.extractors.common.extract_arch(buf))) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def get_arch(sample: str) -> str:
|
||||||
|
with open(sample, "rb") as f:
|
||||||
|
buf = f.read()
|
||||||
|
|
||||||
|
for feature, _ in capa.features.extractors.common.extract_arch(buf):
|
||||||
|
assert isinstance(feature.value, str)
|
||||||
|
return feature.value
|
||||||
|
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
def is_supported_os(sample: str) -> bool:
|
def is_supported_os(sample: str) -> bool:
|
||||||
with open(sample, "rb") as f:
|
with open(sample, "rb") as f:
|
||||||
buf = f.read()
|
buf = f.read()
|
||||||
@@ -261,6 +283,17 @@ def is_supported_os(sample: str) -> bool:
|
|||||||
return len(list(capa.features.extractors.common.extract_os(buf))) == 1
|
return len(list(capa.features.extractors.common.extract_os(buf))) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def get_os(sample: str) -> str:
|
||||||
|
with open(sample, "rb") as f:
|
||||||
|
buf = f.read()
|
||||||
|
|
||||||
|
for feature, _ in capa.features.extractors.common.extract_os(buf):
|
||||||
|
assert isinstance(feature.value, str)
|
||||||
|
return feature.value
|
||||||
|
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
SHELLCODE_BASE = 0x690000
|
SHELLCODE_BASE = 0x690000
|
||||||
|
|
||||||
|
|
||||||
@@ -431,7 +464,7 @@ def get_workspace(path, format, sigpaths):
|
|||||||
|
|
||||||
logger.debug("generating vivisect workspace for: %s", path)
|
logger.debug("generating vivisect workspace for: %s", path)
|
||||||
if format == "auto":
|
if format == "auto":
|
||||||
if not is_supported_file_type(path):
|
if not is_supported_format(path):
|
||||||
raise UnsupportedFormatError()
|
raise UnsupportedFormatError()
|
||||||
|
|
||||||
# don't analyze, so that we can add our Flirt function analyzer first.
|
# don't analyze, so that we can add our Flirt function analyzer first.
|
||||||
@@ -463,15 +496,12 @@ def get_extractor(
|
|||||||
) -> FeatureExtractor:
|
) -> FeatureExtractor:
|
||||||
"""
|
"""
|
||||||
raises:
|
raises:
|
||||||
UnsupportedFormatError:
|
UnsupportedFormatError
|
||||||
|
UnsupportedArchError
|
||||||
|
UnsupportedOSError
|
||||||
"""
|
"""
|
||||||
if format == "auto" and path.endswith(EXTENSIONS_SHELLCODE_32):
|
|
||||||
format = "sc32"
|
|
||||||
elif format == "auto" and path.endswith(EXTENSIONS_SHELLCODE_64):
|
|
||||||
format = "sc64"
|
|
||||||
|
|
||||||
if format not in ("sc32", "sc64"):
|
if format not in ("sc32", "sc64"):
|
||||||
if not is_supported_file_type(path):
|
if not is_supported_format(path):
|
||||||
raise UnsupportedFormatError()
|
raise UnsupportedFormatError()
|
||||||
|
|
||||||
if not is_supported_arch(path):
|
if not is_supported_arch(path):
|
||||||
@@ -605,7 +635,7 @@ def get_signatures(sigs_path):
|
|||||||
return paths
|
return paths
|
||||||
|
|
||||||
|
|
||||||
def collect_metadata(argv, sample_path, rules_path, format, extractor):
|
def collect_metadata(argv, sample_path, rules_path, extractor):
|
||||||
md5 = hashlib.md5()
|
md5 = hashlib.md5()
|
||||||
sha1 = hashlib.sha1()
|
sha1 = hashlib.sha1()
|
||||||
sha256 = hashlib.sha256()
|
sha256 = hashlib.sha256()
|
||||||
@@ -620,6 +650,10 @@ def collect_metadata(argv, sample_path, rules_path, format, extractor):
|
|||||||
if rules_path != RULES_PATH_DEFAULT_STRING:
|
if rules_path != RULES_PATH_DEFAULT_STRING:
|
||||||
rules_path = os.path.abspath(os.path.normpath(rules_path))
|
rules_path = os.path.abspath(os.path.normpath(rules_path))
|
||||||
|
|
||||||
|
format = get_format(sample_path)
|
||||||
|
arch = get_arch(sample_path)
|
||||||
|
os_ = get_os(sample_path)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"timestamp": datetime.datetime.now().isoformat(),
|
"timestamp": datetime.datetime.now().isoformat(),
|
||||||
"version": capa.version.__version__,
|
"version": capa.version.__version__,
|
||||||
@@ -632,6 +666,8 @@ def collect_metadata(argv, sample_path, rules_path, format, extractor):
|
|||||||
},
|
},
|
||||||
"analysis": {
|
"analysis": {
|
||||||
"format": format,
|
"format": format,
|
||||||
|
"arch": arch,
|
||||||
|
"os": os_,
|
||||||
"extractor": extractor.__class__.__name__,
|
"extractor": extractor.__class__.__name__,
|
||||||
"rules": rules_path,
|
"rules": rules_path,
|
||||||
"base_address": extractor.get_base_address(),
|
"base_address": extractor.get_base_address(),
|
||||||
@@ -940,6 +976,11 @@ def main(argv=None):
|
|||||||
extractor = capa.features.freeze.load(f.read())
|
extractor = capa.features.freeze.load(f.read())
|
||||||
else:
|
else:
|
||||||
format = args.format
|
format = args.format
|
||||||
|
if format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_32):
|
||||||
|
format = "sc32"
|
||||||
|
elif format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_64):
|
||||||
|
format = "sc64"
|
||||||
|
|
||||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -973,7 +1014,7 @@ def main(argv=None):
|
|||||||
logger.error("-" * 80)
|
logger.error("-" * 80)
|
||||||
return -1
|
return -1
|
||||||
|
|
||||||
meta = collect_metadata(argv, args.sample, args.rules, format, extractor)
|
meta = collect_metadata(argv, args.sample, args.rules, extractor)
|
||||||
|
|
||||||
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
|
capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)
|
||||||
meta["analysis"].update(counts)
|
meta["analysis"].update(counts)
|
||||||
|
|||||||
@@ -7,7 +7,6 @@
|
|||||||
# See the License for the specific language governing permissions and limitations under the License.
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
from typing import Dict, List
|
|
||||||
|
|
||||||
import tabulate
|
import tabulate
|
||||||
|
|
||||||
@@ -33,6 +32,9 @@ def render_meta(doc, ostream: StringIO):
|
|||||||
(width("md5", 22), width(doc["meta"]["sample"]["md5"], 82)),
|
(width("md5", 22), width(doc["meta"]["sample"]["md5"], 82)),
|
||||||
("sha1", doc["meta"]["sample"]["sha1"]),
|
("sha1", doc["meta"]["sample"]["sha1"]),
|
||||||
("sha256", doc["meta"]["sample"]["sha256"]),
|
("sha256", doc["meta"]["sample"]["sha256"]),
|
||||||
|
("os", doc["meta"]["analysis"]["os"]),
|
||||||
|
("format", doc["meta"]["analysis"]["format"]),
|
||||||
|
("arch", doc["meta"]["analysis"]["arch"]),
|
||||||
("path", doc["meta"]["sample"]["path"]),
|
("path", doc["meta"]["sample"]["path"]),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -41,7 +41,9 @@ def render_meta(ostream, doc):
|
|||||||
path /tmp/suspicious.dll_
|
path /tmp/suspicious.dll_
|
||||||
timestamp 2020-07-03T10:17:05.796933
|
timestamp 2020-07-03T10:17:05.796933
|
||||||
capa version 0.0.0
|
capa version 0.0.0
|
||||||
format auto
|
os windows
|
||||||
|
format pe
|
||||||
|
arch amd64
|
||||||
extractor VivisectFeatureExtractor
|
extractor VivisectFeatureExtractor
|
||||||
base address 0x10000000
|
base address 0x10000000
|
||||||
rules (embedded rules)
|
rules (embedded rules)
|
||||||
@@ -55,7 +57,9 @@ def render_meta(ostream, doc):
|
|||||||
("path", doc["meta"]["sample"]["path"]),
|
("path", doc["meta"]["sample"]["path"]),
|
||||||
("timestamp", doc["meta"]["timestamp"]),
|
("timestamp", doc["meta"]["timestamp"]),
|
||||||
("capa version", doc["meta"]["version"]),
|
("capa version", doc["meta"]["version"]),
|
||||||
|
("os", doc["meta"]["analysis"]["os"]),
|
||||||
("format", doc["meta"]["analysis"]["format"]),
|
("format", doc["meta"]["analysis"]["format"]),
|
||||||
|
("arch", doc["meta"]["analysis"]["arch"]),
|
||||||
("extractor", doc["meta"]["analysis"]["extractor"]),
|
("extractor", doc["meta"]["analysis"]["extractor"]),
|
||||||
("base address", hex(doc["meta"]["analysis"]["base_address"])),
|
("base address", hex(doc["meta"]["analysis"]["base_address"])),
|
||||||
("rules", doc["meta"]["analysis"]["rules"]),
|
("rules", doc["meta"]["analysis"]["rules"]),
|
||||||
|
|||||||
@@ -126,7 +126,7 @@ def get_capa_results(args):
|
|||||||
"error": "unexpected error: %s" % (e),
|
"error": "unexpected error: %s" % (e),
|
||||||
}
|
}
|
||||||
|
|
||||||
meta = capa.main.collect_metadata("", path, "", format, extractor)
|
meta = capa.main.collect_metadata("", path, "", extractor)
|
||||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||||
meta["analysis"].update(counts)
|
meta["analysis"].update(counts)
|
||||||
|
|
||||||
|
|||||||
@@ -169,7 +169,7 @@ def capa_details(file_path, output_format="dictionary"):
|
|||||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||||
|
|
||||||
# collect metadata (used only to make rendering more complete)
|
# collect metadata (used only to make rendering more complete)
|
||||||
meta = capa.main.collect_metadata("", file_path, RULES_PATH, "auto", extractor)
|
meta = capa.main.collect_metadata("", file_path, RULES_PATH, extractor)
|
||||||
meta["analysis"].update(counts)
|
meta["analysis"].update(counts)
|
||||||
|
|
||||||
capa_output = False
|
capa_output = False
|
||||||
|
|||||||
@@ -171,7 +171,7 @@ def main(argv=None):
|
|||||||
logger.error("-" * 80)
|
logger.error("-" * 80)
|
||||||
return -1
|
return -1
|
||||||
|
|
||||||
meta = capa.main.collect_metadata(argv, args.sample, args.rules, format, extractor)
|
meta = capa.main.collect_metadata(argv, args.sample, args.rules, extractor)
|
||||||
capabilities, counts = capa.main.find_capabilities(rules, extractor)
|
capabilities, counts = capa.main.find_capabilities(rules, extractor)
|
||||||
meta["analysis"].update(counts)
|
meta["analysis"].update(counts)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user