Merge pull request #446 from fireeye/function-id-flirt

feature: match functions with FLIRT
This commit is contained in:
Willi Ballenthin
2021-04-30 08:49:30 -06:00
committed by GitHub
25 changed files with 476 additions and 149 deletions

View File

@@ -33,6 +33,7 @@ a = Analysis(
# this gets invoked from the directory of the spec file,
# i.e. ./.github/pyinstaller
('../../rules', 'rules'),
('../../sigs', 'sigs'),
# capa.render.default uses tabulate that depends on wcwidth.
# it seems wcwidth uses a json file `version.json`
@@ -108,5 +109,4 @@ exe = EXE(pyz,
# a.datas,
# strip=None,
# upx=True,
# name='capa-dat')
# name='capa-dat')

View File

@@ -76,6 +76,41 @@ class FeatureExtractor(object):
"""
raise NotImplemented
def is_library_function(self, va):
"""
is the given address a library function?
the backend may implement its own function matching algorithm, or none at all.
we accept a VA here, rather than function object, to handle addresses identified in instructions.
this information is used to:
- filter out matches in library functions (by default), and
- recognize when to fetch symbol names for called (non-API) functions
args:
va (int): the virtual address of a function.
returns:
bool: True if the given address is the start of a library function.
"""
return False
def get_function_name(self, va):
"""
fetch any recognized name for the given address.
this is only guaranteed to return a value when the given function is a recognized library function.
we accept a VA here, rather than function object, to handle addresses identified in instructions.
args:
va (int): the virtual address of a function.
returns:
str: the function name
raises:
KeyError: when the given function does not have a name.
"""
raise KeyError(va)
@abc.abstractmethod
def extract_function_features(self, f):
"""

View File

@@ -6,9 +6,6 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import types
import idaapi
import capa.features.extractors.ida.file
@@ -18,24 +15,43 @@ import capa.features.extractors.ida.basicblock
from capa.features.extractors import FeatureExtractor
def get_ea(self):
""" """
if isinstance(self, (idaapi.BasicBlock, idaapi.func_t)):
class FunctionHandle:
"""this acts like an idaapi.func_t but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.start_ea
if isinstance(self, idaapi.insn_t):
def __getattr__(self, name):
return getattr(self._inner, name)
class BasicBlockHandle:
"""this acts like an idaapi.BasicBlock but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.start_ea
def __getattr__(self, name):
return getattr(self._inner, name)
class InstructionHandle:
"""this acts like an idaapi.insn_t but with __int__()"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.ea
raise TypeError
def add_ea_int_cast(o):
"""
dynamically add a cast-to-int (`__int__`) method to the given object
that returns the value of the `.ea` property.
this bit of skullduggery lets use cast viv-utils objects as ints.
the correct way of doing this is to update viv-utils (or subclass the objects here).
"""
setattr(o, "__int__", types.MethodType(get_ea, o))
return o
def __getattr__(self, name):
return getattr(self._inner, name)
class IdaFeatureExtractor(FeatureExtractor):
@@ -59,21 +75,23 @@ class IdaFeatureExtractor(FeatureExtractor):
# ignore library functions and thunk functions as identified by IDA
for f in ida_helpers.get_functions(skip_thunks=True, skip_libs=True):
setattr(f, "ctx", ctx)
yield add_ea_int_cast(f)
yield FunctionHandle(f)
@staticmethod
def get_function(ea):
f = idaapi.get_func(ea)
setattr(f, "ctx", {})
return add_ea_int_cast(f)
return FunctionHandle(f)
def extract_function_features(self, f):
for (feature, ea) in capa.features.extractors.ida.function.extract_features(f):
yield feature, ea
def get_basic_blocks(self, f):
for bb in capa.features.extractors.ida.helpers.get_function_blocks(f):
yield add_ea_int_cast(bb)
import capa.features.extractors.ida.helpers as ida_helpers
for bb in ida_helpers.get_function_blocks(f):
yield BasicBlockHandle(bb)
def extract_basic_block_features(self, f, bb):
for (feature, ea) in capa.features.extractors.ida.basicblock.extract_features(f, bb):
@@ -83,7 +101,7 @@ class IdaFeatureExtractor(FeatureExtractor):
import capa.features.extractors.ida.helpers as ida_helpers
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
yield add_ea_int_cast(insn)
yield InstructionHandle(insn)
def extract_insn_features(self, f, bb, insn):
for (feature, ea) in capa.features.extractors.ida.insn.extract_features(f, bb, insn):

View File

@@ -5,10 +5,10 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import types
import logging
import viv_utils
import viv_utils.flirt
import capa.features.extractors
import capa.features.extractors.viv.file
@@ -18,28 +18,20 @@ import capa.features.extractors.viv.basicblock
from capa.features.extractors import FeatureExtractor
__all__ = ["file", "function", "basicblock", "insn"]
logger = logging.getLogger(__name__)
def get_va(self):
try:
# vivisect type
class InstructionHandle:
"""this acts like a vivisect.Opcode but with an __int__() method"""
def __init__(self, inner):
self._inner = inner
def __int__(self):
return self.va
except AttributeError:
pass
raise TypeError()
def add_va_int_cast(o):
"""
dynamically add a cast-to-int (`__int__`) method to the given object
that returns the value of the `.va` property.
this bit of skullduggery lets use cast viv-utils objects as ints.
the correct way of doing this is to update viv-utils (or subclass the objects here).
"""
setattr(o, "__int__", types.MethodType(get_va, o))
return o
def __getattr__(self, name):
return getattr(self._inner, name)
class VivisectFeatureExtractor(FeatureExtractor):
@@ -58,15 +50,14 @@ class VivisectFeatureExtractor(FeatureExtractor):
def get_functions(self):
for va in sorted(self.vw.getFunctions()):
yield add_va_int_cast(viv_utils.Function(self.vw, va))
yield viv_utils.Function(self.vw, va)
def extract_function_features(self, f):
for feature, va in capa.features.extractors.viv.function.extract_features(f):
yield feature, va
def get_basic_blocks(self, f):
for bb in f.basic_blocks:
yield add_va_int_cast(bb)
return f.basic_blocks
def extract_basic_block_features(self, f, bb):
for feature, va in capa.features.extractors.viv.basicblock.extract_features(f, bb):
@@ -74,8 +65,14 @@ class VivisectFeatureExtractor(FeatureExtractor):
def get_instructions(self, f, bb):
for insn in bb.instructions:
yield add_va_int_cast(insn)
yield InstructionHandle(insn)
def extract_insn_features(self, f, bb, insn):
for feature, va in capa.features.extractors.viv.insn.extract_features(f, bb, insn):
yield feature, va
def is_library_function(self, va):
return viv_utils.flirt.is_library_function(self.vw, va)
def get_function_name(self, va):
return viv_utils.get_function_name(self.vw, va)

View File

@@ -5,10 +5,14 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import viv_utils
import envi.memory
import viv_utils.flirt
import envi.archs.i386.disasm
import capa.features.extractors.viv
import capa.features.extractors.helpers
import capa.features.extractors.viv.helpers
from capa.features import (
@@ -74,7 +78,6 @@ def extract_insn_api_features(f, bb, insn):
# example:
#
# call dword [0x00473038]
if insn.mnem not in ("call", "jmp"):
return
@@ -96,7 +99,7 @@ def extract_insn_api_features(f, bb, insn):
# call via thunk on x86,
# see 9324d1a8ae37a36ae560c37448c9705a at 0x407985
#
# this is also how calls to internal functions may be decoded on x64.
# this is also how calls to internal functions may be decoded on x32 and x64.
# see Lab21-01.exe_:0x140001178
#
# follow chained thunks, e.g. in 82bf6347acf15e5d883715dc289d8a2b at 0x14005E0FF in
@@ -111,6 +114,11 @@ def extract_insn_api_features(f, bb, insn):
if not target:
return
if viv_utils.flirt.is_library_function(f.vw, target):
name = viv_utils.get_function_name(f.vw, target)
yield API(name), insn.va
return
for _ in range(THUNK_CHAIN_DEPTH_DELTA):
if target in imports:
dll, symbol = imports[target]

View File

@@ -122,7 +122,7 @@ def dumps(extractor):
)
for insnva, insn in sorted(
[(insn.__int__(), insn) for insn in extractor.get_instructions(f, bb)], key=lambda p: p[0]
[(int(insn), insn) for insn in extractor.get_instructions(f, bb)], key=lambda p: p[0]
):
ret["functions"][hex(f)][hex(bb)].append(hex(insnva))
@@ -248,43 +248,13 @@ def main(argv=None):
if argv is None:
argv = sys.argv[1:]
formats = [
("auto", "(default) detect file type automatically"),
("pe", "Windows PE file"),
("sc32", "32-bit shellcode"),
("sc64", "64-bit shellcode"),
]
format_help = ", ".join(["%s: %s" % (f[0], f[1]) for f in formats])
parser = argparse.ArgumentParser(description="save capa features to a file")
parser.add_argument("sample", type=str, help="Path to sample to analyze")
capa.main.install_common_args(parser, {"sample", "format", "backend", "signatures"})
parser.add_argument("output", type=str, help="Path to output file")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
parser.add_argument(
"-f", "--format", choices=[f[0] for f in formats], default="auto", help="Select sample format, %s" % format_help
)
parser.add_argument(
"-b",
"--backend",
type=str,
help="select the backend to use",
choices=(capa.main.BACKEND_VIV, capa.main.BACKEND_SMDA),
default=capa.main.BACKEND_VIV,
)
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
if args.quiet:
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
elif args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
extractor = capa.main.get_extractor(args.sample, args.format, args.backend)
extractor = capa.main.get_extractor(args.sample, args.format, args.backend, sigpaths=args.signatures)
with open(args.output, "wb") as f:
f.write(dump(extractor))

View File

@@ -12,18 +12,7 @@ _hex = hex
def hex(i):
return _hex(oint(i))
def oint(i):
# there seems to be some trouble with using `int(viv_utils.Function)`
# with the black magic we do with binding the `__int__()` routine.
# i haven't had a chance to debug this yet (and i have no hotel wifi).
# so in the meantime, detect this, and call the method directly.
try:
return int(i)
except TypeError:
return i.__int__()
return _hex(int(i))
def get_file_taste(sample_path):

View File

@@ -78,7 +78,7 @@ def find_func_features(f, extractor):
_bb_features[feature].add(ea)
func_features[feature].add(ea)
bb_features[capa.helpers.oint(bb)] = _bb_features
bb_features[int(bb)] = _bb_features
return func_features, bb_features
@@ -100,7 +100,7 @@ def find_func_matches(f, ruleset, func_features, bb_features):
func_features[capa.features.MatchedRule(name)].add(ea)
# find rule matches for function, function features include rule matches for basic blocks
_, matches = capa.engine.match(ruleset.function_rules, func_features, capa.helpers.oint(f))
_, matches = capa.engine.match(ruleset.function_rules, func_features, int(f))
for (name, res) in matches.items():
func_matches[name].extend(res)

View File

@@ -10,16 +10,20 @@ See the License for the specific language governing permissions and limitations
"""
import os
import sys
import gzip
import time
import hashlib
import logging
import os.path
import argparse
import datetime
import textwrap
import contextlib
import collections
import halo
import tqdm
import flirt
import colorama
import capa.rules
@@ -29,7 +33,7 @@ import capa.version
import capa.features
import capa.features.freeze
import capa.features.extractors
from capa.helpers import oint, get_file_taste
from capa.helpers import get_file_taste
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SUPPORTED_FILE_MAGIC = set([b"MZ"])
@@ -42,6 +46,14 @@ EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
logger = logging.getLogger("capa")
@contextlib.contextmanager
def timing(msg):
t0 = time.time()
yield
t1 = time.time()
logger.debug("perf: %s: %0.2fs", msg, t1 - t0)
def set_vivisect_log_level(level):
logging.getLogger("vivisect").setLevel(level)
logging.getLogger("vivisect.base").setLevel(level)
@@ -76,14 +88,14 @@ def find_function_capabilities(ruleset, extractor, f):
bb_features[feature].add(va)
function_features[feature].add(va)
_, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, oint(bb))
_, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, int(bb))
for rule_name, res in matches.items():
bb_matches[rule_name].extend(res)
for va, _ in res:
function_features[capa.features.MatchedRule(rule_name)].add(va)
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, oint(f))
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f))
return function_matches, bb_matches, len(function_features)
@@ -125,10 +137,19 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
# to disable progress completely
pbar = lambda s, *args, **kwargs: s
for f in pbar(list(extractor.get_functions()), desc="matching", unit=" functions"):
functions = list(extractor.get_functions())
for f in pbar(functions, desc="matching", unit=" functions"):
function_address = int(f)
if extractor.is_library_function(function_address):
function_name = extractor.get_function_name(function_address)
logger.debug("skipping library function 0x%x (%s)", function_address, function_name)
continue
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
meta["feature_counts"]["functions"][f.__int__()] = feature_count
logger.debug("analyzed function 0x%x and extracted %d features", f.__int__(), feature_count)
meta["feature_counts"]["functions"][function_address] = feature_count
logger.debug("analyzed function 0x%x and extracted %d features", function_address, feature_count)
for rule_name, res in function_matches.items():
all_function_matches[rule_name].extend(res)
@@ -136,7 +157,7 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
all_bb_matches[rule_name].extend(res)
# mapping from matched rule feature to set of addresses at which it matched.
# schema: Dic[MatchedRule: Set[int]
# schema: Dict[MatchedRule: Set[int]
function_features = {
capa.features.MatchedRule(rule_name): set(map(lambda p: p[0], results))
for rule_name, results in all_function_matches.items()
@@ -229,26 +250,31 @@ def is_supported_file_type(sample):
SHELLCODE_BASE = 0x690000
def get_shellcode_vw(sample, arch="auto", should_save=True):
def get_shellcode_vw(sample, arch="auto"):
"""
Return shellcode workspace using explicit arch or via auto detect
Return shellcode workspace using explicit arch or via auto detect.
The workspace is *not* analyzed nor saved. Its up to the caller to do this.
Then, they can register FLIRT analyzers or decide not to write to disk.
"""
import viv_utils
with open(sample, "rb") as f:
sample_bytes = f.read()
if arch == "auto":
# choose arch with most functions, idea by Jay G.
vw_cands = []
for arch in ["i386", "amd64"]:
vw_cands.append(
viv_utils.getShellcodeWorkspace(sample_bytes, arch, base=SHELLCODE_BASE, should_save=should_save)
viv_utils.getShellcodeWorkspace(
sample_bytes, arch, base=SHELLCODE_BASE, analyze=False, should_save=False
)
)
if not vw_cands:
raise ValueError("could not generate vivisect workspace")
vw = max(vw_cands, key=lambda vw: len(vw.getFunctions()))
else:
vw = viv_utils.getShellcodeWorkspace(sample_bytes, arch, base=SHELLCODE_BASE, should_save=should_save)
vw = viv_utils.getShellcodeWorkspace(sample_bytes, arch, base=SHELLCODE_BASE, analyze=False, should_save=False)
vw.setMeta("StorageName", "%s.viv" % sample)
@@ -266,26 +292,114 @@ def get_meta_str(vw):
return "%s, number of functions: %d" % (", ".join(meta), len(vw.getFunctions()))
def load_flirt_signature(path):
if path.endswith(".sig"):
with open(path, "rb") as f:
with timing("flirt: parsing .sig: " + path):
sigs = flirt.parse_sig(f.read())
elif path.endswith(".pat"):
with open(path, "rb") as f:
with timing("flirt: parsing .pat: " + path):
sigs = flirt.parse_pat(f.read().decode("utf-8").replace("\r\n", "\n"))
elif path.endswith(".pat.gz"):
with gzip.open(path, "rb") as f:
with timing("flirt: parsing .pat.gz: " + path):
sigs = flirt.parse_pat(f.read().decode("utf-8").replace("\r\n", "\n"))
else:
raise ValueError("unexpect signature file extension: " + path)
return sigs
def register_flirt_signature_analyzers(vw, sigpaths):
"""
args:
vw (vivisect.VivWorkspace):
sigpaths (List[str]): file system paths of .sig/.pat files
"""
import viv_utils.flirt
for sigpath in sigpaths:
sigs = load_flirt_signature(sigpath)
logger.debug("flirt: sig count: %d", len(sigs))
with timing("flirt: compiling sigs"):
matcher = flirt.compile(sigs)
analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, sigpath)
logger.debug("registering viv function analyzer: %s", repr(analyzer))
viv_utils.flirt.addFlirtFunctionAnalyzer(vw, analyzer)
def get_default_signatures():
if hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS"):
logger.debug("detected running under PyInstaller")
sigs_path = os.path.join(sys._MEIPASS, "sigs")
logger.debug("default signatures path (PyInstaller method): %s", sigs_path)
else:
logger.debug("detected running from source")
sigs_path = os.path.join(os.path.dirname(__file__), "..", "sigs")
logger.debug("default signatures path (source method): %s", sigs_path)
ret = []
for root, dirs, files in os.walk(sigs_path):
for file in files:
if not (file.endswith(".pat") or file.endswith(".pat.gz") or file.endswith(".sig")):
continue
ret.append(os.path.join(root, file))
return ret
class UnsupportedFormatError(ValueError):
pass
def get_workspace(path, format, should_save=True):
def get_workspace(path, format, sigpaths):
"""
load the program at the given path into a vivisect workspace using the given format.
also apply the given FLIRT signatures.
supported formats:
- pe
- sc32
- sc64
- auto
this creates and analyzes the workspace; however, it does *not* save the workspace.
this is the responsibility of the caller.
"""
# lazy import enables us to not require viv if user wants SMDA, for example.
import viv_utils
logger.debug("generating vivisect workspace for: %s", path)
if format == "auto":
if not is_supported_file_type(path):
raise UnsupportedFormatError()
vw = viv_utils.getWorkspace(path, should_save=should_save)
# don't analyze, so that we can add our Flirt function analyzer first.
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
elif format == "pe":
vw = viv_utils.getWorkspace(path, should_save=should_save)
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False)
elif format == "sc32":
vw = get_shellcode_vw(path, arch="i386", should_save=should_save)
# these are not analyzed nor saved.
vw = get_shellcode_vw(path, arch="i386")
elif format == "sc64":
vw = get_shellcode_vw(path, arch="amd64", should_save=should_save)
vw = get_shellcode_vw(path, arch="amd64")
else:
raise ValueError("unexpected format: " + format)
register_flirt_signature_analyzers(vw, sigpaths)
vw.analyze()
logger.debug("%s", get_meta_str(vw))
return vw
@@ -294,7 +408,7 @@ class UnsupportedRuntimeError(RuntimeError):
pass
def get_extractor(path, format, backend, disable_progress=False):
def get_extractor(path, format, backend, sigpaths, disable_progress=False):
"""
raises:
UnsupportedFormatError:
@@ -321,7 +435,7 @@ def get_extractor(path, format, backend, disable_progress=False):
format = "sc32"
elif format == "auto" and path.endswith(EXTENSIONS_SHELLCODE_64):
format = "sc64"
vw = get_workspace(path, format, should_save=False)
vw = get_workspace(path, format, sigpaths)
try:
vw.saveWorkspace()
@@ -524,6 +638,18 @@ def install_common_args(parser, wanted=None):
help="path to rule file or directory, use embedded rules by default",
)
if "signatures" in wanted:
parser.add_argument(
"--signature",
action="append",
dest="signatures",
type=str,
# with action=append, users can specify futher signatures but not override whats found in $capa/sigs/.
# seems reasonable for now. this is an easy way to register the default signature set.
default=get_default_signatures(),
help="use the given signatures to identify library functions, file system paths to .sig/.pat files.",
)
if "tag" in wanted:
parser.add_argument("-t", "--tag", type=str, help="filter on rule meta field values")
@@ -609,7 +735,7 @@ def main(argv=None):
parser = argparse.ArgumentParser(
description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
)
install_common_args(parser, {"sample", "format", "backend", "rules", "tag"})
install_common_args(parser, {"sample", "format", "backend", "signatures", "rules", "tag"})
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
args = parser.parse_args(args=argv)
handle_common_args(args)
@@ -678,7 +804,7 @@ def main(argv=None):
else:
format = args.format
try:
extractor = get_extractor(args.sample, format, args.backend, disable_progress=args.quiet)
extractor = get_extractor(args.sample, format, args.backend, args.signatures, disable_progress=args.quiet)
except UnsupportedFormatError:
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE file.")

View File

@@ -96,7 +96,7 @@ def get_capa_results(args):
rules, format, path = args
logger.info("computing capa results for: %s", path)
try:
extractor = capa.main.get_extractor(path, format, capa.main.BACKEND_VIV, disable_progress=True)
extractor = capa.main.get_extractor(path, format, capa.main.BACKEND_VIV, args.signatures, disable_progress=True)
except capa.main.UnsupportedFormatError:
# i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries.
# so instead, return an object with explicit success/failure status.
@@ -140,7 +140,7 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
capa.main.install_common_args(parser, wanted={"rules"})
capa.main.install_common_args(parser, wanted={"rules", "signatures"})
parser.add_argument("input", type=str, help="Path to directory of files to recursively analyze")
parser.add_argument(
"-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor"

View File

@@ -192,7 +192,7 @@ def render_dictionary(doc):
def capa_details(file_path, output_format="dictionary"):
# extract features and find capabilities
extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, disable_progress=True)
extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, sigpaths=[], disable_progress=True)
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
# collect metadata (used only to make rendering more complete)

View File

@@ -181,6 +181,9 @@ class ExampleFileDNE(Lint):
return not found
DEFAULT_SIGNATURES = capa.main.get_default_signatures()
class DoesntMatchExample(Lint):
name = "doesn't match on referenced example"
recommendation = "Fix the rule logic or provide a different example"
@@ -203,7 +206,9 @@ class DoesntMatchExample(Lint):
continue
try:
extractor = capa.main.get_extractor(path, "auto", capa.main.BACKEND_VIV, disable_progress=True)
extractor = capa.main.get_extractor(
path, "auto", capa.main.BACKEND_VIV, sigpaths=DEFAULT_SIGNATURES, disable_progress=True
)
capabilities, meta = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True)
except Exception as e:
logger.error("failed to extract capabilities: %s %s %s", rule.name, path, e)

View File

@@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""
Copyright (C) 2021 FireEye, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
match-function-id
Show the names of functions as recognized by the function identification subsystem.
This can help identify library functions statically linked into a program,
such as when triaging false positive matches in capa rules.
Example::
$ python scripts/match-function-id.py --signature sigs/vc6.pat.gz /tmp/suspicious.dll_
0x44cf30: ?GetPdbDll@@YAPAUHINSTANCE__@@XZ
0x44bb20: ?_strlen_priv@@YAIPBD@Z
0x44b6b0: ?invoke_main@@YAHXZ
0x44a5d0: ?find_pe_section@@YAPAU_IMAGE_SECTION_HEADER@@QAEI@Z
0x44a690: ?is_potentially_valid_image_base@@YA_NQAX@Z
0x44cbe0: ___get_entropy
0x44a4a0: __except_handler4
0x44b3d0: ?pre_cpp_initialization@@YAXXZ
0x44b2e0: ?pre_c_initialization@@YAHXZ
0x44b3c0: ?post_pgo_initialization@@YAHXZ
0x420156: ?
0x420270: ?
0x430dcd: ?
0x44d930: __except_handler4_noexcept
0x41e960: ?
0x44a1e0: @_RTC_AllocaHelper@12
0x44ba90: ?_getMemBlockDataString@@YAXPAD0PBDI@Z
0x44a220: @_RTC_CheckStackVars2@12
0x44a790: ___scrt_dllmain_after_initialize_c
0x44a7d0: ___scrt_dllmain_before_initialize_c
0x44a800: ___scrt_dllmain_crt_thread_attach
0x44a860: ___scrt_dllmain_exception_filter
0x44a900: ___scrt_dllmain_uninitialize_critical
0x44ad10: _at_quick_exit
0x44b940: ?_RTC_Failure@@YAXPAXH@Z
0x44be60: __RTC_UninitUse
0x44bfd0: __RTC_GetErrDesc
0x44c060: __RTC_SetErrorType
0x44cb60: ?
0x44cba0: __guard_icall_checks_enforced
"""
import sys
import logging
import argparse
import flirt
import viv_utils
import viv_utils.flirt
import capa.main
import capa.rules
import capa.engine
import capa.helpers
import capa.features
import capa.features.freeze
logger = logging.getLogger("capa.match-function-id")
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="FLIRT match each function")
parser.add_argument("sample", type=str, help="Path to sample to analyze")
parser.add_argument(
"-F",
"--function",
type=lambda x: int(x, 0x10),
help="match a specific function by VA, rather than add functions",
)
parser.add_argument(
"--signature",
action="append",
dest="signatures",
type=str,
default=[],
help="use the given signatures to identify library functions, file system paths to .sig/.pat files.",
)
parser.add_argument("-d", "--debug", action="store_true", help="Enable debugging output on STDERR")
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
args = parser.parse_args(args=argv)
if args.quiet:
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# disable vivisect-related logging, it's verbose and not relevant for capa users
capa.main.set_vivisect_log_level(logging.CRITICAL)
analyzers = []
for sigpath in args.signatures:
sigs = capa.main.load_flirt_signature(sigpath)
with capa.main.timing("flirt: compiling sigs"):
matcher = flirt.compile(sigs)
analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, sigpath)
logger.debug("registering viv function analyzer: %s", repr(analyzer))
analyzers.append(analyzer)
vw = viv_utils.getWorkspace(args.sample, analyze=True, should_save=False)
functions = vw.getFunctions()
if args.function:
functions = [args.function]
for function in functions:
logger.debug("matching function: 0x%04x", function)
for analyzer in analyzers:
name = viv_utils.flirt.match_function_flirt_signatures(analyzer.matcher, vw, function)
if name:
print("0x%04x: %s" % (function, name))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -111,7 +111,7 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
capa.main.install_common_args(parser, wanted={"format", "sample", "rules", "tag"})
capa.main.install_common_args(parser, wanted={"format", "backend", "sample", "signatures", "rules", "tag"})
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
@@ -152,8 +152,9 @@ def main(argv=None):
extractor = capa.features.freeze.load(f.read())
else:
format = args.format
try:
extractor = capa.main.get_extractor(args.sample, args.format)
extractor = capa.main.get_extractor(args.sample, args.format, args.backend, args.signatures)
except capa.main.UnsupportedFormatError:
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE file.")

