diff --git a/.github/pyinstaller/pyinstaller.spec b/.github/pyinstaller/pyinstaller.spec index b8967751..7e7ae207 100644 --- a/.github/pyinstaller/pyinstaller.spec +++ b/.github/pyinstaller/pyinstaller.spec @@ -33,6 +33,7 @@ a = Analysis( # this gets invoked from the directory of the spec file, # i.e. ./.github/pyinstaller ('../../rules', 'rules'), + ('../../sigs', 'sigs'), # capa.render.default uses tabulate that depends on wcwidth. # it seems wcwidth uses a json file `version.json` @@ -108,5 +109,4 @@ exe = EXE(pyz, # a.datas, # strip=None, # upx=True, -# name='capa-dat') - +# name='capa-dat') \ No newline at end of file diff --git a/capa/features/extractors/__init__.py b/capa/features/extractors/__init__.py index 2cdbd4af..1faea42d 100644 --- a/capa/features/extractors/__init__.py +++ b/capa/features/extractors/__init__.py @@ -76,6 +76,41 @@ class FeatureExtractor(object): """ raise NotImplemented + def is_library_function(self, va): + """ + is the given address a library function? + the backend may implement its own function matching algorithm, or none at all. + we accept a VA here, rather than function object, to handle addresses identified in instructions. + + this information is used to: + - filter out matches in library functions (by default), and + - recognize when to fetch symbol names for called (non-API) functions + + args: + va (int): the virtual address of a function. + + returns: + bool: True if the given address is the start of a library function. + """ + return False + + def get_function_name(self, va): + """ + fetch any recognized name for the given address. + this is only guaranteed to return a value when the given function is a recognized library function. + we accept a VA here, rather than function object, to handle addresses identified in instructions. + + args: + va (int): the virtual address of a function. + + returns: + str: the function name + + raises: + KeyError: when the given function does not have a name. + """ + raise KeyError(va) + @abc.abstractmethod def extract_function_features(self, f): """ diff --git a/capa/features/extractors/ida/__init__.py b/capa/features/extractors/ida/__init__.py index 689975b9..a089c9ae 100644 --- a/capa/features/extractors/ida/__init__.py +++ b/capa/features/extractors/ida/__init__.py @@ -6,9 +6,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys -import types - import idaapi import capa.features.extractors.ida.file @@ -18,24 +15,43 @@ import capa.features.extractors.ida.basicblock from capa.features.extractors import FeatureExtractor -def get_ea(self): - """ """ - if isinstance(self, (idaapi.BasicBlock, idaapi.func_t)): +class FunctionHandle: + """this acts like an idaapi.func_t but with __int__()""" + + def __init__(self, inner): + self._inner = inner + + def __int__(self): return self.start_ea - if isinstance(self, idaapi.insn_t): + + def __getattr__(self, name): + return getattr(self._inner, name) + + +class BasicBlockHandle: + """this acts like an idaapi.BasicBlock but with __int__()""" + + def __init__(self, inner): + self._inner = inner + + def __int__(self): + return self.start_ea + + def __getattr__(self, name): + return getattr(self._inner, name) + + +class InstructionHandle: + """this acts like an idaapi.insn_t but with __int__()""" + + def __init__(self, inner): + self._inner = inner + + def __int__(self): return self.ea - raise TypeError - -def add_ea_int_cast(o): - """ - dynamically add a cast-to-int (`__int__`) method to the given object - that returns the value of the `.ea` property. - this bit of skullduggery lets use cast viv-utils objects as ints. - the correct way of doing this is to update viv-utils (or subclass the objects here). - """ - setattr(o, "__int__", types.MethodType(get_ea, o)) - return o + def __getattr__(self, name): + return getattr(self._inner, name) class IdaFeatureExtractor(FeatureExtractor): @@ -59,21 +75,23 @@ class IdaFeatureExtractor(FeatureExtractor): # ignore library functions and thunk functions as identified by IDA for f in ida_helpers.get_functions(skip_thunks=True, skip_libs=True): setattr(f, "ctx", ctx) - yield add_ea_int_cast(f) + yield FunctionHandle(f) @staticmethod def get_function(ea): f = idaapi.get_func(ea) setattr(f, "ctx", {}) - return add_ea_int_cast(f) + return FunctionHandle(f) def extract_function_features(self, f): for (feature, ea) in capa.features.extractors.ida.function.extract_features(f): yield feature, ea def get_basic_blocks(self, f): - for bb in capa.features.extractors.ida.helpers.get_function_blocks(f): - yield add_ea_int_cast(bb) + import capa.features.extractors.ida.helpers as ida_helpers + + for bb in ida_helpers.get_function_blocks(f): + yield BasicBlockHandle(bb) def extract_basic_block_features(self, f, bb): for (feature, ea) in capa.features.extractors.ida.basicblock.extract_features(f, bb): @@ -83,7 +101,7 @@ class IdaFeatureExtractor(FeatureExtractor): import capa.features.extractors.ida.helpers as ida_helpers for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea): - yield add_ea_int_cast(insn) + yield InstructionHandle(insn) def extract_insn_features(self, f, bb, insn): for (feature, ea) in capa.features.extractors.ida.insn.extract_features(f, bb, insn): diff --git a/capa/features/extractors/viv/__init__.py b/capa/features/extractors/viv/__init__.py index 47581236..c54ee0be 100644 --- a/capa/features/extractors/viv/__init__.py +++ b/capa/features/extractors/viv/__init__.py @@ -5,10 +5,10 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. - -import types +import logging import viv_utils +import viv_utils.flirt import capa.features.extractors import capa.features.extractors.viv.file @@ -18,28 +18,20 @@ import capa.features.extractors.viv.basicblock from capa.features.extractors import FeatureExtractor __all__ = ["file", "function", "basicblock", "insn"] +logger = logging.getLogger(__name__) -def get_va(self): - try: - # vivisect type +class InstructionHandle: + """this acts like a vivisect.Opcode but with an __int__() method""" + + def __init__(self, inner): + self._inner = inner + + def __int__(self): return self.va - except AttributeError: - pass - raise TypeError() - - -def add_va_int_cast(o): - """ - dynamically add a cast-to-int (`__int__`) method to the given object - that returns the value of the `.va` property. - - this bit of skullduggery lets use cast viv-utils objects as ints. - the correct way of doing this is to update viv-utils (or subclass the objects here). - """ - setattr(o, "__int__", types.MethodType(get_va, o)) - return o + def __getattr__(self, name): + return getattr(self._inner, name) class VivisectFeatureExtractor(FeatureExtractor): @@ -58,15 +50,14 @@ class VivisectFeatureExtractor(FeatureExtractor): def get_functions(self): for va in sorted(self.vw.getFunctions()): - yield add_va_int_cast(viv_utils.Function(self.vw, va)) + yield viv_utils.Function(self.vw, va) def extract_function_features(self, f): for feature, va in capa.features.extractors.viv.function.extract_features(f): yield feature, va def get_basic_blocks(self, f): - for bb in f.basic_blocks: - yield add_va_int_cast(bb) + return f.basic_blocks def extract_basic_block_features(self, f, bb): for feature, va in capa.features.extractors.viv.basicblock.extract_features(f, bb): @@ -74,8 +65,14 @@ class VivisectFeatureExtractor(FeatureExtractor): def get_instructions(self, f, bb): for insn in bb.instructions: - yield add_va_int_cast(insn) + yield InstructionHandle(insn) def extract_insn_features(self, f, bb, insn): for feature, va in capa.features.extractors.viv.insn.extract_features(f, bb, insn): yield feature, va + + def is_library_function(self, va): + return viv_utils.flirt.is_library_function(self.vw, va) + + def get_function_name(self, va): + return viv_utils.get_function_name(self.vw, va) diff --git a/capa/features/extractors/viv/insn.py b/capa/features/extractors/viv/insn.py index 42deee51..f6c9273c 100644 --- a/capa/features/extractors/viv/insn.py +++ b/capa/features/extractors/viv/insn.py @@ -5,10 +5,14 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import sys +import viv_utils import envi.memory +import viv_utils.flirt import envi.archs.i386.disasm +import capa.features.extractors.viv import capa.features.extractors.helpers import capa.features.extractors.viv.helpers from capa.features import ( @@ -74,7 +78,6 @@ def extract_insn_api_features(f, bb, insn): # example: # # call dword [0x00473038] - if insn.mnem not in ("call", "jmp"): return @@ -96,7 +99,7 @@ def extract_insn_api_features(f, bb, insn): # call via thunk on x86, # see 9324d1a8ae37a36ae560c37448c9705a at 0x407985 # - # this is also how calls to internal functions may be decoded on x64. + # this is also how calls to internal functions may be decoded on x32 and x64. # see Lab21-01.exe_:0x140001178 # # follow chained thunks, e.g. in 82bf6347acf15e5d883715dc289d8a2b at 0x14005E0FF in @@ -111,6 +114,11 @@ def extract_insn_api_features(f, bb, insn): if not target: return + if viv_utils.flirt.is_library_function(f.vw, target): + name = viv_utils.get_function_name(f.vw, target) + yield API(name), insn.va + return + for _ in range(THUNK_CHAIN_DEPTH_DELTA): if target in imports: dll, symbol = imports[target] diff --git a/capa/features/freeze.py b/capa/features/freeze.py index 53c69f1a..68e93563 100644 --- a/capa/features/freeze.py +++ b/capa/features/freeze.py @@ -122,7 +122,7 @@ def dumps(extractor): ) for insnva, insn in sorted( - [(insn.__int__(), insn) for insn in extractor.get_instructions(f, bb)], key=lambda p: p[0] + [(int(insn), insn) for insn in extractor.get_instructions(f, bb)], key=lambda p: p[0] ): ret["functions"][hex(f)][hex(bb)].append(hex(insnva)) @@ -248,43 +248,13 @@ def main(argv=None): if argv is None: argv = sys.argv[1:] - formats = [ - ("auto", "(default) detect file type automatically"), - ("pe", "Windows PE file"), - ("sc32", "32-bit shellcode"), - ("sc64", "64-bit shellcode"), - ] - format_help = ", ".join(["%s: %s" % (f[0], f[1]) for f in formats]) - parser = argparse.ArgumentParser(description="save capa features to a file") - parser.add_argument("sample", type=str, help="Path to sample to analyze") + capa.main.install_common_args(parser, {"sample", "format", "backend", "signatures"}) parser.add_argument("output", type=str, help="Path to output file") - parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output") - parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors") - parser.add_argument( - "-f", "--format", choices=[f[0] for f in formats], default="auto", help="Select sample format, %s" % format_help - ) - parser.add_argument( - "-b", - "--backend", - type=str, - help="select the backend to use", - choices=(capa.main.BACKEND_VIV, capa.main.BACKEND_SMDA), - default=capa.main.BACKEND_VIV, - ) args = parser.parse_args(args=argv) + capa.main.handle_common_args(args) - if args.quiet: - logging.basicConfig(level=logging.ERROR) - logging.getLogger().setLevel(logging.ERROR) - elif args.verbose: - logging.basicConfig(level=logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - else: - logging.basicConfig(level=logging.INFO) - logging.getLogger().setLevel(logging.INFO) - - extractor = capa.main.get_extractor(args.sample, args.format, args.backend) + extractor = capa.main.get_extractor(args.sample, args.format, args.backend, sigpaths=args.signatures) with open(args.output, "wb") as f: f.write(dump(extractor)) diff --git a/capa/helpers.py b/capa/helpers.py index f5e1e5b0..9e799a44 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -12,18 +12,7 @@ _hex = hex def hex(i): - return _hex(oint(i)) - - -def oint(i): - # there seems to be some trouble with using `int(viv_utils.Function)` - # with the black magic we do with binding the `__int__()` routine. - # i haven't had a chance to debug this yet (and i have no hotel wifi). - # so in the meantime, detect this, and call the method directly. - try: - return int(i) - except TypeError: - return i.__int__() + return _hex(int(i)) def get_file_taste(sample_path): diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index f16f5abe..3c71716e 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -78,7 +78,7 @@ def find_func_features(f, extractor): _bb_features[feature].add(ea) func_features[feature].add(ea) - bb_features[capa.helpers.oint(bb)] = _bb_features + bb_features[int(bb)] = _bb_features return func_features, bb_features @@ -100,7 +100,7 @@ def find_func_matches(f, ruleset, func_features, bb_features): func_features[capa.features.MatchedRule(name)].add(ea) # find rule matches for function, function features include rule matches for basic blocks - _, matches = capa.engine.match(ruleset.function_rules, func_features, capa.helpers.oint(f)) + _, matches = capa.engine.match(ruleset.function_rules, func_features, int(f)) for (name, res) in matches.items(): func_matches[name].extend(res) diff --git a/capa/main.py b/capa/main.py index 82f6f4a6..61a0eaa3 100644 --- a/capa/main.py +++ b/capa/main.py @@ -10,16 +10,20 @@ See the License for the specific language governing permissions and limitations """ import os import sys +import gzip +import time import hashlib import logging import os.path import argparse import datetime import textwrap +import contextlib import collections import halo import tqdm +import flirt import colorama import capa.rules @@ -29,7 +33,7 @@ import capa.version import capa.features import capa.features.freeze import capa.features.extractors -from capa.helpers import oint, get_file_taste +from capa.helpers import get_file_taste RULES_PATH_DEFAULT_STRING = "(embedded rules)" SUPPORTED_FILE_MAGIC = set([b"MZ"]) @@ -42,6 +46,14 @@ EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64") logger = logging.getLogger("capa") +@contextlib.contextmanager +def timing(msg): + t0 = time.time() + yield + t1 = time.time() + logger.debug("perf: %s: %0.2fs", msg, t1 - t0) + + def set_vivisect_log_level(level): logging.getLogger("vivisect").setLevel(level) logging.getLogger("vivisect.base").setLevel(level) @@ -76,14 +88,14 @@ def find_function_capabilities(ruleset, extractor, f): bb_features[feature].add(va) function_features[feature].add(va) - _, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, oint(bb)) + _, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, int(bb)) for rule_name, res in matches.items(): bb_matches[rule_name].extend(res) for va, _ in res: function_features[capa.features.MatchedRule(rule_name)].add(va) - _, function_matches = capa.engine.match(ruleset.function_rules, function_features, oint(f)) + _, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f)) return function_matches, bb_matches, len(function_features) @@ -125,10 +137,19 @@ def find_capabilities(ruleset, extractor, disable_progress=None): # to disable progress completely pbar = lambda s, *args, **kwargs: s - for f in pbar(list(extractor.get_functions()), desc="matching", unit=" functions"): + functions = list(extractor.get_functions()) + + for f in pbar(functions, desc="matching", unit=" functions"): + function_address = int(f) + + if extractor.is_library_function(function_address): + function_name = extractor.get_function_name(function_address) + logger.debug("skipping library function 0x%x (%s)", function_address, function_name) + continue + function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f) - meta["feature_counts"]["functions"][f.__int__()] = feature_count - logger.debug("analyzed function 0x%x and extracted %d features", f.__int__(), feature_count) + meta["feature_counts"]["functions"][function_address] = feature_count + logger.debug("analyzed function 0x%x and extracted %d features", function_address, feature_count) for rule_name, res in function_matches.items(): all_function_matches[rule_name].extend(res) @@ -136,7 +157,7 @@ def find_capabilities(ruleset, extractor, disable_progress=None): all_bb_matches[rule_name].extend(res) # mapping from matched rule feature to set of addresses at which it matched. - # schema: Dic[MatchedRule: Set[int] + # schema: Dict[MatchedRule: Set[int] function_features = { capa.features.MatchedRule(rule_name): set(map(lambda p: p[0], results)) for rule_name, results in all_function_matches.items() @@ -229,26 +250,31 @@ def is_supported_file_type(sample): SHELLCODE_BASE = 0x690000 -def get_shellcode_vw(sample, arch="auto", should_save=True): +def get_shellcode_vw(sample, arch="auto"): """ - Return shellcode workspace using explicit arch or via auto detect + Return shellcode workspace using explicit arch or via auto detect. + The workspace is *not* analyzed nor saved. Its up to the caller to do this. + Then, they can register FLIRT analyzers or decide not to write to disk. """ import viv_utils with open(sample, "rb") as f: sample_bytes = f.read() + if arch == "auto": # choose arch with most functions, idea by Jay G. vw_cands = [] for arch in ["i386", "amd64"]: vw_cands.append( - viv_utils.getShellcodeWorkspace(sample_bytes, arch, base=SHELLCODE_BASE, should_save=should_save) + viv_utils.getShellcodeWorkspace( + sample_bytes, arch, base=SHELLCODE_BASE, analyze=False, should_save=False + ) ) if not vw_cands: raise ValueError("could not generate vivisect workspace") vw = max(vw_cands, key=lambda vw: len(vw.getFunctions())) else: - vw = viv_utils.getShellcodeWorkspace(sample_bytes, arch, base=SHELLCODE_BASE, should_save=should_save) + vw = viv_utils.getShellcodeWorkspace(sample_bytes, arch, base=SHELLCODE_BASE, analyze=False, should_save=False) vw.setMeta("StorageName", "%s.viv" % sample) @@ -266,26 +292,114 @@ def get_meta_str(vw): return "%s, number of functions: %d" % (", ".join(meta), len(vw.getFunctions())) +def load_flirt_signature(path): + + if path.endswith(".sig"): + with open(path, "rb") as f: + with timing("flirt: parsing .sig: " + path): + sigs = flirt.parse_sig(f.read()) + + elif path.endswith(".pat"): + with open(path, "rb") as f: + with timing("flirt: parsing .pat: " + path): + sigs = flirt.parse_pat(f.read().decode("utf-8").replace("\r\n", "\n")) + + elif path.endswith(".pat.gz"): + with gzip.open(path, "rb") as f: + with timing("flirt: parsing .pat.gz: " + path): + sigs = flirt.parse_pat(f.read().decode("utf-8").replace("\r\n", "\n")) + + else: + raise ValueError("unexpect signature file extension: " + path) + + return sigs + + +def register_flirt_signature_analyzers(vw, sigpaths): + """ + args: + vw (vivisect.VivWorkspace): + sigpaths (List[str]): file system paths of .sig/.pat files + """ + import viv_utils.flirt + + for sigpath in sigpaths: + sigs = load_flirt_signature(sigpath) + + logger.debug("flirt: sig count: %d", len(sigs)) + + with timing("flirt: compiling sigs"): + matcher = flirt.compile(sigs) + + analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, sigpath) + logger.debug("registering viv function analyzer: %s", repr(analyzer)) + viv_utils.flirt.addFlirtFunctionAnalyzer(vw, analyzer) + + +def get_default_signatures(): + if hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS"): + logger.debug("detected running under PyInstaller") + sigs_path = os.path.join(sys._MEIPASS, "sigs") + logger.debug("default signatures path (PyInstaller method): %s", sigs_path) + else: + logger.debug("detected running from source") + sigs_path = os.path.join(os.path.dirname(__file__), "..", "sigs") + logger.debug("default signatures path (source method): %s", sigs_path) + + ret = [] + for root, dirs, files in os.walk(sigs_path): + for file in files: + if not (file.endswith(".pat") or file.endswith(".pat.gz") or file.endswith(".sig")): + continue + + ret.append(os.path.join(root, file)) + + return ret + + class UnsupportedFormatError(ValueError): pass -def get_workspace(path, format, should_save=True): +def get_workspace(path, format, sigpaths): + """ + load the program at the given path into a vivisect workspace using the given format. + also apply the given FLIRT signatures. + + supported formats: + - pe + - sc32 + - sc64 + - auto + + this creates and analyzes the workspace; however, it does *not* save the workspace. + this is the responsibility of the caller. + """ + + # lazy import enables us to not require viv if user wants SMDA, for example. import viv_utils logger.debug("generating vivisect workspace for: %s", path) if format == "auto": if not is_supported_file_type(path): raise UnsupportedFormatError() - vw = viv_utils.getWorkspace(path, should_save=should_save) + + # don't analyze, so that we can add our Flirt function analyzer first. + vw = viv_utils.getWorkspace(path, analyze=False, should_save=False) elif format == "pe": - vw = viv_utils.getWorkspace(path, should_save=should_save) + vw = viv_utils.getWorkspace(path, analyze=False, should_save=False) elif format == "sc32": - vw = get_shellcode_vw(path, arch="i386", should_save=should_save) + # these are not analyzed nor saved. + vw = get_shellcode_vw(path, arch="i386") elif format == "sc64": - vw = get_shellcode_vw(path, arch="amd64", should_save=should_save) + vw = get_shellcode_vw(path, arch="amd64") else: raise ValueError("unexpected format: " + format) + + register_flirt_signature_analyzers(vw, sigpaths) + + vw.analyze() + logger.debug("%s", get_meta_str(vw)) return vw @@ -294,7 +408,7 @@ class UnsupportedRuntimeError(RuntimeError): pass -def get_extractor(path, format, backend, disable_progress=False): +def get_extractor(path, format, backend, sigpaths, disable_progress=False): """ raises: UnsupportedFormatError: @@ -321,7 +435,7 @@ def get_extractor(path, format, backend, disable_progress=False): format = "sc32" elif format == "auto" and path.endswith(EXTENSIONS_SHELLCODE_64): format = "sc64" - vw = get_workspace(path, format, should_save=False) + vw = get_workspace(path, format, sigpaths) try: vw.saveWorkspace() @@ -524,6 +638,18 @@ def install_common_args(parser, wanted=None): help="path to rule file or directory, use embedded rules by default", ) + if "signatures" in wanted: + parser.add_argument( + "--signature", + action="append", + dest="signatures", + type=str, + # with action=append, users can specify futher signatures but not override whats found in $capa/sigs/. + # seems reasonable for now. this is an easy way to register the default signature set. + default=get_default_signatures(), + help="use the given signatures to identify library functions, file system paths to .sig/.pat files.", + ) + if "tag" in wanted: parser.add_argument("-t", "--tag", type=str, help="filter on rule meta field values") @@ -609,7 +735,7 @@ def main(argv=None): parser = argparse.ArgumentParser( description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter ) - install_common_args(parser, {"sample", "format", "backend", "rules", "tag"}) + install_common_args(parser, {"sample", "format", "backend", "signatures", "rules", "tag"}) parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text") args = parser.parse_args(args=argv) handle_common_args(args) @@ -678,7 +804,7 @@ def main(argv=None): else: format = args.format try: - extractor = get_extractor(args.sample, format, args.backend, disable_progress=args.quiet) + extractor = get_extractor(args.sample, format, args.backend, args.signatures, disable_progress=args.quiet) except UnsupportedFormatError: logger.error("-" * 80) logger.error(" Input file does not appear to be a PE file.") diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 584e8d66..14f1d8ac 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -96,7 +96,7 @@ def get_capa_results(args): rules, format, path = args logger.info("computing capa results for: %s", path) try: - extractor = capa.main.get_extractor(path, format, capa.main.BACKEND_VIV, disable_progress=True) + extractor = capa.main.get_extractor(path, format, capa.main.BACKEND_VIV, args.signatures, disable_progress=True) except capa.main.UnsupportedFormatError: # i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries. # so instead, return an object with explicit success/failure status. @@ -140,7 +140,7 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="detect capabilities in programs.") - capa.main.install_common_args(parser, wanted={"rules"}) + capa.main.install_common_args(parser, wanted={"rules", "signatures"}) parser.add_argument("input", type=str, help="Path to directory of files to recursively analyze") parser.add_argument( "-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor" diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 87a2378a..2d1b8ddf 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -192,7 +192,7 @@ def render_dictionary(doc): def capa_details(file_path, output_format="dictionary"): # extract features and find capabilities - extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, disable_progress=True) + extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, sigpaths=[], disable_progress=True) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) diff --git a/scripts/lint.py b/scripts/lint.py index 9e9e2e3c..893b8cfd 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -181,6 +181,9 @@ class ExampleFileDNE(Lint): return not found +DEFAULT_SIGNATURES = capa.main.get_default_signatures() + + class DoesntMatchExample(Lint): name = "doesn't match on referenced example" recommendation = "Fix the rule logic or provide a different example" @@ -203,7 +206,9 @@ class DoesntMatchExample(Lint): continue try: - extractor = capa.main.get_extractor(path, "auto", capa.main.BACKEND_VIV, disable_progress=True) + extractor = capa.main.get_extractor( + path, "auto", capa.main.BACKEND_VIV, sigpaths=DEFAULT_SIGNATURES, disable_progress=True + ) capabilities, meta = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True) except Exception as e: logger.error("failed to extract capabilities: %s %s %s", rule.name, path, e) diff --git a/scripts/match-function-id.py b/scripts/match-function-id.py new file mode 100644 index 00000000..c37702ba --- /dev/null +++ b/scripts/match-function-id.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +""" +Copyright (C) 2021 FireEye, Inc. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +You may obtain a copy of the License at: [package root]/LICENSE.txt +Unless required by applicable law or agreed to in writing, software distributed under the License + is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. + +match-function-id + +Show the names of functions as recognized by the function identification subsystem. +This can help identify library functions statically linked into a program, +such as when triaging false positive matches in capa rules. + +Example:: + + $ python scripts/match-function-id.py --signature sigs/vc6.pat.gz /tmp/suspicious.dll_ + 0x44cf30: ?GetPdbDll@@YAPAUHINSTANCE__@@XZ + 0x44bb20: ?_strlen_priv@@YAIPBD@Z + 0x44b6b0: ?invoke_main@@YAHXZ + 0x44a5d0: ?find_pe_section@@YAPAU_IMAGE_SECTION_HEADER@@QAEI@Z + 0x44a690: ?is_potentially_valid_image_base@@YA_NQAX@Z + 0x44cbe0: ___get_entropy + 0x44a4a0: __except_handler4 + 0x44b3d0: ?pre_cpp_initialization@@YAXXZ + 0x44b2e0: ?pre_c_initialization@@YAHXZ + 0x44b3c0: ?post_pgo_initialization@@YAHXZ + 0x420156: ? + 0x420270: ? + 0x430dcd: ? + 0x44d930: __except_handler4_noexcept + 0x41e960: ? + 0x44a1e0: @_RTC_AllocaHelper@12 + 0x44ba90: ?_getMemBlockDataString@@YAXPAD0PBDI@Z + 0x44a220: @_RTC_CheckStackVars2@12 + 0x44a790: ___scrt_dllmain_after_initialize_c + 0x44a7d0: ___scrt_dllmain_before_initialize_c + 0x44a800: ___scrt_dllmain_crt_thread_attach + 0x44a860: ___scrt_dllmain_exception_filter + 0x44a900: ___scrt_dllmain_uninitialize_critical + 0x44ad10: _at_quick_exit + 0x44b940: ?_RTC_Failure@@YAXPAXH@Z + 0x44be60: __RTC_UninitUse + 0x44bfd0: __RTC_GetErrDesc + 0x44c060: __RTC_SetErrorType + 0x44cb60: ? + 0x44cba0: __guard_icall_checks_enforced +""" +import sys +import logging +import argparse + +import flirt +import viv_utils +import viv_utils.flirt + +import capa.main +import capa.rules +import capa.engine +import capa.helpers +import capa.features +import capa.features.freeze + +logger = logging.getLogger("capa.match-function-id") + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + parser = argparse.ArgumentParser(description="FLIRT match each function") + parser.add_argument("sample", type=str, help="Path to sample to analyze") + parser.add_argument( + "-F", + "--function", + type=lambda x: int(x, 0x10), + help="match a specific function by VA, rather than add functions", + ) + parser.add_argument( + "--signature", + action="append", + dest="signatures", + type=str, + default=[], + help="use the given signatures to identify library functions, file system paths to .sig/.pat files.", + ) + parser.add_argument("-d", "--debug", action="store_true", help="Enable debugging output on STDERR") + parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors") + args = parser.parse_args(args=argv) + + if args.quiet: + logging.basicConfig(level=logging.ERROR) + logging.getLogger().setLevel(logging.ERROR) + elif args.debug: + logging.basicConfig(level=logging.DEBUG) + logging.getLogger().setLevel(logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + logging.getLogger().setLevel(logging.INFO) + + # disable vivisect-related logging, it's verbose and not relevant for capa users + capa.main.set_vivisect_log_level(logging.CRITICAL) + + analyzers = [] + for sigpath in args.signatures: + sigs = capa.main.load_flirt_signature(sigpath) + + with capa.main.timing("flirt: compiling sigs"): + matcher = flirt.compile(sigs) + + analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, sigpath) + logger.debug("registering viv function analyzer: %s", repr(analyzer)) + analyzers.append(analyzer) + + vw = viv_utils.getWorkspace(args.sample, analyze=True, should_save=False) + + functions = vw.getFunctions() + if args.function: + functions = [args.function] + + for function in functions: + logger.debug("matching function: 0x%04x", function) + for analyzer in analyzers: + name = viv_utils.flirt.match_function_flirt_signatures(analyzer.matcher, vw, function) + if name: + print("0x%04x: %s" % (function, name)) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 569e7534..f3d73710 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -111,7 +111,7 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="detect capabilities in programs.") - capa.main.install_common_args(parser, wanted={"format", "sample", "rules", "tag"}) + capa.main.install_common_args(parser, wanted={"format", "backend", "sample", "signatures", "rules", "tag"}) args = parser.parse_args(args=argv) capa.main.handle_common_args(args) @@ -152,8 +152,9 @@ def main(argv=None): extractor = capa.features.freeze.load(f.read()) else: format = args.format + try: - extractor = capa.main.get_extractor(args.sample, args.format) + extractor = capa.main.get_extractor(args.sample, args.format, args.backend, args.signatures) except capa.main.UnsupportedFormatError: logger.error("-" * 80) logger.error(" Input file does not appear to be a PE file.") diff --git a/scripts/show-features.py b/scripts/show-features.py index f357ab51..8960deca 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -83,7 +83,7 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Show the features that capa extracts from the given sample") - capa.main.install_common_args(parser, wanted={"format", "sample"}) + capa.main.install_common_args(parser, wanted={"format", "sample", "signatures"}) parser.add_argument("-F", "--function", type=lambda x: int(x, 0x10), help="Show features for specific function") args = parser.parse_args(args=argv) @@ -100,7 +100,9 @@ def main(argv=None): extractor = capa.features.freeze.load(f.read()) else: try: - extractor = capa.main.get_extractor(args.sample, args.format, capa.main.BACKEND_VIV) + extractor = capa.main.get_extractor( + args.sample, args.format, capa.main.BACKEND_VIV, sigpaths=args.signatures + ) except capa.main.UnsupportedFormatError: logger.error("-" * 80) logger.error(" Input file does not appear to be a PE file.") @@ -135,9 +137,9 @@ def main(argv=None): if args.format == "freeze": functions = tuple(filter(lambda f: f == args.function, functions)) else: - functions = tuple(filter(lambda f: capa.helpers.oint(f) == args.function, functions)) + functions = tuple(filter(lambda f: int(f) == args.function, functions)) - if args.function not in [capa.helpers.oint(f) for f in functions]: + if args.function not in [int(f) for f in functions]: print("0x%X not a function" % args.function) return -1 diff --git a/setup.py b/setup.py index 580a3be1..1b1c685a 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,6 @@ # See the License for the specific language governing permissions and limitations under the License. import os -import sys import setuptools @@ -19,7 +18,7 @@ requirements = [ "termcolor==1.1.0", "wcwidth==0.2.5", "ida-settings==2.1.0", - "viv-utils==0.6.0", + "viv-utils[flirt]==0.6.2", "halo==0.0.31", "networkx==2.5.1", "ruamel.yaml==0.17.4", diff --git a/sigs/README.md b/sigs/README.md new file mode 100644 index 00000000..c2024fb3 --- /dev/null +++ b/sigs/README.md @@ -0,0 +1,9 @@ +# capa/sigs + +This directory contains FLIRT signatures that capa uses to identify library functions. +Typically, capa will ignore library functions, which reduces false positives and improves runtime. + +These FLIRT signatures were generated by FireEye using the Hex-Rays FLAIR tools such as `pcf` and `sigmake`. +FireEye generated the signatures from source data that they collected; these signatures are not derived from the FLIRT signatures distributed with IDA PRo. + +The signatures in this directory have the same license as capa: Apache 2.0. \ No newline at end of file diff --git a/sigs/flare_common_libs.sig b/sigs/flare_common_libs.sig new file mode 100644 index 00000000..436b2046 Binary files /dev/null and b/sigs/flare_common_libs.sig differ diff --git a/sigs/flare_msvc_atlmfc_32_64.sig b/sigs/flare_msvc_atlmfc_32_64.sig new file mode 100644 index 00000000..a00aaafa Binary files /dev/null and b/sigs/flare_msvc_atlmfc_32_64.sig differ diff --git a/sigs/flare_msvc_rtf_32_64.sig b/sigs/flare_msvc_rtf_32_64.sig new file mode 100644 index 00000000..0edf8f7d Binary files /dev/null and b/sigs/flare_msvc_rtf_32_64.sig differ diff --git a/sigs/test_aulldiv.pat b/sigs/test_aulldiv.pat new file mode 100644 index 00000000..bbf3e52d --- /dev/null +++ b/sigs/test_aulldiv.pat @@ -0,0 +1,11 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +53568b4424180bc075188b4c24148b44241033d2f7f18bd88b44240cf7f18bd3 48 d0a0 0068 :0000 __aulldiv +558bec83e4e083ec60c5fe7f4c2420c5fe7f0424ff742424ff742424ff74240c 08 6e24 0095 :0000 ___avx_u64div4@@64 ^0028 __aulldiv +558bec83e4f083ec30660f6fd1660f73da04660f6fd8660f7ed0660f73db04f3 1b ed6c 0080 :0000 ___sse2_u64div2@@32 ^003b __aulldiv +--- \ No newline at end of file diff --git a/sigs/test_aullrem.pat.gz b/sigs/test_aullrem.pat.gz new file mode 100644 index 00000000..b63ee4f5 Binary files /dev/null and b/sigs/test_aullrem.pat.gz differ diff --git a/tests/fixtures.py b/tests/fixtures.py index b5a0b91d..33b0cc26 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -72,12 +72,15 @@ def xfail(condition, reason=None): def get_viv_extractor(path): import capa.features.extractors.viv + aulldiv_pat = os.path.join(CD, "..", "sigs", "test_aulldiv.pat") + aullrem_pat = os.path.join(CD, "..", "sigs", "test_aullrem.pat.gz") + if "raw32" in path: - vw = capa.main.get_workspace(path, "sc32", should_save=False) + vw = capa.main.get_workspace(path, "sc32", sigpaths=[aulldiv_pat, aullrem_pat]) elif "raw64" in path: - vw = capa.main.get_workspace(path, "sc64", should_save=False) + vw = capa.main.get_workspace(path, "sc64", sigpaths=[aulldiv_pat, aullrem_pat]) else: - vw = capa.main.get_workspace(path, "auto", should_save=True) + vw = capa.main.get_workspace(path, "auto", sigpaths=[aulldiv_pat, aullrem_pat]) extractor = capa.features.extractors.viv.VivisectFeatureExtractor(vw, path) fixup_viv(path, extractor) return extractor @@ -241,14 +244,14 @@ def sample(request): def get_function(extractor, fva): for f in extractor.get_functions(): - if f.__int__() == fva: + if int(f) == fva: return f raise ValueError("function not found") def get_basic_block(extractor, f, va): for bb in extractor.get_basic_blocks(f): - if bb.__int__() == va: + if int(bb) == va: return bb raise ValueError("basic block not found") diff --git a/tests/test_freeze.py b/tests/test_freeze.py index 4e27d5f8..514e98f3 100644 --- a/tests/test_freeze.py +++ b/tests/test_freeze.py @@ -115,30 +115,23 @@ def compare_extractors_viv_null(viv_ext, null_ext): null_ext (capa.features.extractors.NullFeatureExtractor) """ assert list(viv_ext.extract_file_features()) == list(null_ext.extract_file_features()) - assert list(map(to_int, viv_ext.get_functions())) == list(null_ext.get_functions()) + assert list(map(int, viv_ext.get_functions())) == list(null_ext.get_functions()) for f in viv_ext.get_functions(): - assert list(map(to_int, viv_ext.get_basic_blocks(f))) == list(null_ext.get_basic_blocks(to_int(f))) - assert list(viv_ext.extract_function_features(f)) == list(null_ext.extract_function_features(to_int(f))) + assert list(map(int, viv_ext.get_basic_blocks(f))) == list(null_ext.get_basic_blocks(int(f))) + assert list(viv_ext.extract_function_features(f)) == list(null_ext.extract_function_features(int(f))) for bb in viv_ext.get_basic_blocks(f): - assert list(map(to_int, viv_ext.get_instructions(f, bb))) == list( - null_ext.get_instructions(to_int(f), to_int(bb)) - ) + assert list(map(int, viv_ext.get_instructions(f, bb))) == list(null_ext.get_instructions(int(f), int(bb))) assert list(viv_ext.extract_basic_block_features(f, bb)) == list( - null_ext.extract_basic_block_features(to_int(f), to_int(bb)) + null_ext.extract_basic_block_features(int(f), int(bb)) ) for insn in viv_ext.get_instructions(f, bb): assert list(viv_ext.extract_insn_features(f, bb, insn)) == list( - null_ext.extract_insn_features(to_int(f), to_int(bb), to_int(insn)) + null_ext.extract_insn_features(int(f), int(bb), int(insn)) ) -def to_int(o): - """helper to get int value of extractor items""" - return capa.helpers.oint(o) - - def test_freeze_s_roundtrip(): load = capa.features.freeze.loads dump = capa.features.freeze.dumps diff --git a/tests/test_function_id.py b/tests/test_function_id.py new file mode 100644 index 00000000..c6ad2be9 --- /dev/null +++ b/tests/test_function_id.py @@ -0,0 +1,27 @@ +from fixtures import get_function, pma16_01_extractor, extract_function_features + +import capa.features.insn + + +def test_function_id_simple_match(pma16_01_extractor): + assert pma16_01_extractor.is_library_function(0x407490) == True + assert pma16_01_extractor.get_function_name(0x407490) == "__aulldiv" + + +def test_function_id_gz_pat(pma16_01_extractor): + # aullrem is stored in `test_aullrem.pat.gz` + assert pma16_01_extractor.is_library_function(0x407500) == True + assert pma16_01_extractor.get_function_name(0x407500) == "__aullrem" + + +def test_function_id_complex_match(pma16_01_extractor): + # 0x405714 is __spawnlp which requires recursive match of __spawnvp at 0x407FAB + # (and __spawnvpe at 0x409DE8) + assert pma16_01_extractor.is_library_function(0x405714) == True + assert pma16_01_extractor.get_function_name(0x405714) == "__spawnlp" + + +def test_function_id_api_feature(pma16_01_extractor): + f = get_function(pma16_01_extractor, 0x404548) + features = extract_function_features(pma16_01_extractor, f) + assert capa.features.insn.API("__aulldiv") in features