Merge remote-tracking branch 'sumulige/master' into workout_sync

This commit is contained in:
fatsbrown
2026-05-30 21:34:53 -03:00
18 changed files with 1383 additions and 8 deletions
+2
View File
@@ -5,3 +5,5 @@ __pycache__/
build/
dist/
logs/
.venv/
scripts/strava_token.txt
+2 -1
View File
@@ -77,6 +77,7 @@ zoffline can be installed on the same machine as Zwift or another local machine.
- TZ=Europe/London
volumes:
- ./storage/:/usr/src/app/zwift-offline/storage
- ${ZWIFT_WORKOUTS_DIR:-~/Documents/Zwift/Workouts}:/root/Documents/Zwift/Workouts
ports:
- 80:80
- 443:443
@@ -85,6 +86,7 @@ zoffline can be installed on the same machine as Zwift or another local machine.
restart: unless-stopped
```
* In the ``volumes`` tag replace ``./storage/`` before the ``:`` with the directory path you want to use as your local zoffline data store.
* Set ``ZWIFT_WORKOUTS_DIR`` if your local Zwift workouts folder is not ``~/Documents/Zwift/Workouts``.
* If you are not running zoffline on the same PC that Zwift is running: create a ``server-ip.txt`` file in the ``storage`` directory containing the IP address of the PC running zoffline.
* Start zoffline with: ``docker-compose up -d``
</details>
@@ -437,4 +439,3 @@ this project and does not endorse this project.
All product and company names are trademarks of their respective holders. Use of
them does not imply any affiliation with or endorsement by them.
+31
View File
@@ -8,6 +8,7 @@
<div class="col-md-12">
<a href="{{ url_for('settings', username=username) }}" class="btn btn-sm btn-secondary">Back</a>
{% if aid or akey %}
<a href="{{ url_for('intervals_sync', username=username) }}" class="btn btn-sm btn-secondary">Sync today's workout</a>
<a href="/delete/intervals_credentials.bin" class="btn btn-sm btn-danger">Remove credentials</a>
{% endif %}
</div>
@@ -31,6 +32,36 @@
</div>
</div>
</form>
{% if sync_status %}
<ul class="list-group top-buffer">
<li class="list-group-item py-2">
<div class="text-shadow">Active provider: {{ active_provider or 'not selected' }}</div>
</li>
<li class="list-group-item py-2">
{% if sync_status.metadata %}
<div class="text-shadow">Synced workout: {{ sync_status.metadata.name or sync_status.metadata.filename }}</div>
<div class="text-shadow"><small>{{ sync_status.metadata.filename }}</small></div>
{% if sync_status.metadata.start_date_local %}
<div class="text-shadow"><small>{{ sync_status.metadata.start_date_local }}</small></div>
{% endif %}
{% else %}
<div class="text-shadow">No synced Intervals workout is cached right now.</div>
<div class="text-shadow"><small>Use "Sync today's workout" before launching Zwift.</small></div>
{% endif %}
</li>
<li class="list-group-item py-2">
{% if sync_status.local_status %}
<div class="text-shadow">Local Zwift catalog: {{ 'ready for launch' if sync_status.local_status.healthy else 'needs refresh' }}</div>
<div class="text-shadow"><small>File: {{ 'present' if sync_status.local_status.file_exists else 'missing' }} | Manifest: {{ 'present' if sync_status.local_status.manifest_entry_exists else 'missing' }}</small></div>
{% if sync_status.local_status.manifest_entry %}
<div class="text-shadow"><small>Guid {{ sync_status.local_status.manifest_entry.guid }}, checksum {{ sync_status.local_status.manifest_entry.checksum }}</small></div>
{% endif %}
{% else %}
<div class="text-shadow">Local Zwift catalog: waiting for next sync</div>
{% endif %}
</li>
</ul>
{% endif %}
{% with messages = get_flashed_messages() %}
{% if messages %}
<ul class="list-group top-buffer">
+1
View File
@@ -14,6 +14,7 @@
<a href="{{ url_for('profile', username=username) }}" class="btn btn-sm btn-secondary">Zwift</a>
<a href="{{ url_for('strava', username=username) }}" class="btn btn-sm btn-secondary">Strava</a>
<a href="{{ url_for('intervals', username=username) }}" class="btn btn-sm btn-secondary">Intervals</a>
<a href="{{ url_for('trainingpeaks', username=username) }}" class="btn btn-sm btn-secondary">TrainingPeaks</a>
</div>
</div>
{% if files %}
@@ -0,0 +1,49 @@
{% extends "./layout.html" %}
{% block content %}
<h1><div class="text-shadow">TrainingPeaks</div></h1>
{% if username != "zoffline" %}
<h4 class="text-shadow">Logged in as {{ username }}</h4>
{% endif %}
<div class="row">
<div class="col-md-12">
<a href="{{ url_for('settings', username=username) }}" class="btn btn-sm btn-secondary">Back</a>
<a href="{{ url_for('trainingpeaks_sync', username=username) }}" class="btn btn-sm btn-secondary">Import workouts</a>
</div>
</div>
<div class="row">
<div class="col-sm-8 col-md-6 top-buffer">
<div class="list-group">
<div class="list-group-item py-2">
<div class="text-shadow">{{ message }}</div>
</div>
<div class="list-group-item py-2">
<div class="text-shadow">Manual bridge mode: export TrainingPeaks workouts as .zwo files into a local folder, save that folder path here, then click "Import workouts".</div>
</div>
</div>
<form id="trainingpeaks" action="{{ url_for('trainingpeaks', username=username) }}" method="post" class="top-buffer">
<div class="row">
<div class="col-md-12">
<label class="col-form-label col-form-label-sm text-shadow">Bridge folder</label>
<input type="text" id="bridge_folder" name="bridge_folder" value="{{ bridge_folder }}" class="form-control form-control-sm" placeholder="/path/to/TrainingPeaks/exports">
</div>
</div>
<div class="row">
<div class="col-md-12 top-buffer">
<input type="submit" value="Save bridge folder" class="btn btn-sm btn-light">
</div>
</div>
</form>
{% with messages = get_flashed_messages() %}
{% if messages %}
<ul class="list-group top-buffer">
{% for message in messages %}
<li class="list-group-item py-2">
<div class="text-shadow">{{ message }}</div>
</li>
{% endfor %}
</ul>
{% endif %}
{% endwith %}
</div>
</div>
{% endblock %}
+19
View File
@@ -29,6 +29,25 @@
</div>
<div class="row">
<div class="col-sm-6 col-md-5">
{% if workout_sync_status %}
<ul class="list-group top-buffer">
<li class="list-group-item py-2">
<div class="text-shadow">Workout provider: {{ active_workout_provider or 'not selected' }}</div>
{% if workout_sync_status.metadata %}
<div class="text-shadow"><small>{{ workout_sync_status.metadata.name or workout_sync_status.metadata.filename }}</small></div>
{% else %}
<div class="text-shadow"><small>No synced workout cached yet.</small></div>
{% endif %}
</li>
<li class="list-group-item py-2">
{% if workout_sync_status.local_status %}
<div class="text-shadow">Local workout catalog: {{ 'ready for launch' if workout_sync_status.local_status.healthy else 'needs refresh' }}</div>
{% else %}
<div class="text-shadow">Local workout catalog: waiting for next sync</div>
{% endif %}
</li>
</ul>
{% endif %}
<form method="POST" action="/start-zwift" class="top-buffer">
<div class="row">
<div class="col-md-12">
+1
View File
@@ -9,6 +9,7 @@ services:
- TZ=Europe/London
volumes:
- ./storage/:/usr/src/app/zwift-offline/storage
- ${ZWIFT_WORKOUTS_DIR:-~/Documents/Zwift/Workouts}:/root/Documents/Zwift/Workouts
ports:
- 80:80
- 443:443
+110
View File
@@ -0,0 +1,110 @@
# zoffline 本地使用说明
## 当前已完成能力
1. Intervals.icu
- 可在启动 Zwift 前自动拉取当天 workout
- 骑完保存后可自动上传活动回 Intervals.icu
- 若活动匹配当天同步的 workout,会自动带 paired_event_id 并 mark-done
2. Strava
- 已支持 OAuth 授权
- 骑完保存后可自动上传 FIT 到 Strava
3. TrainingPeaks
- 已支持手工桥接模式
- 将 TrainingPeaks 导出的 `.zwo` 文件放入本地目录后,可一键导入 Zwift custom workouts
- 当前不支持官方 API 自动拉取/自动上传,因为需要 TrainingPeaks partner access
4. Provider 互斥模式
- Intervals.icu 和 TrainingPeaks 不会再同时出现在 customworkouts
- 谁最后同步,谁就是当前 active provider
## 启动方式
推荐使用非特权端口本地运行:
```bash
cd /Users/sumulige/Documents/Antigravity/zwift-offline
python3 -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt
env \
ZOFFLINE_CDN_PORT=18080 \
ZOFFLINE_TCP_PORT=13025 \
ZOFFLINE_UDP_PORT=13024 \
ZOFFLINE_API_PORT=18443 \
ZOFFLINE_API_USE_CERT=false \
./.venv/bin/python standalone.py
```
打开页面:
- 主页: `http://127.0.0.1:18443/user/zoffline/`
- Settings: `http://127.0.0.1:18443/settings/zoffline/`
- Intervals: `http://127.0.0.1:18443/intervals/zoffline/`
- Strava: `http://127.0.0.1:18443/strava/zoffline/`
- TrainingPeaks: `http://127.0.0.1:18443/trainingpeaks/zoffline/`
## Intervals.icu 设置
`Settings -> Intervals` 中填写:
- Athlete ID
- API key
然后你可以:
- 点击 `Sync today's workout` 手动同步
- 或点击 `Start Zwift` 时自动同步
同步成功后:
- workout 会写入 `storage/<player_id>/customworkouts/`
- metadata 会写入 provider state 文件
## Strava 设置
`Settings -> Strava` 中填写:
- Client ID
- Client Secret
然后点击 `Authorize` 完成 OAuth。
授权成功后,`storage/<player_id>/strava_token.txt` 会存在。
之后每次活动保存时会自动上传到 Strava。
## TrainingPeaks 手工桥接
1. 在 TrainingPeaks 中导出 workout 为 `.zwo`
2.`.zwo` 文件放到本地某个目录,例如:
- `/Users/yourname/TrainingPeaksExports`
3. 打开 `TrainingPeaks` 页面
4.`Bridge folder` 输入该目录
5. 点击 `Save bridge folder`
6. 点击 `Import workouts`
导入后的文件会写到:
- `storage/<player_id>/customworkouts/trainingpeaks-*.zwo`
## Provider 切换规则
- 点击 `Intervals -> Sync today's workout`
- active provider 切到 `intervals-icu`
- TrainingPeaks workouts 会被清掉
- 点击 `TrainingPeaks -> Import workouts`
- active provider 切到 `trainingpeaks`
- Intervals workouts 会被清掉
- 点击 `Start Zwift` 时,只会同步当前 active provider
## 关键文件
- Intervals provider: `intervals_workouts.py`
- TrainingPeaks bridge/provider scaffold: `trainingpeaks_workouts.py`
- Provider state: `workout_state.py`
- 主服务: `zwift_offline.py`
## Git 提交
本次功能提交:
- commit: `f0418ee`
- message: `feat: add workout provider sync integrations`
+135
View File
@@ -0,0 +1,135 @@
import datetime
import re
from io import BytesIO
from urllib.parse import urlencode
from requests.auth import HTTPBasicAuth
DEFAULT_BASE_URL = "https://intervals.icu/api/v1"
def normalize_events_payload(payload):
if isinstance(payload, list):
return payload
if isinstance(payload, dict):
for key in ("events", "data", "items"):
value = payload.get(key)
if isinstance(value, list):
return value
return []
def parse_datetime(value):
if not value:
return None
if isinstance(value, datetime.datetime):
parsed = value
else:
if value.endswith("Z"):
value = value[:-1] + "+00:00"
try:
parsed = datetime.datetime.fromisoformat(value)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=datetime.timezone.utc)
return parsed
def parse_event_datetime(event):
value = event.get("start_date_local") or event.get("start_date") or event.get("date")
return parse_datetime(value)
def choose_workout_event(events, now=None):
if not events:
return None
now = now or datetime.datetime.now(datetime.timezone.utc)
ordered = sorted(events, key=lambda event: parse_event_datetime(event) or datetime.datetime.max.replace(tzinfo=datetime.timezone.utc))
future = [event for event in ordered if (parse_event_datetime(event) or now) >= now]
if future:
return future[0]
return ordered[-1]
def slugify_filename(value):
value = (value or "").strip().lower()
value = re.sub(r"[^a-z0-9]+", "-", value)
return value.strip("-")
def build_workout_filename(event):
event_id = event.get("id", "workout")
name = slugify_filename(event.get("name") or f"workout-{event_id}")
if not name:
name = f"workout-{event_id}"
return f"intervals-icu-{event_id}-{name}.zwo"
def build_activity_upload_url(base_url, athlete_id, activity_name, paired_event_id=None):
params = {"name": activity_name}
if paired_event_id:
params["paired_event_id"] = paired_event_id
return f"{base_url}/athlete/{athlete_id}/activities?{urlencode(params)}"
def activity_matches_workout(activity, workout_metadata):
if not workout_metadata:
return False
event_dt = parse_datetime(workout_metadata.get("start_date_local") or workout_metadata.get("start_date") or workout_metadata.get("synced_for_date"))
activity_dt = parse_datetime(getattr(activity, "start_date", None) or getattr(activity, "date", None))
if not event_dt or not activity_dt:
return False
return event_dt.date() == activity_dt.date()
def upload_activity(athlete_id, api_key, activity, workout_metadata=None, session=None, base_url=DEFAULT_BASE_URL):
session = session or __import__("requests")
paired_event_id = None
if activity_matches_workout(activity, workout_metadata):
paired_event_id = workout_metadata.get("event_id") or workout_metadata.get("id")
upload_url = build_activity_upload_url(base_url, athlete_id, activity.name, paired_event_id=paired_event_id)
upload_response = session.post(upload_url, files={"file": BytesIO(activity.fit)}, auth=HTTPBasicAuth("API_KEY", api_key), timeout=30)
upload_response.raise_for_status()
if paired_event_id:
mark_done_url = f"{base_url}/athlete/{athlete_id}/events/{paired_event_id}/mark-done"
mark_done_response = session.post(mark_done_url, auth=HTTPBasicAuth("API_KEY", api_key), timeout=30)
mark_done_response.raise_for_status()
return {
"status": "uploaded",
"paired": bool(paired_event_id),
"paired_event_id": paired_event_id,
}
def sync_workout(athlete_id, api_key, store_workout, today=None, now=None, session=None, base_url=DEFAULT_BASE_URL):
today = today or datetime.date.today()
now = now or datetime.datetime.now(datetime.timezone.utc)
session = session or __import__("requests")
query = urlencode({
"oldest": today.isoformat(),
"newest": today.isoformat(),
"category": "WORKOUT",
})
events_url = f"{base_url}/athlete/{athlete_id}/events?{query}"
events_response = session.get(events_url, auth=HTTPBasicAuth("API_KEY", api_key), timeout=30)
events_response.raise_for_status()
events = normalize_events_payload(events_response.json())
event = choose_workout_event(events, now=now)
if not event:
return {"status": "no_workout", "message": f"No Intervals.icu workout found for {today.isoformat()}"}
workout_url = f"{base_url}/athlete/{athlete_id}/events/{event['id']}/download.zwo"
workout_response = session.get(workout_url, auth=HTTPBasicAuth("API_KEY", api_key), timeout=30)
workout_response.raise_for_status()
filename = build_workout_filename(event)
store_workout(filename, workout_response.content, event)
return {
"status": "synced",
"event": event,
"filename": filename,
"message": f"Synced Intervals.icu workout: {event.get('name', filename)}",
}
+168
View File
@@ -0,0 +1,168 @@
import hashlib
import os
import xml.etree.ElementTree as ET
MANIFEST_FILENAME = 'workouts.files'
def workouts_dir(root_dir, player_id):
return os.path.join(root_dir, str(player_id))
def manifest_file(root_dir, player_id):
return os.path.join(workouts_dir(root_dir, player_id), MANIFEST_FILENAME)
def parse_int(value):
try:
return int((value or '0').strip() or '0')
except ValueError:
return 0
def file_checksum(content):
return (-sum(content)) % 256
def file_guid(filename):
digest = hashlib.sha1(filename.encode('utf-8')).digest()
return (int.from_bytes(digest[:4], 'big') & 0x7fffffff) or 1
def load_manifest(root_dir, player_id):
path = manifest_file(root_dir, player_id)
if not os.path.exists(path):
return []
try:
root = ET.parse(path).getroot()
except ET.ParseError:
return []
entries = []
for node in root.findall('custom_file'):
name = (node.findtext('name') or '').strip()
if not name:
continue
entries.append({
'name': name,
'time': parse_int(node.findtext('time')),
'guid': parse_int(node.findtext('guid')),
'checksum': parse_int(node.findtext('checksum')),
'deleted': (node.findtext('deleted') or 'false').strip().lower() == 'true',
})
return entries
def save_manifest(root_dir, player_id, entries):
directory = workouts_dir(root_dir, player_id)
os.makedirs(directory, exist_ok=True)
root = ET.Element('custom_file_directory')
for entry in sorted(entries, key=lambda item: item['name'].lower()):
node = ET.SubElement(root, 'custom_file')
ET.SubElement(node, 'name').text = entry['name']
ET.SubElement(node, 'time').text = str(int(entry['time']))
ET.SubElement(node, 'guid').text = str(int(entry['guid']))
ET.SubElement(node, 'checksum').text = str(int(entry['checksum']))
ET.SubElement(node, 'deleted').text = 'true' if entry.get('deleted') else 'false'
ET.SubElement(root, 'deleted_files')
tree = ET.ElementTree(root)
ET.indent(tree, space=' ')
path = manifest_file(root_dir, player_id)
tree.write(path, encoding='utf-8', xml_declaration=False)
with open(path, 'a', encoding='utf-8') as fd:
fd.write('\n')
return path
def upsert_manifest_entry(root_dir, player_id, filename, content, timestamp=None):
entries = [entry for entry in load_manifest(root_dir, player_id) if entry['name'] != filename]
timestamp = int(timestamp if timestamp is not None else os.path.getmtime(os.path.join(workouts_dir(root_dir, player_id), filename)))
entry = {
'name': filename,
'time': timestamp,
'guid': file_guid(filename),
'checksum': file_checksum(content),
'deleted': False,
}
entries.append(entry)
save_manifest(root_dir, player_id, entries)
return entry
def remove_manifest_entries(root_dir, player_id, names):
names = set(names)
if not names:
return 0
existing = load_manifest(root_dir, player_id)
remaining = [entry for entry in existing if entry['name'] not in names]
removed = len(existing) - len(remaining)
if removed or os.path.exists(manifest_file(root_dir, player_id)):
save_manifest(root_dir, player_id, remaining)
return removed
def export_workout(root_dir, player_id, filename, content):
directory = workouts_dir(root_dir, player_id)
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, filename)
with open(path, 'wb') as fd:
fd.write(content)
timestamp = int(os.path.getmtime(path))
manifest_entry = upsert_manifest_entry(root_dir, player_id, filename, content, timestamp=timestamp)
return {
'path': path,
'manifest_path': manifest_file(root_dir, player_id),
'manifest_entry': manifest_entry,
}
def remove_prefixed_workouts(root_dir, player_id, prefixes):
directory = workouts_dir(root_dir, player_id)
if not os.path.isdir(directory):
return {'removed_files': 0, 'removed_manifest_entries': 0}
names_to_remove = []
removed_files = 0
for name in os.listdir(directory):
if name == MANIFEST_FILENAME or not any(name.startswith(prefix) for prefix in prefixes):
continue
path = os.path.join(directory, name)
if not os.path.isfile(path):
continue
os.remove(path)
names_to_remove.append(name)
removed_files += 1
removed_manifest_entries = remove_manifest_entries(root_dir, player_id, names_to_remove)
return {
'removed_files': removed_files,
'removed_manifest_entries': removed_manifest_entries,
}
def health_report(root_dir, player_id, filename):
directory = workouts_dir(root_dir, player_id)
path = os.path.join(directory, filename)
entries = {entry['name']: entry for entry in load_manifest(root_dir, player_id)}
entry = entries.get(filename)
report = {
'directory': directory,
'path': path,
'manifest_path': manifest_file(root_dir, player_id),
'file_exists': os.path.exists(path),
'manifest_entry_exists': entry is not None,
'manifest_entry': entry,
}
if report['file_exists']:
with open(path, 'rb') as fd:
content = fd.read()
report['expected_checksum'] = file_checksum(content)
report['expected_guid'] = file_guid(filename)
else:
report['expected_checksum'] = None
report['expected_guid'] = file_guid(filename)
report['healthy'] = (
report['file_exists']
and report['manifest_entry_exists']
and entry.get('checksum') == report['expected_checksum']
and int(entry.get('guid') or 0) > 0
) if entry else False
return report
View File
+152
View File
@@ -0,0 +1,152 @@
import datetime
import unittest
from types import SimpleNamespace
from intervals_workouts import sync_workout, upload_activity
class FakeResponse:
def __init__(self, payload=None, content=b"", status_code=200):
self._payload = payload
self.content = content
self.status_code = status_code
def json(self):
return self._payload
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
class FakeSession:
def __init__(self, responses=None):
self.responses = list(responses or [])
self.calls = []
def _pop(self):
return self.responses.pop(0)
def get(self, url, **kwargs):
self.calls.append(("GET", url, kwargs))
return self._pop()
def post(self, url, **kwargs):
self.calls.append(("POST", url, kwargs))
return self._pop()
class SyncWorkoutTests(unittest.TestCase):
def test_sync_workout_downloads_next_planned_workout_and_saves_it(self):
now = datetime.datetime(2026, 4, 22, 10, 30, tzinfo=datetime.timezone.utc)
session = FakeSession([
FakeResponse(payload={
"events": [
{
"id": 41,
"name": "Morning opener",
"start_date_local": "2026-04-22T08:00:00+00:00",
},
{
"id": 42,
"name": "Threshold Session",
"start_date_local": "2026-04-22T18:00:00+00:00",
},
]
}),
FakeResponse(content=b"<workout_file />"),
])
saved = []
result = sync_workout(
athlete_id="123",
api_key="secret",
store_workout=lambda filename, content, event: saved.append((filename, content, event)),
today=now.date(),
now=now,
session=session,
)
self.assertEqual(result["status"], "synced")
self.assertEqual(result["event"]["id"], 42)
self.assertEqual(saved[0][0], "intervals-icu-42-threshold-session.zwo")
self.assertEqual(saved[0][1], b"<workout_file />")
self.assertEqual(saved[0][2]["id"], 42)
self.assertIn("/athlete/123/events?oldest=2026-04-22", session.calls[0][1])
self.assertTrue(session.calls[1][1].endswith("/athlete/123/events/42/download.zwo"))
def test_sync_workout_returns_no_workout_when_none_found(self):
today = datetime.date(2026, 4, 22)
session = FakeSession([FakeResponse(payload=[])])
saved = []
result = sync_workout(
athlete_id="123",
api_key="secret",
store_workout=lambda filename, content, event: saved.append((filename, content, event)),
today=today,
session=session,
)
self.assertEqual(result["status"], "no_workout")
self.assertEqual(saved, [])
self.assertEqual(len(session.calls), 1)
def test_upload_activity_pairs_synced_workout_and_marks_event_done(self):
activity = SimpleNamespace(
name="VO2 ride",
fit=b"FITDATA",
start_date="2026-04-22T18:05:00Z",
)
metadata = {
"event_id": 42,
"start_date_local": "2026-04-22T18:00:00+00:00",
"filename": "intervals-icu-42-threshold-session.zwo",
}
session = FakeSession([FakeResponse(), FakeResponse()])
result = upload_activity(
athlete_id="123",
api_key="secret",
activity=activity,
workout_metadata=metadata,
session=session,
)
self.assertEqual(result["status"], "uploaded")
self.assertTrue(result["paired"])
self.assertIn("paired_event_id=42", session.calls[0][1])
self.assertEqual(session.calls[0][0], "POST")
self.assertEqual(session.calls[1][1], "https://intervals.icu/api/v1/athlete/123/events/42/mark-done")
uploaded = session.calls[0][2]["files"]["file"]
self.assertEqual(uploaded.read(), b"FITDATA")
def test_upload_activity_without_matching_synced_workout_skips_pairing(self):
activity = SimpleNamespace(
name="VO2 ride",
fit=b"FITDATA",
start_date="2026-04-23T06:00:00Z",
)
metadata = {
"event_id": 42,
"start_date_local": "2026-04-22T18:00:00+00:00",
"filename": "intervals-icu-42-threshold-session.zwo",
}
session = FakeSession([FakeResponse()])
result = upload_activity(
athlete_id="123",
api_key="secret",
activity=activity,
workout_metadata=metadata,
session=session,
)
self.assertEqual(result["status"], "uploaded")
self.assertFalse(result["paired"])
self.assertNotIn("paired_event_id=42", session.calls[0][1])
self.assertEqual(len(session.calls), 1)
if __name__ == "__main__":
unittest.main()
+113
View File
@@ -0,0 +1,113 @@
import os
import tempfile
import unittest
from local_zwift_workouts import (
export_workout,
file_checksum,
file_guid,
health_report,
load_manifest,
manifest_file,
remove_prefixed_workouts,
save_manifest,
workouts_dir,
)
class LocalZwiftWorkoutsTests(unittest.TestCase):
def test_workouts_dir_is_player_scoped(self):
self.assertEqual(workouts_dir('/tmp/zwift', 1), '/tmp/zwift/1')
def test_file_checksum_uses_8bit_twos_complement(self):
self.assertEqual(file_checksum(b'\x01'), 255)
self.assertEqual(file_checksum(b'ABC'), (-ord('A') - ord('B') - ord('C')) % 256)
def test_export_workout_creates_file_and_manifest_entry(self):
with tempfile.TemporaryDirectory() as tmp:
result = export_workout(tmp, 1, 'intervals-icu-test.zwo', b'<workout_file/>')
self.assertTrue(os.path.exists(result['path']))
self.assertTrue(os.path.exists(result['manifest_path']))
entries = load_manifest(tmp, 1)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0]['name'], 'intervals-icu-test.zwo')
self.assertEqual(entries[0]['guid'], file_guid('intervals-icu-test.zwo'))
self.assertEqual(entries[0]['checksum'], file_checksum(b'<workout_file/>'))
def test_export_workout_preserves_unmanaged_manifest_entries(self):
with tempfile.TemporaryDirectory() as tmp:
save_manifest(tmp, 1, [{
'name': 'custom-other.zwo',
'time': 123,
'guid': 456,
'checksum': 78,
'deleted': False,
}])
export_workout(tmp, 1, 'intervals-icu-test.zwo', b'<workout_file/>')
names = {entry['name'] for entry in load_manifest(tmp, 1)}
self.assertEqual(names, {'custom-other.zwo', 'intervals-icu-test.zwo'})
def test_load_manifest_tolerates_bad_numeric_fields(self):
with tempfile.TemporaryDirectory() as tmp:
os.makedirs(workouts_dir(tmp, 1))
with open(manifest_file(tmp, 1), 'w', encoding='utf-8') as fd:
fd.write(
'<custom_file_directory>'
'<custom_file>'
'<name>intervals-icu-test.zwo</name>'
'<time>bad</time>'
'<guid></guid>'
'<checksum>also-bad</checksum>'
'<deleted>false</deleted>'
'</custom_file>'
'</custom_file_directory>'
)
entries = load_manifest(tmp, 1)
self.assertEqual(entries[0]['time'], 0)
self.assertEqual(entries[0]['guid'], 0)
self.assertEqual(entries[0]['checksum'], 0)
def test_remove_prefixed_workouts_removes_matching_files_and_manifest_entries(self):
with tempfile.TemporaryDirectory() as tmp:
export_workout(tmp, 1, 'intervals-icu-a.zwo', b'a')
export_workout(tmp, 1, 'trainingpeaks-b.zwo', b'b')
export_workout(tmp, 1, 'custom-other.zwo', b'c')
removed = remove_prefixed_workouts(tmp, 1, {'intervals-icu-'})
self.assertEqual(removed['removed_files'], 1)
self.assertFalse(os.path.exists(os.path.join(tmp, '1', 'intervals-icu-a.zwo')))
self.assertTrue(os.path.exists(os.path.join(tmp, '1', 'trainingpeaks-b.zwo')))
self.assertTrue(os.path.exists(os.path.join(tmp, '1', 'custom-other.zwo')))
names = {entry['name'] for entry in load_manifest(tmp, 1)}
self.assertEqual(names, {'trainingpeaks-b.zwo', 'custom-other.zwo'})
def test_health_report_detects_consistent_export(self):
with tempfile.TemporaryDirectory() as tmp:
export_workout(tmp, 1, 'intervals-icu-a.zwo', b'a')
report = health_report(tmp, 1, 'intervals-icu-a.zwo')
self.assertTrue(report['healthy'])
self.assertTrue(report['manifest_entry_exists'])
self.assertEqual(report['manifest_entry']['guid'], file_guid('intervals-icu-a.zwo'))
def test_health_report_detects_manifest_mismatch(self):
with tempfile.TemporaryDirectory() as tmp:
export_workout(tmp, 1, 'intervals-icu-a.zwo', b'a')
save_manifest(tmp, 1, [{
'name': 'intervals-icu-a.zwo',
'time': 123,
'guid': 999,
'checksum': 42,
'deleted': False,
}])
report = health_report(tmp, 1, 'intervals-icu-a.zwo')
self.assertFalse(report['healthy'])
self.assertTrue(os.path.exists(manifest_file(tmp, 1)))
if __name__ == '__main__':
unittest.main()
+55
View File
@@ -0,0 +1,55 @@
import os
import tempfile
import unittest
from types import SimpleNamespace
from trainingpeaks_workouts import (
build_workout_filename,
sync_exported_workouts,
sync_workout,
upload_activity,
)
class TrainingPeaksWorkoutTests(unittest.TestCase):
def test_build_workout_filename_uses_trainingpeaks_prefix(self):
workout = {'id': 51, 'title': 'VO2 Max Builder'}
self.assertEqual(build_workout_filename(workout), 'trainingpeaks-51-vo2-max-builder.zwo')
def test_sync_exported_workouts_imports_zwo_files_only(self):
with tempfile.TemporaryDirectory() as tmp:
with open(os.path.join(tmp, 'VO2 Builder.zwo'), 'wb') as fd:
fd.write(b'<workout_file id="1"/>')
with open(os.path.join(tmp, 'Tempo Ride.zwo'), 'wb') as fd:
fd.write(b'<workout_file id="2"/>')
with open(os.path.join(tmp, 'ignore.txt'), 'w') as fd:
fd.write('ignore me')
saved = []
result = sync_exported_workouts(tmp, lambda filename, content, workout: saved.append((filename, content, workout)))
self.assertEqual(result['status'], 'synced')
self.assertEqual(result['count'], 2)
self.assertEqual([item[0] for item in saved], [
'trainingpeaks-tempo-ride.zwo',
'trainingpeaks-vo2-builder.zwo',
])
def test_sync_exported_workouts_handles_missing_folder(self):
result = sync_exported_workouts('/tmp/does-not-exist-hermes', lambda *args: None)
self.assertEqual(result['status'], 'missing_folder')
def test_sync_workout_reports_partner_access_requirement(self):
result = sync_workout(credentials={'client_id': 'abc'})
self.assertEqual(result['status'], 'unsupported')
self.assertIn('partner', result['message'].lower())
def test_upload_activity_reports_partner_access_requirement(self):
activity = SimpleNamespace(name='Ride', fit=b'FIT')
result = upload_activity(credentials={'access_token': 'abc'}, activity=activity)
self.assertEqual(result['status'], 'unsupported')
self.assertIn('partner', result['message'].lower())
if __name__ == '__main__':
unittest.main()
+73
View File
@@ -0,0 +1,73 @@
import json
import os
import tempfile
import unittest
from workout_state import (
clear_metadata,
load_active_provider,
load_metadata,
metadata_file,
resolve_active_provider,
save_active_provider,
save_metadata,
)
class WorkoutStateTests(unittest.TestCase):
def test_metadata_file_is_provider_specific(self):
path = metadata_file('/tmp/storage', 7, 'intervals-icu')
self.assertEqual(path, '/tmp/storage/7/intervals_icu_workout.json')
def test_save_and_load_metadata_round_trip(self):
with tempfile.TemporaryDirectory() as tmp:
payload = save_metadata(
tmp,
1,
'intervals-icu',
{
'id': 42,
'name': 'Threshold Session',
'start_date_local': '2026-04-22T18:00:00+00:00',
},
'intervals-icu-42-threshold-session.zwo',
)
self.assertEqual(payload['provider'], 'intervals-icu')
self.assertEqual(payload['event_id'], 42)
self.assertEqual(payload['filename'], 'intervals-icu-42-threshold-session.zwo')
loaded = load_metadata(tmp, 1, 'intervals-icu')
self.assertEqual(loaded['event_id'], 42)
self.assertEqual(loaded['provider'], 'intervals-icu')
def test_clear_metadata_removes_only_selected_provider(self):
with tempfile.TemporaryDirectory() as tmp:
save_metadata(tmp, 1, 'intervals-icu', {'id': 42, 'name': 'A'}, 'a.zwo')
save_metadata(tmp, 1, 'trainingpeaks', {'id': 99, 'name': 'B'}, 'b.zwo')
clear_metadata(tmp, 1, 'intervals-icu')
self.assertIsNone(load_metadata(tmp, 1, 'intervals-icu'))
self.assertEqual(load_metadata(tmp, 1, 'trainingpeaks')['event_id'], 99)
def test_save_and_load_active_provider(self):
with tempfile.TemporaryDirectory() as tmp:
self.assertIsNone(load_active_provider(tmp, 1))
save_active_provider(tmp, 1, 'trainingpeaks')
self.assertEqual(load_active_provider(tmp, 1), 'trainingpeaks')
def test_resolve_active_provider_uses_saved_provider_when_available(self):
active = resolve_active_provider('trainingpeaks', {'intervals-icu', 'trainingpeaks'})
self.assertEqual(active, 'trainingpeaks')
def test_resolve_active_provider_falls_back_to_first_available(self):
active = resolve_active_provider('trainingpeaks', {'intervals-icu'})
self.assertEqual(active, 'intervals-icu')
def test_resolve_active_provider_prefers_intervals_when_nothing_saved(self):
active = resolve_active_provider(None, {'trainingpeaks', 'intervals-icu'})
self.assertEqual(active, 'intervals-icu')
if __name__ == '__main__':
unittest.main()
+78
View File
@@ -0,0 +1,78 @@
import os
import re
PARTNER_ACCESS_MESSAGE = (
'TrainingPeaks automatic sync requires approved partner API access and app credentials. '
'This provider is scaffolded but not yet live-configured.'
)
def slugify_filename(value):
value = (value or '').strip().lower()
value = re.sub(r'[^a-z0-9]+', '-', value)
return value.strip('-')
def build_workout_filename(workout):
workout_id = workout.get('id', 'workout')
name = slugify_filename(workout.get('name') or workout.get('title') or f'workout-{workout_id}')
if not name:
name = f'workout-{workout_id}'
return f'trainingpeaks-{workout_id}-{name}.zwo'
def build_exported_filename(path):
base = os.path.splitext(os.path.basename(path))[0]
slug = slugify_filename(base)
if not slug:
slug = 'workout'
return f'trainingpeaks-{slug}.zwo'
def sync_exported_workouts(folder, store_workout):
if not folder or not os.path.isdir(folder):
return {
'status': 'missing_folder',
'message': 'TrainingPeaks bridge folder is missing or unreadable.',
}
entries = []
for name in sorted(os.listdir(folder), key=lambda item: item.lower()):
path = os.path.join(folder, name)
if not os.path.isfile(path) or not name.lower().endswith('.zwo'):
continue
with open(path, 'rb') as fd:
content = fd.read()
filename = build_exported_filename(path)
workout = {'title': os.path.splitext(name)[0], 'source_path': path}
store_workout(filename, content, workout)
entries.append(filename)
if not entries:
return {
'status': 'no_workout',
'message': 'No .zwo workouts were found in the TrainingPeaks bridge folder.',
'count': 0,
}
return {
'status': 'synced',
'message': f'Imported {len(entries)} TrainingPeaks workout(s) from bridge folder.',
'count': len(entries),
'filenames': entries,
}
def sync_workout(credentials=None, **kwargs):
return {
'status': 'unsupported',
'message': PARTNER_ACCESS_MESSAGE,
'credentials_present': bool(credentials),
}
def upload_activity(credentials=None, activity=None, **kwargs):
return {
'status': 'unsupported',
'message': PARTNER_ACCESS_MESSAGE,
'credentials_present': bool(credentials),
'activity_present': activity is not None,
}
+80
View File
@@ -0,0 +1,80 @@
import datetime
import json
import os
import re
def normalize_provider_name(provider):
return re.sub(r'[^a-z0-9]+', '_', provider.lower()).strip('_') or 'provider'
def metadata_file(storage_dir, player_id, provider):
return os.path.join(storage_dir, str(player_id), f'{normalize_provider_name(provider)}_workout.json')
def build_metadata(provider, workout, filename):
return {
'provider': provider,
'event_id': workout.get('event_id', workout.get('id')),
'name': workout.get('name', workout.get('title')),
'filename': filename,
'start_date_local': workout.get('start_date_local'),
'start_date': workout.get('start_date'),
'synced_at': datetime.datetime.now(datetime.timezone.utc).isoformat(),
}
def save_metadata(storage_dir, player_id, provider, workout, filename):
payload = build_metadata(provider, workout, filename)
file = metadata_file(storage_dir, player_id, provider)
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as fd:
json.dump(payload, fd)
return payload
def load_metadata(storage_dir, player_id, provider):
file = metadata_file(storage_dir, player_id, provider)
if not os.path.exists(file):
return None
try:
with open(file) as fd:
return json.load(fd)
except Exception:
return None
def clear_metadata(storage_dir, player_id, provider):
file = metadata_file(storage_dir, player_id, provider)
if os.path.exists(file):
os.remove(file)
def active_provider_file(storage_dir, player_id):
return os.path.join(storage_dir, str(player_id), 'active_workout_provider.txt')
def save_active_provider(storage_dir, player_id, provider):
file = active_provider_file(storage_dir, player_id)
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as fd:
fd.write((provider or '').strip())
def load_active_provider(storage_dir, player_id):
file = active_provider_file(storage_dir, player_id)
if not os.path.exists(file):
return None
with open(file) as fd:
provider = fd.read().strip()
return provider or None
def resolve_active_provider(saved_provider, available_providers):
available = set(available_providers or set())
if saved_provider and saved_provider in available:
return saved_provider
for candidate in ('intervals-icu', 'trainingpeaks'):
if candidate in available:
return candidate
return None
+314 -7
View File
@@ -62,6 +62,10 @@ import fitness_pb2
import structured_events_pb2
import online_sync
import intervals_workouts
import trainingpeaks_workouts
import workout_state
import local_zwift_workouts
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
logger = logging.getLogger('zoffline')
@@ -860,6 +864,246 @@ def backup_file(file):
if os.path.isfile(file):
copyfile(file, "%s-%s.bak" % (file, datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d-%H-%M-%S")))
def save_player_zfile(player_id, folder, filename, content):
zfiles_dir = os.path.join(STORAGE_DIR, str(player_id), folder)
if not make_dir(zfiles_dir):
raise IOError("failed to create zfiles directory")
with open(os.path.join(zfiles_dir, filename), 'wb') as fd:
fd.write(content)
timestamp = int(time.time())
row = Zfile.query.filter_by(folder=folder, filename=filename, player_id=player_id).first()
if not row:
row = Zfile(folder=folder, filename=filename, timestamp=timestamp, player_id=player_id)
db.session.add(row)
else:
row.timestamp = timestamp
db.session.commit()
return row
def remove_player_zfiles_by_prefix(player_id, folder, prefix):
rows = Zfile.query.filter_by(folder=folder, player_id=player_id)
removed = False
for row in rows:
if not row.filename.startswith(prefix):
continue
removed = True
try:
os.remove(os.path.join(STORAGE_DIR, str(row.player_id), row.folder, row.filename))
except FileNotFoundError:
pass
except Exception as exc:
logger.warning('remove_player_zfiles_by_prefix: %s' % repr(exc))
db.session.delete(row)
if removed:
db.session.commit()
def load_workout_metadata(player_id, provider):
payload = workout_state.load_metadata(STORAGE_DIR, player_id, provider)
if payload is None:
return None
return payload
def save_workout_metadata(player_id, provider, event, filename):
return workout_state.save_metadata(STORAGE_DIR, player_id, provider, event, filename)
def clear_workout_metadata(player_id, provider):
workout_state.clear_metadata(STORAGE_DIR, player_id, provider)
def load_intervals_workout_metadata(player_id):
return load_workout_metadata(player_id, 'intervals-icu')
def save_intervals_workout_metadata(player_id, event, filename):
return save_workout_metadata(player_id, 'intervals-icu', event, filename)
def clear_intervals_workout_metadata(player_id):
clear_workout_metadata(player_id, 'intervals-icu')
def load_active_workout_provider(player_id):
return workout_state.load_active_provider(STORAGE_DIR, player_id)
def save_active_workout_provider(player_id, provider):
workout_state.save_active_provider(STORAGE_DIR, player_id, provider)
def available_workout_providers_for_player(player_id):
providers = set()
intervals_credentials = '%s/%s/intervals_credentials.bin' % (STORAGE_DIR, player_id)
if os.path.exists(intervals_credentials):
providers.add('intervals-icu')
if load_trainingpeaks_bridge_folder(player_id):
providers.add('trainingpeaks')
return providers
def resolve_workout_provider_for_player(player_id):
return workout_state.resolve_active_provider(load_active_workout_provider(player_id), available_workout_providers_for_player(player_id))
def local_zwift_workouts_root():
return os.path.expanduser('~/Documents/Zwift/Workouts')
def export_local_zwift_workout(player_id, filename, content):
return local_zwift_workouts.export_workout(local_zwift_workouts_root(), player_id, filename, content)
def remove_local_zwift_workouts_by_prefixes(player_id, prefixes):
return local_zwift_workouts.remove_prefixed_workouts(local_zwift_workouts_root(), player_id, prefixes)
def managed_workout_prefixes(provider):
if provider == 'intervals-icu':
return {'intervals-icu-'}
if provider == 'trainingpeaks':
return {'trainingpeaks-'}
return set()
def clear_managed_workouts_for_provider(player_id, provider, clear_metadata=True):
prefixes = managed_workout_prefixes(provider)
for prefix in prefixes:
remove_player_zfiles_by_prefix(player_id, 'customworkouts', prefix)
if prefixes:
remove_local_zwift_workouts_by_prefixes(player_id, prefixes)
if provider == 'intervals-icu' and clear_metadata:
clear_intervals_workout_metadata(player_id)
def current_workout_sync_status(player_id, provider=None):
provider = provider or resolve_workout_provider_for_player(player_id)
if not provider:
return None
metadata = load_workout_metadata(player_id, provider)
if not metadata:
return {
'provider': provider,
'metadata': None,
'server_file_exists': False,
'local_status': None,
}
filename = metadata.get('filename')
server_file = os.path.join(STORAGE_DIR, str(player_id), 'customworkouts', filename) if filename else ''
local_status = local_zwift_workouts.health_report(local_zwift_workouts_root(), player_id, filename) if filename else None
return {
'provider': provider,
'metadata': metadata,
'server_file_exists': bool(filename and os.path.exists(server_file)),
'server_file': server_file,
'local_status': local_status,
}
def activate_workout_provider(player_id, provider):
save_active_workout_provider(player_id, provider)
if provider == 'intervals-icu':
clear_managed_workouts_for_provider(player_id, 'trainingpeaks', clear_metadata=False)
elif provider == 'trainingpeaks':
clear_managed_workouts_for_provider(player_id, 'intervals-icu')
def sync_intervals_workout_for_player(player_id):
intervals_credentials = '%s/%s/intervals_credentials.bin' % (STORAGE_DIR, player_id)
if not os.path.exists(intervals_credentials):
return {"status": "missing_credentials", "message": "Intervals.icu credentials are not configured."}
athlete_id, api_key = decrypt_credentials(intervals_credentials)
if not athlete_id or not api_key:
return {"status": "missing_credentials", "message": "Intervals.icu credentials are incomplete."}
activate_workout_provider(player_id, 'intervals-icu')
stored = {}
def store_workout(filename, content, event):
clear_managed_workouts_for_provider(player_id, 'intervals-icu', clear_metadata=False)
stored['zfile'] = save_player_zfile(player_id, 'customworkouts', filename, content)
stored['local_export'] = export_local_zwift_workout(player_id, filename, content)
stored['metadata'] = save_intervals_workout_metadata(player_id, event, filename)
try:
result = intervals_workouts.sync_workout(athlete_id, api_key, store_workout)
if result['status'] == 'no_workout':
clear_managed_workouts_for_provider(player_id, 'intervals-icu')
return {
**result,
'sync_status': current_workout_sync_status(player_id, 'intervals-icu'),
'message': 'No Intervals.icu workout found for today. Cleared stale managed exports.',
}
if result['status'] == 'synced':
sync_status = current_workout_sync_status(player_id, 'intervals-icu')
return {
**result,
'local_export': stored.get('local_export'),
'sync_status': sync_status,
'message': '%s Local Zwift workout catalog prepared.' % result['message'],
}
return result
except Exception as exc:
logger.warning('sync_intervals_workout_for_player: %s' % repr(exc))
return {"status": "error", "message": "Intervals.icu workout sync failed."}
def trainingpeaks_bridge_folder_file(player_id):
return os.path.join(STORAGE_DIR, str(player_id), 'trainingpeaks_bridge_folder.txt')
def load_trainingpeaks_bridge_folder(player_id):
file = trainingpeaks_bridge_folder_file(player_id)
if not os.path.exists(file):
return ''
with open(file) as fd:
return fd.read().strip()
def save_trainingpeaks_bridge_folder(player_id, folder):
file = trainingpeaks_bridge_folder_file(player_id)
with open(file, 'w') as fd:
fd.write(folder.strip())
def sync_trainingpeaks_workout_for_player(player_id):
folder = load_trainingpeaks_bridge_folder(player_id)
if not folder:
return {
'status': 'missing_folder',
'message': 'Set a TrainingPeaks bridge folder first. Export .zwo workouts there, then sync again.',
}
activate_workout_provider(player_id, 'trainingpeaks')
clear_managed_workouts_for_provider(player_id, 'trainingpeaks', clear_metadata=False)
exports = []
def store_workout(filename, content, workout):
save_player_zfile(player_id, 'customworkouts', filename, content)
exports.append(export_local_zwift_workout(player_id, filename, content))
try:
result = trainingpeaks_workouts.sync_exported_workouts(folder, store_workout)
if result['status'] == 'no_workout':
clear_managed_workouts_for_provider(player_id, 'trainingpeaks', clear_metadata=False)
return {
**result,
'message': 'No .zwo workouts were found in the TrainingPeaks bridge folder. Cleared stale managed exports.',
}
if result['status'] == 'synced':
return {
**result,
'local_exports': exports,
'message': '%s Local Zwift workout catalog prepared.' % result['message'],
}
return result
except Exception as exc:
logger.warning('sync_trainingpeaks_workout_for_player: %s' % repr(exc))
return {'status': 'error', 'message': 'TrainingPeaks bridge sync failed.'}
@app.route("/profile/<username>/", methods=["GET", "POST"])
@login_required
def profile(username):
@@ -953,18 +1197,72 @@ def intervals(username):
if request.method == "POST":
if request.form['athlete_id'] == "" or request.form['api_key'] == "":
flash("Intervals.icu credentials can't be empty.")
return render_template("intervals.html", username=current_user.username)
return render_template(
"intervals.html",
username=current_user.username,
sync_status=current_workout_sync_status(current_user.player_id, 'intervals-icu'),
active_provider=resolve_workout_provider_for_player(current_user.player_id),
)
encrypt_credentials(file, (request.form['athlete_id'], request.form['api_key']))
return redirect(url_for('settings', username=current_user.username))
cred = decrypt_credentials(file)
return render_template("intervals.html", username=current_user.username, aid=cred[0], akey=cred[1])
return render_template(
"intervals.html",
username=current_user.username,
aid=cred[0],
akey=cred[1],
sync_status=current_workout_sync_status(current_user.player_id, 'intervals-icu'),
active_provider=resolve_workout_provider_for_player(current_user.player_id),
)
@app.route("/intervals/<username>/sync", methods=["GET"])
@login_required
def intervals_sync(username):
result = sync_intervals_workout_for_player(current_user.player_id)
flash(result['message'])
return redirect(url_for('intervals', username=current_user.username))
@app.route("/trainingpeaks/<username>/", methods=["GET", "POST"])
@login_required
def trainingpeaks(username):
if request.method == 'POST':
save_trainingpeaks_bridge_folder(current_user.player_id, request.form['bridge_folder'])
flash('TrainingPeaks bridge folder saved.')
return redirect(url_for('trainingpeaks', username=current_user.username))
bridge_folder = load_trainingpeaks_bridge_folder(current_user.player_id)
return render_template(
"trainingpeaks.html",
username=current_user.username,
message=trainingpeaks_workouts.PARTNER_ACCESS_MESSAGE,
bridge_folder=bridge_folder,
)
@app.route("/trainingpeaks/<username>/sync", methods=["GET"])
@login_required
def trainingpeaks_sync(username):
result = sync_trainingpeaks_workout_for_player(current_user.player_id)
flash(result['message'])
return redirect(url_for('trainingpeaks', username=current_user.username))
@app.route("/user/<username>/")
@login_required
def user_home(username):
return render_template("user_home.html", username=current_user.username, enable_ghosts=bool(current_user.enable_ghosts), climbs=CLIMBS,
online=get_online(), is_admin=current_user.is_admin, restarting=restarting, restarting_in_minutes=restarting_in_minutes)
return render_template(
"user_home.html",
username=current_user.username,
enable_ghosts=bool(current_user.enable_ghosts),
climbs=CLIMBS,
online=get_online(),
is_admin=current_user.is_admin,
restarting=restarting,
restarting_in_minutes=restarting_in_minutes,
active_workout_provider=resolve_workout_provider_for_player(current_user.player_id),
workout_sync_status=current_workout_sync_status(current_user.player_id),
)
def enqueue_player_update(player_id, wa_bytes):
if not player_id in player_update_queue:
@@ -2377,10 +2675,11 @@ def intervals_upload(player_id, activity):
logger.info("intervals_credentials.bin missing, skip Intervals.icu activity update")
return
athlete_id, api_key = decrypt_credentials(intervals_credentials)
workout_metadata = load_intervals_workout_metadata(player_id)
try:
from requests.auth import HTTPBasicAuth
url = 'https://intervals.icu/api/v1/athlete/%s/activities?name=%s' % (athlete_id, activity.name)
requests.post(url, files = {"file": BytesIO(activity.fit)}, auth = HTTPBasicAuth('API_KEY', api_key))
result = intervals_workouts.upload_activity(athlete_id, api_key, activity, workout_metadata=workout_metadata)
if result.get('paired'):
clear_intervals_workout_metadata(player_id)
except Exception as exc:
logger.warning("Intervals.icu upload failed. No internet? %s" % repr(exc))
@@ -4446,6 +4745,14 @@ def start_zwift():
selected_climb = request.form['climb']
if selected_climb != 'CALENDAR':
climb_override[request.remote_addr] = selected_climb
provider = resolve_workout_provider_for_player(current_user.player_id)
sync_result = None
if provider == 'intervals-icu':
sync_result = sync_intervals_workout_for_player(current_user.player_id)
elif provider == 'trainingpeaks':
sync_result = sync_trainingpeaks_workout_for_player(current_user.player_id)
if sync_result and sync_result['status'] in ('synced', 'no_workout', 'error'):
flash(sync_result['message'])
return redirect("/ride", 302)