usual2970/certimate

[Feature] 希望支持多吉云和宝塔部署

Opened this issue · 0 comments

功能描述
希望支持多吉云和宝塔部署

动机
多吉云CDN和宝塔使用范围比较广泛,多吉云官方有支持的API,宝塔也有对应API可用。

替代方案

其他信息

# -*- coding: utf-8 -*-
import hashlib
import time
import requests
import prettytable


# 宝塔面板操作类
class BtPanel:
    __BTURL = ''
    __APIKEY = ''
    __REQ = requests.session()

    # 初始化宝塔面板
    def __init__(self, host: str, apisk: str):
        '''
        :param host: 宝塔面板地址(末尾不加/)
        :param apisk: 宝塔面板API密钥
        '''
        self.__BTURL = host
        self.__APIKEY = apisk

    # 计算MD5
    def __GetMD5(self, s: str):
        '''
        计算字符串的MD5值
        :param s: 待计算的字符串
        :return: MD5值
        '''
        m = hashlib.md5()
        m.update(s.encode('utf-8'))
        return m.hexdigest()

    # 签名计算
    def __GetToken(self):
        request_time = int(time.time())    # 获取请求时间戳
        request_token = self.__GetMD5(str(request_time) + '' + self.__GetMD5(self.__APIKEY)),   # 生成请求签名
        return {'request_time': request_time, 'request_token': request_token}

    # 获取站点列表
    def GetSites(self, showlog=True):
        '''
        获取宝塔面板站点列表
        :showlog: 是否输出结果
        :return: 站点列表数据
        '''
        if showlog:
            print('\n### 获取站点列表...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'p': 1,
            'limit': 100,
            'order': 'id'
        }
        res = self.__REQ.post(url=self.__BTURL + '/data?action=getData&table=sites', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取站点列表失败:', res['msg'])
            return False
        # 使用prettytable输出站点列表
        if showlog:
            tb = prettytable.PrettyTable()
            tb.field_names = ['网站名', '站点类型', '备注', 'SSL域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '状态']
            tb.align['网站名'] = 'l'
            tb.align['备注'] = 'l'
            tb.align['SSL域名'] = 'l'
            tb.align['证书品牌'] = 'l'
            for site in res['data']:
                site_status = '运行' if site['status'] == '1' else '停止'
                if site['ssl'] == -1:
                    sslinfo = {'notAfter': '-', 'endtime': '-', 'subject': '-', 'issuer': '-'}
                else:
                    sslinfo = {'notAfter': site['ssl']['notAfter'], 'endtime': site['ssl']['endtime'], 'subject': site['ssl']['subject'], 'issuer': site['ssl']['issuer']}
                tb.add_row([site['name'], site['project_type'], site['ps'], sslinfo['subject'], sslinfo['notAfter'], sslinfo['endtime'], sslinfo['issuer'], site_status])
            print(tb)
        return res['data']

    # 设置站点SSL证书
    def SetSSL(self, site_name: str, ssl_cert_content: str, ssl_key_content: str):
        '''
        设置站点SSL证书
        :param site_name: 站点名称(域名)
        :param ssl_cert_content: ssl证书内容
        :param ssl_key_content: ssl私钥内容
        :return: 设置结果
        '''
        print('\n### 设置站点SSL证书...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'type': 0,
            'siteName': site_name,
            'key': ssl_key_content,
            'csr': ssl_cert_content
        }
        res = self.__REQ.post(url=self.__BTURL + '/site?action=SetSSL', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 设置站点SSL证书失败:', res['msg'])
            return False
        print('>>>', res['msg'])    # 输出结果
        return res['status']

    # 获取证书夹列表
    def GetCertList(self, showlog=True):
        '''
        获取证书夹列表
        :showlog: 是否输出结果
        :return: 证书夹列表数据
        '''
        if showlog:
            print('\n### 获取证书夹列表...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'force_refresh': 0    # 0:获取本地证书 1:获取云端证书
        }
        res = self.__REQ.post(url=self.__BTURL + '/ssl?action=get_cert_list', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取证书夹列表失败:', res['msg'])
            return False
        # 使用prettytable输出证书夹列表
        if showlog:
            tb = prettytable.PrettyTable()
            tb.field_names = ['域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '可选域名']
            tb.align['域名'] = 'l'
            tb.align['证书品牌'] = 'l'
            tb.align['可选域名'] = 'l'
            for cert in res:
                tb.add_row([cert['subject'], cert['info']['notAfter'], cert['endtime'], cert['info']['issuer'], cert['dns']])
            print(tb)
        return res

    # 删除过期SSL证书
    def DelExpiredSSL(self):
        '''
        删除过期SSL证书
        :showlog: 是否输出结果
        :return: 删除结果
        '''
        print('\n### 删除过期SSL证书...')
        certs = self.GetCertList(showlog=False)    # 获取证书列表
        if not certs:
            return False
        expiredcerts = []    # 过期证书列表
        for cert in certs:
            if cert['endtime'] < 0:    # 证书已过期
                expiredcerts.append(cert['subject'])    # 加入过期证书列表
                tk = self.__GetToken()    # 获取签名
                playload = {
                    'request_time': tk['request_time'],
                    'request_token': tk['request_token'],
                    'local': 1,
                    'ssl_hash': cert['hash']
                }
                res = self.__REQ.post(url=self.__BTURL + '/ssl?action=remove_cloud_cert', data=playload).json()
                # 判断请求是否成功
                if 'status' in res and 'msg' in res and not res['status']:
                    print('>>> 删除过期SSL证书失败:', res['msg'])
                    return False
                print('>>> 过期证书', cert['subject'], res['msg'])
                return res['status']
        if len(expiredcerts) == 0:
            print('>>> 证书夹内未发现过期证书')
        return expiredcerts

    # 重启Nginx
    def RestartNginx(self):
        '''
        重启Nginx
        :return: 重启结果
        '''
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'name': 'nginx',
            'type': 'restart'
        }
        res = self.__REQ.post(url=self.__BTURL + '/system?action=ServiceAdmin', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 重启Nginx失败:', res['msg'])
            return False
        return res.json()['status']

    # 获取系统基本信息
    def GetSystemInfo(self, showlog=True):
        '''
        获取系统基本信息
        :showlog: 是否输出结果
        :return: 系统基本信息
        '''
        if showlog:
            print('\n### 获取系统基本信息...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token']
        }
        res = self.__REQ.post(url=self.__BTURL + '/system?action=GetSystemTotal', data=playload).json()
        # 判断请求是否成功
        # 判断res是否存在status字段,若不存在则说明请求失败
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取系统基本信息失败:', res['msg'])
            return False
        res['memUtilization'] = round(res['memRealUsed'] / res['memTotal'], 2) * 100    # 内存使用率
        if showlog:
            print('>>> 面板地址:', self.__BTURL)
            print('>>> 操作系统:', res['system'])
            print('>>> 面板版本:', res['version'])
            print('>>> 运行时间:', res['time'])
            print('>>> CPU使用率:', res['cpuRealUsed'], '%')
            print('>>> 内存使用率:', res['memUtilization'], '%')
            print('>>> 内存总量:', round(res['memTotal'] / 1024, 2), 'GB')
        return res