mirror of
https://github.com/mandiant/capa.git
synced 2025-12-23 07:28:34 -08:00
161 lines
4.9 KiB
Python
161 lines
4.9 KiB
Python
#!/usr/bin/env python
|
|
'''
|
|
migrate rules and their namespaces.
|
|
|
|
example:
|
|
|
|
$ python scripts/migrate-rules.py migration.csv ./rules ./new-rules
|
|
'''
|
|
import os
|
|
import os.path
|
|
import sys
|
|
import csv
|
|
import logging
|
|
import collections
|
|
|
|
import argparse
|
|
|
|
import capa.rules
|
|
|
|
|
|
logger = logging.getLogger('migrate-rules')
|
|
|
|
|
|
|
|
def read_plan(plan_path):
|
|
with open(plan_path, 'rb') as f:
|
|
return list(csv.DictReader(f, restkey='other', fieldnames=(
|
|
'existing path',
|
|
'existing name',
|
|
'existing rule-category',
|
|
'proposed name',
|
|
'proposed namespace',
|
|
'ATT&CK',
|
|
'MBC',
|
|
'comment1',
|
|
)))
|
|
|
|
|
|
def read_rules(rule_directory):
|
|
rules = {}
|
|
for root, dirs, files in os.walk(rule_directory):
|
|
for file in files:
|
|
path = os.path.join(root, file)
|
|
if not path.endswith('.yml'):
|
|
logger.info('skipping file: %s', path)
|
|
continue
|
|
|
|
rule = capa.rules.Rule.from_yaml_file(path)
|
|
rules[rule.name] = rule
|
|
|
|
if 'nursery' in path:
|
|
rule.meta['capa/nursery'] = True
|
|
return rules
|
|
|
|
|
|
def main(argv=None):
|
|
if argv is None:
|
|
argv = sys.argv[1:]
|
|
|
|
parser = argparse.ArgumentParser(description='migrate rules.')
|
|
parser.add_argument('plan', type=str,
|
|
help='Path to CSV describing migration')
|
|
parser.add_argument('source', type=str,
|
|
help='Source directory of rules')
|
|
parser.add_argument('destination', type=str,
|
|
help='Destination directory of rules')
|
|
args = parser.parse_args(args=argv)
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
plan = read_plan(args.plan)
|
|
logger.info('read %d plan entries', len(plan))
|
|
|
|
rules = read_rules(args.source)
|
|
logger.info('read %d rules', len(rules))
|
|
|
|
planned_rules = set([row['existing name'] for row in plan])
|
|
unplanned_rules = [rule for (name, rule) in rules.items() if name not in planned_rules]
|
|
|
|
if unplanned_rules:
|
|
logger.error('plan does not account for %d rules:' % (len(unplanned_rules)))
|
|
for rule in unplanned_rules:
|
|
logger.error(' ' + rule.name)
|
|
return -1
|
|
|
|
# pairs of strings (needle, replacement)
|
|
match_translations = []
|
|
|
|
for row in plan:
|
|
if not row['existing name']:
|
|
continue
|
|
|
|
rule = rules[row['existing name']]
|
|
|
|
if rule.meta['name'] != row['proposed name']:
|
|
logger.info("renaming rule '%s' -> '%s'", rule.meta['name'], row['proposed name'])
|
|
|
|
# assume the yaml is formatted like `- match: $rule-name`.
|
|
# but since its been linted, this should be ok.
|
|
match_translations.append(
|
|
('- match: ' + rule.meta['name'],
|
|
'- match: ' + row['proposed name']))
|
|
|
|
rule.meta['name'] = row['proposed name']
|
|
rule.name = row['proposed name']
|
|
|
|
if 'rule-category' in rule.meta:
|
|
logger.info("deleting rule category '%s'", rule.meta['rule-category'])
|
|
del rule.meta['rule-category']
|
|
|
|
rule.meta['namespace'] = row['proposed namespace']
|
|
|
|
if row['ATT&CK'] != 'n/a' and row['ATT&CK'] != '':
|
|
tag = row['ATT&CK']
|
|
name, _, id = tag.rpartition(' ')
|
|
tag = '%s [%s]' % (name, id)
|
|
rule.meta['att&ck'] = [tag]
|
|
|
|
if row['MBC'] != 'n/a' and row['MBC'] != '':
|
|
tag = row['MBC']
|
|
rule.meta['mbc'] = [tag]
|
|
|
|
for rule in rules.values():
|
|
filename = rule.name
|
|
filename = filename.lower()
|
|
filename = filename.replace(' ', '-')
|
|
filename = filename.replace('(', '')
|
|
filename = filename.replace(')', '')
|
|
filename = filename.replace('+', '')
|
|
filename = filename.replace('/', '')
|
|
filename = filename + '.yml'
|
|
|
|
try:
|
|
if rule.meta.get('capa/nursery'):
|
|
directory = os.path.join(args.destination, 'nursery')
|
|
elif rule.meta.get('lib'):
|
|
directory = os.path.join(args.destination, 'lib')
|
|
else:
|
|
directory = os.path.join(args.destination, rule.meta.get('namespace'))
|
|
os.makedirs(directory)
|
|
except OSError:
|
|
pass
|
|
else:
|
|
logger.info('created namespace: %s', directory)
|
|
|
|
path = os.path.join(directory, filename)
|
|
logger.info('writing rule %s', path)
|
|
|
|
doc = rule.to_yaml().decode('utf-8')
|
|
for (needle, replacement) in match_translations:
|
|
doc = doc.replace(needle, replacement)
|
|
|
|
with open(path, 'wb') as f:
|
|
f.write(doc.encode('utf-8'))
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main()) |