View File

@@ -83,7 +83,7 @@ def main(argv=None):
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Show the features that capa extracts from the given sample")
capa.main.install_common_args(parser, wanted={"format", "sample"})
capa.main.install_common_args(parser, wanted={"format", "sample", "signatures"})
parser.add_argument("-F", "--function", type=lambda x: int(x, 0x10), help="Show features for specific function")
args = parser.parse_args(args=argv)
@@ -100,7 +100,9 @@ def main(argv=None):
extractor = capa.features.freeze.load(f.read())
else:
try:
extractor = capa.main.get_extractor(args.sample, args.format, capa.main.BACKEND_VIV)
extractor = capa.main.get_extractor(
args.sample, args.format, capa.main.BACKEND_VIV, sigpaths=args.signatures
)
except capa.main.UnsupportedFormatError:
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE file.")
@@ -135,9 +137,9 @@ def main(argv=None):
if args.format == "freeze":
functions = tuple(filter(lambda f: f == args.function, functions))
else:
functions = tuple(filter(lambda f: capa.helpers.oint(f) == args.function, functions))
functions = tuple(filter(lambda f: int(f) == args.function, functions))
if args.function not in [capa.helpers.oint(f) for f in functions]:
if args.function not in [int(f) for f in functions]:
print("0x%X not a function" % args.function)
return -1

