SEVEN-6174/zhenxun_plugin_phigros

关于直接获取鸽游服务器存档信息

zxjwzn opened this issue · 2 comments

zxjwzn commented

我记得有个仓库可以实现
https://github.com/7aGiven/PhigrosLibrary
里面有个py的目录可以获取数据,使用的时候要把difficulty.csv也下载下来
不过程序貌似有点问题,我个人修改了一下
这是PhigrosLibrary.py

import base64
from Crypto.Cipher import AES
from Crypto.Util import Padding
import io
import struct
import zipfile

levels = ["EZ", "HD", "IN", "AT"]
difficulty = {}

def getBool(num, index):
        return bool(num & 1 << index)

def print_hex(data):
    hex_data = ' '.join(format(byte, '02X') for byte in data)
    print(hex_data)

class ByteReader:
    def __init__(self, data:bytes):
        self.data = data
        self.position = 0
    
    def readVarShort(self):
        num = self.data[self.position]
        if  num < 128:
            self.position += 1
        else:
            self.position += 2
        return num
    
    def readString(self):
        length = self.data[self.position]
        self.position += length + 1
        return self.data[self.position-length:self.position].decode()
    
    def readScoreAcc(self):
        self.position += 8
        scoreAcc = struct.unpack("if", self.data[self.position-8:self.position])
        return {"score": scoreAcc[0], "acc": scoreAcc[1]}

    
    def readRecord(self, songId):
        end_position = self.position + self.data[self.position] + 1
        self.position += 1
        exists = self.data[self.position]
        self.position += 1
        fc = self.data[self.position]
        self.position += 1
        diff = difficulty[songId]
        records = []
        for level in range(len(diff)):
            if getBool(exists, level):
                scoreAcc = self.readScoreAcc()
                scoreAcc["level"] = levels[level]
                scoreAcc["fc"] = getBool(fc, level)
                scoreAcc["songId"] = songId
                scoreAcc["difficulty"] = diff[level]
                scoreAcc["rks"] = (scoreAcc["acc"]-55)/45
                scoreAcc["rks"] = float(scoreAcc["rks"]) * float(scoreAcc["rks"]) * float(scoreAcc["difficulty"])
                records.append(scoreAcc)
        self.position = end_position
        return records 

global_headers = {
    "X-LC-Id": "rAK3FfdieFob2Nn8Am",
    "X-LC-Key": "Qr9AEqtuoSVS3zeD6iVbM4ZC0AtkJcQ89tywVyi0",
    "User-Agent": "LeanCloud-CSharp-SDK/1.0.3",
    "Accept": "application/json"
}

async def readGameRecord(client, url):
    async with client.get(url) as response:
        result = await response.read()
    with zipfile.ZipFile(io.BytesIO(result)) as zip:
        with zip.open("gameRecord") as gameRecord_file:
            if gameRecord_file.read(1) != b"\x01":
                raise "版本号不正确,可能协议已更新。"
            return gameRecord_file.read()

key = base64.b64decode("6Jaa0qVAJZuXkZCLiOa/Ax5tIZVu+taKUN1V1nqwkks=")
iv = base64.b64decode("Kk/wisgNYwcAV8WVGMgyUw==")

def decrypt_gameRecord(gameRecord):
    gameRecord = AES.new(key, AES.MODE_CBC, iv).decrypt(gameRecord)
    #print(Padding.unpad(gameRecord, AES.block_size))
    return Padding.unpad(gameRecord, AES.block_size)

def parse_b19(gameRecord):
    records = []
    reader = ByteReader(gameRecord)
    #print_hex(gameRecord)
    for i in range(reader.readVarShort()):
        songId = reader.readString()[:-2]
        #print(songId)
        record = reader.readRecord(songId)
        records.extend(record)
    records.sort(key=lambda x:x["rks"], reverse=True)
    b19 = [max(filter(lambda x:x["score"] == 1000000, records), key=lambda x:x["difficulty"])]
    b19.extend(records[:19])
    return b19

class B19Class:
    def __init__(self, client):
        self.client = client
    
    def read_difficulty(self, path):
        difficulty.clear()
        with open(path, encoding="utf-8", errors="replace") as f:
            lines = f.readlines()
        for line in lines:
            line = line[:-1].split(",")
            #print(line)
            for i in range(1, len(line)):
                difficulty[line[0]]=line[1:]
        #print(difficulty)

    async def get_playerId(self, sessionToken):
        headers = global_headers.copy()
        headers["X-LC-Session"] = sessionToken
        async with self.client.get("https://rak3ffdi.cloud.tds1.tapapis.cn/1.1/users/me", headers=headers) as response:
            result = (await response.json())["nickname"]
        return result

    async def get_summary(self, sessionToken):
        headers = global_headers.copy()
        headers["X-LC-Session"] = sessionToken
        async with self.client.get("https://rak3ffdi.cloud.tds1.tapapis.cn/1.1/classes/_GameSave", headers=headers) as response:
            result = (await response.json())["results"][0]
        updateAt = result["updatedAt"]
        url = result["gameFile"]["url"]
        summary = base64.b64decode(result["summary"])
        summary = struct.unpack("=BHfBx%ds12H" % summary[8], summary)
        return {"updateAt": updateAt, "url": url, "saveVersion": summary[0], "challenge": summary[1], "rks": summary[2], "gameVersion": summary[3], "avatar": summary[4].decode(), "EZ": summary[5:8], "HD": summary[8:11], "IN": summary[11:14], "AT": summary[14:17]}
    
    async def get_b19(self, url):
        gameRecord = await readGameRecord(self.client, url)
        gameRecord = decrypt_gameRecord(gameRecord)
        return parse_b19(gameRecord)

这是test.py

import aiohttp
import asyncio
import sys
from PhigrosLibrary import B19Class

async def main(sessionToken):
    async with aiohttp.ClientSession() as client:
        b19Class = B19Class(client)
        b19Class.read_difficulty("../difficulty.csv")
        playerId = await b19Class.get_playerId(sessionToken)        
        summary = await b19Class.get_summary(sessionToken)
        print("玩家名称:" + playerId)
        print("存档上传日期:" + summary["updateAt"])
        print("存档地址:" + summary["url"])
        #print(difficulty)
        b19 = await b19Class.get_b19(summary["url"])
    rks = 0
    b19 = sorted(b19, key=lambda x: x["score"], reverse=True)
    for song in b19:
        rks += song["rks"]
        print("名称:" + song["songId"] + "\n分数:" + str(song["score"]) + "\n准度:" + str(round(song["acc"], 2)) + "%" + "\n难度:" +song["level"] + "\n定数:" + song["difficulty"] + "\n单曲rks:" + str(round(song["rks"], 2)) + "\n-------------")
    print(rks/20)

s = ""
if len(sys.argv) == 2:
    s = sys.argv[1]
asyncio.run(main(s))

s里填的是25位的token,这样就可以直接往鸽游服务器上获取数据
token的获取可以参考以下文章里的账号绑定内容
https://mivik.moe/pgr-bot-help/

zxjwzn commented

我个人没什么能力,加上机器人账号被封,所以心有余力不足了
希望作者可以整合一下这个功能

十分感谢
之前一直想着获取数据的结果没办法