mirror of
https://github.com/mandiant/capa.git
synced 2025-12-06 04:41:00 -08:00
Compare commits
19 Commits
18923601c7
...
library-de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c58a616c1 | ||
|
|
1b72c81df1 | ||
|
|
deed98b87e | ||
|
|
2db0cc457f | ||
|
|
3cad8d12af | ||
|
|
5be96d7ddc | ||
|
|
a3b6aef67f | ||
|
|
077fa2e7e1 | ||
|
|
c3b8e7c638 | ||
|
|
4346922b9a | ||
|
|
d652192af1 | ||
|
|
d83750c901 | ||
|
|
8394b81841 | ||
|
|
febda7d0e2 | ||
|
|
f9abb5e83f | ||
|
|
f69602d085 | ||
|
|
ad187fc3bd | ||
|
|
637926e0b6 | ||
|
|
03ce40e781 |
@@ -108,6 +108,7 @@ repos:
|
||||
- "--check-untyped-defs"
|
||||
- "--ignore-missing-imports"
|
||||
- "--config-file=.github/mypy/mypy.ini"
|
||||
- "--enable-incomplete-feature=NewGenericSyntax"
|
||||
- "capa/"
|
||||
- "scripts/"
|
||||
- "tests/"
|
||||
|
||||
0
capa/analysis/__init__.py
Normal file
0
capa/analysis/__init__.py
Normal file
38
capa/analysis/flirt.py
Normal file
38
capa/analysis/flirt.py
Normal file
@@ -0,0 +1,38 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
import capa.features.extractors.ida.idalib as idalib
|
||||
|
||||
if not idalib.has_idalib():
|
||||
raise RuntimeError("cannot find IDA idalib module.")
|
||||
|
||||
if not idalib.load_idalib():
|
||||
raise RuntimeError("failed to load IDA idalib module.")
|
||||
|
||||
import idaapi
|
||||
import idautils
|
||||
|
||||
|
||||
class FunctionId(BaseModel):
|
||||
va: int
|
||||
is_library: bool
|
||||
name: str
|
||||
|
||||
|
||||
def get_flirt_matches(lib_only=True):
|
||||
for fva in idautils.Functions():
|
||||
f = idaapi.get_func(fva)
|
||||
is_lib = bool(f.flags & idaapi.FUNC_LIB)
|
||||
fname = idaapi.get_func_name(fva)
|
||||
|
||||
if lib_only and not is_lib:
|
||||
continue
|
||||
|
||||
yield FunctionId(va=fva, is_library=is_lib, name=fname)
|
||||
335
capa/analysis/libraries.py
Normal file
335
capa/analysis/libraries.py
Normal file
@@ -0,0 +1,335 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import io
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
import tempfile
|
||||
import contextlib
|
||||
import collections
|
||||
from enum import Enum
|
||||
from typing import List, Iterable, Optional
|
||||
from pathlib import Path
|
||||
|
||||
import rich
|
||||
from pydantic import BaseModel
|
||||
from rich.text import Text
|
||||
from rich.console import Console
|
||||
|
||||
import capa.main
|
||||
import capa.helpers
|
||||
import capa.analysis.flirt
|
||||
import capa.analysis.strings
|
||||
import capa.features.extractors.ida.idalib as idalib
|
||||
|
||||
if not idalib.has_idalib():
|
||||
raise RuntimeError("cannot find IDA idalib module.")
|
||||
|
||||
if not idalib.load_idalib():
|
||||
raise RuntimeError("failed to load IDA idalib module.")
|
||||
|
||||
import idaapi
|
||||
import idapro
|
||||
import ida_auto
|
||||
import idautils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Classification(str, Enum):
|
||||
USER = "user"
|
||||
LIBRARY = "library"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
class Method(str, Enum):
|
||||
FLIRT = "flirt"
|
||||
STRINGS = "strings"
|
||||
THUNK = "thunk"
|
||||
ENTRYPOINT = "entrypoint"
|
||||
CALLGRAPH = "callgraph"
|
||||
|
||||
|
||||
class FunctionClassification(BaseModel):
|
||||
va: int
|
||||
classification: Classification
|
||||
# name per the disassembler/analysis tool
|
||||
# may be combined with the recovered/suspected name TODO below
|
||||
name: str
|
||||
|
||||
# if is library, this must be provided
|
||||
method: Optional[Method]
|
||||
|
||||
# TODO if is library, recovered/suspected name?
|
||||
|
||||
# if is library, these can optionally be provided.
|
||||
library_name: Optional[str] = None
|
||||
library_version: Optional[str] = None
|
||||
|
||||
# additional note on the classification, TODO removeme if not useful beyond dev/debug
|
||||
note: Optional[str] = None
|
||||
|
||||
|
||||
class BinaryLayout(BaseModel):
|
||||
va: int
|
||||
# size of the function chunks in bytes
|
||||
size: int
|
||||
|
||||
|
||||
class FunctionIdResults(BaseModel):
|
||||
function_classifications: List[FunctionClassification]
|
||||
layout: List[BinaryLayout]
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def ida_session(input_path: Path, use_temp_dir=True):
|
||||
if use_temp_dir:
|
||||
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
||||
else:
|
||||
t = input_path
|
||||
|
||||
logger.debug("using %s", str(t))
|
||||
# stderr=True is used here to redirect the spinner banner to stderr,
|
||||
# so that users can redirect capa's output.
|
||||
console = Console(stderr=True, quiet=False)
|
||||
|
||||
try:
|
||||
if use_temp_dir:
|
||||
t.write_bytes(input_path.read_bytes())
|
||||
|
||||
# idalib writes to stdout (ugh), so we have to capture that
|
||||
# so as not to screw up structured output.
|
||||
with capa.helpers.stdout_redirector(io.BytesIO()):
|
||||
idapro.enable_console_messages(False)
|
||||
with capa.main.timing("analyze program"):
|
||||
with console.status("analyzing program...", spinner="dots"):
|
||||
if idapro.open_database(str(t.absolute()), run_auto_analysis=True):
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
|
||||
logger.debug("idalib: waiting for analysis...")
|
||||
ida_auto.auto_wait()
|
||||
logger.debug("idalib: opened database.")
|
||||
|
||||
yield
|
||||
finally:
|
||||
idapro.close_database()
|
||||
if use_temp_dir:
|
||||
t.unlink()
|
||||
|
||||
|
||||
def get_library_called_functions(
|
||||
function_classifications: list[FunctionClassification],
|
||||
) -> Iterable[FunctionClassification]:
|
||||
MAX_PASSES = 10
|
||||
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
|
||||
for n in range(MAX_PASSES):
|
||||
found_new_lib_func = False
|
||||
|
||||
for fva in idautils.Functions():
|
||||
if classifications_by_va.get(fva):
|
||||
# already classified
|
||||
continue
|
||||
|
||||
for ref in idautils.CodeRefsTo(fva, True):
|
||||
f: idaapi.func_t = idaapi.get_func(ref)
|
||||
if not f:
|
||||
# no function associated with reference location
|
||||
continue
|
||||
|
||||
ref_fva = f.start_ea
|
||||
fname = idaapi.get_func_name(ref_fva)
|
||||
if fname in ("___tmainCRTStartup",):
|
||||
# ignore library functions, where we know that they call user-code
|
||||
# TODO(mr): extend this list
|
||||
continue
|
||||
|
||||
if classifications := classifications_by_va.get(ref_fva):
|
||||
for c in classifications:
|
||||
if c.classification == Classification.LIBRARY:
|
||||
fc = FunctionClassification(
|
||||
va=fva,
|
||||
name=idaapi.get_func_name(fva),
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.CALLGRAPH,
|
||||
note=f"called by 0x{ref_fva:x} ({c.method.value}{f', {c.library_name}@{c.library_version})' if c.library_name else ')'}",
|
||||
)
|
||||
classifications_by_va[fva].append(fc)
|
||||
yield fc
|
||||
found_new_lib_func = True
|
||||
break
|
||||
|
||||
if not found_new_lib_func:
|
||||
logger.debug("no update in pass %d, done here", n)
|
||||
return
|
||||
|
||||
|
||||
def is_thunk_function(fva):
|
||||
f = idaapi.get_func(fva)
|
||||
return bool(f.flags & idaapi.FUNC_THUNK)
|
||||
|
||||
|
||||
def get_function_size(fva):
|
||||
f = idaapi.get_func(fva)
|
||||
assert f.start_ea == fva
|
||||
return sum([end_ea - start_ea for (start_ea, end_ea) in idautils.Chunks(fva)])
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Identify library functions using various strategies.")
|
||||
capa.main.install_common_args(parser, wanted={"input_file"})
|
||||
parser.add_argument("--store-idb", action="store_true", default=False, help="store IDA database file")
|
||||
parser.add_argument("--min-string-length", type=int, default=8, help="minimum string length")
|
||||
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
dbs = capa.analysis.strings.get_default_databases()
|
||||
capa.analysis.strings.prune_databases(dbs, n=args.min_string_length)
|
||||
|
||||
function_classifications: List[FunctionClassification] = []
|
||||
with ida_session(args.input_file, use_temp_dir=not args.store_idb):
|
||||
with capa.main.timing("FLIRT-based library identification"):
|
||||
# TODO: add more signature (files)
|
||||
# TOOD: apply more signatures
|
||||
for flirt_match in capa.analysis.flirt.get_flirt_matches():
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=flirt_match.va,
|
||||
name=flirt_match.name,
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.FLIRT,
|
||||
# note: we cannot currently include which signature matched per function via the IDA API
|
||||
)
|
||||
)
|
||||
|
||||
# thunks
|
||||
for fva in idautils.Functions():
|
||||
if is_thunk_function(fva):
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=fva,
|
||||
name=idaapi.get_func_name(fva),
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.THUNK,
|
||||
)
|
||||
)
|
||||
|
||||
with capa.main.timing("string-based library identification"):
|
||||
for string_match in capa.analysis.strings.get_string_matches(dbs):
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=string_match.va,
|
||||
name=idaapi.get_func_name(string_match.va),
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.STRINGS,
|
||||
library_name=string_match.metadata.library_name,
|
||||
library_version=string_match.metadata.library_version,
|
||||
)
|
||||
)
|
||||
|
||||
for va in idautils.Functions():
|
||||
name = idaapi.get_func_name(va)
|
||||
if name not in {
|
||||
"WinMain",
|
||||
"_main",
|
||||
"main",
|
||||
}:
|
||||
continue
|
||||
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=va,
|
||||
name=name,
|
||||
classification=Classification.USER,
|
||||
method=Method.ENTRYPOINT,
|
||||
)
|
||||
)
|
||||
|
||||
with capa.main.timing("call graph based library identification"):
|
||||
for fc in get_library_called_functions(function_classifications):
|
||||
function_classifications.append(fc)
|
||||
|
||||
doc = FunctionIdResults(function_classifications=[], layout=[])
|
||||
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
|
||||
for va in idautils.Functions():
|
||||
if classifications := classifications_by_va.get(va):
|
||||
doc.function_classifications.extend(classifications)
|
||||
else:
|
||||
doc.function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=va,
|
||||
name=idaapi.get_func_name(va),
|
||||
classification=Classification.UNKNOWN,
|
||||
method=None,
|
||||
)
|
||||
)
|
||||
doc.layout.append(
|
||||
BinaryLayout(
|
||||
va=va,
|
||||
size=get_function_size(va),
|
||||
)
|
||||
)
|
||||
|
||||
if args.json:
|
||||
print(doc.model_dump_json()) # noqa: T201 print found
|
||||
|
||||
else:
|
||||
table = rich.table.Table(
|
||||
"FVA",
|
||||
"CLASSIFICATION",
|
||||
"METHOD",
|
||||
"FNAME",
|
||||
"EXTRA",
|
||||
"SIZE"
|
||||
)
|
||||
|
||||
classifications_by_va = capa.analysis.strings.create_index(doc.function_classifications, "va", sorted_=True)
|
||||
size_by_va = {layout.va: layout.size for layout in doc.layout}
|
||||
size_by_classification = collections.defaultdict(int)
|
||||
for va, classifications in classifications_by_va.items():
|
||||
# TODO count of classifications if multiple?
|
||||
name = ", ".join({c.name for c in classifications})
|
||||
if "sub_" in name:
|
||||
name = Text(name, style="grey53")
|
||||
|
||||
classification = {c.classification for c in classifications}
|
||||
method = {c.method for c in classifications if c.method}
|
||||
extra = {f"{c.library_name}@{c.library_version}" for c in classifications if c.library_name}
|
||||
note = {f"{c.note}" for c in classifications if c.note}
|
||||
|
||||
table.add_row(
|
||||
hex(va),
|
||||
", ".join(classification) if classification != {"unknown"} else Text("unknown", style="grey53"),
|
||||
", ".join(method),
|
||||
name,
|
||||
f"{', '.join(extra)} {', '.join(note)}",
|
||||
f"{size_by_va[va]}",
|
||||
)
|
||||
|
||||
size_by_classification["-".join(classification)] += size_by_va[va]
|
||||
|
||||
rich.print(table)
|
||||
|
||||
stats_table = rich.table.Table(
|
||||
"ID", rich.table.Column("SIZE", justify="right"), rich.table.Column("%", justify="right")
|
||||
)
|
||||
size_all = sum(size_by_classification.values())
|
||||
for k, s in size_by_classification.items():
|
||||
stats_table.add_row(k, f"{s:d}", f"{100 * s / size_all:.2f}")
|
||||
rich.print(stats_table)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
2
capa/analysis/requirements.txt
Normal file
2
capa/analysis/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
# temporary extra file to track dependencies of the analysis directory
|
||||
nltk==3.9.1
|
||||
269
capa/analysis/strings/__init__.py
Normal file
269
capa/analysis/strings/__init__.py
Normal file
@@ -0,0 +1,269 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
"""
|
||||
further requirements:
|
||||
- nltk
|
||||
"""
|
||||
import gzip
|
||||
import logging
|
||||
import collections
|
||||
from typing import Any, Dict, Mapping
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
|
||||
import msgspec
|
||||
|
||||
import capa.features.extractors.strings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LibraryString(msgspec.Struct):
|
||||
string: str
|
||||
library_name: str
|
||||
library_version: str
|
||||
file_path: str | None = None
|
||||
function_name: str | None = None
|
||||
line_number: int | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LibraryStringDatabase:
|
||||
metadata_by_string: Dict[str, LibraryString]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.metadata_by_string)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path: Path) -> "LibraryStringDatabase":
|
||||
metadata_by_string: Dict[str, LibraryString] = {}
|
||||
decoder = msgspec.json.Decoder(type=LibraryString)
|
||||
for line in gzip.decompress(path.read_bytes()).split(b"\n"):
|
||||
if not line:
|
||||
continue
|
||||
s = decoder.decode(line)
|
||||
metadata_by_string[s.string] = s
|
||||
|
||||
return cls(metadata_by_string=metadata_by_string)
|
||||
|
||||
|
||||
DEFAULT_FILENAMES = (
|
||||
"brotli.jsonl.gz",
|
||||
"bzip2.jsonl.gz",
|
||||
"cryptopp.jsonl.gz",
|
||||
"curl.jsonl.gz",
|
||||
"detours.jsonl.gz",
|
||||
"jemalloc.jsonl.gz",
|
||||
"jsoncpp.jsonl.gz",
|
||||
"kcp.jsonl.gz",
|
||||
"liblzma.jsonl.gz",
|
||||
"libsodium.jsonl.gz",
|
||||
"libpcap.jsonl.gz",
|
||||
"mbedtls.jsonl.gz",
|
||||
"openssl.jsonl.gz",
|
||||
"sqlite3.jsonl.gz",
|
||||
"tomcrypt.jsonl.gz",
|
||||
"wolfssl.jsonl.gz",
|
||||
"zlib.jsonl.gz",
|
||||
)
|
||||
|
||||
DEFAULT_PATHS = tuple(Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES) + (
|
||||
Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",
|
||||
)
|
||||
|
||||
|
||||
def get_default_databases() -> list[LibraryStringDatabase]:
|
||||
return [LibraryStringDatabase.from_file(path) for path in DEFAULT_PATHS]
|
||||
|
||||
|
||||
@dataclass
|
||||
class WindowsApiStringDatabase:
|
||||
dll_names: set[str]
|
||||
api_names: set[str]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.dll_names) + len(self.api_names)
|
||||
|
||||
@classmethod
|
||||
def from_dir(cls, path: Path) -> "WindowsApiStringDatabase":
|
||||
dll_names: set[str] = set()
|
||||
api_names: set[str] = set()
|
||||
|
||||
for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
|
||||
if not line:
|
||||
continue
|
||||
dll_names.add(line)
|
||||
|
||||
for line in gzip.decompress((path / "apis.txt.gz").read_bytes()).decode("utf-8").splitlines():
|
||||
if not line:
|
||||
continue
|
||||
api_names.add(line)
|
||||
|
||||
return cls(dll_names=dll_names, api_names=api_names)
|
||||
|
||||
@classmethod
|
||||
def from_defaults(cls) -> "WindowsApiStringDatabase":
|
||||
return cls.from_dir(Path(__file__).parent / "data" / "winapi")
|
||||
|
||||
|
||||
def extract_strings(buf, n=4):
|
||||
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n)
|
||||
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n)
|
||||
|
||||
|
||||
def prune_databases(dbs: list[LibraryStringDatabase], n=8):
|
||||
"""remove less trustyworthy database entries.
|
||||
|
||||
such as:
|
||||
- those found in multiple databases
|
||||
- those that are English words
|
||||
- those that are too short
|
||||
- Windows API and DLL names
|
||||
"""
|
||||
|
||||
# TODO: consider applying these filters directly to the persisted databases, not at load time.
|
||||
|
||||
winapi = WindowsApiStringDatabase.from_defaults()
|
||||
|
||||
try:
|
||||
from nltk.corpus import words as nltk_words
|
||||
|
||||
nltk_words.words()
|
||||
except (ImportError, LookupError):
|
||||
# one-time download of dataset.
|
||||
# this probably doesn't work well for embedded use.
|
||||
import nltk
|
||||
|
||||
nltk.download("words")
|
||||
from nltk.corpus import words as nltk_words
|
||||
words = set(nltk_words.words())
|
||||
|
||||
counter: collections.Counter[str] = collections.Counter()
|
||||
to_remove = set()
|
||||
for db in dbs:
|
||||
for string in db.metadata_by_string.keys():
|
||||
counter[string] += 1
|
||||
|
||||
if string in words:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
if len(string) < n:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
if string in winapi.api_names:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
if string in winapi.dll_names:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
for string, count in counter.most_common():
|
||||
if count <= 1:
|
||||
break
|
||||
|
||||
# remove strings that are seen in more than one database
|
||||
to_remove.add(string)
|
||||
|
||||
for db in dbs:
|
||||
for string in to_remove:
|
||||
if string in db.metadata_by_string:
|
||||
del db.metadata_by_string[string]
|
||||
|
||||
|
||||
def get_function_strings():
|
||||
import idaapi
|
||||
import idautils
|
||||
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
strings_by_function = collections.defaultdict(set)
|
||||
for ea in idautils.Functions():
|
||||
f = idaapi.get_func(ea)
|
||||
|
||||
# ignore library functions and thunk functions as identified by IDA
|
||||
if f.flags & idaapi.FUNC_THUNK:
|
||||
continue
|
||||
if f.flags & idaapi.FUNC_LIB:
|
||||
continue
|
||||
|
||||
for bb in ida_helpers.get_function_blocks(f):
|
||||
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref == insn.ea:
|
||||
continue
|
||||
|
||||
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
||||
if not string:
|
||||
continue
|
||||
|
||||
strings_by_function[ea].add(string)
|
||||
|
||||
return strings_by_function
|
||||
|
||||
|
||||
@dataclass
|
||||
class LibraryStringClassification:
|
||||
va: int
|
||||
string: str
|
||||
library_name: str
|
||||
metadata: LibraryString
|
||||
|
||||
|
||||
def create_index(s: list, k: str, sorted_: bool = False) -> Mapping[Any, list]:
|
||||
"""create an index of the elements in `s` using the key `k`, optionally sorted by `k`"""
|
||||
if sorted_:
|
||||
s = sorted(s, key=lambda x: getattr(x, k))
|
||||
|
||||
s_by_k = collections.defaultdict(list)
|
||||
for v in s:
|
||||
p = getattr(v, k)
|
||||
s_by_k[p].append(v)
|
||||
return s_by_k
|
||||
|
||||
|
||||
def get_string_matches(dbs: list[LibraryStringDatabase]) -> list[LibraryStringClassification]:
|
||||
matches: list[LibraryStringClassification] = []
|
||||
|
||||
for function, strings in sorted(get_function_strings().items()):
|
||||
for string in strings:
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
matches.append(
|
||||
LibraryStringClassification(
|
||||
va=function,
|
||||
string=string,
|
||||
library_name=metadata.library_name,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
|
||||
# if there are less than N strings per library, ignore that library
|
||||
matches_by_library = create_index(matches, "library_name")
|
||||
for library_name, library_matches in matches_by_library.items():
|
||||
if len(library_matches) > 5:
|
||||
continue
|
||||
|
||||
logger.info("pruning library %s: only %d matched string", library_name, len(library_matches))
|
||||
matches = [m for m in matches if m.library_name != library_name]
|
||||
|
||||
# if there are conflicts within a single function, don't label it
|
||||
matches_by_function = create_index(matches, "va")
|
||||
for va, function_matches in matches_by_function.items():
|
||||
library_names = {m.library_name for m in function_matches}
|
||||
if len(library_names) == 1:
|
||||
continue
|
||||
|
||||
logger.info("conflicting matches: 0x%x: %s", va, sorted(library_names))
|
||||
# this is potentially slow (O(n**2)) but hopefully fast enough in practice.
|
||||
matches = [m for m in matches if m.va != va]
|
||||
|
||||
return matches
|
||||
130
capa/analysis/strings/__main__.py
Normal file
130
capa/analysis/strings/__main__.py
Normal file
@@ -0,0 +1,130 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import sys
|
||||
import logging
|
||||
import collections
|
||||
from pathlib import Path
|
||||
|
||||
import rich
|
||||
from rich.text import Text
|
||||
|
||||
import capa.analysis.strings
|
||||
import capa.features.extractors.strings
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def open_ida(input_path: Path):
|
||||
import tempfile
|
||||
|
||||
import idapro
|
||||
|
||||
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
||||
t.write_bytes(input_path.read_bytes())
|
||||
# resource leak: we should delete this upon exit
|
||||
|
||||
idapro.enable_console_messages(False)
|
||||
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
|
||||
|
||||
import ida_auto
|
||||
|
||||
ida_auto.auto_wait()
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
# use n=8 to ignore common words
|
||||
N = 8
|
||||
|
||||
input_path = Path(sys.argv[1])
|
||||
|
||||
dbs = capa.analysis.strings.get_default_databases()
|
||||
capa.analysis.strings.prune_databases(dbs, n=N)
|
||||
|
||||
strings_by_library = collections.defaultdict(set)
|
||||
for string in capa.analysis.strings.extract_strings(input_path.read_bytes(), n=N):
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string.s):
|
||||
strings_by_library[metadata.library_name].add(string.s)
|
||||
|
||||
console = rich.get_console()
|
||||
console.print("found libraries:", style="bold")
|
||||
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True):
|
||||
console.print(f" - [b]{library}[/] ({len(strings)} strings)")
|
||||
|
||||
for string in sorted(strings)[:10]:
|
||||
console.print(f" - {string}", markup=False, style="grey37")
|
||||
|
||||
if len(strings) > 10:
|
||||
console.print(" ...", style="grey37")
|
||||
|
||||
if not strings_by_library:
|
||||
console.print(" (none)", style="grey37")
|
||||
# since we're not going to find any strings
|
||||
# return early and don't do IDA analysis
|
||||
return
|
||||
|
||||
open_ida(input_path)
|
||||
|
||||
import idaapi
|
||||
import idautils
|
||||
import ida_funcs
|
||||
|
||||
strings_by_function = collections.defaultdict(set)
|
||||
for ea in idautils.Functions():
|
||||
f = idaapi.get_func(ea)
|
||||
|
||||
# ignore library functions and thunk functions as identified by IDA
|
||||
if f.flags & idaapi.FUNC_THUNK:
|
||||
continue
|
||||
if f.flags & idaapi.FUNC_LIB:
|
||||
continue
|
||||
|
||||
for bb in ida_helpers.get_function_blocks(f):
|
||||
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref == insn.ea:
|
||||
continue
|
||||
|
||||
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
||||
if not string:
|
||||
continue
|
||||
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
strings_by_function[ea].add(string)
|
||||
|
||||
# ensure there are at least XXX functions renamed, or ignore those entries
|
||||
|
||||
console.print("functions:", style="bold")
|
||||
for function, strings in sorted(strings_by_function.items()):
|
||||
if strings:
|
||||
name = ida_funcs.get_func_name(function)
|
||||
|
||||
console.print(f" [b]{name}[/]@{function:08x}:")
|
||||
|
||||
for string in strings:
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
location = Text(
|
||||
f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}",
|
||||
style="grey37",
|
||||
)
|
||||
console.print(" - ", location, ": ", string.rstrip())
|
||||
|
||||
console.print()
|
||||
|
||||
console.print(
|
||||
f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
BIN
capa/analysis/strings/data/crt/msvc_v143.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/crt/msvc_v143.jsonl.gz
Normal file
Binary file not shown.
3
capa/analysis/strings/data/oss/.gitignore
vendored
Normal file
3
capa/analysis/strings/data/oss/.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
*.csv
|
||||
*.jsonl
|
||||
*.jsonl.gz
|
||||
BIN
capa/analysis/strings/data/oss/brotli.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/brotli.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/bzip2.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/bzip2.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/cryptopp.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/cryptopp.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/curl.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/curl.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/detours.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/detours.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/jemalloc.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/jemalloc.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/jsoncpp.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/jsoncpp.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/kcp.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/kcp.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/liblzma.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/liblzma.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/libpcap.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/libpcap.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/libsodium.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/libsodium.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/mbedtls.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/mbedtls.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/openssl.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/openssl.jsonl.gz
Normal file
Binary file not shown.
99
capa/analysis/strings/data/oss/readme.md
Normal file
99
capa/analysis/strings/data/oss/readme.md
Normal file
@@ -0,0 +1,99 @@
|
||||
# Strings from Open Source libraries
|
||||
|
||||
This directory contains databases of strings extracted from open soure software.
|
||||
capa uses these databases to ignore functions that are likely library code.
|
||||
|
||||
There is one file for each database. Each database is a gzip-compressed, JSONL (one JSON document per line) file.
|
||||
The JSON document looks like this:
|
||||
|
||||
string: "1.0.8, 13-Jul-2019"
|
||||
library_name: "bzip2"
|
||||
library_version: "1.0.8#3"
|
||||
file_path: "CMakeFiles/bz2.dir/bzlib.c.obj"
|
||||
function_name: "BZ2_bzlibVersion"
|
||||
line_number: null
|
||||
|
||||
The following databases were extracted via the vkpkg & jh technique:
|
||||
|
||||
- brotli 1.0.9#5
|
||||
- bzip2 1.0.8#3
|
||||
- cryptopp 8.7.0
|
||||
- curl 7.86.0#1
|
||||
- detours 4.0.1#7
|
||||
- jemalloc 5.3.0#1
|
||||
- jsoncpp 1.9.5
|
||||
- kcp 1.7
|
||||
- liblzma 5.2.5#6
|
||||
- libsodium 1.0.18#8
|
||||
- libpcap 1.10.1#3
|
||||
- mbedtls 2.28.1
|
||||
- openssl 3.0.7#1
|
||||
- sqlite3 3.40.0#1
|
||||
- tomcrypt 1.18.2#2
|
||||
- wolfssl 5.5.0
|
||||
- zlib 1.2.13
|
||||
|
||||
This code was originally developed in FLOSS and imported into capa.
|
||||
|
||||
## The vkpkg & jh technique
|
||||
|
||||
Major steps:
|
||||
|
||||
1. build static libraries via vcpkg
|
||||
2. extract features via jh
|
||||
3. convert to JSONL format with `jh_to_qs.py`
|
||||
4. compress with gzip
|
||||
|
||||
### Build static libraries via vcpkg
|
||||
|
||||
[vcpkg](https://vcpkg.io/en/) is a free C/C++ package manager for acquiring and managing libraries.
|
||||
We use it to easily build common open source libraries, like zlib.
|
||||
Use the triplet `x64-windows-static` to build static archives (.lib files that are AR archives containing COFF object files):
|
||||
|
||||
```console
|
||||
PS > C:\vcpkg\vcpkg.exe install --triplet x64-windows-static zlib
|
||||
```
|
||||
|
||||
### Extract features via jh
|
||||
|
||||
[jh](https://github.com/williballenthin/lancelot/blob/master/bin/src/bin/jh.rs)
|
||||
is a lancelot-based utility that parses AR archives containing COFF object files,
|
||||
reconstructs their control flow, finds functions, and extracts features.
|
||||
jh extracts numbers, API calls, and strings; we are only interested in the string features.
|
||||
|
||||
For each feature, jh emits a CSV line with the fields
|
||||
- target triplet
|
||||
- compiler
|
||||
- library
|
||||
- version
|
||||
- build profile
|
||||
- path
|
||||
- function
|
||||
- feature type
|
||||
- feature value
|
||||
|
||||
For example:
|
||||
|
||||
```csv
|
||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0x00000100
|
||||
```
|
||||
|
||||
For example, to invoke jh:
|
||||
|
||||
```console
|
||||
$ ~/lancelot/target/release/jh x64-windows-static msvc143 zlib 1.2.13 release /mnt/c/vcpkg/installed/x64-windows-static/lib/zlib.lib > ~/flare-floss/floss/qs/db/data/oss/zlib.csv
|
||||
```
|
||||
|
||||
### Convert to OSS database format
|
||||
|
||||
We use the script `jh_to_qs.py` to convert these CSV lines into JSONL file prepared for FLOSS:
|
||||
|
||||
```console
|
||||
$ python3 jh_to_qs.py zlib.csv > zlib.jsonl
|
||||
```
|
||||
|
||||
These files are then gzip'd:
|
||||
|
||||
```console
|
||||
$ gzip -c zlib.jsonl > zlib.jsonl.gz
|
||||
```
|
||||
BIN
capa/analysis/strings/data/oss/sqlite3.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/sqlite3.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/tomcrypt.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/tomcrypt.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/wolfssl.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/wolfssl.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/oss/zlib.jsonl.gz
Normal file
BIN
capa/analysis/strings/data/oss/zlib.jsonl.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/winapi/apis.txt.gz
Normal file
BIN
capa/analysis/strings/data/winapi/apis.txt.gz
Normal file
Binary file not shown.
BIN
capa/analysis/strings/data/winapi/dlls.txt.gz
Normal file
BIN
capa/analysis/strings/data/winapi/dlls.txt.gz
Normal file
Binary file not shown.
@@ -77,6 +77,8 @@ dependencies = [
|
||||
"protobuf>=5",
|
||||
"msgspec>=0.18.6",
|
||||
"xmltodict>=0.13.0",
|
||||
# for library detection (in development)
|
||||
"nltk>=3",
|
||||
|
||||
# ---------------------------------------
|
||||
# Dependencies that we develop
|
||||
|
||||
Reference in New Issue
Block a user