View File

@@ -7,7 +7,6 @@
# See the License for the specific language governing permissions and limitations under the License.
import os
import sys
import setuptools
@@ -19,7 +18,7 @@ requirements = [
"termcolor==1.1.0",
"wcwidth==0.2.5",
"ida-settings==2.1.0",
"viv-utils==0.6.0",
"viv-utils[flirt]==0.6.2",
"halo==0.0.31",
"networkx==2.5.1",
"ruamel.yaml==0.17.4",

9
sigs/README.md Normal file
View File

@@ -0,0 +1,9 @@
# capa/sigs
This directory contains FLIRT signatures that capa uses to identify library functions.
Typically, capa will ignore library functions, which reduces false positives and improves runtime.
These FLIRT signatures were generated by FireEye using the Hex-Rays FLAIR tools such as `pcf` and `sigmake`.
FireEye generated the signatures from source data that they collected; these signatures are not derived from the FLIRT signatures distributed with IDA PRo.
The signatures in this directory have the same license as capa: Apache 2.0.

BIN
sigs/flare_common_libs.sig Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

11
sigs/test_aulldiv.pat Normal file
View File

@@ -0,0 +1,11 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
53568b4424180bc075188b4c24148b44241033d2f7f18bd88b44240cf7f18bd3 48 d0a0 0068 :0000 __aulldiv
558bec83e4e083ec60c5fe7f4c2420c5fe7f0424ff742424ff742424ff74240c 08 6e24 0095 :0000 ___avx_u64div4@@64 ^0028 __aulldiv
558bec83e4f083ec30660f6fd1660f73da04660f6fd8660f7ed0660f73db04f3 1b ed6c 0080 :0000 ___sse2_u64div2@@32 ^003b __aulldiv
---

BIN
sigs/test_aullrem.pat.gz Normal file

Binary file not shown.

View File

@@ -72,12 +72,15 @@ def xfail(condition, reason=None):
def get_viv_extractor(path):
import capa.features.extractors.viv
aulldiv_pat = os.path.join(CD, "..", "sigs", "test_aulldiv.pat")
aullrem_pat = os.path.join(CD, "..", "sigs", "test_aullrem.pat.gz")
if "raw32" in path:
vw = capa.main.get_workspace(path, "sc32", should_save=False)
vw = capa.main.get_workspace(path, "sc32", sigpaths=[aulldiv_pat, aullrem_pat])
elif "raw64" in path:
vw = capa.main.get_workspace(path, "sc64", should_save=False)
vw = capa.main.get_workspace(path, "sc64", sigpaths=[aulldiv_pat, aullrem_pat])
else:
vw = capa.main.get_workspace(path, "auto", should_save=True)
vw = capa.main.get_workspace(path, "auto", sigpaths=[aulldiv_pat, aullrem_pat])
extractor = capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
fixup_viv(path, extractor)
return extractor
@@ -241,14 +244,14 @@ def sample(request):
def get_function(extractor, fva):
for f in extractor.get_functions():
if f.__int__() == fva:
if int(f) == fva:
return f
raise ValueError("function not found")
def get_basic_block(extractor, f, va):
for bb in extractor.get_basic_blocks(f):
if bb.__int__() == va:
if int(bb) == va:
return bb
raise ValueError("basic block not found")

View File

@@ -115,30 +115,23 @@ def compare_extractors_viv_null(viv_ext, null_ext):
null_ext (capa.features.extractors.NullFeatureExtractor)
"""
assert list(viv_ext.extract_file_features()) == list(null_ext.extract_file_features())
assert list(map(to_int, viv_ext.get_functions())) == list(null_ext.get_functions())
assert list(map(int, viv_ext.get_functions())) == list(null_ext.get_functions())
for f in viv_ext.get_functions():
assert list(map(to_int, viv_ext.get_basic_blocks(f))) == list(null_ext.get_basic_blocks(to_int(f)))
assert list(viv_ext.extract_function_features(f)) == list(null_ext.extract_function_features(to_int(f)))
assert list(map(int, viv_ext.get_basic_blocks(f))) == list(null_ext.get_basic_blocks(int(f)))
assert list(viv_ext.extract_function_features(f)) == list(null_ext.extract_function_features(int(f)))
for bb in viv_ext.get_basic_blocks(f):
assert list(map(to_int, viv_ext.get_instructions(f, bb))) == list(
null_ext.get_instructions(to_int(f), to_int(bb))
)
assert list(map(int, viv_ext.get_instructions(f, bb))) == list(null_ext.get_instructions(int(f), int(bb)))
assert list(viv_ext.extract_basic_block_features(f, bb)) == list(
null_ext.extract_basic_block_features(to_int(f), to_int(bb))
null_ext.extract_basic_block_features(int(f), int(bb))
)
for insn in viv_ext.get_instructions(f, bb):
assert list(viv_ext.extract_insn_features(f, bb, insn)) == list(
null_ext.extract_insn_features(to_int(f), to_int(bb), to_int(insn))
null_ext.extract_insn_features(int(f), int(bb), int(insn))
)
def to_int(o):
"""helper to get int value of extractor items"""
return capa.helpers.oint(o)
def test_freeze_s_roundtrip():
load = capa.features.freeze.loads
dump = capa.features.freeze.dumps

27
tests/test_function_id.py Normal file
View File

@@ -0,0 +1,27 @@
from fixtures import get_function, pma16_01_extractor, extract_function_features
import capa.features.insn
def test_function_id_simple_match(pma16_01_extractor):
assert pma16_01_extractor.is_library_function(0x407490) == True
assert pma16_01_extractor.get_function_name(0x407490) == "__aulldiv"
def test_function_id_gz_pat(pma16_01_extractor):
# aullrem is stored in `test_aullrem.pat.gz`
assert pma16_01_extractor.is_library_function(0x407500) == True
assert pma16_01_extractor.get_function_name(0x407500) == "__aullrem"
def test_function_id_complex_match(pma16_01_extractor):
# 0x405714 is __spawnlp which requires recursive match of __spawnvp at 0x407FAB
# (and __spawnvpe at 0x409DE8)
assert pma16_01_extractor.is_library_function(0x405714) == True
assert pma16_01_extractor.get_function_name(0x405714) == "__spawnlp"
def test_function_id_api_feature(pma16_01_extractor):
f = get_function(pma16_01_extractor, 0x404548)
features = extract_function_features(pma16_01_extractor, f)
assert capa.features.insn.API("__aulldiv") in features