关于直接获取鸽游服务器存档信息
zxjwzn opened this issue · 2 comments
zxjwzn commented
我记得有个仓库可以实现
https://github.com/7aGiven/PhigrosLibrary
里面有个py的目录可以获取数据,使用的时候要把difficulty.csv也下载下来
不过程序貌似有点问题,我个人修改了一下
这是PhigrosLibrary.py
import base64
from Crypto.Cipher import AES
from Crypto.Util import Padding
import io
import struct
import zipfile
levels = ["EZ", "HD", "IN", "AT"]
difficulty = {}
def getBool(num, index):
return bool(num & 1 << index)
def print_hex(data):
hex_data = ' '.join(format(byte, '02X') for byte in data)
print(hex_data)
class ByteReader:
def __init__(self, data:bytes):
self.data = data
self.position = 0
def readVarShort(self):
num = self.data[self.position]
if num < 128:
self.position += 1
else:
self.position += 2
return num
def readString(self):
length = self.data[self.position]
self.position += length + 1
return self.data[self.position-length:self.position].decode()
def readScoreAcc(self):
self.position += 8
scoreAcc = struct.unpack("if", self.data[self.position-8:self.position])
return {"score": scoreAcc[0], "acc": scoreAcc[1]}
def readRecord(self, songId):
end_position = self.position + self.data[self.position] + 1
self.position += 1
exists = self.data[self.position]
self.position += 1
fc = self.data[self.position]
self.position += 1
diff = difficulty[songId]
records = []
for level in range(len(diff)):
if getBool(exists, level):
scoreAcc = self.readScoreAcc()
scoreAcc["level"] = levels[level]
scoreAcc["fc"] = getBool(fc, level)
scoreAcc["songId"] = songId
scoreAcc["difficulty"] = diff[level]
scoreAcc["rks"] = (scoreAcc["acc"]-55)/45
scoreAcc["rks"] = float(scoreAcc["rks"]) * float(scoreAcc["rks"]) * float(scoreAcc["difficulty"])
records.append(scoreAcc)
self.position = end_position
return records
global_headers = {
"X-LC-Id": "rAK3FfdieFob2Nn8Am",
"X-LC-Key": "Qr9AEqtuoSVS3zeD6iVbM4ZC0AtkJcQ89tywVyi0",
"User-Agent": "LeanCloud-CSharp-SDK/1.0.3",
"Accept": "application/json"
}
async def readGameRecord(client, url):
async with client.get(url) as response:
result = await response.read()
with zipfile.ZipFile(io.BytesIO(result)) as zip:
with zip.open("gameRecord") as gameRecord_file:
if gameRecord_file.read(1) != b"\x01":
raise "版本号不正确,可能协议已更新。"
return gameRecord_file.read()
key = base64.b64decode("6Jaa0qVAJZuXkZCLiOa/Ax5tIZVu+taKUN1V1nqwkks=")
iv = base64.b64decode("Kk/wisgNYwcAV8WVGMgyUw==")
def decrypt_gameRecord(gameRecord):
gameRecord = AES.new(key, AES.MODE_CBC, iv).decrypt(gameRecord)
#print(Padding.unpad(gameRecord, AES.block_size))
return Padding.unpad(gameRecord, AES.block_size)
def parse_b19(gameRecord):
records = []
reader = ByteReader(gameRecord)
#print_hex(gameRecord)
for i in range(reader.readVarShort()):
songId = reader.readString()[:-2]
#print(songId)
record = reader.readRecord(songId)
records.extend(record)
records.sort(key=lambda x:x["rks"], reverse=True)
b19 = [max(filter(lambda x:x["score"] == 1000000, records), key=lambda x:x["difficulty"])]
b19.extend(records[:19])
return b19
class B19Class:
def __init__(self, client):
self.client = client
def read_difficulty(self, path):
difficulty.clear()
with open(path, encoding="utf-8", errors="replace") as f:
lines = f.readlines()
for line in lines:
line = line[:-1].split(",")
#print(line)
for i in range(1, len(line)):
difficulty[line[0]]=line[1:]
#print(difficulty)
async def get_playerId(self, sessionToken):
headers = global_headers.copy()
headers["X-LC-Session"] = sessionToken
async with self.client.get("https://rak3ffdi.cloud.tds1.tapapis.cn/1.1/users/me", headers=headers) as response:
result = (await response.json())["nickname"]
return result
async def get_summary(self, sessionToken):
headers = global_headers.copy()
headers["X-LC-Session"] = sessionToken
async with self.client.get("https://rak3ffdi.cloud.tds1.tapapis.cn/1.1/classes/_GameSave", headers=headers) as response:
result = (await response.json())["results"][0]
updateAt = result["updatedAt"]
url = result["gameFile"]["url"]
summary = base64.b64decode(result["summary"])
summary = struct.unpack("=BHfBx%ds12H" % summary[8], summary)
return {"updateAt": updateAt, "url": url, "saveVersion": summary[0], "challenge": summary[1], "rks": summary[2], "gameVersion": summary[3], "avatar": summary[4].decode(), "EZ": summary[5:8], "HD": summary[8:11], "IN": summary[11:14], "AT": summary[14:17]}
async def get_b19(self, url):
gameRecord = await readGameRecord(self.client, url)
gameRecord = decrypt_gameRecord(gameRecord)
return parse_b19(gameRecord)
这是test.py
import aiohttp
import asyncio
import sys
from PhigrosLibrary import B19Class
async def main(sessionToken):
async with aiohttp.ClientSession() as client:
b19Class = B19Class(client)
b19Class.read_difficulty("../difficulty.csv")
playerId = await b19Class.get_playerId(sessionToken)
summary = await b19Class.get_summary(sessionToken)
print("玩家名称:" + playerId)
print("存档上传日期:" + summary["updateAt"])
print("存档地址:" + summary["url"])
#print(difficulty)
b19 = await b19Class.get_b19(summary["url"])
rks = 0
b19 = sorted(b19, key=lambda x: x["score"], reverse=True)
for song in b19:
rks += song["rks"]
print("名称:" + song["songId"] + "\n分数:" + str(song["score"]) + "\n准度:" + str(round(song["acc"], 2)) + "%" + "\n难度:" +song["level"] + "\n定数:" + song["difficulty"] + "\n单曲rks:" + str(round(song["rks"], 2)) + "\n-------------")
print(rks/20)
s = ""
if len(sys.argv) == 2:
s = sys.argv[1]
asyncio.run(main(s))
s里填的是25位的token,这样就可以直接往鸽游服务器上获取数据
token的获取可以参考以下文章里的账号绑定内容
https://mivik.moe/pgr-bot-help/
zxjwzn commented
我个人没什么能力,加上机器人账号被封,所以心有余力不足了
希望作者可以整合一下这个功能
SEVEN-6174 commented
十分感谢
之前一直想着获取数据的结果没办法