mirror of
https://github.com/mandiant/capa.git
synced 2026-02-04 11:07:53 -08:00
extractors: file extractor arg consistency via kwargs
This commit is contained in:
@@ -28,7 +28,7 @@ def extract_format(buf):
|
||||
|
||||
def extract_arch(buf):
|
||||
if buf.startswith(b"MZ"):
|
||||
yield from capa.features.extractors.pefile.extract_file_arch(pefile.PE(data=buf), "hack: path not provided")
|
||||
yield from capa.features.extractors.pefile.extract_file_arch(pe=pefile.PE(data=buf))
|
||||
|
||||
elif buf.startswith(b"\x7fELF"):
|
||||
with contextlib.closing(io.BytesIO(buf)) as f:
|
||||
|
||||
@@ -20,15 +20,12 @@ from capa.features.extractors.base_extractor import FeatureExtractor
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def extract_file_embedded_pe(pe, file_path):
|
||||
with open(file_path, "rb") as f:
|
||||
fbytes = f.read()
|
||||
|
||||
for offset, i in capa.features.extractors.helpers.carve_pe(fbytes, 1):
|
||||
def extract_file_embedded_pe(buf, **kwargs):
|
||||
for offset, _ in capa.features.extractors.helpers.carve_pe(buf, 1):
|
||||
yield Characteristic("embedded pe"), offset
|
||||
|
||||
|
||||
def extract_file_export_names(pe, file_path):
|
||||
def extract_file_export_names(pe, **kwargs):
|
||||
base_address = pe.OPTIONAL_HEADER.ImageBase
|
||||
|
||||
if hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
|
||||
@@ -43,7 +40,7 @@ def extract_file_export_names(pe, file_path):
|
||||
yield Export(name), va
|
||||
|
||||
|
||||
def extract_file_import_names(pe, file_path):
|
||||
def extract_file_import_names(pe, **kwargs):
|
||||
"""
|
||||
extract imported function names
|
||||
1. imports by ordinal:
|
||||
@@ -75,7 +72,7 @@ def extract_file_import_names(pe, file_path):
|
||||
yield Import(name), imp.address
|
||||
|
||||
|
||||
def extract_file_section_names(pe, file_path):
|
||||
def extract_file_section_names(pe, **kwargs):
|
||||
base_address = pe.OPTIONAL_HEADER.ImageBase
|
||||
|
||||
for section in pe.sections:
|
||||
@@ -87,21 +84,18 @@ def extract_file_section_names(pe, file_path):
|
||||
yield Section(name), base_address + section.VirtualAddress
|
||||
|
||||
|
||||
def extract_file_strings(pe, file_path):
|
||||
def extract_file_strings(buf, **kwargs):
|
||||
"""
|
||||
extract ASCII and UTF-16 LE strings from file
|
||||
"""
|
||||
with open(file_path, "rb") as f:
|
||||
b = f.read()
|
||||
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(b):
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
for s in capa.features.extractors.strings.extract_unicode_strings(b):
|
||||
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
|
||||
def extract_file_function_names(pe, file_path):
|
||||
def extract_file_function_names(**kwargs):
|
||||
"""
|
||||
extract the names of statically-linked library functions.
|
||||
"""
|
||||
@@ -111,17 +105,17 @@ def extract_file_function_names(pe, file_path):
|
||||
return
|
||||
|
||||
|
||||
def extract_file_os(pe, file_path):
|
||||
def extract_file_os(**kwargs):
|
||||
# assuming PE -> Windows
|
||||
# though i suppose they're also used by UEFI
|
||||
yield OS(OS_WINDOWS), 0x0
|
||||
|
||||
|
||||
def extract_file_format(pe, file_path):
|
||||
def extract_file_format(**kwargs):
|
||||
yield Format(FORMAT_PE), 0x0
|
||||
|
||||
|
||||
def extract_file_arch(pe, file_path):
|
||||
def extract_file_arch(pe, **kwargs):
|
||||
if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_I386"]:
|
||||
yield Arch(ARCH_I386), 0x0
|
||||
elif pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_AMD64"]:
|
||||
@@ -130,20 +124,20 @@ def extract_file_arch(pe, file_path):
|
||||
logger.warning("unsupported architecture: %s", pefile.MACHINE_TYPE[pe.FILE_HEADER.Machine])
|
||||
|
||||
|
||||
def extract_file_features(pe, file_path):
|
||||
def extract_file_features(pe, buf):
|
||||
"""
|
||||
extract file features from given workspace
|
||||
|
||||
args:
|
||||
pe (pefile.PE): the parsed PE
|
||||
file_path: path to the input file
|
||||
buf: the raw sample bytes
|
||||
|
||||
yields:
|
||||
Tuple[Feature, VA]: a feature and its location.
|
||||
"""
|
||||
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for feature, va in file_handler(pe, file_path):
|
||||
for feature, va in file_handler(pe=pe, buf=buf):
|
||||
yield feature, va
|
||||
|
||||
|
||||
@@ -170,7 +164,10 @@ class PefileFeatureExtractor(FeatureExtractor):
|
||||
return self.pe.OPTIONAL_HEADER.ImageBase
|
||||
|
||||
def extract_file_features(self):
|
||||
for feature, va in extract_file_features(self.pe, self.path):
|
||||
with open(self.path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
for feature, va in extract_file_features(self.pe, buf):
|
||||
yield feature, va
|
||||
|
||||
def get_functions(self):
|
||||
|
||||
@@ -26,7 +26,7 @@ class SmdaFeatureExtractor(FeatureExtractor):
|
||||
return self.smda_report.base_addr
|
||||
|
||||
def extract_file_features(self):
|
||||
for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.path):
|
||||
for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.buf):
|
||||
yield feature, va
|
||||
yield from self.global_features
|
||||
|
||||
|
||||
@@ -8,24 +8,22 @@ from capa.features.file import Export, Import, Section
|
||||
from capa.features.common import String, Characteristic
|
||||
|
||||
|
||||
def extract_file_embedded_pe(smda_report, file_path):
|
||||
with open(file_path, "rb") as f:
|
||||
fbytes = f.read()
|
||||
|
||||
for offset, i in capa.features.extractors.helpers.carve_pe(fbytes, 1):
|
||||
def extract_file_embedded_pe(buf, **kwargs):
|
||||
for offset, _ in capa.features.extractors.helpers.carve_pe(buf, 1):
|
||||
yield Characteristic("embedded pe"), offset
|
||||
|
||||
|
||||
def extract_file_export_names(smda_report, file_path):
|
||||
lief_binary = lief.parse(file_path)
|
||||
def extract_file_export_names(buf, **kwargs):
|
||||
lief_binary = lief.parse(buf)
|
||||
|
||||
if lief_binary is not None:
|
||||
for function in lief_binary.exported_functions:
|
||||
yield Export(function.name), function.address
|
||||
|
||||
|
||||
def extract_file_import_names(smda_report, file_path):
|
||||
def extract_file_import_names(smda_report, buf):
|
||||
# extract import table info via LIEF
|
||||
lief_binary = lief.parse(file_path)
|
||||
lief_binary = lief.parse(buf)
|
||||
if not isinstance(lief_binary, lief.PE.Binary):
|
||||
return
|
||||
for imported_library in lief_binary.imports:
|
||||
@@ -41,8 +39,8 @@ def extract_file_import_names(smda_report, file_path):
|
||||
yield Import(name), va
|
||||
|
||||
|
||||
def extract_file_section_names(smda_report, file_path):
|
||||
lief_binary = lief.parse(file_path)
|
||||
def extract_file_section_names(buf, **kwargs):
|
||||
lief_binary = lief.parse(buf)
|
||||
if not isinstance(lief_binary, lief.PE.Binary):
|
||||
return
|
||||
if lief_binary and lief_binary.sections:
|
||||
@@ -51,21 +49,18 @@ def extract_file_section_names(smda_report, file_path):
|
||||
yield Section(section.name), base_address + section.virtual_address
|
||||
|
||||
|
||||
def extract_file_strings(smda_report, file_path):
|
||||
def extract_file_strings(buf, **kwargs):
|
||||
"""
|
||||
extract ASCII and UTF-16 LE strings from file
|
||||
"""
|
||||
with open(file_path, "rb") as f:
|
||||
b = f.read()
|
||||
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(b):
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
for s in capa.features.extractors.strings.extract_unicode_strings(b):
|
||||
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
|
||||
def extract_file_function_names(smda_report, file_path):
|
||||
def extract_file_function_names(smda_report, **kwargs):
|
||||
"""
|
||||
extract the names of statically-linked library functions.
|
||||
"""
|
||||
@@ -75,25 +70,24 @@ def extract_file_function_names(smda_report, file_path):
|
||||
return
|
||||
|
||||
|
||||
def extract_file_format(smda_report, file_path):
|
||||
with open(file_path, "rb") as f:
|
||||
yield from capa.features.extractors.common.extract_format(f.read())
|
||||
def extract_file_format(buf, **kwargs):
|
||||
yield from capa.features.extractors.common.extract_format(buf)
|
||||
|
||||
|
||||
def extract_features(smda_report, file_path):
|
||||
def extract_features(smda_report, buf):
|
||||
"""
|
||||
extract file features from given workspace
|
||||
|
||||
args:
|
||||
smda_report (smda.common.SmdaReport): a SmdaReport
|
||||
file_path: path to the input file
|
||||
buf: the raw bytes of the sample
|
||||
|
||||
yields:
|
||||
Tuple[Feature, VA]: a feature and its location.
|
||||
"""
|
||||
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for feature, va in file_handler(smda_report, file_path):
|
||||
for feature, va in file_handler(smda_report=smda_report, buf=buf):
|
||||
yield feature, va
|
||||
|
||||
|
||||
|
||||
@@ -18,17 +18,17 @@ from capa.features.file import Export, Import, Section, FunctionName
|
||||
from capa.features.common import String, Characteristic
|
||||
|
||||
|
||||
def extract_file_embedded_pe(vw, buf):
|
||||
for offset, i in pe_carve.carve(buf, 1):
|
||||
def extract_file_embedded_pe(buf, **kwargs):
|
||||
for offset, _ in pe_carve.carve(buf, 1):
|
||||
yield Characteristic("embedded pe"), offset
|
||||
|
||||
|
||||
def extract_file_export_names(vw, buf):
|
||||
for va, etype, name, _ in vw.getExports():
|
||||
def extract_file_export_names(vw, **kwargs):
|
||||
for va, _, name, _ in vw.getExports():
|
||||
yield Export(name), va
|
||||
|
||||
|
||||
def extract_file_import_names(vw, buf):
|
||||
def extract_file_import_names(vw, **kwargs):
|
||||
"""
|
||||
extract imported function names
|
||||
1. imports by ordinal:
|
||||
@@ -62,12 +62,12 @@ def is_viv_ord_impname(impname: str) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def extract_file_section_names(vw, buf):
|
||||
def extract_file_section_names(vw, **kwargs):
|
||||
for va, _, segname, _ in vw.getSegments():
|
||||
yield Section(segname), va
|
||||
|
||||
|
||||
def extract_file_strings(vw, buf):
|
||||
def extract_file_strings(buf, **kwargs):
|
||||
"""
|
||||
extract ASCII and UTF-16 LE strings from file
|
||||
"""
|
||||
@@ -78,7 +78,7 @@ def extract_file_strings(vw, buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
|
||||
def extract_file_function_names(vw, buf):
|
||||
def extract_file_function_names(vw, **kwargs):
|
||||
"""
|
||||
extract the names of statically-linked library functions.
|
||||
"""
|
||||
@@ -88,7 +88,7 @@ def extract_file_function_names(vw, buf):
|
||||
yield FunctionName(name), va
|
||||
|
||||
|
||||
def extract_file_format(vw, buf):
|
||||
def extract_file_format(buf, **kwargs):
|
||||
yield from capa.features.extractors.common.extract_format(buf)
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ def extract_features(vw, buf: bytes):
|
||||
"""
|
||||
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for feature, va in file_handler(vw, buf):
|
||||
for feature, va in file_handler(vw=vw, buf=buf):
|
||||
yield feature, va
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user