mirror of
https://github.com/mandiant/capa.git
synced 2026-02-04 19:12:01 -08:00
add vverbose rendering
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import hashlib
|
||||
import logging
|
||||
from typing import Any, Dict, List, Tuple, Iterator
|
||||
|
||||
@@ -19,12 +20,12 @@ import capa.features.extractors.viv.function
|
||||
import capa.features.extractors.viv.basicblock
|
||||
from capa.features.common import Feature
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VivisectFeatureExtractor(StaticFeatureExtractor):
|
||||
class VivisectFeatureExtractor(FeatureExtractor):
|
||||
def __init__(self, vw, path, os):
|
||||
super().__init__()
|
||||
self.vw = vw
|
||||
@@ -32,6 +33,12 @@ class VivisectFeatureExtractor(StaticFeatureExtractor):
|
||||
with open(self.path, "rb") as f:
|
||||
self.buf = f.read()
|
||||
|
||||
self.sample_hashes = (
|
||||
hashlib.md5().update(self.buf).hexdigest(),
|
||||
hashlib.sha1().update(self.buf).hexdigest(),
|
||||
hashlib.sha256().update(self.buf).hexdigest(),
|
||||
)
|
||||
|
||||
# pre-compute these because we'll yield them at *every* scope.
|
||||
self.global_features: List[Tuple[Feature, Address]] = []
|
||||
self.global_features.extend(capa.features.extractors.viv.file.extract_file_format(self.buf))
|
||||
@@ -42,6 +49,9 @@ class VivisectFeatureExtractor(StaticFeatureExtractor):
|
||||
# assume there is only one file loaded into the vw
|
||||
return AbsoluteVirtualAddress(list(self.vw.filemeta.values())[0]["imagebase"])
|
||||
|
||||
def get_sample_hashes(self) -> Tuple[str, str, str]:
|
||||
return self.sample_hashes
|
||||
|
||||
def extract_global_features(self):
|
||||
yield from self.global_features
|
||||
|
||||
|
||||
@@ -1022,7 +1022,7 @@ def compute_dynamic_layout(rules, extractor, capabilities) -> rdoc.Layout:
|
||||
matched_threads = set()
|
||||
for rule_name, matches in capabilities.items():
|
||||
rule = rules[rule_name]
|
||||
if capa.rules.BASIC_BLOCK_SCOPE in rule.meta.get("scopes")["dynamic"]:
|
||||
if capa.rules.THREAD_SCOPE in rule.meta.get("scopes")["dynamic"]:
|
||||
for addr, _ in matches:
|
||||
assert addr in processes_by_thread
|
||||
matched_threads.add(addr)
|
||||
|
||||
@@ -122,6 +122,7 @@ def scope_to_pb2(scope: capa.rules.Scope) -> capa_pb2.Scope.ValueType:
|
||||
|
||||
|
||||
def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
|
||||
assert isinstance(meta.analysis, rd.StaticAnalysis)
|
||||
return capa_pb2.Metadata(
|
||||
timestamp=str(meta.timestamp),
|
||||
version=meta.version,
|
||||
|
||||
@@ -259,7 +259,8 @@ def render_rules(ostream, doc: rd.ResultDocument):
|
||||
check for OutputDebugString error
|
||||
namespace anti-analysis/anti-debugging/debugger-detection
|
||||
author michael.hunhoff@mandiant.com
|
||||
scope function
|
||||
static scope: function
|
||||
dynamic scope: process
|
||||
mbc Anti-Behavioral Analysis::Detect Debugger::OutputDebugString
|
||||
function @ 0x10004706
|
||||
and:
|
||||
@@ -268,14 +269,24 @@ def render_rules(ostream, doc: rd.ResultDocument):
|
||||
api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895
|
||||
"""
|
||||
|
||||
assert isinstance(doc.meta.analysis, rd.StaticAnalysis)
|
||||
functions_by_bb: Dict[capa.features.address.Address, capa.features.address.Address] = {}
|
||||
for finfo in doc.meta.analysis.layout.functions:
|
||||
faddress = finfo.address.to_capa()
|
||||
processes_by_thread: Dict[capa.features.address.Address, capa.features.address.Address] = {}
|
||||
if isinstance(doc.meta.analysis, rd.StaticAnalysis):
|
||||
for finfo in doc.meta.analysis.layout.functions:
|
||||
faddress = finfo.address.to_capa()
|
||||
|
||||
for bb in finfo.matched_basic_blocks:
|
||||
bbaddress = bb.address.to_capa()
|
||||
functions_by_bb[bbaddress] = faddress
|
||||
for bb in finfo.matched_basic_blocks:
|
||||
bbaddress = bb.address.to_capa()
|
||||
functions_by_bb[bbaddress] = faddress
|
||||
elif isinstance(doc.meta.analysis, rd.DynamicAnalysis):
|
||||
for pinfo in doc.meta.analysis.layout.processes:
|
||||
paddress = pinfo.address.to_capa()
|
||||
|
||||
for thread in pinfo.matched_threads:
|
||||
taddress = thread.address.to_capa()
|
||||
processes_by_thread[taddress] = paddress
|
||||
else:
|
||||
raise ValueError("invalid analysis field in the document's meta")
|
||||
|
||||
had_match = False
|
||||
|
||||
@@ -324,7 +335,11 @@ def render_rules(ostream, doc: rd.ResultDocument):
|
||||
|
||||
rows.append(("author", ", ".join(rule.meta.authors)))
|
||||
|
||||
rows.append(("scopes", str(rule.meta.scopes)))
|
||||
if rule.meta.scopes.static:
|
||||
rows.append(("static scope:", str(rule.meta.scopes.static)))
|
||||
|
||||
if rule.meta.scopes.dynamic:
|
||||
rows.append(("dynamic scope:", str(rule.meta.scopes.dynamic)))
|
||||
|
||||
if rule.meta.attack:
|
||||
rows.append(("att&ck", ", ".join([rutils.format_parts_id(v) for v in rule.meta.attack])))
|
||||
@@ -352,7 +367,8 @@ def render_rules(ostream, doc: rd.ResultDocument):
|
||||
render_match(ostream, first_match, indent=0)
|
||||
else:
|
||||
for location, match in sorted(doc.rules[rule.meta.name].matches):
|
||||
ostream.write(rule.meta.scopes)
|
||||
ostream.write(f"static scope: {rule.meta.scopes.static}")
|
||||
ostream.write(f"dynamic scope: {rule.meta.scopes.dynamic}")
|
||||
ostream.write(" @ ")
|
||||
ostream.write(capa.render.verbose.format_address(location))
|
||||
|
||||
@@ -362,6 +378,14 @@ def render_rules(ostream, doc: rd.ResultDocument):
|
||||
+ capa.render.verbose.format_address(frz.Address.from_capa(functions_by_bb[location.to_capa()]))
|
||||
)
|
||||
|
||||
if capa.rules.THREAD_SCOPE in rule.meta.scopes:
|
||||
ostream.write(
|
||||
" in process "
|
||||
+ capa.render.verbose.format_address(
|
||||
frz.Address.from_capa(processes_by_thread[location.to_capa()])
|
||||
)
|
||||
)
|
||||
|
||||
ostream.write("\n")
|
||||
render_match(ostream, match, indent=1)
|
||||
if rule.meta.lib:
|
||||
|
||||
Reference in New Issue
Block a user