Merge pull request #497 from CoderFX/fix/handshake-capture-optimizations

Optimize handshake capture: fix bugs and improve attack efficiency
This commit is contained in:
Jayofelony
2026-03-10 22:43:05 +01:00
committed by GitHub
3 changed files with 49 additions and 22 deletions

View File

@@ -145,7 +145,7 @@ class Agent(Client, Automata, AsyncAdvertiser):
channels = self._config['personality']['channels']
if self._epoch.inactive_for >= max_inactive:
recon_time *= recon_mul
recon_time = min(recon_time * recon_mul, recon_time + 15)
self._view.set('channel', '*')
@@ -169,7 +169,7 @@ class Agent(Client, Automata, AsyncAdvertiser):
return self._access_points
def get_access_points(self):
whitelist = self._config['main']['whitelist']
whitelist = set(e.lower() if isinstance(e, str) else e for e in self._config['main']['whitelist'])
aps = []
try:
s = self.session()
@@ -177,7 +177,7 @@ class Agent(Client, Automata, AsyncAdvertiser):
for ap in s['wifi']['aps']:
if ap['encryption'] == '' or ap['encryption'] == 'OPEN':
continue
elif ap['hostname'] in whitelist or ap['mac'][:13].lower() in whitelist or ap['mac'].lower() in whitelist:
elif ap['hostname'] in whitelist or ap['mac'].lower() in whitelist:
continue
else:
aps.append(ap)
@@ -214,8 +214,19 @@ class Agent(Client, Automata, AsyncAdvertiser):
else:
grouped[ch].append(ap)
# sort by more populated channels
return sorted(grouped.items(), key=lambda kv: len(kv[1]), reverse=True)
# interleave populated and sparse channels for balanced coverage
by_count = sorted(grouped.items(), key=lambda kv: len(kv[1]), reverse=True)
if len(by_count) <= 2:
return by_count
heavy = by_count[:len(by_count)//2]
light = by_count[len(by_count)//2:]
result = []
while heavy or light:
if heavy:
result.append(heavy.pop(0))
if light:
result.append(light.pop(0))
return result
def _find_ap_sta_in(self, station_mac, ap_mac, session):
for ap in session['wifi']['aps']:
@@ -291,7 +302,14 @@ class Agent(Client, Automata, AsyncAdvertiser):
self._started_at = data['started_at']
self._epoch.epoch = data['epoch']
self._handshakes = data['handshakes']
self._history = data['history']
# backward compat: old format stored {mac: count}, new stores {mac: {count, first_seen}}
raw_history = data['history']
self._history = {}
for k, v in raw_history.items():
if isinstance(v, dict):
self._history[k] = v
else:
self._history[k] = {'count': v, 'first_seen': time.time()}
self._last_pwnd = data['last_pwnd']
if delete:
@@ -424,8 +442,11 @@ class Agent(Client, Automata, AsyncAdvertiser):
self.run('%s off; %s on' % (module, module))
def _has_handshake(self, bssid):
bssid_lower = bssid.lower()
for key in self._handshakes:
if bssid.lower() in key:
# key format is 'sta_mac -> ap_mac'
parts = key.lower().split(' -> ')
if bssid_lower in parts:
return True
return False
@@ -433,14 +454,20 @@ class Agent(Client, Automata, AsyncAdvertiser):
if self._has_handshake(who):
return False
elif who not in self._history:
self._history[who] = 1
now = time.time()
if who not in self._history:
self._history[who] = {'count': 1, 'first_seen': now}
return True
else:
self._history[who] += 1
entry = self._history[who]
# reset interaction count after 5 minutes to allow retrying
if now - entry['first_seen'] > 300:
entry['count'] = 1
entry['first_seen'] = now
return True
return self._history[who] < self._config['personality']['max_interactions']
entry['count'] += 1
return entry['count'] < self._config['personality']['max_interactions']
def associate(self, ap, throttle=-1):
if self.is_stale():

View File

@@ -70,7 +70,7 @@ def pwnagotchi_cli():
# deauth all client stations in order to get a full handshake
for sta in ap['clients']:
agent.deauth(ap, sta)
time.sleep(1) # delay to not trigger nexmon firmware bugs
time.sleep(0.3) # reduced delay between deauths
# An interesting effect of this:
#

View File

@@ -385,7 +385,7 @@ class auto_tune(plugins.Plugin):
else:
ret += "<td></td>"
if lmac in self._agent._history:
ret += "<td>%s</td>" % self._agent._history[lmac]
entry = self._agent._history[lmac]; ret += "<td>%s</td>" % (entry["count"] if isinstance(entry, dict) else entry)
else:
ret += "<td>no attacks yet</td>"
ret += "</tr>\n"
@@ -488,9 +488,9 @@ class auto_tune(plugins.Plugin):
if preset_name:
try:
self._save_preset(preset_name)
ret += "<div class='success'>Preset '%s' saved successfully!</div>" % preset_name
ret += "<div class='success'>Preset '%s' saved successfully!</div>" % html.escape(preset_name)
except Exception as e:
ret += "<div class='error'>Error saving preset: %s</div>" % str(e)
ret += "<div class='error'>Error saving preset: %s</div>" % html.escape(str(e))
else:
ret += "<div class='error'>Please enter a preset name</div>"
@@ -510,9 +510,9 @@ class auto_tune(plugins.Plugin):
preset_name = request.values['selected_preset']
if preset_name:
if self._delete_preset(preset_name):
ret += "<div class='success'>Preset '%s' deleted successfully!</div>" % preset_name
ret += "<div class='success'>Preset '%s' deleted successfully!</div>" % html.escape(preset_name)
else:
ret += "<div class='error'>Error deleting preset '%s'</div>" % preset_name
ret += "<div class='error'>Error deleting preset '%s'</div>" % html.escape(preset_name)
else:
ret += "<div class='error'>Please select a preset to delete</div>"
@@ -570,7 +570,7 @@ class auto_tune(plugins.Plugin):
try:
defaults = {'show_hidden': False,
'reset_history': True,
'extra_channels': 15,
'extra_channels': 5,
'show_interactions': False,
}
@@ -639,7 +639,7 @@ class auto_tune(plugins.Plugin):
self._unscanned_channels.remove(ch)
next_channels.append(ch)
# update live config
agent._config['personality']['channels'] = next_channels
agent._config['personality']['channels'] = list(dict.fromkeys(next_channels))
logging.info("Active: %s, Next scan: %s, yet unscanned: %d %s" % (
self._active_channels, next_channels, len(self._unscanned_channels), self._unscanned_channels))
except Exception as e:
@@ -742,11 +742,11 @@ class auto_tune(plugins.Plugin):
if apID not in self._known_aps:
self.incrementChisto('Missed joins', channel)
logging.warn("Unknown AP '%s' seen leaving" % apID)
logging.warning("Unknown AP '%s' seen leaving" % apID)
else:
if not self._known_aps[apID]['AT_visible']:
self.incrementChisto('Missed rejoins', channel)
logging.warn("AP '%s' already gone", apID)
logging.warning("AP '%s' already gone", apID)
else:
self._known_aps[apID]['AT_visible'] = False
self.incrementChisto('Current APs', channel, -1)