mirror of
https://github.com/mandiant/capa.git
synced 2026-03-18 07:49:04 -07:00
Compare commits
1 Commits
object-lay
...
feat/vmray
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65b19ad46c |
@@ -108,7 +108,6 @@ repos:
|
||||
- "--check-untyped-defs"
|
||||
- "--ignore-missing-imports"
|
||||
- "--config-file=.github/mypy/mypy.ini"
|
||||
- "--enable-incomplete-feature=NewGenericSyntax"
|
||||
- "capa/"
|
||||
- "scripts/"
|
||||
- "tests/"
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
### Bug Fixes
|
||||
|
||||
- extractor: fix exception when PE extractor encounters unknown architecture #2440 @Tamir-K
|
||||
- IDA Pro: rename ida to idapro module for plugin and idalib in IDA 9.0 #2453 @mr-tz
|
||||
|
||||
### capa Explorer Web
|
||||
|
||||
|
||||
@@ -1,38 +0,0 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
import capa.features.extractors.ida.idalib as idalib
|
||||
|
||||
if not idalib.has_idalib():
|
||||
raise RuntimeError("cannot find IDA idalib module.")
|
||||
|
||||
if not idalib.load_idalib():
|
||||
raise RuntimeError("failed to load IDA idalib module.")
|
||||
|
||||
import idaapi
|
||||
import idautils
|
||||
|
||||
|
||||
class FunctionId(BaseModel):
|
||||
va: int
|
||||
is_library: bool
|
||||
name: str
|
||||
|
||||
|
||||
def get_flirt_matches(lib_only=True):
|
||||
for fva in idautils.Functions():
|
||||
f = idaapi.get_func(fva)
|
||||
is_lib = bool(f.flags & idaapi.FUNC_LIB)
|
||||
fname = idaapi.get_func_name(fva)
|
||||
|
||||
if lib_only and not is_lib:
|
||||
continue
|
||||
|
||||
yield FunctionId(va=fva, is_library=is_lib, name=fname)
|
||||
@@ -1,240 +0,0 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import io
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
import tempfile
|
||||
import contextlib
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
from pathlib import Path
|
||||
|
||||
import rich
|
||||
from pydantic import BaseModel
|
||||
from rich.text import Text
|
||||
from rich.console import Console
|
||||
|
||||
import capa.main
|
||||
import capa.helpers
|
||||
import capa.analysis.flirt
|
||||
import capa.analysis.strings
|
||||
import capa.features.extractors.ida.idalib as idalib
|
||||
|
||||
if not idalib.has_idalib():
|
||||
raise RuntimeError("cannot find IDA idalib module.")
|
||||
|
||||
if not idalib.load_idalib():
|
||||
raise RuntimeError("failed to load IDA idalib module.")
|
||||
|
||||
import idaapi
|
||||
import idapro
|
||||
import ida_auto
|
||||
import idautils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Classification(str, Enum):
|
||||
USER = "user"
|
||||
LIBRARY = "library"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
class Method(str, Enum):
|
||||
FLIRT = "flirt"
|
||||
STRINGS = "strings"
|
||||
THUNK = "thunk"
|
||||
ENTRYPOINT = "entrypoint"
|
||||
|
||||
|
||||
class FunctionClassification(BaseModel):
|
||||
va: int
|
||||
classification: Classification
|
||||
# name per the disassembler/analysis tool
|
||||
# may be combined with the recovered/suspected name TODO below
|
||||
name: str
|
||||
|
||||
# if is library, this must be provided
|
||||
method: Optional[Method]
|
||||
|
||||
# TODO if is library, recovered/suspected name?
|
||||
|
||||
# if is library, these can optionally be provided.
|
||||
library_name: Optional[str] = None
|
||||
library_version: Optional[str] = None
|
||||
|
||||
|
||||
class FunctionIdResults(BaseModel):
|
||||
function_classifications: List[FunctionClassification]
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def ida_session(input_path: Path, use_temp_dir=True):
|
||||
if use_temp_dir:
|
||||
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
||||
else:
|
||||
t = input_path
|
||||
|
||||
logger.debug("using %s", str(t))
|
||||
# stderr=True is used here to redirect the spinner banner to stderr,
|
||||
# so that users can redirect capa's output.
|
||||
console = Console(stderr=True, quiet=False)
|
||||
|
||||
try:
|
||||
if use_temp_dir:
|
||||
t.write_bytes(input_path.read_bytes())
|
||||
|
||||
# idalib writes to stdout (ugh), so we have to capture that
|
||||
# so as not to screw up structured output.
|
||||
with capa.helpers.stdout_redirector(io.BytesIO()):
|
||||
idapro.enable_console_messages(False)
|
||||
with capa.main.timing("analyze program"):
|
||||
with console.status("analyzing program...", spinner="dots"):
|
||||
if idapro.open_database(str(t.absolute()), run_auto_analysis=True):
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
|
||||
logger.debug("idalib: waiting for analysis...")
|
||||
ida_auto.auto_wait()
|
||||
logger.debug("idalib: opened database.")
|
||||
|
||||
yield
|
||||
finally:
|
||||
idapro.close_database()
|
||||
if use_temp_dir:
|
||||
t.unlink()
|
||||
|
||||
|
||||
def is_thunk_function(fva):
|
||||
f = idaapi.get_func(fva)
|
||||
return bool(f.flags & idaapi.FUNC_THUNK)
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Identify library functions using various strategies.")
|
||||
capa.main.install_common_args(parser, wanted={"input_file"})
|
||||
parser.add_argument("--store-idb", action="store_true", default=False, help="store IDA database file")
|
||||
parser.add_argument("--min-string-length", type=int, default=8, help="minimum string length")
|
||||
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
dbs = capa.analysis.strings.get_default_databases()
|
||||
capa.analysis.strings.prune_databases(dbs, n=args.min_string_length)
|
||||
|
||||
function_classifications: List[FunctionClassification] = []
|
||||
with ida_session(args.input_file, use_temp_dir=not args.store_idb):
|
||||
with capa.main.timing("FLIRT-based library identification"):
|
||||
# TODO: add more signature (files)
|
||||
# TOOD: apply more signatures
|
||||
for flirt_match in capa.analysis.flirt.get_flirt_matches():
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=flirt_match.va,
|
||||
name=flirt_match.name,
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.FLIRT,
|
||||
# note: we cannot currently include which signature matched per function via the IDA API
|
||||
)
|
||||
)
|
||||
|
||||
# thunks
|
||||
for fva in idautils.Functions():
|
||||
if is_thunk_function(fva):
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=fva,
|
||||
name=idaapi.get_func_name(fva),
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.THUNK,
|
||||
)
|
||||
)
|
||||
|
||||
with capa.main.timing("string-based library identification"):
|
||||
for string_match in capa.analysis.strings.get_string_matches(dbs):
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=string_match.va,
|
||||
name=idaapi.get_func_name(string_match.va),
|
||||
classification=Classification.LIBRARY,
|
||||
method=Method.STRINGS,
|
||||
library_name=string_match.metadata.library_name,
|
||||
library_version=string_match.metadata.library_version,
|
||||
)
|
||||
)
|
||||
|
||||
for va in idautils.Functions():
|
||||
name = idaapi.get_func_name(va)
|
||||
if name not in {"WinMain", }:
|
||||
continue
|
||||
|
||||
function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=va,
|
||||
name=name,
|
||||
classification=Classification.USER,
|
||||
method=Method.ENTRYPOINT,
|
||||
)
|
||||
)
|
||||
|
||||
doc = FunctionIdResults(function_classifications=[])
|
||||
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
|
||||
for va in idautils.Functions():
|
||||
if classifications := classifications_by_va.get(va):
|
||||
doc.function_classifications.extend(classifications)
|
||||
else:
|
||||
doc.function_classifications.append(
|
||||
FunctionClassification(
|
||||
va=va,
|
||||
name=idaapi.get_func_name(va),
|
||||
classification=Classification.UNKNOWN,
|
||||
method=None,
|
||||
)
|
||||
)
|
||||
|
||||
if args.json:
|
||||
print(doc.model_dump_json()) # noqa: T201 print found
|
||||
|
||||
else:
|
||||
table = rich.table.Table()
|
||||
table.add_column("FVA")
|
||||
table.add_column("CLASSIFICATION")
|
||||
table.add_column("METHOD")
|
||||
table.add_column("FNAME")
|
||||
table.add_column("EXTRA INFO")
|
||||
|
||||
classifications_by_va = capa.analysis.strings.create_index(doc.function_classifications, "va", sorted_=True)
|
||||
for va, classifications in classifications_by_va.items():
|
||||
name = ", ".join({c.name for c in classifications})
|
||||
if "sub_" in name:
|
||||
name = Text(name, style="grey53")
|
||||
|
||||
classification = {c.classification for c in classifications}
|
||||
method = {c.method for c in classifications if c.method}
|
||||
extra = {f"{c.library_name}@{c.library_version}" for c in classifications if c.library_name}
|
||||
|
||||
table.add_row(
|
||||
hex(va),
|
||||
", ".join(classification) if classification != {"unknown"} else Text("unknown", style="grey53"),
|
||||
", ".join(method),
|
||||
name,
|
||||
", ".join(extra),
|
||||
)
|
||||
|
||||
rich.print(table)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,2 +0,0 @@
|
||||
# temporary extra file to track dependencies of the analysis directory
|
||||
nltk==3.9.1
|
||||
@@ -1,269 +0,0 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
"""
|
||||
further requirements:
|
||||
- nltk
|
||||
"""
|
||||
import gzip
|
||||
import logging
|
||||
import collections
|
||||
from typing import Any, Dict, Mapping
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
|
||||
import msgspec
|
||||
|
||||
import capa.features.extractors.strings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LibraryString(msgspec.Struct):
|
||||
string: str
|
||||
library_name: str
|
||||
library_version: str
|
||||
file_path: str | None = None
|
||||
function_name: str | None = None
|
||||
line_number: int | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LibraryStringDatabase:
|
||||
metadata_by_string: Dict[str, LibraryString]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.metadata_by_string)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path: Path) -> "LibraryStringDatabase":
|
||||
metadata_by_string: Dict[str, LibraryString] = {}
|
||||
decoder = msgspec.json.Decoder(type=LibraryString)
|
||||
for line in gzip.decompress(path.read_bytes()).split(b"\n"):
|
||||
if not line:
|
||||
continue
|
||||
s = decoder.decode(line)
|
||||
metadata_by_string[s.string] = s
|
||||
|
||||
return cls(metadata_by_string=metadata_by_string)
|
||||
|
||||
|
||||
DEFAULT_FILENAMES = (
|
||||
"brotli.jsonl.gz",
|
||||
"bzip2.jsonl.gz",
|
||||
"cryptopp.jsonl.gz",
|
||||
"curl.jsonl.gz",
|
||||
"detours.jsonl.gz",
|
||||
"jemalloc.jsonl.gz",
|
||||
"jsoncpp.jsonl.gz",
|
||||
"kcp.jsonl.gz",
|
||||
"liblzma.jsonl.gz",
|
||||
"libsodium.jsonl.gz",
|
||||
"libpcap.jsonl.gz",
|
||||
"mbedtls.jsonl.gz",
|
||||
"openssl.jsonl.gz",
|
||||
"sqlite3.jsonl.gz",
|
||||
"tomcrypt.jsonl.gz",
|
||||
"wolfssl.jsonl.gz",
|
||||
"zlib.jsonl.gz",
|
||||
)
|
||||
|
||||
DEFAULT_PATHS = tuple(Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES) + (
|
||||
Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",
|
||||
)
|
||||
|
||||
|
||||
def get_default_databases() -> list[LibraryStringDatabase]:
|
||||
return [LibraryStringDatabase.from_file(path) for path in DEFAULT_PATHS]
|
||||
|
||||
|
||||
@dataclass
|
||||
class WindowsApiStringDatabase:
|
||||
dll_names: set[str]
|
||||
api_names: set[str]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.dll_names) + len(self.api_names)
|
||||
|
||||
@classmethod
|
||||
def from_dir(cls, path: Path) -> "WindowsApiStringDatabase":
|
||||
dll_names: set[str] = set()
|
||||
api_names: set[str] = set()
|
||||
|
||||
for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
|
||||
if not line:
|
||||
continue
|
||||
dll_names.add(line)
|
||||
|
||||
for line in gzip.decompress((path / "apis.txt.gz").read_bytes()).decode("utf-8").splitlines():
|
||||
if not line:
|
||||
continue
|
||||
api_names.add(line)
|
||||
|
||||
return cls(dll_names=dll_names, api_names=api_names)
|
||||
|
||||
@classmethod
|
||||
def from_defaults(cls) -> "WindowsApiStringDatabase":
|
||||
return cls.from_dir(Path(__file__).parent / "data" / "winapi")
|
||||
|
||||
|
||||
def extract_strings(buf, n=4):
|
||||
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n)
|
||||
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n)
|
||||
|
||||
|
||||
def prune_databases(dbs: list[LibraryStringDatabase], n=8):
|
||||
"""remove less trustyworthy database entries.
|
||||
|
||||
such as:
|
||||
- those found in multiple databases
|
||||
- those that are English words
|
||||
- those that are too short
|
||||
- Windows API and DLL names
|
||||
"""
|
||||
|
||||
# TODO: consider applying these filters directly to the persisted databases, not at load time.
|
||||
|
||||
winapi = WindowsApiStringDatabase.from_defaults()
|
||||
|
||||
try:
|
||||
from nltk.corpus import words as nltk_words
|
||||
|
||||
nltk_words.words()
|
||||
except (ImportError, LookupError):
|
||||
# one-time download of dataset.
|
||||
# this probably doesn't work well for embedded use.
|
||||
import nltk
|
||||
|
||||
nltk.download("words")
|
||||
from nltk.corpus import words as nltk_words
|
||||
words = set(nltk_words.words())
|
||||
|
||||
counter: collections.Counter[str] = collections.Counter()
|
||||
to_remove = set()
|
||||
for db in dbs:
|
||||
for string in db.metadata_by_string.keys():
|
||||
counter[string] += 1
|
||||
|
||||
if string in words:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
if len(string) < n:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
if string in winapi.api_names:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
if string in winapi.dll_names:
|
||||
to_remove.add(string)
|
||||
continue
|
||||
|
||||
for string, count in counter.most_common():
|
||||
if count <= 1:
|
||||
break
|
||||
|
||||
# remove strings that are seen in more than one database
|
||||
to_remove.add(string)
|
||||
|
||||
for db in dbs:
|
||||
for string in to_remove:
|
||||
if string in db.metadata_by_string:
|
||||
del db.metadata_by_string[string]
|
||||
|
||||
|
||||
def get_function_strings():
|
||||
import idaapi
|
||||
import idautils
|
||||
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
strings_by_function = collections.defaultdict(set)
|
||||
for ea in idautils.Functions():
|
||||
f = idaapi.get_func(ea)
|
||||
|
||||
# ignore library functions and thunk functions as identified by IDA
|
||||
if f.flags & idaapi.FUNC_THUNK:
|
||||
continue
|
||||
if f.flags & idaapi.FUNC_LIB:
|
||||
continue
|
||||
|
||||
for bb in ida_helpers.get_function_blocks(f):
|
||||
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref == insn.ea:
|
||||
continue
|
||||
|
||||
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
||||
if not string:
|
||||
continue
|
||||
|
||||
strings_by_function[ea].add(string)
|
||||
|
||||
return strings_by_function
|
||||
|
||||
|
||||
@dataclass
|
||||
class LibraryStringClassification:
|
||||
va: int
|
||||
string: str
|
||||
library_name: str
|
||||
metadata: LibraryString
|
||||
|
||||
|
||||
def create_index(s: list, k: str, sorted_: bool = False) -> Mapping[Any, list]:
|
||||
"""create an index of the elements in `s` using the key `k`, optionally sorted by `k`"""
|
||||
if sorted_:
|
||||
s = sorted(s, key=lambda x: getattr(x, k))
|
||||
|
||||
s_by_k = collections.defaultdict(list)
|
||||
for v in s:
|
||||
p = getattr(v, k)
|
||||
s_by_k[p].append(v)
|
||||
return s_by_k
|
||||
|
||||
|
||||
def get_string_matches(dbs: list[LibraryStringDatabase]) -> list[LibraryStringClassification]:
|
||||
matches: list[LibraryStringClassification] = []
|
||||
|
||||
for function, strings in sorted(get_function_strings().items()):
|
||||
for string in strings:
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
matches.append(
|
||||
LibraryStringClassification(
|
||||
va=function,
|
||||
string=string,
|
||||
library_name=metadata.library_name,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
|
||||
# if there are less than N strings per library, ignore that library
|
||||
matches_by_library = create_index(matches, "library_name")
|
||||
for library_name, library_matches in matches_by_library.items():
|
||||
if len(library_matches) > 5:
|
||||
continue
|
||||
|
||||
logger.info("pruning library %s: only %d matched string", library_name, len(library_matches))
|
||||
matches = [m for m in matches if m.library_name != library_name]
|
||||
|
||||
# if there are conflicts within a single function, don't label it
|
||||
matches_by_function = create_index(matches, "va")
|
||||
for va, function_matches in matches_by_function.items():
|
||||
library_names = {m.library_name for m in function_matches}
|
||||
if len(library_names) == 1:
|
||||
continue
|
||||
|
||||
logger.info("conflicting matches: 0x%x: %s", va, sorted(library_names))
|
||||
# this is potentially slow (O(n**2)) but hopefully fast enough in practice.
|
||||
matches = [m for m in matches if m.va != va]
|
||||
|
||||
return matches
|
||||
@@ -1,130 +0,0 @@
|
||||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import sys
|
||||
import logging
|
||||
import collections
|
||||
from pathlib import Path
|
||||
|
||||
import rich
|
||||
from rich.text import Text
|
||||
|
||||
import capa.analysis.strings
|
||||
import capa.features.extractors.strings
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def open_ida(input_path: Path):
|
||||
import tempfile
|
||||
|
||||
import idapro
|
||||
|
||||
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
|
||||
t.write_bytes(input_path.read_bytes())
|
||||
# resource leak: we should delete this upon exit
|
||||
|
||||
idapro.enable_console_messages(False)
|
||||
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
|
||||
|
||||
import ida_auto
|
||||
|
||||
ida_auto.auto_wait()
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
# use n=8 to ignore common words
|
||||
N = 8
|
||||
|
||||
input_path = Path(sys.argv[1])
|
||||
|
||||
dbs = capa.analysis.strings.get_default_databases()
|
||||
capa.analysis.strings.prune_databases(dbs, n=N)
|
||||
|
||||
strings_by_library = collections.defaultdict(set)
|
||||
for string in capa.analysis.strings.extract_strings(input_path.read_bytes(), n=N):
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string.s):
|
||||
strings_by_library[metadata.library_name].add(string.s)
|
||||
|
||||
console = rich.get_console()
|
||||
console.print("found libraries:", style="bold")
|
||||
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True):
|
||||
console.print(f" - [b]{library}[/] ({len(strings)} strings)")
|
||||
|
||||
for string in sorted(strings)[:10]:
|
||||
console.print(f" - {string}", markup=False, style="grey37")
|
||||
|
||||
if len(strings) > 10:
|
||||
console.print(" ...", style="grey37")
|
||||
|
||||
if not strings_by_library:
|
||||
console.print(" (none)", style="grey37")
|
||||
# since we're not going to find any strings
|
||||
# return early and don't do IDA analysis
|
||||
return
|
||||
|
||||
open_ida(input_path)
|
||||
|
||||
import idaapi
|
||||
import idautils
|
||||
import ida_funcs
|
||||
|
||||
strings_by_function = collections.defaultdict(set)
|
||||
for ea in idautils.Functions():
|
||||
f = idaapi.get_func(ea)
|
||||
|
||||
# ignore library functions and thunk functions as identified by IDA
|
||||
if f.flags & idaapi.FUNC_THUNK:
|
||||
continue
|
||||
if f.flags & idaapi.FUNC_LIB:
|
||||
continue
|
||||
|
||||
for bb in ida_helpers.get_function_blocks(f):
|
||||
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
|
||||
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
|
||||
if ref == insn.ea:
|
||||
continue
|
||||
|
||||
string = capa.features.extractors.ida.helpers.find_string_at(ref)
|
||||
if not string:
|
||||
continue
|
||||
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
strings_by_function[ea].add(string)
|
||||
|
||||
# ensure there are at least XXX functions renamed, or ignore those entries
|
||||
|
||||
console.print("functions:", style="bold")
|
||||
for function, strings in sorted(strings_by_function.items()):
|
||||
if strings:
|
||||
name = ida_funcs.get_func_name(function)
|
||||
|
||||
console.print(f" [b]{name}[/]@{function:08x}:")
|
||||
|
||||
for string in strings:
|
||||
for db in dbs:
|
||||
if metadata := db.metadata_by_string.get(string):
|
||||
location = Text(
|
||||
f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}",
|
||||
style="grey37",
|
||||
)
|
||||
console.print(" - ", location, ": ", string.rstrip())
|
||||
|
||||
console.print()
|
||||
|
||||
console.print(
|
||||
f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Binary file not shown.
3
capa/analysis/strings/data/oss/.gitignore
vendored
3
capa/analysis/strings/data/oss/.gitignore
vendored
@@ -1,3 +0,0 @@
|
||||
*.csv
|
||||
*.jsonl
|
||||
*.jsonl.gz
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,99 +0,0 @@
|
||||
# Strings from Open Source libraries
|
||||
|
||||
This directory contains databases of strings extracted from open soure software.
|
||||
capa uses these databases to ignore functions that are likely library code.
|
||||
|
||||
There is one file for each database. Each database is a gzip-compressed, JSONL (one JSON document per line) file.
|
||||
The JSON document looks like this:
|
||||
|
||||
string: "1.0.8, 13-Jul-2019"
|
||||
library_name: "bzip2"
|
||||
library_version: "1.0.8#3"
|
||||
file_path: "CMakeFiles/bz2.dir/bzlib.c.obj"
|
||||
function_name: "BZ2_bzlibVersion"
|
||||
line_number: null
|
||||
|
||||
The following databases were extracted via the vkpkg & jh technique:
|
||||
|
||||
- brotli 1.0.9#5
|
||||
- bzip2 1.0.8#3
|
||||
- cryptopp 8.7.0
|
||||
- curl 7.86.0#1
|
||||
- detours 4.0.1#7
|
||||
- jemalloc 5.3.0#1
|
||||
- jsoncpp 1.9.5
|
||||
- kcp 1.7
|
||||
- liblzma 5.2.5#6
|
||||
- libsodium 1.0.18#8
|
||||
- libpcap 1.10.1#3
|
||||
- mbedtls 2.28.1
|
||||
- openssl 3.0.7#1
|
||||
- sqlite3 3.40.0#1
|
||||
- tomcrypt 1.18.2#2
|
||||
- wolfssl 5.5.0
|
||||
- zlib 1.2.13
|
||||
|
||||
This code was originally developed in FLOSS and imported into capa.
|
||||
|
||||
## The vkpkg & jh technique
|
||||
|
||||
Major steps:
|
||||
|
||||
1. build static libraries via vcpkg
|
||||
2. extract features via jh
|
||||
3. convert to JSONL format with `jh_to_qs.py`
|
||||
4. compress with gzip
|
||||
|
||||
### Build static libraries via vcpkg
|
||||
|
||||
[vcpkg](https://vcpkg.io/en/) is a free C/C++ package manager for acquiring and managing libraries.
|
||||
We use it to easily build common open source libraries, like zlib.
|
||||
Use the triplet `x64-windows-static` to build static archives (.lib files that are AR archives containing COFF object files):
|
||||
|
||||
```console
|
||||
PS > C:\vcpkg\vcpkg.exe install --triplet x64-windows-static zlib
|
||||
```
|
||||
|
||||
### Extract features via jh
|
||||
|
||||
[jh](https://github.com/williballenthin/lancelot/blob/master/bin/src/bin/jh.rs)
|
||||
is a lancelot-based utility that parses AR archives containing COFF object files,
|
||||
reconstructs their control flow, finds functions, and extracts features.
|
||||
jh extracts numbers, API calls, and strings; we are only interested in the string features.
|
||||
|
||||
For each feature, jh emits a CSV line with the fields
|
||||
- target triplet
|
||||
- compiler
|
||||
- library
|
||||
- version
|
||||
- build profile
|
||||
- path
|
||||
- function
|
||||
- feature type
|
||||
- feature value
|
||||
|
||||
For example:
|
||||
|
||||
```csv
|
||||
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0x00000100
|
||||
```
|
||||
|
||||
For example, to invoke jh:
|
||||
|
||||
```console
|
||||
$ ~/lancelot/target/release/jh x64-windows-static msvc143 zlib 1.2.13 release /mnt/c/vcpkg/installed/x64-windows-static/lib/zlib.lib > ~/flare-floss/floss/qs/db/data/oss/zlib.csv
|
||||
```
|
||||
|
||||
### Convert to OSS database format
|
||||
|
||||
We use the script `jh_to_qs.py` to convert these CSV lines into JSONL file prepared for FLOSS:
|
||||
|
||||
```console
|
||||
$ python3 jh_to_qs.py zlib.csv > zlib.jsonl
|
||||
```
|
||||
|
||||
These files are then gzip'd:
|
||||
|
||||
```console
|
||||
$ gzip -c zlib.jsonl > zlib.jsonl.gz
|
||||
```
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -41,7 +41,7 @@ if hasattr(ida_bytes, "parse_binpat_str"):
|
||||
return
|
||||
|
||||
while True:
|
||||
ea, _ = ida_bytes.bin_search(start, end, patterns, ida_bytes.BIN_SEARCH_FORWARD)
|
||||
ea, _ = ida_bytes.bin_search3(start, end, patterns, ida_bytes.BIN_SEARCH_FORWARD)
|
||||
if ea == idaapi.BADADDR:
|
||||
break
|
||||
start = ea + 1
|
||||
|
||||
@@ -18,7 +18,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
def is_idalib_installed() -> bool:
|
||||
try:
|
||||
return importlib.util.find_spec("idapro") is not None
|
||||
return importlib.util.find_spec("ida") is not None
|
||||
except ModuleNotFoundError:
|
||||
return False
|
||||
|
||||
@@ -44,7 +44,6 @@ def get_idalib_user_config_path() -> Optional[Path]:
|
||||
def find_idalib() -> Optional[Path]:
|
||||
config_path = get_idalib_user_config_path()
|
||||
if not config_path:
|
||||
logger.error("IDA Pro user configuration does not exist, please make sure you've installed idalib properly.")
|
||||
return None
|
||||
|
||||
config = json.loads(config_path.read_text(encoding="utf-8"))
|
||||
@@ -52,9 +51,6 @@ def find_idalib() -> Optional[Path]:
|
||||
try:
|
||||
ida_install_dir = Path(config["Paths"]["ida-install-dir"])
|
||||
except KeyError:
|
||||
logger.error(
|
||||
"IDA Pro user configuration does not contain location of IDA Pro installation, please make sure you've installed idalib properly."
|
||||
)
|
||||
return None
|
||||
|
||||
if not ida_install_dir.exists():
|
||||
@@ -77,7 +73,7 @@ def find_idalib() -> Optional[Path]:
|
||||
if not idalib_path.exists():
|
||||
return None
|
||||
|
||||
if not (idalib_path / "idapro" / "__init__.py").is_file():
|
||||
if not (idalib_path / "ida" / "__init__.py").is_file():
|
||||
return None
|
||||
|
||||
return idalib_path
|
||||
@@ -100,7 +96,7 @@ def has_idalib() -> bool:
|
||||
|
||||
def load_idalib() -> bool:
|
||||
try:
|
||||
import idapro
|
||||
import ida
|
||||
|
||||
return True
|
||||
except ImportError:
|
||||
@@ -110,7 +106,7 @@ def load_idalib() -> bool:
|
||||
|
||||
sys.path.append(idalib_path.absolute().as_posix())
|
||||
try:
|
||||
import idapro # noqa: F401 unused import
|
||||
import ida # noqa: F401 unused import
|
||||
|
||||
return True
|
||||
except ImportError:
|
||||
|
||||
@@ -323,7 +323,7 @@ def get_extractor(
|
||||
if not idalib.load_idalib():
|
||||
raise RuntimeError("failed to load IDA idalib module.")
|
||||
|
||||
import idapro
|
||||
import ida
|
||||
import ida_auto
|
||||
|
||||
import capa.features.extractors.ida.extractor
|
||||
@@ -333,7 +333,7 @@ def get_extractor(
|
||||
# so as not to screw up structured output.
|
||||
with capa.helpers.stdout_redirector(io.BytesIO()):
|
||||
with console.status("analyzing program...", spinner="dots"):
|
||||
if idapro.open_database(str(input_path), run_auto_analysis=True):
|
||||
if ida.open_database(str(input_path), run_auto_analysis=True):
|
||||
raise RuntimeError("failed to analyze input file")
|
||||
|
||||
logger.debug("idalib: waiting for analysis...")
|
||||
|
||||
@@ -77,8 +77,6 @@ dependencies = [
|
||||
"protobuf>=5",
|
||||
"msgspec>=0.18.6",
|
||||
"xmltodict>=0.13.0",
|
||||
# for library detection (in development)
|
||||
"nltk>=3",
|
||||
|
||||
# ---------------------------------------
|
||||
# Dependencies that we develop
|
||||
@@ -179,7 +177,7 @@ known_first_party = [
|
||||
"binaryninja",
|
||||
"flirt",
|
||||
"ghidra",
|
||||
"idapro",
|
||||
"ida",
|
||||
"ida_ida",
|
||||
"ida_auto",
|
||||
"ida_bytes",
|
||||
|
||||
291
scripts/parse-vmray-flog.py
Normal file
291
scripts/parse-vmray-flog.py
Normal file
@@ -0,0 +1,291 @@
|
||||
import sys
|
||||
import logging
|
||||
from typing import Any, Literal, Optional
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import BeforeValidator
|
||||
from typing_extensions import Annotated
|
||||
from pydantic.dataclasses import dataclass
|
||||
|
||||
HexInt = Annotated[int, BeforeValidator(lambda v: int(v.strip('"'), 0x10))]
|
||||
QuotedInt = Annotated[int, BeforeValidator(lambda v: int(v.strip('"')))]
|
||||
QuotedString = Annotated[str, BeforeValidator(lambda v: v.strip('"'))]
|
||||
|
||||
|
||||
logger = logging.getLogger("vmray.flog")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Region:
|
||||
id: QuotedInt
|
||||
start_va: HexInt
|
||||
end_va: HexInt
|
||||
monitored: bool
|
||||
entry_point: HexInt
|
||||
region_type: Literal["private"] | Literal["mapped_file"] | Literal["pagefile_backed"]
|
||||
name: QuotedString
|
||||
filename: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Event:
|
||||
timestamp: tuple[int, int]
|
||||
api: str
|
||||
args: str
|
||||
rv: Optional[int]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Thread:
|
||||
id: QuotedInt
|
||||
os_tid: HexInt
|
||||
events: list[Event]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Process:
|
||||
id: QuotedInt
|
||||
image_name: QuotedString
|
||||
filename: QuotedString
|
||||
page_root: HexInt
|
||||
os_pid: HexInt
|
||||
os_integrity_level: HexInt
|
||||
os_privileges: HexInt
|
||||
monitor_reason: Literal['"analysis_target"'] | Literal['"rpc_server"']
|
||||
parent_id: HexInt
|
||||
os_parent_pid: HexInt
|
||||
cmd_line: str # TODO: json decode str
|
||||
cur_dir: str # TODO: json decode str
|
||||
os_username: str # TODO: json decode str
|
||||
bitness: QuotedInt # TODO: enum 32 or 64
|
||||
os_groups: str # TODO: list of str
|
||||
regions: list[Region]
|
||||
threads: list[Thread]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Flog:
|
||||
processes: list[Process]
|
||||
|
||||
processes_by_id: dict[int, Process]
|
||||
regions_by_id: dict[int, Region]
|
||||
threads_by_id: dict[int, Thread]
|
||||
|
||||
|
||||
def parse_properties(txt: str) -> dict[str, Any]:
|
||||
properties = {}
|
||||
for line in txt.partition("\n\n")[0].splitlines():
|
||||
key, _, value = line.lstrip().partition(" = ")
|
||||
properties[key] = value
|
||||
|
||||
return properties
|
||||
|
||||
|
||||
def parse_region(txt: str) -> Region:
|
||||
# like:
|
||||
#
|
||||
# Region:
|
||||
# id = 125
|
||||
# start_va = 0x10000
|
||||
# end_va = 0x2ffff
|
||||
# monitored = 1
|
||||
# entry_point = 0x0
|
||||
# region_type = private
|
||||
# name = "private_0x0000000000010000"
|
||||
# filename = ""
|
||||
region_kwargs = parse_properties(txt)
|
||||
return Region(**region_kwargs)
|
||||
|
||||
|
||||
def parse_event(line: str) -> Event:
|
||||
# like:
|
||||
#
|
||||
# [0066.433] CoInitializeEx (pvReserved=0x0, dwCoInit=0x2) returned 0x0
|
||||
# [0071.184] RegisterClipboardFormatW (lpszFormat="WM_GETCONTROLTYPE") returned 0xc1dc
|
||||
# [0072.750] GetCurrentProcess () returned 0xffffffffffffffff
|
||||
numbers, _, rest = line.lstrip()[1:].partition("] ")
|
||||
major, _, minor = numbers.partition(".")
|
||||
majori = int(major.lstrip("0") or "0")
|
||||
minori = int(minor.lstrip("0") or "0")
|
||||
timestamp = (majori, minori)
|
||||
|
||||
api, _, rest = rest.partition(" (")
|
||||
args, _, rest = rest.rpartition(")")
|
||||
|
||||
if " returned " in rest:
|
||||
_, _, rvs = rest.partition(" returned ")
|
||||
rv = int(rvs, 0x10)
|
||||
else:
|
||||
rv = None
|
||||
|
||||
return Event(
|
||||
timestamp=timestamp,
|
||||
api=api,
|
||||
args=args,
|
||||
rv=rv,
|
||||
)
|
||||
|
||||
|
||||
def parse_thread(txt: str) -> Thread:
|
||||
# like:
|
||||
#
|
||||
# Thread:
|
||||
# id = 1
|
||||
# os_tid = 0x117c
|
||||
#
|
||||
# [0066.433] CoInitializeEx (pvReserved=0x0, dwCoInit=0x2) returned 0x0
|
||||
# [0071.184] RegisterClipboardFormatW (lpszFormat="WM_GETCONTROLTYPE") returned 0xc1dc
|
||||
# [0072.750] GetCurrentProcess () returned 0xffffffffffffffff
|
||||
thread_kwargs = parse_properties(txt)
|
||||
|
||||
events = []
|
||||
for line in txt.splitlines():
|
||||
if not line.startswith("\t["):
|
||||
continue
|
||||
|
||||
events.append(parse_event(line))
|
||||
|
||||
return Thread(
|
||||
events=events,
|
||||
**thread_kwargs,
|
||||
)
|
||||
|
||||
|
||||
def parse_process(txt: str) -> Process:
|
||||
# properties look like:
|
||||
#
|
||||
# id = "1"
|
||||
# image_name = "svchost.exe"
|
||||
# filename = "c:\\users\\rdhj0cnfevzx\\desktop\\svchost.exe"
|
||||
# page_root = "0x751fc000"
|
||||
# os_pid = "0x118c"
|
||||
# os_integrity_level = "0x3000"
|
||||
# os_privileges = "0x60800000"
|
||||
# monitor_reason = "analysis_target"
|
||||
# parent_id = "0"
|
||||
# os_parent_pid = "0x7d8"
|
||||
# cmd_line = "\"c:\\users\\rdhj0cnfevzx\\desktop\\svchost.exe\" "
|
||||
# cur_dir = "c:\\users\\rdhj0cnfevzx\\desktop\\"
|
||||
# os_username = "xc64zb\\rdhj0cnfevzx"
|
||||
# bitness = "32"
|
||||
# os_groups = "xc64zb\\domain users" [0x7], "everyone" [0x7], ...
|
||||
process_kwargs = parse_properties(txt)
|
||||
|
||||
regions = []
|
||||
for region in txt.split("\nRegion:\n")[1:]:
|
||||
regions.append(parse_region(region))
|
||||
|
||||
threads = []
|
||||
for thread in txt.split("\nThread:\n")[1:]:
|
||||
threads.append(parse_thread(thread))
|
||||
|
||||
return Process(
|
||||
regions=regions,
|
||||
threads=threads,
|
||||
**process_kwargs,
|
||||
)
|
||||
|
||||
|
||||
def parse_processes(txt: str) -> list[Process]:
|
||||
processes = []
|
||||
for process in txt.split("\nProcess:\n")[1:]:
|
||||
processes.append(parse_process(process))
|
||||
return processes
|
||||
|
||||
|
||||
def parse_flog(txt: str) -> Flog:
|
||||
# the header probably fits within this size
|
||||
header_lines = txt[:512].splitlines()
|
||||
|
||||
# file may start with: | ef bb bf |
|
||||
assert "# Flog Txt Version 1" in header_lines[0]
|
||||
|
||||
for line in header_lines[1:]:
|
||||
line = line.strip()
|
||||
if not line.startswith("#"):
|
||||
break
|
||||
|
||||
# metadata lines, like:
|
||||
#
|
||||
# Flog Txt Version 1
|
||||
# Analyzer Version: 2024.4.1
|
||||
# Analyzer Build Date: Sep 2 2024 06:30:10
|
||||
# Log Creation Date: 08.10.2024 18:12:03.945c
|
||||
logger.debug("%s", line)
|
||||
|
||||
processes = parse_processes(txt)
|
||||
processes_by_id = {process.id: process for process in processes}
|
||||
regions_by_id = {region.id: region for process in processes for region in process.regions}
|
||||
threads_by_id = {thread.id: thread for process in processes for thread in process.threads}
|
||||
|
||||
return Flog(
|
||||
processes=processes,
|
||||
processes_by_id=processes_by_id,
|
||||
regions_by_id=regions_by_id,
|
||||
threads_by_id=threads_by_id,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
flog_path = Path(sys.argv[1])
|
||||
flog = parse_flog(flog_path.read_text(encoding="utf-8"))
|
||||
|
||||
for process in flog.processes:
|
||||
print(f"{process.id=} {len(process.regions)=} {len(process.threads)=}")
|
||||
|
||||
for region in process.regions:
|
||||
print(f" {region.id=} {region.name}")
|
||||
|
||||
for thread in process.threads:
|
||||
print(f" {thread.id=} {len(thread.events)=}")
|
||||
|
||||
|
||||
def test_event_timestamp():
|
||||
event = parse_event(" [0072.750] GetCurrentProcess () returned 0xffffffffffffffff")
|
||||
assert event.timestamp == (72, 750)
|
||||
|
||||
|
||||
def test_event_api():
|
||||
event = parse_event(" [0072.750] GetCurrentProcess () returned 0xffffffffffffffff")
|
||||
assert event.api == "GetCurrentProcess"
|
||||
|
||||
|
||||
def test_event_empty_args():
|
||||
event = parse_event(" [0072.750] GetCurrentProcess () returned 0xffffffffffffffff")
|
||||
assert len(event.args) == 0
|
||||
|
||||
|
||||
# single arg
|
||||
# [0074.875] GetSystemMetrics (nIndex=75) returned 1
|
||||
|
||||
# no return value
|
||||
# [0083.567] CoTaskMemFree (pv=0x746aa0)
|
||||
|
||||
# two args
|
||||
# [0085.491] GetWindowLongPtrW (hWnd=0x401f0, nIndex=-16) returned 0x6c10000
|
||||
|
||||
# in/out
|
||||
# [0086.848] GetClientRect (in: hWnd=0x401f0, lpRect=0x14d0c0 | out: lpRect=0x14d0c0) returned 1
|
||||
|
||||
# string
|
||||
# [0102.753] FindAtomW (lpString="GDI+Atom_4492_1") returned 0xc000
|
||||
|
||||
# int (hex)
|
||||
# [0102.756] GdipDeleteFont (font=0x1c504e00) returned 0x0
|
||||
|
||||
# int (decimal)
|
||||
# [0074.875] GetSystemMetrics (nIndex=75) returned 1
|
||||
|
||||
# int (negative)
|
||||
# [0085.491] GetWindowLongPtrW (hWnd=0x401f0, nIndex=-16) returned 0x6c10000
|
||||
|
||||
# struct
|
||||
# [0067.024] GetVersionExW (in: lpVersionInformation=0x14e3f0*(dwOSVersionInfoSize=0x114, dwMajorVersion=0x0, dwMinorVersion=0x0, dwBuildNumber=0x0, dwPlatformId=0x0, szCSDVersion="") | out: lpVersionInformation=0x14e3f0*(dwOSVersionInfoSize=0x114, dwMajorVersion=0x6, dwMinorVersion=0x2, dwBuildNumber=0x23f0, dwPlatformId=0x2, szCSDVersion="")) returned 1
|
||||
|
||||
# nested struct
|
||||
# [0111.527] CoCreateGuid (in: pguid=0x14c910 | out: pguid=0x14c910*(Data1=0x63ac5b46, Data2=0xc417, Data3=0x49b0, Data4=([0]=0xac, [1]=0xbf, [2]=0xb8, [3]=0xf3, [4]=0x8b, [5]=0x1a, [6]=0x51, [7]=0x78))) returned 0x0
|
||||
|
||||
# bytes
|
||||
# [0111.527] CoCreateGuid (in: pguid=0x14c910 | out: pguid=0x14c910*(Data1=0x63ac5b46, Data2=0xc417, Data3=0x49b0, Data4=([0]=0xac, [1]=0xbf, [2]=0xb8, [3]=0xf3, [4]=0x8b, [5]=0x1a, [6]=0x51, [7]=0x78))) returned 0x0
|
||||
@@ -1,184 +0,0 @@
|
||||
import sys
|
||||
import sqlite3
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
|
||||
import pefile
|
||||
|
||||
import capa.main
|
||||
|
||||
|
||||
@dataclass
|
||||
class AssemblageRow:
|
||||
# from table: binaries
|
||||
binary_id: int
|
||||
file_name: str
|
||||
platform: str
|
||||
build_mode: str
|
||||
toolset_version: str
|
||||
github_url: str
|
||||
optimization: str
|
||||
repo_last_update: int
|
||||
size: int
|
||||
path: str
|
||||
license: str
|
||||
binary_hash: str
|
||||
repo_commit_hash: str
|
||||
# from table: functions
|
||||
function_id: int
|
||||
function_name: str
|
||||
function_hash: str
|
||||
top_comments: str
|
||||
source_codes: str
|
||||
prototype: str
|
||||
_source_file: str
|
||||
# from table: rvas
|
||||
rva_id: int
|
||||
start_rva: int
|
||||
end_rva: int
|
||||
|
||||
@property
|
||||
def source_file(self):
|
||||
# cleanup some extra metadata provided by assemblage
|
||||
return self._source_file.partition(" (MD5: ")[0].partition(" (0x3: ")[0]
|
||||
|
||||
|
||||
class Assemblage:
|
||||
conn: sqlite3.Connection
|
||||
samples: Path
|
||||
|
||||
def __init__(self, db: Path, samples: Path):
|
||||
super().__init__()
|
||||
|
||||
self.db = db
|
||||
self.samples = samples
|
||||
|
||||
self.conn = sqlite3.connect(self.db)
|
||||
with self.conn:
|
||||
self.conn.executescript("""
|
||||
PRAGMA journal_mode = WAL;
|
||||
PRAGMA synchronous = NORMAL;
|
||||
PRAGMA busy_timeout = 5000;
|
||||
PRAGMA cache_size = -20000; -- 20MB
|
||||
PRAGMA foreign_keys = true;
|
||||
PRAGMA temp_store = memory;
|
||||
|
||||
BEGIN IMMEDIATE TRANSACTION;
|
||||
CREATE INDEX IF NOT EXISTS idx__functions__binary_id ON functions (binary_id);
|
||||
CREATE INDEX IF NOT EXISTS idx__rvas__function_id ON rvas (function_id);
|
||||
|
||||
CREATE VIEW IF NOT EXISTS assemblage AS
|
||||
SELECT
|
||||
binaries.id AS binary_id,
|
||||
binaries.file_name AS file_name,
|
||||
binaries.platform AS platform,
|
||||
binaries.build_mode AS build_mode,
|
||||
binaries.toolset_version AS toolset_version,
|
||||
binaries.github_url AS github_url,
|
||||
binaries.optimization AS optimization,
|
||||
binaries.repo_last_update AS repo_last_update,
|
||||
binaries.size AS size,
|
||||
binaries.path AS path,
|
||||
binaries.license AS license,
|
||||
binaries.hash AS hash,
|
||||
binaries.repo_commit_hash AS repo_commit_hash,
|
||||
|
||||
functions.id AS function_id,
|
||||
functions.name AS function_name,
|
||||
functions.hash AS function_hash,
|
||||
functions.top_comments AS top_comments,
|
||||
functions.source_codes AS source_codes,
|
||||
functions.prototype AS prototype,
|
||||
functions.source_file AS source_file,
|
||||
|
||||
rvas.id AS rva_id,
|
||||
rvas.start AS start_rva,
|
||||
rvas.end AS end_rva
|
||||
FROM binaries
|
||||
JOIN functions ON binaries.id = functions.binary_id
|
||||
JOIN rvas ON functions.id = rvas.function_id;
|
||||
""")
|
||||
|
||||
def get_row_by_binary_id(self, binary_id: int) -> AssemblageRow:
|
||||
with self.conn:
|
||||
cur = self.conn.execute("SELECT * FROM assemblage WHERE binary_id = ? LIMIT 1;", (binary_id, ))
|
||||
return AssemblageRow(*cur.fetchone())
|
||||
|
||||
def get_rows_by_binary_id(self, binary_id: int) -> AssemblageRow:
|
||||
with self.conn:
|
||||
cur = self.conn.execute("SELECT * FROM assemblage WHERE binary_id = ?;", (binary_id, ))
|
||||
row = cur.fetchone()
|
||||
while row:
|
||||
yield AssemblageRow(*row)
|
||||
row = cur.fetchone()
|
||||
|
||||
def get_path_by_binary_id(self, binary_id: int) -> Path:
|
||||
with self.conn:
|
||||
cur = self.conn.execute("""SELECT path FROM assemblage WHERE binary_id = ? LIMIT 1""", (binary_id, ))
|
||||
return self.samples / cur.fetchone()[0]
|
||||
|
||||
def get_pe_by_binary_id(self, binary_id: int) -> pefile.PE:
|
||||
path = self.get_path_by_binary_id(binary_id)
|
||||
return pefile.PE(data=path.read_bytes(), fast_load=True)
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Inspect object boundaries in compiled programs")
|
||||
capa.main.install_common_args(parser, wanted={})
|
||||
parser.add_argument("assemblage_database", type=Path, help="path to Assemblage database")
|
||||
parser.add_argument("assemblage_directory", type=Path, help="path to Assemblage samples directory")
|
||||
parser.add_argument("binary_id", type=int, help="primary key of binary to inspect")
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
if not args.assemblage_database.is_file():
|
||||
raise ValueError("database doesn't exist")
|
||||
|
||||
db = Assemblage(args.assemblage_database, args.assemblage_directory)
|
||||
# print(db.get_row_by_binary_id(args.binary_id))
|
||||
# print(db.get_pe_by_binary_id(args.binary_id))
|
||||
|
||||
@dataclass
|
||||
class Function:
|
||||
file: str
|
||||
name: str
|
||||
start_rva: int
|
||||
end_rva: int
|
||||
|
||||
functions = [
|
||||
Function(
|
||||
file=m.source_file,
|
||||
name=m.function_name,
|
||||
start_rva=m.start_rva,
|
||||
end_rva=m.end_rva,
|
||||
)
|
||||
for m in db.get_rows_by_binary_id(args.binary_id)
|
||||
]
|
||||
|
||||
import rich
|
||||
import rich.table
|
||||
|
||||
print(db.get_path_by_binary_id(args.binary_id))
|
||||
|
||||
t = rich.table.Table()
|
||||
t.add_column("rva")
|
||||
t.add_column("filename")
|
||||
t.add_column("name")
|
||||
|
||||
for function in sorted(functions, key=lambda f: f.start_rva):
|
||||
t.add_row(hex(function.start_rva), function.file, function.name)
|
||||
|
||||
rich.print(t)
|
||||
|
||||
# db.conn.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user