mirror of
https://github.com/mandiant/capa.git
synced 2025-12-21 23:00:29 -08:00
extractors: viv: match flirt signatures [wip]
This commit is contained in:
@@ -7,8 +7,10 @@
|
|||||||
# See the License for the specific language governing permissions and limitations under the License.
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
import types
|
import types
|
||||||
|
import logging
|
||||||
|
|
||||||
import viv_utils
|
import viv_utils
|
||||||
|
import vivisect.const
|
||||||
|
|
||||||
import capa.features.extractors
|
import capa.features.extractors
|
||||||
import capa.features.extractors.viv.file
|
import capa.features.extractors.viv.file
|
||||||
@@ -18,6 +20,7 @@ import capa.features.extractors.viv.basicblock
|
|||||||
from capa.features.extractors import FeatureExtractor
|
from capa.features.extractors import FeatureExtractor
|
||||||
|
|
||||||
__all__ = ["file", "function", "basicblock", "insn"]
|
__all__ = ["file", "function", "basicblock", "insn"]
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def get_va(self):
|
def get_va(self):
|
||||||
@@ -42,12 +45,60 @@ def add_va_int_cast(o):
|
|||||||
return o
|
return o
|
||||||
|
|
||||||
|
|
||||||
|
import time
|
||||||
|
import contextlib
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def timing(msg):
|
||||||
|
t0 = time.time()
|
||||||
|
yield
|
||||||
|
t1 = time.time()
|
||||||
|
logger.debug("perf: %s: %0.2fs", msg, t1 - t0)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class VivisectFeatureExtractor(FeatureExtractor):
|
class VivisectFeatureExtractor(FeatureExtractor):
|
||||||
def __init__(self, vw, path):
|
def __init__(self, vw, path):
|
||||||
super(VivisectFeatureExtractor, self).__init__()
|
super(VivisectFeatureExtractor, self).__init__()
|
||||||
self.vw = vw
|
self.vw = vw
|
||||||
self.path = path
|
self.path = path
|
||||||
|
|
||||||
|
import flirt
|
||||||
|
|
||||||
|
# vc32rtf.sig:
|
||||||
|
# 60,195 total signatures
|
||||||
|
# parsing sig: 0.13s
|
||||||
|
# compiling sigs: 1.18s
|
||||||
|
#
|
||||||
|
# libcmt_15_msvc_x86
|
||||||
|
# 396 total signatures
|
||||||
|
# parsing pat: 0.09s
|
||||||
|
# parsing sigs: 0.01s
|
||||||
|
|
||||||
|
#sigfile = ""
|
||||||
|
sigfile = "vc32rtf.sig"
|
||||||
|
#sigfile = "libcmt_15_msvc_x86.pat"
|
||||||
|
|
||||||
|
if sigfile.endswith(".sig"):
|
||||||
|
with open(sigfile, "rb") as f:
|
||||||
|
with timing("flirt: parsing .sig: " + sigfile):
|
||||||
|
sigs = flirt.parse_sig(f.read())
|
||||||
|
elif sigfile.endswith(".pat"):
|
||||||
|
with open(sigfile, "rb") as f:
|
||||||
|
with timing("flirt: parsing .pat: " + sigfile):
|
||||||
|
sigs = flirt.parse_pat(f.read().decode("utf-8"))
|
||||||
|
else:
|
||||||
|
sigs = []
|
||||||
|
|
||||||
|
logger.debug("flirt: sig count: %d", len(sigs))
|
||||||
|
|
||||||
|
with timing("flirt: compiling sigs"):
|
||||||
|
matcher = flirt.compile(sigs)
|
||||||
|
|
||||||
|
with timing("flirt: matching sigs"):
|
||||||
|
match_vw_flirt_signatures(matcher, vw)
|
||||||
|
|
||||||
def get_base_address(self):
|
def get_base_address(self):
|
||||||
# assume there is only one file loaded into the vw
|
# assume there is only one file loaded into the vw
|
||||||
return list(self.vw.filemeta.values())[0]["imagebase"]
|
return list(self.vw.filemeta.values())[0]["imagebase"]
|
||||||
@@ -79,3 +130,126 @@ class VivisectFeatureExtractor(FeatureExtractor):
|
|||||||
def extract_insn_features(self, f, bb, insn):
|
def extract_insn_features(self, f, bb, insn):
|
||||||
for feature, va in capa.features.extractors.viv.insn.extract_features(f, bb, insn):
|
for feature, va in capa.features.extractors.viv.insn.extract_features(f, bb, insn):
|
||||||
yield feature, va
|
yield feature, va
|
||||||
|
|
||||||
|
def is_library_function(self, va):
|
||||||
|
return self.vw.funcmeta.get(va, {}).get("capa/library", False)
|
||||||
|
|
||||||
|
def get_function_name(self, va):
|
||||||
|
return viv_utils.get_function_name(self.vw, va)
|
||||||
|
|
||||||
|
|
||||||
|
def add_function_flirt_match(vw, va, name):
|
||||||
|
fmeta = vw.funcmeta.get(va, {})
|
||||||
|
fmeta["capa/library"] = True
|
||||||
|
viv_utils.set_function_name(vw, va, name)
|
||||||
|
|
||||||
|
|
||||||
|
def get_match_name(match):
|
||||||
|
for (name, type_, offset) in match.names:
|
||||||
|
if offset == 0:
|
||||||
|
return name
|
||||||
|
raise ValueError("flirt: match: no best name: %s", match.names)
|
||||||
|
|
||||||
|
|
||||||
|
def match_function_flirt_signatures(matcher, vw, va):
|
||||||
|
if va == 0x403970:
|
||||||
|
add_function_flirt_match(vw, va, "__alloca_probe")
|
||||||
|
return
|
||||||
|
|
||||||
|
if vw.funcmeta.get(va, {}).get("capa/library", False):
|
||||||
|
# already matched here.
|
||||||
|
# this might be the case if recursive matching visited this address.
|
||||||
|
return viv_utils.get_function_name(vw, va)
|
||||||
|
|
||||||
|
# TODO: fix reads at the end of a section.
|
||||||
|
# TODO: pick the right size to read here.
|
||||||
|
buf = vw.readMemory(va, 0x200)
|
||||||
|
matches = matcher.match(buf)
|
||||||
|
|
||||||
|
matches = []
|
||||||
|
for match in matcher.match(buf):
|
||||||
|
references = list(filter(lambda n: n[1] == "reference", match.names))
|
||||||
|
|
||||||
|
if not references:
|
||||||
|
matches.append(match)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# flirt uses reference names to assert that
|
||||||
|
# the function contains a reference to another function with a given name.
|
||||||
|
#
|
||||||
|
# we need to loop through these references,
|
||||||
|
# potentially recursively FLIRT match,
|
||||||
|
# and check the name matches (or doesn't).
|
||||||
|
|
||||||
|
# at the end of the following loop,
|
||||||
|
# if this flag is still true,
|
||||||
|
# then all the references have been validated.
|
||||||
|
does_match_references = True
|
||||||
|
|
||||||
|
#logger.debug("flirt: references needed for name %s for function at 0x%x: %s", get_match_name(match), va, references)
|
||||||
|
|
||||||
|
# when a reference is used to differentiate rule matches,
|
||||||
|
# then we can expect multiple rules to query the name of the same address.
|
||||||
|
# so, this caches the names looked up in the below loop.
|
||||||
|
# type: Map[int, str]
|
||||||
|
local_names = {}
|
||||||
|
for (ref_name, _, ref_offset) in references:
|
||||||
|
ref_va = va + ref_offset
|
||||||
|
|
||||||
|
# the reference offset may be inside an instruction,
|
||||||
|
# so we use getLocation to select the containing instruction address.
|
||||||
|
loc_va = vw.getLocation(ref_va)[vivisect.const.L_VA]
|
||||||
|
|
||||||
|
# an instruction may have multiple xrefs from
|
||||||
|
# so we loop through all code references,
|
||||||
|
# searching for that name.
|
||||||
|
#
|
||||||
|
# TODO: if we assume there is a single code reference, this is a lot easier.
|
||||||
|
# can we do that with FLIRT?
|
||||||
|
#
|
||||||
|
# if the name is found, then this flag will be set.
|
||||||
|
does_match_the_reference = False
|
||||||
|
for xref in vw.getXrefsFrom(loc_va):
|
||||||
|
if xref[vivisect.const.XR_RTYPE] != vivisect.const.REF_CODE:
|
||||||
|
continue
|
||||||
|
|
||||||
|
target = xref[vivisect.const.XR_TO]
|
||||||
|
if target in local_names:
|
||||||
|
# fast path: a prior loop already looked up this address.
|
||||||
|
found_name = local_names[target]
|
||||||
|
else:
|
||||||
|
# this is a bit slower, since we have to read buf, do match, etc.
|
||||||
|
# note that we don't ever save "this is not a library function",
|
||||||
|
# so there's not an easy way to short circuit at the start of this function.
|
||||||
|
found_name = match_function_flirt_signatures(matcher, vw, target)
|
||||||
|
local_names[target] = found_name
|
||||||
|
|
||||||
|
#logger.debug("flirt: reference: 0x%x: 0x%x: wanted: %s found: %s", loc_va, target, ref_name, found_name)
|
||||||
|
|
||||||
|
if found_name == ref_name:
|
||||||
|
does_match_the_reference = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not does_match_the_reference:
|
||||||
|
does_match_references = False
|
||||||
|
break
|
||||||
|
|
||||||
|
if does_match_references:
|
||||||
|
# only if all references pass do we count it.
|
||||||
|
matches.append(match)
|
||||||
|
|
||||||
|
if matches:
|
||||||
|
names = list(set(map(get_match_name, matches)))
|
||||||
|
if len(names) == 1:
|
||||||
|
name = names[0]
|
||||||
|
add_function_flirt_match(vw, va, name)
|
||||||
|
logger.debug("flirt: found library function: 0x%x: %s", va, name)
|
||||||
|
return name
|
||||||
|
else:
|
||||||
|
logger.warning("flirt: conflicting names: 0x%x: %s", va, names)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def match_vw_flirt_signatures(matcher, vw):
|
||||||
|
for va in sorted(vw.getFunctions()):
|
||||||
|
match_function_flirt_signatures(matcher, vw, va)
|
||||||
@@ -6,6 +6,7 @@
|
|||||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and limitations under the License.
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
import viv_utils
|
||||||
import envi.memory
|
import envi.memory
|
||||||
import envi.archs.i386.disasm
|
import envi.archs.i386.disasm
|
||||||
|
|
||||||
@@ -74,7 +75,6 @@ def extract_insn_api_features(f, bb, insn):
|
|||||||
# example:
|
# example:
|
||||||
#
|
#
|
||||||
# call dword [0x00473038]
|
# call dword [0x00473038]
|
||||||
|
|
||||||
if insn.mnem not in ("call", "jmp"):
|
if insn.mnem not in ("call", "jmp"):
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -96,7 +96,7 @@ def extract_insn_api_features(f, bb, insn):
|
|||||||
# call via thunk on x86,
|
# call via thunk on x86,
|
||||||
# see 9324d1a8ae37a36ae560c37448c9705a at 0x407985
|
# see 9324d1a8ae37a36ae560c37448c9705a at 0x407985
|
||||||
#
|
#
|
||||||
# this is also how calls to internal functions may be decoded on x64.
|
# this is also how calls to internal functions may be decoded on x32 and x64.
|
||||||
# see Lab21-01.exe_:0x140001178
|
# see Lab21-01.exe_:0x140001178
|
||||||
#
|
#
|
||||||
# follow chained thunks, e.g. in 82bf6347acf15e5d883715dc289d8a2b at 0x14005E0FF in
|
# follow chained thunks, e.g. in 82bf6347acf15e5d883715dc289d8a2b at 0x14005E0FF in
|
||||||
@@ -111,6 +111,11 @@ def extract_insn_api_features(f, bb, insn):
|
|||||||
if not target:
|
if not target:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if f.vw.funcmeta.get(target, {}).get("capa/library"):
|
||||||
|
name = viv_utils.get_function_name(f.vw, target)
|
||||||
|
yield API(name), insn.va
|
||||||
|
return
|
||||||
|
|
||||||
for _ in range(THUNK_CHAIN_DEPTH_DELTA):
|
for _ in range(THUNK_CHAIN_DEPTH_DELTA):
|
||||||
if target in imports:
|
if target in imports:
|
||||||
dll, symbol = imports[target]
|
dll, symbol = imports[target]
|
||||||
|
|||||||
25
tests/test_function_id.py
Normal file
25
tests/test_function_id.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
import capa.features.insn
|
||||||
|
|
||||||
|
from fixtures import pma16_01_extractor, get_function, extract_function_features
|
||||||
|
|
||||||
|
|
||||||
|
def test_function_id_alloca_probe(pma16_01_extractor):
|
||||||
|
assert pma16_01_extractor.is_library_function(0x403970) == True
|
||||||
|
assert pma16_01_extractor.get_function_name(0x403970) == "__alloca_probe"
|
||||||
|
|
||||||
|
|
||||||
|
def test_function_id_spawnlp(pma16_01_extractor):
|
||||||
|
# 0x405714 is __spawnlp which requires recursive match of __spawnvp at 0x407FAB
|
||||||
|
# (and __spawnvpe at 0x409DE8)
|
||||||
|
assert pma16_01_extractor.is_library_function(0x405714) == True
|
||||||
|
assert pma16_01_extractor.get_function_name(0x405714) == "__spawnlp"
|
||||||
|
|
||||||
|
|
||||||
|
def test_function_id_api_feature(pma16_01_extractor):
|
||||||
|
f = get_function(pma16_01_extractor, 0x4011D0)
|
||||||
|
features = extract_function_features(pma16_01_extractor, f)
|
||||||
|
|
||||||
|
for feature in features.keys():
|
||||||
|
print(feature)
|
||||||
|
|
||||||
|
assert capa.features.insn.API("__alloca_probe") in features
|
||||||
Reference in New Issue
Block a user