Merge pull request #431 from jacopotediosi/improve/wpa-sec

Multiple WPA-SEC improvements
This commit is contained in:
Jayofelony
2025-09-19 23:22:20 +02:00
committed by GitHub
2 changed files with 205 additions and 104 deletions

View File

@@ -157,6 +157,7 @@ api_key = ""
api_url = "https://wpa-sec.stanev.org"
download_results = false
show_pwd = false
single_files = false
[main.log]
path = "/etc/pwnagotchi/log/pwnagotchi.log"

View File

@@ -1,58 +1,187 @@
import os
import logging
import re
import requests
import sqlite3
from datetime import datetime
from enum import Enum
from threading import Lock
from pwnagotchi.utils import StatusFile, remove_whitelisted
from pwnagotchi.utils import remove_whitelisted
from pwnagotchi import plugins
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
import pwnagotchi.ui.fonts as fonts
from json.decoder import JSONDecodeError
class WpaSec(plugins.Plugin):
__author__ = '33197631+dadav@users.noreply.github.com'
__editor__ = 'jayofelony'
__version__ = '2.1.1'
__version__ = '2.1.2'
__license__ = 'GPL3'
__description__ = 'This plugin automatically uploads handshakes to https://wpa-sec.stanev.org'
class Status(Enum):
TOUPLOAD = 0
INVALID = 1
SUCCESSFULL = 2
def __init__(self):
self.ready = False
self.lock = Lock()
try:
self.report = StatusFile('/home/pi/.wpa_sec_uploads', data_format='json')
except JSONDecodeError:
os.remove("/home/pi/.wpa_sec_uploads")
self.report = StatusFile('/home/pi/.wpa_sec_uploads', data_format='json')
self.options = dict()
self.skip = list()
self._init_db()
def _init_db(self):
db_conn = sqlite3.connect('/home/pi/.wpa_sec_db')
db_conn.execute('pragma journal_mode=wal')
with db_conn:
db_conn.execute('''
CREATE TABLE IF NOT EXISTS handshakes (
path TEXT PRIMARY KEY,
status INTEGER
)
''')
db_conn.execute('''
CREATE INDEX IF NOT EXISTS idx_handshakes_status
ON handshakes (status)
''')
db_conn.close()
def on_loaded(self):
"""
Gets called when the plugin gets loaded
"""
if 'api_key' not in self.options or ('api_key' in self.options and not self.options['api_key']):
logging.error("WPA_SEC: API-KEY isn't set. Can't upload.")
return
if 'api_url' not in self.options or ('api_url' in self.options and not self.options['api_url']):
logging.error("WPA_SEC: API-URL isn't set. Can't upload.")
return
self.skip_until_reload = set()
self.ready = True
logging.info("WPA_SEC: plugin loaded.")
def on_handshake(self, agent, filename, access_point, client_station):
config = agent.config()
if not remove_whitelisted([filename], config['main']['whitelist']):
return
db_conn = sqlite3.connect('/home/pi/.wpa_sec_db')
with db_conn:
db_conn.execute('''
INSERT INTO handshakes (path, status)
VALUES (?, ?)
ON CONFLICT(path) DO UPDATE SET status = excluded.status
WHERE handshakes.status = ?
''', (filename, self.Status.TOUPLOAD.value, self.Status.INVALID.value))
db_conn.close()
def on_internet_available(self, agent):
"""
Called when there's internet connectivity
"""
if not self.ready or self.lock.locked():
return
with self.lock:
display = agent.view()
try:
db_conn = sqlite3.connect('/home/pi/.wpa_sec_db')
cursor = db_conn.cursor()
cursor.execute('SELECT path FROM handshakes WHERE status = ?', (self.Status.TOUPLOAD.value,))
handshakes_toupload = [row[0] for row in cursor.fetchall()]
handshakes_toupload = set(handshakes_toupload) - self.skip_until_reload
if handshakes_toupload:
logging.info("WPA_SEC: Internet connectivity detected. Uploading new handshakes...")
for idx, handshake in enumerate(handshakes_toupload):
display.on_uploading(f"WPA-SEC ({idx + 1}/{len(handshakes_toupload)})")
logging.info("WPA_SEC: Uploading %s...", handshake)
try:
upload_response = self._upload_to_wpasec(handshake)
if upload_response.startswith("hcxpcapngtool"):
logging.info(f"WPA_SEC: {handshake} successfully uploaded.")
new_status = self.Status.SUCCESSFULL.value
else:
logging.info(f"WPA_SEC: {handshake} uploaded, but it was invalid.")
new_status = self.Status.INVALID.value
cursor.execute('''
INSERT INTO handshakes (path, status)
VALUES (?, ?)
ON CONFLICT(path) DO UPDATE SET status = excluded.status
''', (handshake, new_status))
db_conn.commit()
except requests.exceptions.RequestException:
logging.exception("WPA_SEC: RequestException uploading %s, skipping until reload.", handshake)
self.skip_until_reload.append(handshake)
except OSError:
logging.exception("WPA_SEC: OSError uploading %s, deleting from db.", handshake)
cursor.execute('DELETE FROM handshakes WHERE path = ?', (handshake,))
db_conn.commit()
except Exception:
logging.exception("WPA_SEC: Exception uploading %s.", handshake)
display.on_normal()
cursor.close()
db_conn.close()
except Exception:
logging.exception("WPA_SEC: Exception uploading results.")
try:
if 'download_results' in self.options and self.options['download_results']:
config = agent.config()
handshake_dir = config['bettercap']['handshakes']
cracked_file_path = os.path.join(handshake_dir, 'wpa-sec.cracked.potfile')
if os.path.exists(cracked_file_path):
last_check = datetime.fromtimestamp(os.path.getmtime(cracked_file_path))
download_interval = int(self.options.get('download_interval', 3600))
if last_check is not None and ((datetime.now() - last_check).seconds / download_interval) < 1:
return
self._download_from_wpasec(cracked_file_path)
if 'single_files' in self.options and self.options['single_files']:
self._write_cracked_single_files(cracked_file_path, handshake_dir)
except Exception:
logging.exception("WPA_SEC: Exception downloading results.")
def _upload_to_wpasec(self, path, timeout=30):
"""
Uploads the file to https://wpa-sec.stanev.org, or another endpoint.
Uploads the file to wpasec
"""
with open(path, 'rb') as file_to_upload:
cookie = {"key": self.options['api_key']}
payload = {"file": file_to_upload}
cookie = {'key': self.options['api_key']}
payload = {'file': file_to_upload}
headers = {"HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"}
try:
result = requests.post(self.options['api_url'],
cookies=cookie,
files=payload,
headers=headers,
timeout=timeout)
if result.status_code == 200:
if ' already submitted' in result.text:
logging.info("%s was already submitted.", path)
return False
return True
elif result.status_code != 200:
logging.error("WPA_SEC: Error code: %s", result.text)
return False
except requests.exceptions.RequestException as req_e:
raise req_e
result = requests.post(
self.options['api_url'],
cookies=cookie,
files=payload,
headers=headers,
timeout=timeout
)
result.raise_for_status()
response = result.text.partition('\n')[0]
logging.debug("WPA_SEC: Response uploading %s: %s.", path, response)
return response
def _download_from_wpasec(self, output, timeout=30):
"""
@@ -66,88 +195,59 @@ class WpaSec(plugins.Plugin):
api_url = f"{api_url}?api&dl=1"
cookie = {'key': self.options['api_key']}
try:
result = requests.get(api_url, cookies=cookie, timeout=timeout)
with open(output, 'wb') as output_file:
output_file.write(result.content)
except requests.exceptions.RequestException as req_e:
raise req_e
except OSError as os_e:
raise os_e
headers = {"HTTP_USER_AGENT": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"}
def on_loaded(self):
logging.info("WPA_SEC: Downloading cracked passwords...")
result = requests.get(api_url, cookies=cookie, headers=headers, timeout=timeout)
result.raise_for_status()
with open(output, 'wb') as output_file:
output_file.write(result.content)
logging.info("WPA_SEC: Downloaded cracked passwords.")
def _write_cracked_single_files(self, cracked_file_path, handshake_dir):
"""
Gets called when the plugin gets loaded
Splits download results from wpasec into individual .pcap.cracked files in handshake_dir
Each .pcap.cracked file will contain the cracked handshake password
"""
if 'api_key' not in self.options or ('api_key' in self.options and not self.options['api_key']):
logging.error("WPA_SEC: API-KEY isn't set. Can't upload to wpa-sec.stanev.org")
return
logging.info("WPA_SEC: Writing cracked single files...")
if 'api_url' not in self.options or ('api_url' in self.options and not self.options['api_url']):
logging.error("WPA_SEC: API-URL isn't set. Can't upload, no endpoint configured.")
return
with open(cracked_file_path, 'r') as cracked_file:
for line in cracked_file:
try:
bssid,station_mac,ssid,password = line.split(":")
if password:
handshake_filename = re.sub(r'[^a-zA-Z0-9]', '', ssid) + '_' + bssid
pcap_path = os.path.join(handshake_dir, handshake_filename+'.pcap')
pcap_cracked_path = os.path.join(handshake_dir, handshake_filename+'.pcap.cracked')
if os.path.exists(pcap_path) and not os.path.exists(pcap_cracked_path):
with open(pcap_cracked_path, 'w') as f:
f.write(password)
except Exception:
logging.exception(f"WPA_SEC: Exception writing cracked single file, parsing line {line}.")
self.ready = True
logging.info("WPA_SEC: plugin loaded")
logging.info("WPA_SEC: Wrote cracked single files.")
def on_webhook(self, path, request):
from flask import make_response, redirect
response = make_response(redirect(self.options['api_url'], code=302))
response.set_cookie('key', self.options['api_key'])
return response
from flask import make_response
def on_internet_available(self, agent):
"""
Called when there's internet connectivity
"""
if not self.ready or self.lock.locked():
return
html_content = f'''
<html>
<body>
<form id="postForm" action="{self.options['api_url']}" method="POST">
<input type="hidden" name="key" value="{self.options['api_key']}">
</form>
<script type="text/javascript">
document.getElementById('postForm').submit();
</script>
</body>
</html>
'''
with self.lock:
config = agent.config()
display = agent.view()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']
try:
handshake_filenames = os.listdir(handshake_dir)
except FileNotFoundError:
logging.info("WPA_SEC: Handshake directory doesn't exist.")
return
handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if filename.endswith('.pcap')]
handshake_paths = remove_whitelisted(handshake_paths, config['main']['whitelist'])
handshake_new = set(handshake_paths) - set(reported) - set(self.skip)
if handshake_new:
logging.info("WPA_SEC: Internet connectivity detected. Uploading new handshakes to wpa-sec.stanev.org")
for idx, handshake in enumerate(handshake_new):
display.on_uploading(f"wpa-sec.stanev.org ({idx + 1}/{len(handshake_new)})")
try:
if self._upload_to_wpasec(handshake):
reported.append(handshake)
self.report.update(data={'reported': reported})
logging.debug("WPA_SEC: Successfully uploaded %s", handshake)
except requests.exceptions.RequestException as req_e:
self.skip.append(handshake)
logging.debug("WPA_SEC: %s", req_e)
continue
except OSError as os_e:
logging.debug("WPA_SEC: %s", os_e)
continue
display.on_normal()
if 'download_results' in self.options and self.options['download_results']:
cracked_file = os.path.join(handshake_dir, 'wpa-sec.cracked.potfile')
if os.path.exists(cracked_file):
last_check = datetime.fromtimestamp(os.path.getmtime(cracked_file))
if last_check is not None and ((datetime.now() - last_check).seconds / (60 * 60)) < 1:
return
try:
self._download_from_wpasec(os.path.join(handshake_dir, 'wpa-sec.cracked.potfile'))
logging.info("WPA_SEC: Downloaded cracked passwords.")
except requests.exceptions.RequestException as req_e:
logging.debug("WPA_SEC: %s", req_e)
except OSError as os_e:
logging.debug("WPA_SEC: %s", os_e)
return make_response(html_content)
def on_ui_setup(self, ui):
if 'show_pwd' in self.options and self.options['show_pwd'] and 'download_results' in self.options and self.options['download_results']: