mirror of
https://github.com/zoffline/zwift-offline.git
synced 2026-06-12 11:01:32 -07:00
Add scripts
This commit is contained in:
@@ -0,0 +1,130 @@
|
||||
#!/usr/bin/env python
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import sys
|
||||
import getpass
|
||||
|
||||
def post_credentials(session, username, password):
|
||||
# Credentials POSTing and tokens retrieval
|
||||
# POST https://secure.zwift.com/auth/realms/zwift/tokens/access/codes
|
||||
try:
|
||||
response = session.post(
|
||||
url="https://secure.zwift.com/auth/realms/zwift/protocol/openid-connect/token",
|
||||
headers={
|
||||
"Accept": "*/*",
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Connection": "keep-alive",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Host": "secure.zwift.com",
|
||||
"User-Agent": "Zwift/1.5 (iPhone; iOS 9.0.2; Scale/2.00)",
|
||||
"Accept-Language": "en-US;q=1",
|
||||
},
|
||||
data={
|
||||
"client_id": "Zwift_Mobile_Link",
|
||||
"username": username,
|
||||
"password": password,
|
||||
"grant_type": "password",
|
||||
},
|
||||
allow_redirects=False,
|
||||
verify=True,
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
print('Response HTTP Status Code: {status_code}'.format(
|
||||
status_code=response.status_code))
|
||||
print('Response HTTP Response Body: {content}'.format(
|
||||
content=response.content))
|
||||
|
||||
json_dict = json.loads(response.content)
|
||||
|
||||
return (json_dict["access_token"], json_dict["refresh_token"], json_dict["expires_in"])
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print('HTTP Request failed: %s' % e)
|
||||
|
||||
except KeyError as e:
|
||||
print('Invalid uname and/or password')
|
||||
exit(-1)
|
||||
|
||||
def get_game_info(session, access_token):
|
||||
try:
|
||||
response = session.get(
|
||||
url="https://us-or-rly101.zwift.com/api/game_info",
|
||||
headers={
|
||||
"Accept": "*/*",
|
||||
"Connection": "keep-alive",
|
||||
"Host": "us-or-rly101.zwift.com",
|
||||
"User-Agent": "Zwift/115 CFNetwork/758.0.2 Darwin/15.0.0",
|
||||
"Authorization": "Bearer %s" % access_token,
|
||||
"Accept-Language": "en-us",
|
||||
"Zwift-Api-Version": "2.6"
|
||||
},
|
||||
verify=True,
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
print('Response HTTP Status Code: {status_code}'.format(
|
||||
status_code=response.status_code))
|
||||
|
||||
return response.content
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print('HTTP Request failed: %s' % e)
|
||||
|
||||
def logout(session, refresh_token):
|
||||
# Logout
|
||||
# POST https://secure.zwift.com/auth/realms/zwift/tokens/logout
|
||||
try:
|
||||
response = session.post(
|
||||
url="https://secure.zwift.com/auth/realms/zwift/tokens/logout",
|
||||
headers={
|
||||
"Accept": "*/*",
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Connection": "keep-alive",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Host": "secure.zwift.com",
|
||||
"User-Agent": "Zwift/1.5 (iPhone; iOS 9.0.2; Scale/2.00)",
|
||||
"Accept-Language": "en-US;q=1",
|
||||
},
|
||||
data={
|
||||
"client_id": "Zwift_Mobile_Link",
|
||||
"refresh_token": refresh_token,
|
||||
},
|
||||
verify=True,
|
||||
)
|
||||
if response.status_code != 204:
|
||||
print('Response HTTP Status Code: {status_code}'.format(
|
||||
status_code=response.status_code))
|
||||
print('Response HTTP Response Body: {content}'.format(
|
||||
content=response.content))
|
||||
except requests.exceptions.RequestException as e:
|
||||
print('HTTP Request failed: %s' % e)
|
||||
|
||||
def main(argv):
|
||||
username = input("Enter Zwift login (e-mail): ")
|
||||
if not sys.stdin.isatty(): # This terminal cannot support input without displaying text
|
||||
print(f'*WARNING* The current shell ({os.name}) cannot support hidden text entry.')
|
||||
print(f'Your password entry WILL BE VISIBLE.')
|
||||
print(f'If you are running a bash shell under windows, try executing this program via winpty:')
|
||||
print(f'>winpty python {argv[0]}')
|
||||
password = input("Enter password (will be shown):")
|
||||
else:
|
||||
password = getpass.getpass("Enter password: ")
|
||||
|
||||
session = requests.session()
|
||||
access_token, refresh_token, expired_in = post_credentials(session, username, password)
|
||||
game_info = get_game_info(session, access_token).decode('utf-8')
|
||||
with open('game_info.txt', 'wb') as f:
|
||||
f.write(game_info.encode('utf-8-sig'))
|
||||
|
||||
logout(session, refresh_token)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main(sys.argv)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except SystemExit as se:
|
||||
print("ERROR:", se)
|
||||
@@ -0,0 +1,59 @@
|
||||
import os
|
||||
import time
|
||||
import math
|
||||
import signal
|
||||
import threading
|
||||
import xml.etree.ElementTree as ET
|
||||
from urllib3 import PoolManager
|
||||
from binascii import crc32
|
||||
|
||||
def sigint_handler(num, frame):
|
||||
os._exit(0)
|
||||
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
|
||||
def download(files, folder):
|
||||
global downloaded
|
||||
manager = PoolManager()
|
||||
for file in files:
|
||||
path = file.find('path').text
|
||||
length = int(file.find('length').text)
|
||||
checksum = int(file.find('checksum').text) % (1 << 32)
|
||||
file_name = os.path.join(local_path, folder, path.replace('\\', os.sep))
|
||||
dir_name = os.path.dirname(file_name)
|
||||
if not os.path.isdir(dir_name):
|
||||
os.makedirs(dir_name)
|
||||
while not os.path.isfile(file_name) or os.path.getsize(file_name) != length or (crc32(open(file_name, 'rb').read()) != checksum and checksum != 4294967295):
|
||||
open(file_name, 'wb').write(manager.request('GET', '%s%s/%s' % (base_url, folder, path.replace('\\', '/'))).data)
|
||||
downloaded += 1
|
||||
|
||||
base_url = 'http://cdn.zwift.com/gameassets/Zwift_Updates_Root/'
|
||||
local_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'cdn', 'gameassets', 'Zwift_Updates_Root')
|
||||
for file in ['Zwift_ver_cur.xml', 'ZwiftMac_ver_cur.xml']:
|
||||
tree = ET.parse(os.path.join(local_path, file))
|
||||
root = tree.getroot()
|
||||
manifest = root.get('manifest')
|
||||
manifest_checksum = int(root.get('manifest_checksum')) % (1 << 32)
|
||||
manifest_file = os.path.join(local_path, manifest)
|
||||
while not os.path.isfile(manifest_file) or crc32(open(manifest_file, 'rb').read()) != manifest_checksum:
|
||||
open(manifest_file, 'wb').write(PoolManager().request('GET', base_url + manifest).data)
|
||||
tree = ET.parse(manifest_file)
|
||||
root = tree.getroot()
|
||||
folder = root.get('folder')
|
||||
all_files = list(root.iter('file'))
|
||||
total = len(all_files)
|
||||
downloaded = 0
|
||||
threads = 5
|
||||
c = math.ceil(total / threads)
|
||||
for i in range(0, threads):
|
||||
files = all_files[i * c:i * c + c]
|
||||
thread = threading.Thread(target=download, args=(files, folder))
|
||||
thread.start()
|
||||
print("Downloading files from %s" % manifest)
|
||||
while True:
|
||||
time.sleep(1)
|
||||
completed = 50 * downloaded // total
|
||||
print('\r[%s] %s%% (%s of %s)' % ('#' * completed + '.' * (50 - completed), round(100 * downloaded / total, 1), downloaded, total), end='', flush=True)
|
||||
if downloaded == total:
|
||||
break
|
||||
print()
|
||||
@@ -0,0 +1,30 @@
|
||||
import json
|
||||
import protobuf.variants_pb2 as variants_pb2
|
||||
from google.protobuf.json_format import MessageToDict
|
||||
|
||||
variants = variants_pb2.FeatureResponse()
|
||||
|
||||
with open("variant", "rb") as f:
|
||||
variants.ParseFromString(f.read())
|
||||
|
||||
vs = []
|
||||
|
||||
with open("variants.txt") as f:
|
||||
j = json.load(f)
|
||||
vs = j['variants']
|
||||
|
||||
for variant in variants.variants:
|
||||
d = MessageToDict(variant)
|
||||
v = {}
|
||||
v['name'] = d['name']
|
||||
if 'value' in d:
|
||||
v['value'] = d['value']
|
||||
d['values'] = dict(d['values'])
|
||||
for f in d['values']:
|
||||
d['values'][f] = dict(sorted(d['values'][f].items()))
|
||||
v['values'] = d['values']
|
||||
vs[:] = [d for d in vs if d.get('name') != v['name']]
|
||||
vs.append(v)
|
||||
|
||||
with open("variants.txt", "w") as f:
|
||||
json.dump({'variants': sorted(vs, key=lambda x: x['name'])}, f, indent=2)
|
||||
Reference in New Issue
Block a user