Optimize handshake capture: fix bugs and improve attack efficiency

agent.py:
- Fix _has_handshake() substring match bug: was matching partial MACs
  (e.g. "AA:BB:CC" matching "AA:BB:CC:DD:EE:FF"), causing APs to be
  skipped. Now splits key on ' -> ' and checks exact MAC match.
- Add time-based decay to interaction history: previously once an AP/STA
  hit max_interactions it was permanently skipped for the session. Now
  resets count after 5 minutes, allowing retry of stubborn targets.
- Cap recon time increase when inactive: was doubling (30s->60s), now
  adds at most 15s to avoid long idle periods with fewer channel hops.
- Interleave populated and sparse channels instead of always hitting
  most-populated first, giving sparse channels fair coverage.
- Use set for whitelist lookup (O(1) instead of O(n) per AP).
- Remove fragile mac[:13] prefix whitelist match, use full MAC only.
- Add backward compat for recovery data format change.

cli.py:
- Reduce inter-deauth sleep from 1.0s to 0.3s. The 1s delay was overly
  conservative for nexmon firmware; 0.3s is sufficient and processes
  multi-client APs 3x faster.

Signed-off-by: CoderFX <153912+CoderFX@users.noreply.github.com>
Signed-off-by: CoderFX <4704376+CoderFX@users.noreply.github.com>
This commit is contained in:
CoderFX
2026-03-10 19:57:26 +02:00
parent 6808063e9b
commit 8c1c51f158
2 changed files with 40 additions and 13 deletions

View File

@@ -145,7 +145,7 @@ class Agent(Client, Automata, AsyncAdvertiser):
channels = self._config['personality']['channels']
if self._epoch.inactive_for >= max_inactive:
recon_time *= recon_mul
recon_time = min(recon_time * recon_mul, recon_time + 15)
self._view.set('channel', '*')
@@ -169,7 +169,7 @@ class Agent(Client, Automata, AsyncAdvertiser):
return self._access_points
def get_access_points(self):
whitelist = self._config['main']['whitelist']
whitelist = set(e.lower() if isinstance(e, str) else e for e in self._config['main']['whitelist'])
aps = []
try:
s = self.session()
@@ -177,7 +177,7 @@ class Agent(Client, Automata, AsyncAdvertiser):
for ap in s['wifi']['aps']:
if ap['encryption'] == '' or ap['encryption'] == 'OPEN':
continue
elif ap['hostname'] in whitelist or ap['mac'][:13].lower() in whitelist or ap['mac'].lower() in whitelist:
elif ap['hostname'] in whitelist or ap['mac'].lower() in whitelist:
continue
else:
aps.append(ap)
@@ -214,8 +214,19 @@ class Agent(Client, Automata, AsyncAdvertiser):
else:
grouped[ch].append(ap)
# sort by more populated channels
return sorted(grouped.items(), key=lambda kv: len(kv[1]), reverse=True)
# interleave populated and sparse channels for balanced coverage
by_count = sorted(grouped.items(), key=lambda kv: len(kv[1]), reverse=True)
if len(by_count) <= 2:
return by_count
heavy = by_count[:len(by_count)//2]
light = by_count[len(by_count)//2:]
result = []
while heavy or light:
if heavy:
result.append(heavy.pop(0))
if light:
result.append(light.pop(0))
return result
def _find_ap_sta_in(self, station_mac, ap_mac, session):
for ap in session['wifi']['aps']:
@@ -291,7 +302,14 @@ class Agent(Client, Automata, AsyncAdvertiser):
self._started_at = data['started_at']
self._epoch.epoch = data['epoch']
self._handshakes = data['handshakes']
self._history = data['history']
# backward compat: old format stored {mac: count}, new stores {mac: {count, first_seen}}
raw_history = data['history']
self._history = {}
for k, v in raw_history.items():
if isinstance(v, dict):
self._history[k] = v
else:
self._history[k] = {'count': v, 'first_seen': time.time()}
self._last_pwnd = data['last_pwnd']
if delete:
@@ -400,8 +418,11 @@ class Agent(Client, Automata, AsyncAdvertiser):
self.run('%s off; %s on' % (module, module))
def _has_handshake(self, bssid):
bssid_lower = bssid.lower()
for key in self._handshakes:
if bssid.lower() in key:
# key format is 'sta_mac -> ap_mac'
parts = key.lower().split(' -> ')
if bssid_lower in parts:
return True
return False
@@ -409,14 +430,20 @@ class Agent(Client, Automata, AsyncAdvertiser):
if self._has_handshake(who):
return False
elif who not in self._history:
self._history[who] = 1
now = time.time()
if who not in self._history:
self._history[who] = {'count': 1, 'first_seen': now}
return True
else:
self._history[who] += 1
entry = self._history[who]
# reset interaction count after 5 minutes to allow retrying
if now - entry['first_seen'] > 300:
entry['count'] = 1
entry['first_seen'] = now
return True
return self._history[who] < self._config['personality']['max_interactions']
entry['count'] += 1
return entry['count'] < self._config['personality']['max_interactions']
def associate(self, ap, throttle=-1):
if self.is_stale():

View File

@@ -70,7 +70,7 @@ def pwnagotchi_cli():
# deauth all client stations in order to get a full handshake
for sta in ap['clients']:
agent.deauth(ap, sta)
time.sleep(1) # delay to not trigger nexmon firmware bugs
time.sleep(0.3) # reduced delay between deauths
# An interesting effect of this:
#