mirror of
https://github.com/mandiant/capa.git
synced 2026-02-04 19:12:01 -08:00
library detection: rough integration of algorithms
This commit is contained in:
@@ -91,6 +91,19 @@ def configure_logging(args):
|
||||
logging.getLogger("viv_utils").setLevel(logging.ERROR)
|
||||
|
||||
|
||||
def get_flirt_matches(lib_only=True):
|
||||
for ea in idautils.Functions(start=None, end=None):
|
||||
f = idaapi.get_func(ea)
|
||||
is_thunk = bool(f.flags & idaapi.FUNC_THUNK)
|
||||
is_lib = bool(f.flags & idaapi.FUNC_LIB)
|
||||
fname = idaapi.get_func_name(ea)
|
||||
|
||||
if lib_only and not is_lib:
|
||||
continue
|
||||
|
||||
yield FunctionId(address=ea, is_library=is_lib, is_thunk=is_thunk, name=fname)
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
@@ -134,23 +147,9 @@ def main(argv=None):
|
||||
table.add_column("thunk?")
|
||||
table.add_column("name")
|
||||
|
||||
LIBONLY = True
|
||||
count = 0
|
||||
|
||||
for ea in idautils.Functions(start=None, end=None):
|
||||
f = idaapi.get_func(ea)
|
||||
is_thunk = bool(f.flags & idaapi.FUNC_THUNK)
|
||||
is_lib = bool(f.flags & idaapi.FUNC_LIB)
|
||||
fname = idaapi.get_func_name(ea)
|
||||
|
||||
if LIBONLY and not is_lib:
|
||||
continue
|
||||
|
||||
fid = FunctionId(address=ea, is_library=is_lib, is_thunk=is_thunk, name=fname)
|
||||
for i, fid in enumerate(get_flirt_matches()):
|
||||
table.add_row(*fid.to_row())
|
||||
|
||||
count += 1
|
||||
if count > 50:
|
||||
if i > 50:
|
||||
break
|
||||
|
||||
rich.print(table)
|
||||
|
||||
@@ -5,4 +5,133 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import io
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
import argparse
|
||||
import tempfile
|
||||
import contextlib
|
||||
from pathlib import Path
|
||||
|
||||
import rich
|
||||
from rich.text import Text
|
||||
from rich.console import Console
|
||||
|
||||
import capa.main
|
||||
import capa.helpers
|
||||
import capa.analysis.flirt
|
||||
import capa.analysis.strings
|
||||
import capa.features.extractors.ida.idalib as idalib
|
||||
|
||||
if not idalib.has_idalib():
|
||||
raise RuntimeError("cannot find IDA idalib module.")
|
||||
|
||||
if not idalib.load_idalib():
|
||||
raise RuntimeError("failed to load IDA idalib module.")
|
||||
|
||||
import idapro
|
||||
import ida_auto
|
||||
import ida_funcs
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def ida_session(input_path: Path):
|
||||
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
||||
|
||||
# stderr=True is used here to redirect the spinner banner to stderr, so that users can redirect capa's output.
|
||||
console = Console(stderr=True, quiet=False)
|
||||
|
||||
try:
|
||||
t.write_bytes(input_path.read_bytes())
|
||||
|
||||
# idalib writes to stdout (ugh), so we have to capture that
|
||||
# so as not to screw up structured output.
|
||||
with capa.helpers.stdout_redirector(io.BytesIO()):
|
||||
idapro.enable_console_messages(False)
|
||||
with console.status("analyzing program...", spinner="dots"):
|
||||
if idapro.open_database(str(t.absolute()), run_auto_analysis=True):
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
|
||||
logger.debug("idalib: waiting for analysis...")
|
||||
ida_auto.auto_wait()
|
||||
logger.debug("idalib: opened database.")
|
||||
|
||||
yield
|
||||
finally:
|
||||
idapro.close_database()
|
||||
t.unlink()
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Identify library functions using various strategies.")
|
||||
capa.main.install_common_args(parser, wanted={"input_file"})
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
N = 8
|
||||
time0 = time.time()
|
||||
|
||||
with ida_session(args.input_file):
|
||||
# TODO: add more signature (files)
|
||||
# TOOD: apply more signatures
|
||||
|
||||
table = rich.table.Table()
|
||||
table.add_column("FVA")
|
||||
table.add_column("library?")
|
||||
table.add_column("thunk?")
|
||||
table.add_column("name")
|
||||
|
||||
for fid in capa.analysis.flirt.get_flirt_matches(lib_only=False):
|
||||
table.add_row(*fid.to_row())
|
||||
|
||||
rich.print(table)
|
||||
|
||||
# TODO can we include which signature matched per function?
|
||||
for index in range(0, ida_funcs.get_idasgn_qty()):
|
||||
signame, optlibs, nmatches = ida_funcs.get_idasgn_desc_with_matches(index)
|
||||
rich.print(signame, optlibs, nmatches)
|
||||
|
||||
min, sec = divmod(time.time() - time0, 60)
|
||||
logger.debug("FLIRT-based library identification ran for ~ %02d:%02dm", min, sec)
|
||||
|
||||
dbs = capa.analysis.strings.get_default_databases()
|
||||
capa.analysis.strings.prune_databases(dbs, n=N)
|
||||
|
||||
console = rich.get_console()
|
||||
for function, strings in sorted(capa.analysis.strings.get_function_strings().items()):
|
||||
|
||||
matched_strings = set()
|
||||
for string in strings:
|
||||
for db in dbs:
|
||||
if string in db.metadata_by_string:
|
||||
matched_strings.add(string)
|
||||
|
||||
if matched_strings:
|
||||
name = ida_funcs.get_func_name(function)
|
||||
|
||||
console.print(f" [b]{name}[/]@{function:08x}:")
|
||||
|
||||
for string in matched_strings:
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
location = Text(
|
||||
f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}",
|
||||
style="grey37",
|
||||
)
|
||||
console.print(" - ", location, ": ", string.rstrip())
|
||||
|
||||
# TODO: ensure there aren't conflicts among the matches
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
||||
@@ -175,3 +175,34 @@ def prune_databases(dbs: list[LibraryStringDatabase], n=8):
|
||||
for string in to_remove:
|
||||
if string in db.metadata_by_string:
|
||||
del db.metadata_by_string[string]
|
||||
|
||||
|
||||
def get_function_strings():
|
||||
import idaapi
|
||||
import idautils
|
||||
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
strings_by_function = collections.defaultdict(set)
|
||||
for ea in idautils.Functions():
|
||||
f = idaapi.get_func(ea)
|
||||
|
||||
# ignore library functions and thunk functions as identified by IDA
|
||||
if f.flags & idaapi.FUNC_THUNK:
|
||||
continue
|
||||
if f.flags & idaapi.FUNC_LIB:
|
||||
continue
|
||||
|
||||
for bb in ida_helpers.get_function_blocks(f):
|
||||
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref == insn.ea:
|
||||
continue
|
||||
|
||||
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
||||
if not string:
|
||||
continue
|
||||
|
||||
strings_by_function[ea].add(string)
|
||||
|
||||
return strings_by_function
|
||||
|
||||
@@ -15,6 +15,7 @@ from rich.text import Text
|
||||
|
||||
import capa.analysis.strings
|
||||
import capa.features.extractors.strings
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -78,8 +79,6 @@ def main():
|
||||
import idautils
|
||||
import ida_funcs
|
||||
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
strings_by_function = collections.defaultdict(set)
|
||||
for ea in idautils.Functions():
|
||||
f = idaapi.get_func(ea)
|
||||
|
||||
Reference in New Issue
Block a user