[Feature] 希望支持多吉云和宝塔部署
Opened this issue · 0 comments
fcwys commented
功能描述
希望支持多吉云和宝塔部署
动机
多吉云CDN和宝塔使用范围比较广泛,多吉云官方有支持的API,宝塔也有对应API可用。
替代方案
无
其他信息
- 多吉云API:https://docs.dogecloud.com/cdn/api-cert-upload
- 附:个人实现的Python版本宝塔获取站点列表、上传证书、重启Nginx代码,希望能帮上忙:
# -*- coding: utf-8 -*-
import hashlib
import time
import requests
import prettytable
# 宝塔面板操作类
class BtPanel:
__BTURL = ''
__APIKEY = ''
__REQ = requests.session()
# 初始化宝塔面板
def __init__(self, host: str, apisk: str):
'''
:param host: 宝塔面板地址(末尾不加/)
:param apisk: 宝塔面板API密钥
'''
self.__BTURL = host
self.__APIKEY = apisk
# 计算MD5
def __GetMD5(self, s: str):
'''
计算字符串的MD5值
:param s: 待计算的字符串
:return: MD5值
'''
m = hashlib.md5()
m.update(s.encode('utf-8'))
return m.hexdigest()
# 签名计算
def __GetToken(self):
request_time = int(time.time()) # 获取请求时间戳
request_token = self.__GetMD5(str(request_time) + '' + self.__GetMD5(self.__APIKEY)), # 生成请求签名
return {'request_time': request_time, 'request_token': request_token}
# 获取站点列表
def GetSites(self, showlog=True):
'''
获取宝塔面板站点列表
:showlog: 是否输出结果
:return: 站点列表数据
'''
if showlog:
print('\n### 获取站点列表...')
tk = self.__GetToken() # 获取签名
playload = {
'request_time': tk['request_time'],
'request_token': tk['request_token'],
'p': 1,
'limit': 100,
'order': 'id'
}
res = self.__REQ.post(url=self.__BTURL + '/data?action=getData&table=sites', data=playload).json()
# 判断请求是否成功
if 'status' in res and 'msg' in res and not res['status']:
print('>>> 获取站点列表失败:', res['msg'])
return False
# 使用prettytable输出站点列表
if showlog:
tb = prettytable.PrettyTable()
tb.field_names = ['网站名', '站点类型', '备注', 'SSL域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '状态']
tb.align['网站名'] = 'l'
tb.align['备注'] = 'l'
tb.align['SSL域名'] = 'l'
tb.align['证书品牌'] = 'l'
for site in res['data']:
site_status = '运行' if site['status'] == '1' else '停止'
if site['ssl'] == -1:
sslinfo = {'notAfter': '-', 'endtime': '-', 'subject': '-', 'issuer': '-'}
else:
sslinfo = {'notAfter': site['ssl']['notAfter'], 'endtime': site['ssl']['endtime'], 'subject': site['ssl']['subject'], 'issuer': site['ssl']['issuer']}
tb.add_row([site['name'], site['project_type'], site['ps'], sslinfo['subject'], sslinfo['notAfter'], sslinfo['endtime'], sslinfo['issuer'], site_status])
print(tb)
return res['data']
# 设置站点SSL证书
def SetSSL(self, site_name: str, ssl_cert_content: str, ssl_key_content: str):
'''
设置站点SSL证书
:param site_name: 站点名称(域名)
:param ssl_cert_content: ssl证书内容
:param ssl_key_content: ssl私钥内容
:return: 设置结果
'''
print('\n### 设置站点SSL证书...')
tk = self.__GetToken() # 获取签名
playload = {
'request_time': tk['request_time'],
'request_token': tk['request_token'],
'type': 0,
'siteName': site_name,
'key': ssl_key_content,
'csr': ssl_cert_content
}
res = self.__REQ.post(url=self.__BTURL + '/site?action=SetSSL', data=playload).json()
# 判断请求是否成功
if 'status' in res and 'msg' in res and not res['status']:
print('>>> 设置站点SSL证书失败:', res['msg'])
return False
print('>>>', res['msg']) # 输出结果
return res['status']
# 获取证书夹列表
def GetCertList(self, showlog=True):
'''
获取证书夹列表
:showlog: 是否输出结果
:return: 证书夹列表数据
'''
if showlog:
print('\n### 获取证书夹列表...')
tk = self.__GetToken() # 获取签名
playload = {
'request_time': tk['request_time'],
'request_token': tk['request_token'],
'force_refresh': 0 # 0:获取本地证书 1:获取云端证书
}
res = self.__REQ.post(url=self.__BTURL + '/ssl?action=get_cert_list', data=playload).json()
# 判断请求是否成功
if 'status' in res and 'msg' in res and not res['status']:
print('>>> 获取证书夹列表失败:', res['msg'])
return False
# 使用prettytable输出证书夹列表
if showlog:
tb = prettytable.PrettyTable()
tb.field_names = ['域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '可选域名']
tb.align['域名'] = 'l'
tb.align['证书品牌'] = 'l'
tb.align['可选域名'] = 'l'
for cert in res:
tb.add_row([cert['subject'], cert['info']['notAfter'], cert['endtime'], cert['info']['issuer'], cert['dns']])
print(tb)
return res
# 删除过期SSL证书
def DelExpiredSSL(self):
'''
删除过期SSL证书
:showlog: 是否输出结果
:return: 删除结果
'''
print('\n### 删除过期SSL证书...')
certs = self.GetCertList(showlog=False) # 获取证书列表
if not certs:
return False
expiredcerts = [] # 过期证书列表
for cert in certs:
if cert['endtime'] < 0: # 证书已过期
expiredcerts.append(cert['subject']) # 加入过期证书列表
tk = self.__GetToken() # 获取签名
playload = {
'request_time': tk['request_time'],
'request_token': tk['request_token'],
'local': 1,
'ssl_hash': cert['hash']
}
res = self.__REQ.post(url=self.__BTURL + '/ssl?action=remove_cloud_cert', data=playload).json()
# 判断请求是否成功
if 'status' in res and 'msg' in res and not res['status']:
print('>>> 删除过期SSL证书失败:', res['msg'])
return False
print('>>> 过期证书', cert['subject'], res['msg'])
return res['status']
if len(expiredcerts) == 0:
print('>>> 证书夹内未发现过期证书')
return expiredcerts
# 重启Nginx
def RestartNginx(self):
'''
重启Nginx
:return: 重启结果
'''
tk = self.__GetToken() # 获取签名
playload = {
'request_time': tk['request_time'],
'request_token': tk['request_token'],
'name': 'nginx',
'type': 'restart'
}
res = self.__REQ.post(url=self.__BTURL + '/system?action=ServiceAdmin', data=playload).json()
# 判断请求是否成功
if 'status' in res and 'msg' in res and not res['status']:
print('>>> 重启Nginx失败:', res['msg'])
return False
return res.json()['status']
# 获取系统基本信息
def GetSystemInfo(self, showlog=True):
'''
获取系统基本信息
:showlog: 是否输出结果
:return: 系统基本信息
'''
if showlog:
print('\n### 获取系统基本信息...')
tk = self.__GetToken() # 获取签名
playload = {
'request_time': tk['request_time'],
'request_token': tk['request_token']
}
res = self.__REQ.post(url=self.__BTURL + '/system?action=GetSystemTotal', data=playload).json()
# 判断请求是否成功
# 判断res是否存在status字段,若不存在则说明请求失败
if 'status' in res and 'msg' in res and not res['status']:
print('>>> 获取系统基本信息失败:', res['msg'])
return False
res['memUtilization'] = round(res['memRealUsed'] / res['memTotal'], 2) * 100 # 内存使用率
if showlog:
print('>>> 面板地址:', self.__BTURL)
print('>>> 操作系统:', res['system'])
print('>>> 面板版本:', res['version'])
print('>>> 运行时间:', res['time'])
print('>>> CPU使用率:', res['cpuRealUsed'], '%')
print('>>> 内存使用率:', res['memUtilization'], '%')
print('>>> 内存总量:', round(res['memTotal'] / 1024, 2), 'GB')
return res