mirror of
https://github.com/mandiant/capa.git
synced 2025-12-11 23:30:37 -08:00
Compare commits
1 Commits
object-lay
...
hide-msgsp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f83fb74502 |
@@ -108,7 +108,6 @@ repos:
|
||||
- "--check-untyped-defs"
|
||||
- "--ignore-missing-imports"
|
||||
- "--config-file=.github/mypy/mypy.ini"
|
||||
- "--enable-incomplete-feature=NewGenericSyntax"
|
||||
- "capa/"
|
||||
- "scripts/"
|
||||
- "tests/"
|
||||
|
||||
@@ -1,38 +0,0 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
import capa.features.extractors.ida.idalib as idalib
|
||||
|
||||
if not idalib.has_idalib():
|
||||
raise RuntimeError("cannot find IDA idalib module.")
|
||||
|
||||
if not idalib.load_idalib():
|
||||
raise RuntimeError("failed to load IDA idalib module.")
|
||||
|
||||
import idaapi
|
||||
import idautils
|
||||
|
||||
|
||||
class FunctionId(BaseModel):
|
||||
va: int
|
||||
is_library: bool
|
||||
name: str
|
||||
|
||||
|
||||
def get_flirt_matches(lib_only=True):
|
||||
for fva in idautils.Functions():
|
||||
f = idaapi.get_func(fva)
|
||||
is_lib = bool(f.flags & idaapi.FUNC_LIB)
|
||||
fname = idaapi.get_func_name(fva)
|
||||
|
||||
if lib_only and not is_lib:
|
||||
continue
|
||||
|
||||
yield FunctionId(va=fva, is_library=is_lib, name=fname)
|
||||
@@ -1,240 +0,0 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import io
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
import tempfile
|
||||
import contextlib
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
from pathlib import Path
|
||||
|
||||
import rich
|
||||
from pydantic import BaseModel
|
||||
from rich.text import Text
|
||||
from rich.console import Console
|
||||
|
||||
import capa.main
|
||||
import capa.helpers
|
||||
import capa.analysis.flirt
|
||||
import capa.analysis.strings
|
||||
import capa.features.extractors.ida.idalib as idalib
|
||||
|
||||
if not idalib.has_idalib():
|
||||
raise RuntimeError("cannot find IDA idalib module.")
|
||||
|
||||
if not idalib.load_idalib():
|
||||
raise RuntimeError("failed to load IDA idalib module.")
|
||||
|
||||
import idaapi
|
||||
import idapro
|
||||
import ida_auto
|
||||
import idautils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Classification(str, Enum):
|
||||
USER = "user"
|
||||
LIBRARY = "library"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
class Method(str, Enum):
|
||||
FLIRT = "flirt"
|
||||
STRINGS = "strings"
|
||||
THUNK = "thunk"
|
||||
ENTRYPOINT = "entrypoint"
|
||||
|
||||
|
||||
class FunctionClassification(BaseModel):
|
||||
va: int
|
||||
classification: Classification
|
||||
# name per the disassembler/analysis tool
|
||||
# may be combined with the recovered/suspected name TODO below
|
||||
name: str
|
||||
|
||||
# if is library, this must be provided
|
||||
method: Optional[Method]
|
||||
|
||||
# TODO if is library, recovered/suspected name?
|
||||
|
||||
# if is library, these can optionally be provided.
|
||||
library_name: Optional[str] = None
|
||||
library_version: Optional[str] = None
|
||||
|
||||
|
||||
class FunctionIdResults(BaseModel):
|
||||
function_classifications: List[FunctionClassification]
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def ida_session(input_path: Path, use_temp_dir=True):
|
||||
if use_temp_dir:
|
||||
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
||||
else:
|
||||
t = input_path
|
||||
|
||||
logger.debug("using %s", str(t))
|
||||
# stderr=True is used here to redirect the spinner banner to stderr,
|
||||
# so that users can redirect capa's output.
|
||||
console = Console(stderr=True, quiet=False)
|
||||
|
||||
try:
|
||||
if use_temp_dir:
|
||||
t.write_bytes(input_path.read_bytes())
|
||||
|
||||
# idalib writes to stdout (ugh), so we have to capture that
|
||||
# so as not to screw up structured output.
|
||||
with capa.helpers.stdout_redirector(io.BytesIO()):
|
||||
idapro.enable_console_messages(False)
|
||||
with capa.main.timing("analyze program"):
|
||||
with console.status("analyzing program...", spinner="dots"):
|
||||
if idapro.open_database(str(t.absolute()), run_auto_analysis=True):
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
|
||||
logger.debug("idalib: waiting for analysis...")
|
||||
ida_auto.auto_wait()
|
||||
logger.debug("idalib: opened database.")
|
||||
|
||||
yield
|
||||
finally:
|
||||
idapro.close_database()
|
||||
if use_temp_dir:
|
||||
t.unlink()
|
||||
|
||||
|
||||
def is_thunk_function(fva):
|
||||
f = idaapi.get_func(fva)
|
||||
return bool(f.flags & idaapi.FUNC_THUNK)
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Identify library functions using various strategies.")
|
||||
capa.main.install_common_args(parser, wanted={"input_file"})
|
||||
parser.add_argument("--store-idb", action="store_true", default=False, help="store IDA database file")
|
||||
parser.add_argument("--min-string-length", type=int, default=8, help="minimum string length")
|
||||
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
dbs = capa.analysis.strings.get_default_databases()
|
||||
capa.analysis.strings.prune_databases(dbs, n=args.min_string_length)
|
||||
|
||||
function_classifications: List[FunctionClassification] = []
|
||||
with ida_session(args.input_file, use_temp_dir=not args.store_idb):
|
||||
with capa.main.timing("FLIRT-based library identification"):
|
||||
# TODO: add more signature (files)
|
||||
# TOOD: apply more signatures
|
||||
for flirt_match in capa.analysis.flirt.get_flirt_matches():
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=flirt_match.va,
|
||||
name=flirt_match.name,
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.FLIRT,
|
||||
# note: we cannot currently include which signature matched per function via the IDA API
|
||||
)
|
||||
)
|
||||
|
||||
# thunks
|
||||
for fva in idautils.Functions():
|
||||
if is_thunk_function(fva):
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=fva,
|
||||
name=idaapi.get_func_name(fva),
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.THUNK,
|
||||
)
|
||||
)
|
||||
|
||||
with capa.main.timing("string-based library identification"):
|
||||
for string_match in capa.analysis.strings.get_string_matches(dbs):
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=string_match.va,
|
||||
name=idaapi.get_func_name(string_match.va),
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.STRINGS,
|
||||
library_name=string_match.metadata.library_name,
|
||||
library_version=string_match.metadata.library_version,
|
||||
)
|
||||
)
|
||||
|
||||
for va in idautils.Functions():
|
||||
name = idaapi.get_func_name(va)
|
||||
if name not in {"WinMain", }:
|
||||
continue
|
||||
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=va,
|
||||
name=name,
|
||||
classification=Classification.USER,
|
||||
method=Method.ENTRYPOINT,
|
||||
)
|
||||
)
|
||||
|
||||
doc = FunctionIdResults(function_classifications=[])
|
||||
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
|
||||
for va in idautils.Functions():
|
||||
if classifications := classifications_by_va.get(va):
|
||||
doc.function_classifications.extend(classifications)
|
||||
else:
|
||||
doc.function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=va,
|
||||
name=idaapi.get_func_name(va),
|
||||
classification=Classification.UNKNOWN,
|
||||
method=None,
|
||||
)
|
||||
)
|
||||
|
||||
if args.json:
|
||||
print(doc.model_dump_json()) # noqa: T201 print found
|
||||
|
||||
else:
|
||||
table = rich.table.Table()
|
||||
table.add_column("FVA")
|
||||
table.add_column("CLASSIFICATION")
|
||||
table.add_column("METHOD")
|
||||
table.add_column("FNAME")
|
||||
table.add_column("EXTRA INFO")
|
||||
|
||||
classifications_by_va = capa.analysis.strings.create_index(doc.function_classifications, "va", sorted_=True)
|
||||
for va, classifications in classifications_by_va.items():
|
||||
name = ", ".join({c.name for c in classifications})
|
||||
if "sub_" in name:
|
||||
name = Text(name, style="grey53")
|
||||
|
||||
classification = {c.classification for c in classifications}
|
||||
method = {c.method for c in classifications if c.method}
|
||||
extra = {f"{c.library_name}@{c.library_version}" for c in classifications if c.library_name}
|
||||
|
||||
table.add_row(
|
||||
hex(va),
|
||||
", ".join(classification) if classification != {"unknown"} else Text("unknown", style="grey53"),
|
||||
", ".join(method),
|
||||
name,
|
||||
", ".join(extra),
|
||||
)
|
||||
|
||||
rich.print(table)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,2 +0,0 @@
|
||||
# temporary extra file to track dependencies of the analysis directory
|
||||
nltk==3.9.1
|
||||
@@ -1,269 +0,0 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
"""
|
||||
further requirements:
|
||||
- nltk
|
||||
"""
|
||||
import gzip
|
||||
import logging
|
||||
import collections
|
||||
from typing import Any, Dict, Mapping
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
|
||||
import msgspec
|
||||
|
||||
import capa.features.extractors.strings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LibraryString(msgspec.Struct):
|
||||
string: str
|
||||
library_name: str
|
||||
library_version: str
|
||||
file_path: str | None = None
|
||||
function_name: str | None = None
|
||||
line_number: int | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LibraryStringDatabase:
|
||||
metadata_by_string: Dict[str, LibraryString]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.metadata_by_string)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path: Path) -> "LibraryStringDatabase":
|
||||
metadata_by_string: Dict[str, LibraryString] = {}
|
||||
decoder = msgspec.json.Decoder(type=LibraryString)
|
||||
for line in gzip.decompress(path.read_bytes()).split(b"\n"):
|
||||
if not line:
|
||||
continue
|
||||
s = decoder.decode(line)
|
||||
metadata_by_string[s.string] = s
|
||||
|
||||
return cls(metadata_by_string=metadata_by_string)
|
||||
|
||||
|
||||
DEFAULT_FILENAMES = (
|
||||
"brotli.jsonl.gz",
|
||||
"bzip2.jsonl.gz",
|
||||
"cryptopp.jsonl.gz",
|
||||
"curl.jsonl.gz",
|
||||
"detours.jsonl.gz",
|
||||
"jemalloc.jsonl.gz",
|
||||
"jsoncpp.jsonl.gz",
|
||||
"kcp.jsonl.gz",
|
||||
"liblzma.jsonl.gz",
|
||||
"libsodium.jsonl.gz",
|
||||
"libpcap.jsonl.gz",
|
||||
"mbedtls.jsonl.gz",
|
||||
"openssl.jsonl.gz",
|
||||
"sqlite3.jsonl.gz",
|
||||
"tomcrypt.jsonl.gz",
|
||||
"wolfssl.jsonl.gz",
|
||||
"zlib.jsonl.gz",
|
||||
)
|
||||
|
||||
DEFAULT_PATHS = tuple(Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES) + (
|
||||
Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",
|
||||
)
|
||||
|
||||
|
||||
def get_default_databases() -> list[LibraryStringDatabase]:
|
||||
return [LibraryStringDatabase.from_file(path) for path in DEFAULT_PATHS]
|
||||
|
||||
|
||||
@dataclass
|
||||
class WindowsApiStringDatabase:
|
||||
dll_names: set[str]
|
||||
api_names: set[str]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.dll_names) + len(self.api_names)
|
||||
|
||||
@classmethod
|
||||
def from_dir(cls, path: Path) -> "WindowsApiStringDatabase":
|
||||
dll_names: set[str] = set()
|
||||
api_names: set[str] = set()
|
||||
|
||||
for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
|
||||
if not line:
|
||||
continue
|
||||
dll_names.add(line)
|
||||
|
||||
for line in gzip.decompress((path / "apis.txt.gz").read_bytes()).decode("utf-8").splitlines():
|
||||
if not line:
|
||||
continue
|
||||
api_names.add(line)
|
||||
|
||||
return cls(dll_names=dll_names, api_names=api_names)
|
||||
|
||||
@classmethod
|
||||
def from_defaults(cls) -> "WindowsApiStringDatabase":
|
||||
return cls.from_dir(Path(__file__).parent / "data" / "winapi")
|
||||
|
||||
|
||||
def extract_strings(buf, n=4):
|
||||
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n)
|
||||
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n)
|
||||
|
||||
|
||||
def prune_databases(dbs: list[LibraryStringDatabase], n=8):
|
||||
"""remove less trustyworthy database entries.
|
||||
|
||||
such as:
|
||||
- those found in multiple databases
|
||||
- those that are English words
|
||||
- those that are too short
|
||||
- Windows API and DLL names
|
||||
"""
|
||||
|
||||
# TODO: consider applying these filters directly to the persisted databases, not at load time.
|
||||
|
||||
winapi = WindowsApiStringDatabase.from_defaults()
|
||||
|
||||
try:
|
||||
from nltk.corpus import words as nltk_words
|
||||
|
||||
nltk_words.words()
|
||||
except (ImportError, LookupError):
|
||||
# one-time download of dataset.
|
||||
# this probably doesn't work well for embedded use.
|
||||
import nltk
|
||||
|
||||
nltk.download("words")
|
||||
from nltk.corpus import words as nltk_words
|
||||
words = set(nltk_words.words())
|
||||
|
||||
counter: collections.Counter[str] = collections.Counter()
|
||||
to_remove = set()
|
||||
for db in dbs:
|
||||
for string in db.metadata_by_string.keys():
|
||||
counter[string] += 1
|
||||
|
||||
if string in words:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
if len(string) < n:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
if string in winapi.api_names:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
if string in winapi.dll_names:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
for string, count in counter.most_common():
|
||||
if count <= 1:
|
||||
break
|
||||
|
||||
# remove strings that are seen in more than one database
|
||||
to_remove.add(string)
|
||||
|
||||
for db in dbs:
|
||||
for string in to_remove:
|
||||
if string in db.metadata_by_string:
|
||||
del db.metadata_by_string[string]
|
||||
|
||||
|
||||
def get_function_strings():
|
||||
import idaapi
|
||||
import idautils
|
||||
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
strings_by_function = collections.defaultdict(set)
|
||||
for ea in idautils.Functions():
|
||||
f = idaapi.get_func(ea)
|
||||
|
||||
# ignore library functions and thunk functions as identified by IDA
|
||||
if f.flags & idaapi.FUNC_THUNK:
|
||||
continue
|
||||
if f.flags & idaapi.FUNC_LIB:
|
||||
continue
|
||||
|
||||
for bb in ida_helpers.get_function_blocks(f):
|
||||
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref == insn.ea:
|
||||
continue
|
||||
|
||||
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
||||
if not string:
|
||||
continue
|
||||
|
||||
strings_by_function[ea].add(string)
|
||||
|
||||
return strings_by_function
|
||||
|
||||
|
||||
@dataclass
|
||||
class LibraryStringClassification:
|
||||
va: int
|
||||
string: str
|
||||
library_name: str
|
||||
metadata: LibraryString
|
||||
|
||||
|
||||
def create_index(s: list, k: str, sorted_: bool = False) -> Mapping[Any, list]:
|
||||
"""create an index of the elements in `s` using the key `k`, optionally sorted by `k`"""
|
||||
if sorted_:
|
||||
s = sorted(s, key=lambda x: getattr(x, k))
|
||||
|
||||
s_by_k = collections.defaultdict(list)
|
||||
for v in s:
|
||||
p = getattr(v, k)
|
||||
s_by_k[p].append(v)
|
||||
return s_by_k
|
||||
|
||||
|
||||
def get_string_matches(dbs: list[LibraryStringDatabase]) -> list[LibraryStringClassification]:
|
||||
matches: list[LibraryStringClassification] = []
|
||||
|
||||
for function, strings in sorted(get_function_strings().items()):
|
||||
for string in strings:
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
matches.append(
|
||||
LibraryStringClassification(
|
||||
va=function,
|
||||
string=string,
|
||||
library_name=metadata.library_name,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
|
||||
# if there are less than N strings per library, ignore that library
|
||||
matches_by_library = create_index(matches, "library_name")
|
||||
for library_name, library_matches in matches_by_library.items():
|
||||
if len(library_matches) > 5:
|
||||
continue
|
||||
|
||||
logger.info("pruning library %s: only %d matched string", library_name, len(library_matches))
|
||||
matches = [m for m in matches if m.library_name != library_name]
|
||||
|
||||
# if there are conflicts within a single function, don't label it
|
||||
matches_by_function = create_index(matches, "va")
|
||||
for va, function_matches in matches_by_function.items():
|
||||
library_names = {m.library_name for m in function_matches}
|
||||
if len(library_names) == 1:
|
||||
continue
|
||||
|
||||
logger.info("conflicting matches: 0x%x: %s", va, sorted(library_names))
|
||||
# this is potentially slow (O(n**2)) but hopefully fast enough in practice.
|
||||
matches = [m for m in matches if m.va != va]
|
||||
|
||||
return matches
|
||||
@@ -1,130 +0,0 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import sys
|
||||
import logging
|
||||
import collections
|
||||
from pathlib import Path
|
||||
|
||||
import rich
|
||||
from rich.text import Text
|
||||
|
||||
import capa.analysis.strings
|
||||
import capa.features.extractors.strings
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def open_ida(input_path: Path):
|
||||
import tempfile
|
||||
|
||||
import idapro
|
||||
|
||||
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
||||
t.write_bytes(input_path.read_bytes())
|
||||
# resource leak: we should delete this upon exit
|
||||
|
||||
idapro.enable_console_messages(False)
|
||||
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
|
||||
|
||||
import ida_auto
|
||||
|
||||
ida_auto.auto_wait()
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
# use n=8 to ignore common words
|
||||
N = 8
|
||||
|
||||
input_path = Path(sys.argv[1])
|
||||
|
||||
dbs = capa.analysis.strings.get_default_databases()
|
||||
capa.analysis.strings.prune_databases(dbs, n=N)
|
||||
|
||||
strings_by_library = collections.defaultdict(set)
|
||||
for string in capa.analysis.strings.extract_strings(input_path.read_bytes(), n=N):
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string.s):
|
||||
strings_by_library[metadata.library_name].add(string.s)
|
||||
|
||||
console = rich.get_console()
|
||||
console.print("found libraries:", style="bold")
|
||||
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True):
|
||||
console.print(f" - [b]{library}[/] ({len(strings)} strings)")
|
||||
|
||||
for string in sorted(strings)[:10]:
|
||||
console.print(f" - {string}", markup=False, style="grey37")
|
||||
|
||||
if len(strings) > 10:
|
||||
console.print(" ...", style="grey37")
|
||||
|
||||
if not strings_by_library:
|
||||
console.print(" (none)", style="grey37")
|
||||
# since we're not going to find any strings
|
||||
# return early and don't do IDA analysis
|
||||
return
|
||||
|
||||
open_ida(input_path)
|
||||
|
||||
import idaapi
|
||||
import idautils
|
||||
import ida_funcs
|
||||
|
||||
strings_by_function = collections.defaultdict(set)
|
||||
for ea in idautils.Functions():
|
||||
f = idaapi.get_func(ea)
|
||||
|
||||
# ignore library functions and thunk functions as identified by IDA
|
||||
if f.flags & idaapi.FUNC_THUNK:
|
||||
continue
|
||||
if f.flags & idaapi.FUNC_LIB:
|
||||
continue
|
||||
|
||||
for bb in ida_helpers.get_function_blocks(f):
|
||||
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref == insn.ea:
|
||||
continue
|
||||
|
||||
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
||||
if not string:
|
||||
continue
|
||||
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
strings_by_function[ea].add(string)
|
||||
|
||||
# ensure there are at least XXX functions renamed, or ignore those entries
|
||||
|
||||
console.print("functions:", style="bold")
|
||||
for function, strings in sorted(strings_by_function.items()):
|
||||
if strings:
|
||||
name = ida_funcs.get_func_name(function)
|
||||
|
||||
console.print(f" [b]{name}[/]@{function:08x}:")
|
||||
|
||||
for string in strings:
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
location = Text(
|
||||
f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}",
|
||||
style="grey37",
|
||||
)
|
||||
console.print(" - ", location, ": ", string.rstrip())
|
||||
|
||||
console.print()
|
||||
|
||||
console.print(
|
||||
f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Binary file not shown.
3
capa/analysis/strings/data/oss/.gitignore
vendored
3
capa/analysis/strings/data/oss/.gitignore
vendored
@@ -1,3 +0,0 @@
|
||||
*.csv
|
||||
*.jsonl
|
||||
*.jsonl.gz
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,99 +0,0 @@
|
||||
# Strings from Open Source libraries
|
||||
|
||||
This directory contains databases of strings extracted from open soure software.
|
||||
capa uses these databases to ignore functions that are likely library code.
|
||||
|
||||
There is one file for each database. Each database is a gzip-compressed, JSONL (one JSON document per line) file.
|
||||
The JSON document looks like this:
|
||||
|
||||
string: "1.0.8, 13-Jul-2019"
|
||||
library_name: "bzip2"
|
||||
library_version: "1.0.8#3"
|
||||
file_path: "CMakeFiles/bz2.dir/bzlib.c.obj"
|
||||
function_name: "BZ2_bzlibVersion"
|
||||
line_number: null
|
||||
|
||||
The following databases were extracted via the vkpkg & jh technique:
|
||||
|
||||
- brotli 1.0.9#5
|
||||
- bzip2 1.0.8#3
|
||||
- cryptopp 8.7.0
|
||||
- curl 7.86.0#1
|
||||
- detours 4.0.1#7
|
||||
- jemalloc 5.3.0#1
|
||||
- jsoncpp 1.9.5
|
||||
- kcp 1.7
|
||||
- liblzma 5.2.5#6
|
||||
- libsodium 1.0.18#8
|
||||
- libpcap 1.10.1#3
|
||||
- mbedtls 2.28.1
|
||||
- openssl 3.0.7#1
|
||||
- sqlite3 3.40.0#1
|
||||
- tomcrypt 1.18.2#2
|
||||
- wolfssl 5.5.0
|
||||
- zlib 1.2.13
|
||||
|
||||
This code was originally developed in FLOSS and imported into capa.
|
||||
|
||||
## The vkpkg & jh technique
|
||||
|
||||
Major steps:
|
||||
|
||||
1. build static libraries via vcpkg
|
||||
2. extract features via jh
|
||||
3. convert to JSONL format with `jh_to_qs.py`
|
||||
4. compress with gzip
|
||||
|
||||
### Build static libraries via vcpkg
|
||||
|
||||
[vcpkg](https://vcpkg.io/en/) is a free C/C++ package manager for acquiring and managing libraries.
|
||||
We use it to easily build common open source libraries, like zlib.
|
||||
Use the triplet `x64-windows-static` to build static archives (.lib files that are AR archives containing COFF object files):
|
||||
|
||||
```console
|
||||
PS > C:\vcpkg\vcpkg.exe install --triplet x64-windows-static zlib
|
||||
```
|
||||
|
||||
### Extract features via jh
|
||||
|
||||
[jh](https://github.com/williballenthin/lancelot/blob/master/bin/src/bin/jh.rs)
|
||||
is a lancelot-based utility that parses AR archives containing COFF object files,
|
||||
reconstructs their control flow, finds functions, and extracts features.
|
||||
jh extracts numbers, API calls, and strings; we are only interested in the string features.
|
||||
|
||||
For each feature, jh emits a CSV line with the fields
|
||||
- target triplet
|
||||
- compiler
|
||||
- library
|
||||
- version
|
||||
- build profile
|
||||
- path
|
||||
- function
|
||||
- feature type
|
||||
- feature value
|
||||
|
||||
For example:
|
||||
|
||||
```csv
|
||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0x00000100
|
||||
```
|
||||
|
||||
For example, to invoke jh:
|
||||
|
||||
```console
|
||||
$ ~/lancelot/target/release/jh x64-windows-static msvc143 zlib 1.2.13 release /mnt/c/vcpkg/installed/x64-windows-static/lib/zlib.lib > ~/flare-floss/floss/qs/db/data/oss/zlib.csv
|
||||
```
|
||||
|
||||
### Convert to OSS database format
|
||||
|
||||
We use the script `jh_to_qs.py` to convert these CSV lines into JSONL file prepared for FLOSS:
|
||||
|
||||
```console
|
||||
$ python3 jh_to_qs.py zlib.csv > zlib.jsonl
|
||||
```
|
||||
|
||||
These files are then gzip'd:
|
||||
|
||||
```console
|
||||
$ gzip -c zlib.jsonl > zlib.jsonl.gz
|
||||
```
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -19,7 +19,6 @@ from pathlib import Path
|
||||
from zipfile import ZipFile
|
||||
from datetime import datetime
|
||||
|
||||
import msgspec.json
|
||||
from rich.console import Console
|
||||
from rich.progress import (
|
||||
Task,
|
||||
@@ -154,6 +153,7 @@ def stdout_redirector(stream):
|
||||
|
||||
|
||||
def load_json_from_path(json_path: Path):
|
||||
import msgspec.json
|
||||
with gzip.open(json_path, "r") as compressed_report:
|
||||
try:
|
||||
report_json = compressed_report.read()
|
||||
@@ -165,6 +165,7 @@ def load_json_from_path(json_path: Path):
|
||||
|
||||
|
||||
def decode_json_lines(fd: Union[BinaryIO, gzip.GzipFile]):
|
||||
import msgspec.json
|
||||
for line in fd:
|
||||
try:
|
||||
line_s = line.strip().decode()
|
||||
@@ -186,6 +187,7 @@ def load_jsonl_from_path(jsonl_path: Path) -> Iterator[Dict]:
|
||||
|
||||
def load_one_jsonl_from_path(jsonl_path: Path):
|
||||
# this loads one json line to avoid the overhead of loading the entire file
|
||||
import msgspec.json
|
||||
try:
|
||||
with gzip.open(jsonl_path, "rb") as f:
|
||||
line = next(iter(f))
|
||||
|
||||
@@ -77,8 +77,6 @@ dependencies = [
|
||||
"protobuf>=5",
|
||||
"msgspec>=0.18.6",
|
||||
"xmltodict>=0.13.0",
|
||||
# for library detection (in development)
|
||||
"nltk>=3",
|
||||
|
||||
# ---------------------------------------
|
||||
# Dependencies that we develop
|
||||
|
||||
@@ -1,184 +0,0 @@
|
||||
import sys
|
||||
import sqlite3
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
|
||||
import pefile
|
||||
|
||||
import capa.main
|
||||
|
||||
|
||||
@dataclass
|
||||
class AssemblageRow:
|
||||
# from table: binaries
|
||||
binary_id: int
|
||||
file_name: str
|
||||
platform: str
|
||||
build_mode: str
|
||||
toolset_version: str
|
||||
github_url: str
|
||||
optimization: str
|
||||
repo_last_update: int
|
||||
size: int
|
||||
path: str
|
||||
license: str
|
||||
binary_hash: str
|
||||
repo_commit_hash: str
|
||||
# from table: functions
|
||||
function_id: int
|
||||
function_name: str
|
||||
function_hash: str
|
||||
top_comments: str
|
||||
source_codes: str
|
||||
prototype: str
|
||||
_source_file: str
|
||||
# from table: rvas
|
||||
rva_id: int
|
||||
start_rva: int
|
||||
end_rva: int
|
||||
|
||||
@property
|
||||
def source_file(self):
|
||||
# cleanup some extra metadata provided by assemblage
|
||||
return self._source_file.partition(" (MD5: ")[0].partition(" (0x3: ")[0]
|
||||
|
||||
|
||||
class Assemblage:
|
||||
conn: sqlite3.Connection
|
||||
samples: Path
|
||||
|
||||
def __init__(self, db: Path, samples: Path):
|
||||
super().__init__()
|
||||
|
||||
self.db = db
|
||||
self.samples = samples
|
||||
|
||||
self.conn = sqlite3.connect(self.db)
|
||||
with self.conn:
|
||||
self.conn.executescript("""
|
||||
PRAGMA journal_mode = WAL;
|
||||
PRAGMA synchronous = NORMAL;
|
||||
PRAGMA busy_timeout = 5000;
|
||||
PRAGMA cache_size = -20000; -- 20MB
|
||||
PRAGMA foreign_keys = true;
|
||||
PRAGMA temp_store = memory;
|
||||
|
||||
BEGIN IMMEDIATE TRANSACTION;
|
||||
CREATE INDEX IF NOT EXISTS idx__functions__binary_id ON functions (binary_id);
|
||||
CREATE INDEX IF NOT EXISTS idx__rvas__function_id ON rvas (function_id);
|
||||
|
||||
CREATE VIEW IF NOT EXISTS assemblage AS
|
||||
SELECT
|
||||
binaries.id AS binary_id,
|
||||
binaries.file_name AS file_name,
|
||||
binaries.platform AS platform,
|
||||
binaries.build_mode AS build_mode,
|
||||
binaries.toolset_version AS toolset_version,
|
||||
binaries.github_url AS github_url,
|
||||
binaries.optimization AS optimization,
|
||||
binaries.repo_last_update AS repo_last_update,
|
||||
binaries.size AS size,
|
||||
binaries.path AS path,
|
||||
binaries.license AS license,
|
||||
binaries.hash AS hash,
|
||||
binaries.repo_commit_hash AS repo_commit_hash,
|
||||
|
||||
functions.id AS function_id,
|
||||
functions.name AS function_name,
|
||||
functions.hash AS function_hash,
|
||||
functions.top_comments AS top_comments,
|
||||
functions.source_codes AS source_codes,
|
||||
functions.prototype AS prototype,
|
||||
functions.source_file AS source_file,
|
||||
|
||||
rvas.id AS rva_id,
|
||||
rvas.start AS start_rva,
|
||||
rvas.end AS end_rva
|
||||
FROM binaries
|
||||
JOIN functions ON binaries.id = functions.binary_id
|
||||
JOIN rvas ON functions.id = rvas.function_id;
|
||||
""")
|
||||
|
||||
def get_row_by_binary_id(self, binary_id: int) -> AssemblageRow:
|
||||
with self.conn:
|
||||
cur = self.conn.execute("SELECT * FROM assemblage WHERE binary_id = ? LIMIT 1;", (binary_id, ))
|
||||
return AssemblageRow(*cur.fetchone())
|
||||
|
||||
def get_rows_by_binary_id(self, binary_id: int) -> AssemblageRow:
|
||||
with self.conn:
|
||||
cur = self.conn.execute("SELECT * FROM assemblage WHERE binary_id = ?;", (binary_id, ))
|
||||
row = cur.fetchone()
|
||||
while row:
|
||||
yield AssemblageRow(*row)
|
||||
row = cur.fetchone()
|
||||
|
||||
def get_path_by_binary_id(self, binary_id: int) -> Path:
|
||||
with self.conn:
|
||||
cur = self.conn.execute("""SELECT path FROM assemblage WHERE binary_id = ? LIMIT 1""", (binary_id, ))
|
||||
return self.samples / cur.fetchone()[0]
|
||||
|
||||
def get_pe_by_binary_id(self, binary_id: int) -> pefile.PE:
|
||||
path = self.get_path_by_binary_id(binary_id)
|
||||
return pefile.PE(data=path.read_bytes(), fast_load=True)
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Inspect object boundaries in compiled programs")
|
||||
capa.main.install_common_args(parser, wanted={})
|
||||
parser.add_argument("assemblage_database", type=Path, help="path to Assemblage database")
|
||||
parser.add_argument("assemblage_directory", type=Path, help="path to Assemblage samples directory")
|
||||
parser.add_argument("binary_id", type=int, help="primary key of binary to inspect")
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
if not args.assemblage_database.is_file():
|
||||
raise ValueError("database doesn't exist")
|
||||
|
||||
db = Assemblage(args.assemblage_database, args.assemblage_directory)
|
||||
# print(db.get_row_by_binary_id(args.binary_id))
|
||||
# print(db.get_pe_by_binary_id(args.binary_id))
|
||||
|
||||
@dataclass
|
||||
class Function:
|
||||
file: str
|
||||
name: str
|
||||
start_rva: int
|
||||
end_rva: int
|
||||
|
||||
functions = [
|
||||
Function(
|
||||
file=m.source_file,
|
||||
name=m.function_name,
|
||||
start_rva=m.start_rva,
|
||||
end_rva=m.end_rva,
|
||||
)
|
||||
for m in db.get_rows_by_binary_id(args.binary_id)
|
||||
]
|
||||
|
||||
import rich
|
||||
import rich.table
|
||||
|
||||
print(db.get_path_by_binary_id(args.binary_id))
|
||||
|
||||
t = rich.table.Table()
|
||||
t.add_column("rva")
|
||||
t.add_column("filename")
|
||||
t.add_column("name")
|
||||
|
||||
for function in sorted(functions, key=lambda f: f.start_rva):
|
||||
t.add_row(hex(function.start_rva), function.file, function.name)
|
||||
|
||||
rich.print(t)
|
||||
|
||||
# db.conn.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user