新增数据库支持,创建CloudDriveDatabase类以管理网盘驱动和用户网盘信息,更新主程序以实现数据库连接和相关API接口,完善管理员面板功能,更新README文档以描述数据库设计。

This commit is contained in:
dockermen 2025-04-10 17:29:05 +08:00
parent f8da20b638
commit bd0a5ebb2c
6 changed files with 732 additions and 24 deletions

View File

@ -27,3 +27,19 @@ flask
## 用户系统
1. 用户使用外链访问后,页面显示剩余次数,到期时间倒计时
2. 用户每成功登录一次,剩余次数-1
## 数据库设计
1. 网盘驱动表
1. 网盘服务商名称-唯一
2. 环境变量配置-JSON格式
3. 备注
2. 用户网盘表
1. 从#1表里面去重获取
2. 登录配置-JSON格式
3. 备注
3. 外链表
1. 网盘名称: 从#2里获取已有网盘
2. 总额度: 数字
3. 已使用额度: 数字
4. 外链编码: 随机uuid-不重复
5. 备注

BIN
database.db Normal file

Binary file not shown.

123
main.py
View File

@ -1,33 +1,52 @@
import os
import requests
import sqlite3
import json
from flask import g
from flask import Flask, render_template, request, redirect, url_for, session,make_response,send_from_directory,jsonify
from flask.views import MethodView
from werkzeug.routing import BaseConverter
from utils.login import login_quark
from utils.tools import get_cnb_weburl
from utils.detebase import CloudDriveDatabase
from functools import wraps # 导入 wraps
app = Flask(__name__)
#app = Flask(__name__,template_folder='templates') #修改模板目录
app.jinja_env.auto_reload = True
# 数据库配置
DATABASE = 'database.db'
def get_db():
# 从 Flask 的全局对象 g 中获取数据库连接
# getattr 函数用于获取对象的属性,如果属性不存在则返回第三个参数作为默认值
# 这里尝试获取 g._database 属性,如果不存在则返回 None
# 这种模式确保在一个请求中只创建一个数据库连接
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
db = g._database = CloudDriveDatabase(DATABASE)
db.row_factory = sqlite3.Row
return db
# 定义数据库注入装饰器
def inject_db(f):
@wraps(f)
def decorated_function(*args, **kwargs):
db = get_db()
# 将 db 作为第一个参数传递
return f(db, *args, **kwargs)
return decorated_function
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
app = Flask(__name__)
#app = Flask(__name__,template_folder='templates') #修改模板目录
app.jinja_env.auto_reload = True
exlink_info = {
"exlink_id":"1",
@ -96,16 +115,96 @@ def qrlink(id):
@app.route('/admin/')
def admin():
return render_template('admin.html')
db = get_db()
providers = db.get_all_drive_providers()
alluser_drives = db.get_all_user_drives()
print(alluser_drives)
return render_template('admin.html',providers=providers,alluser_drives=alluser_drives)
@app.route('/admin/drive_provider/<metfunc>',methods=['POST'])
def drive_provider(metfunc):
db = get_db()
data = {"status":False}
if metfunc == "get":
alldrive= db.get_all_drive_providers()
if len(alldrive) > 0:
data["status"] = True
data['data'] = alldrive
# 使用jsonify确保中文正确显示
elif metfunc == "add":
"""
json样板
body -- {
"config_vars": {
"data": {
"client_id": "532",
"kps_wg": "",
"request_id": "",
"sign_wg": "",
"token": "",
"v": "1.2",
"vcode": ""
},
"kps_wg": "",
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
"sign_wg": ""
},
"provider_name": "夸克网盘",
"remarks": "夸克网盘"
}
Return: status
"""
body = request.get_json()
print(data)
status = db.add_drive_provider(body.get("provider_name","测试网盘"),body.get("config_vars"),body.get("remarks","测试网盘"))
if status:
data["status"] = True
data["data"] = body
return data
@app.route('/admin/user_drive/<metfunc>',methods=['POST'])
def user_drive(metfunc):
db = get_db()
data = {"status":False}
if metfunc == "get":
pass
elif metfunc == "add":
body = request.get_json()
print(body)
status = db.add_user_drive(body.get("provider_name","测试网盘"),body.get("config_vars"),body.get("remarks","测试网盘"))
if status:
data["status"] = True
data["data"] = body
return data
class Exlink(MethodView):
def get(self):
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT * FROM exlinks')
exlinks = cursor.fetchall()
return jsonify([dict(row) for row in exlinks])
db.add_drive_provider(
"阿里网盘",
{
"sign_wg": "",
"kps_wg": "",
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
"data":{
'client_id': '532',
'v': '1.2',
'request_id': "",
'sign_wg': "",
'kps_wg': "",
'vcode': "",
'token': ""
}
},
"阿里网盘API配置"
)
return jsonify({"status": True, "message": "success"})
def post(self):
data = request.json
@ -126,7 +225,10 @@ class Exlink(MethodView):
app.add_url_rule('/admin/exlink', view_func=Exlink.as_view('exlink'))
if __name__ == "__main__":
weburl = get_cnb_weburl(5000)
print("Run_url:",weburl)
app.config.from_pyfile("config.py")
@ -134,3 +236,4 @@ if __name__ == "__main__":

View File

@ -131,6 +131,18 @@
</thead>
<tbody>
<!-- 账号列表将通过JavaScript动态填充 -->
<tr>
{% for user_drive in alluser_drives %}
<td>{{user_drive.provider_name}}</td>
<td>{{user_drive.provider_name}}</td>
<td><span class="badge bg-success">正常</span></td>
<td>1.5TB / 2TB</td>
<td>
<button class="btn btn-sm btn-info" id="editudrivebtn" data-bs-toggle="modal" data-bs-target="#editudriveModal" tid="{{ user_drive.id }}"><i class="bi bi-pencil"></i></button>
<button class="btn btn-sm btn-danger"><i class="bi bi-trash"></i></button>
</td>
{% endfor %}
</tr>
</tbody>
</table>
</div>
@ -201,23 +213,30 @@
<form id="addAccountForm">
<div class="mb-3">
<label class="form-label">网盘类型</label>
<select class="form-select" name="type" required>
<select class="form-select" name="type" id="driveType" required>
<option value="">请选择网盘类型</option>
<option value="aliyun">阿里云盘</option>
<option value="baidu">百度网盘</option>
{% set provider_options = providers %}
{% for provider in provider_options %}
<option value="{{ provider.provider_name }}" data-needs-api="{{ provider.provider_name }}">{{ provider.provider_name}}</option>
{% endfor %}
</select>
</div>
<div class="mb-3">
<label class="form-label">账号名称</label>
<input type="text" class="form-control" name="name" required>
</div>
<div class="mb-3">
<label class="form-label">API密钥</label>
<input type="password" class="form-control" name="api_key" required>
</div>
<div class="mb-3">
<label class="form-label">访问令牌</label>
<input type="password" class="form-control" name="access_token" required>
<label class="form-label">配置信息 (JSON)</label>
<div class="position-relative">
<div id="jsonEditor" class="border rounded" style="height: 200px; font-family: monospace;"></div>
<textarea class="form-control d-none" name="config_json" id="jsonConfig"></textarea>
<div class="position-absolute top-0 end-0 p-2">
<button class="btn btn-sm btn-outline-secondary" type="button" id="formatJsonBtn">
<i class="bi bi-code-square"></i> 格式化
</button>
</div>
</div>
<div class="d-flex justify-content-between mt-1">
<div class="form-text">请输入有效的JSON格式配置信息</div>
<div class="form-text text-end"><span id="jsonLineCount">0</span> 行 | <span id="jsonCharCount">0</span> 字符</div>
</div>
<div id="jsonError" class="invalid-feedback"></div>
</div>
</form>
</div>
@ -230,6 +249,43 @@
</div>
<!-- 编辑网盘账号模态框 -->
<div class="modal fade" id="editudriveModal" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">编辑网盘驱动</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<div class="modal-body">
<form id="editudriveForm">
<div class="mb-3">
<label class="form-label">配置信息 (JSON)</label>
<div class="position-relative">
<div id="jsonEditor1" class="border rounded" style="height: 200px; font-family: monospace;"></div>
<textarea class="form-control d-none" name="config_json" id="jsonConfig1"></textarea>
<div class="position-absolute top-0 end-0 p-2">
<button class="btn btn-sm btn-outline-secondary" type="button" id="formatJsonBtn">
<i class="bi bi-code-square"></i> 格式化
</button>
</div>
</div>
<div class="d-flex justify-content-between mt-1">
<div class="form-text">请输入有效的JSON格式配置信息</div>
<div class="form-text text-end"><span id="jsonLineCount1">0</span> 行 | <span id="jsonCharCount1">0</span> 字符</div>
</div>
<div id="jsonError" class="invalid-feedback"></div>
</div>
</form>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="button" class="btn btn-primary" id="saveAccount">保存</button>
</div>
</div>
</div>
</div>
<!-- 添加网盘外链模态框 -->
<div class="modal fade" id="addExlinkModal" tabindex="-1">
<div class="modal-dialog">
@ -279,6 +335,8 @@
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/ace/1.4.12/ace.js" integrity="sha512-GZ1RIgZaSc8rnco/8CXfRdCpDxRCphenIiZ2ztLy3XQfCbQUSCuk8IudvNHxkRA3oUg6q0qejgN/qqyG1duv5Q==" crossorigin="anonymous"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/ace/1.4.12/ext-language_tools.min.js"></script>
<script>
// 导航切换
@ -430,6 +488,162 @@
}]
}
});
// 添加Ace编辑器的CDN引用
function loadAceEditor() {
if (typeof ace === 'undefined') {
// 如果ace未定义直接初始化编辑器
initAceEditor();
} else {
initAceEditor();
}
}
// json编辑器处理
function initAceEditor() {
// 初始化编辑器
const editor = ace.edit("jsonEditor");
const editor1 = ace.edit("jsonEditor1");
// 配置两个编辑器
const configureEditor = function(editor, configId, lineCountId, charCountId) {
editor.setTheme("ace/theme/monokai");
editor.session.setMode("ace/mode/json");
editor.setOptions({
fontSize: "12pt",
showPrintMargin: false,
enableBasicAutocompletion: true,
enableLiveAutocompletion: true
});
// 同步到隐藏的textarea
editor.getSession().on('change', function() {
document.getElementById(configId).value = editor.getSession().getValue();
document.getElementById(lineCountId).textContent = editor.getSession().getLength();
document.getElementById(charCountId).textContent = editor.getSession().getValue().length;
});
};
// 应用配置到两个编辑器
configureEditor(editor, 'jsonConfig', 'jsonLineCount', 'jsonCharCount');
configureEditor(editor1, 'jsonConfig1', 'jsonLineCount1', 'jsonCharCount1');
// 格式化按钮
document.getElementById('formatJsonBtn').addEventListener('click', function() {
const activeEditor = document.activeElement.closest('#jsonEditor') ? editor :
document.activeElement.closest('#jsonEditor1') ? editor1 : editor;
const editorElement = activeEditor === editor ? 'jsonEditor' : 'jsonEditor1';
try {
const json = JSON.parse(activeEditor.getSession().getValue());
activeEditor.setValue(JSON.stringify(json, null, 2), -1);
document.getElementById('jsonError').textContent = '';
document.getElementById(editorElement).classList.remove('border-danger');
} catch (e) {
document.getElementById('jsonError').textContent = '无效的JSON格式: ' + e.message;
document.getElementById('jsonError').style.display = 'block';
document.getElementById(editorElement).classList.add('border-danger');
}
});
}
document.addEventListener('DOMContentLoaded', function() {
loadAceEditor();
});
// 设置默认JSON模板
document.getElementById('driveType').addEventListener('change', function() {
const editor = ace.edit("jsonEditor");
const selectedType = this.value;
if (selectedType) {
let defaultJson = {};
// 获取所有网盘提供商数据
const providers = JSON.parse('{{ providers | tojson }}');
// 查找选中的网盘提供商
const selectedProvider = providers.find(provider => provider.provider_name === selectedType);
if (selectedProvider && selectedProvider.config_vars) {
// 如果找到对应的提供商配置,直接使用
defaultJson = selectedProvider.config_vars;
} else {
// 如果没有找到对应配置,使用默认模板
defaultJson = {
"provider_name": selectedType,
"config": {
"api_key": "",
"secret_key": "",
"redirect_uri": ""
},
"auth": {
"token": "",
"expires_in": 0
}
};
}
// 设置编辑器内容
editor.setValue(JSON.stringify(defaultJson, null, 2), -1);
}
});
// 设置编辑用户驱动JSON模板
document.getElementById('editudrivebtn').addEventListener('click', function() {
const editor = ace.edit("jsonEditor1");
const selectedtid = this.getAttribute('tid');
if (selectedtid) {
let defaultJson = {};
// 获取所有网盘提供商数据
const user_drives = JSON.parse('{{ alluser_drives | tojson }}');
// 查找选中的网盘提供商
console.log(typeof(selectedtid));
const selectedUdrive = user_drives.find(user_drive => user_drive.id == selectedtid);
console.log(selectedtid,selectedUdrive)
if (selectedUdrive && selectedUdrive.login_config) {
// 如果找到对应的提供商配置,直接使用
defaultJson = selectedUdrive.login_config;
} else {
// 如果没有找到对应配置,使用默认模板
defaultJson = {
"provider_name": selectedtid,
"config": {
"api_key": "",
"secret_key": "",
"redirect_uri": ""
},
"auth": {
"token": "",
"expires_in": 0
}
};
}
// 设置编辑器内容
editor.setValue(JSON.stringify(defaultJson, null, 2), -1);
}
});
// 初始化时设置一个空的JSON结构
window.addEventListener('load', function() {
const editor = ace.edit("jsonEditor");
const editor1 = ace.edit("jsonEditor1");
const emptyJson = {
"provider_name": "",
"config": {},
"auth": {}
};
const jsonString = JSON.stringify(emptyJson, null, 2);
// 为两个编辑器设置相同的初始值
editor.setValue(jsonString, -1);
editor1.setValue(jsonString, -1);
document.getElementById('jsonConfig').value = jsonString;
document.getElementById('jsonConfig1').value = jsonString;
});
</script>
</body>
</html>

375
utils/detebase.py Normal file
View File

@ -0,0 +1,375 @@
import sqlite3
import json
import uuid
from typing import Dict, Any, Optional
class CloudDriveDatabase:
def __init__(self, db_path: str = "cloud_drive.db"):
"""初始化数据库连接"""
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
self._create_tables()
def _create_tables(self):
"""创建所需的数据库表"""
# 1. 网盘驱动表
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS drive_providers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
provider_name TEXT UNIQUE NOT NULL,
config_vars TEXT NOT NULL,
remarks TEXT
)
''')
# 2. 用户网盘表
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS user_drives (
id INTEGER PRIMARY KEY AUTOINCREMENT,
provider_name TEXT NOT NULL,
login_config TEXT NOT NULL,
remarks TEXT,
FOREIGN KEY (provider_name) REFERENCES drive_providers (provider_name)
)
''')
# 3. 外链表
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS external_links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
drive_id INTEGER NOT NULL,
total_quota REAL NOT NULL,
used_quota REAL NOT NULL DEFAULT 0,
link_uuid TEXT UNIQUE NOT NULL,
remarks TEXT,
FOREIGN KEY (drive_id) REFERENCES user_drives (id)
)
''')
self.conn.commit()
# 网盘驱动表操作
def add_drive_provider(self, provider_name: str, config_vars: Dict[str, Any], remarks: Optional[str] = None) -> bool:
"""添加网盘服务商"""
try:
self.cursor.execute(
"INSERT INTO drive_providers (provider_name, config_vars, remarks) VALUES (?, ?, ?)",
(provider_name, json.dumps(config_vars, ensure_ascii=False), remarks)
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
# 服务商名称已存在
return False
def get_drive_provider(self, provider_name: str) -> Optional[Dict[str, Any]]:
"""获取网盘服务商信息"""
self.cursor.execute("SELECT * FROM drive_providers WHERE provider_name = ?", (provider_name,))
result = self.cursor.fetchone()
if result:
result_dict = dict(result)
result_dict['config_vars'] = json.loads(result_dict['config_vars'])
return result_dict
return None
def get_all_drive_providers(self) -> list:
"""获取所有网盘服务商"""
self.cursor.execute("SELECT * FROM drive_providers")
results = self.cursor.fetchall()
providers = []
for row in results:
provider = dict(row)
provider['config_vars'] = json.loads(provider['config_vars'])
providers.append(provider)
return providers
def update_drive_provider(self, provider_name: str, config_vars: Dict[str, Any] = None, remarks: str = None) -> bool:
"""更新网盘服务商信息"""
try:
current = self.get_drive_provider(provider_name)
if not current:
return False
if config_vars is not None:
config_vars_json = json.dumps(config_vars, ensure_ascii=False)
else:
config_vars_json = json.dumps(current['config_vars'], ensure_ascii=False)
if remarks is None:
remarks = current['remarks']
self.cursor.execute(
"UPDATE drive_providers SET config_vars = ?, remarks = ? WHERE provider_name = ?",
(config_vars_json, remarks, provider_name)
)
self.conn.commit()
return True
except Exception:
return False
def delete_drive_provider(self, provider_name: str) -> bool:
"""删除网盘服务商"""
try:
self.cursor.execute("DELETE FROM drive_providers WHERE provider_name = ?", (provider_name,))
self.conn.commit()
return self.cursor.rowcount > 0
except Exception:
return False
# 用户网盘表操作
def add_user_drive(self, provider_name: str, login_config: Dict[str, Any], remarks: Optional[str] = None) -> Optional[int]:
"""添加用户网盘"""
try:
# 检查服务商是否存在
if not self.get_drive_provider(provider_name):
return None
self.cursor.execute(
"INSERT INTO user_drives (provider_name, login_config, remarks) VALUES (?, ?, ?)",
(provider_name, json.dumps(login_config, ensure_ascii=False), remarks)
)
self.conn.commit()
return self.cursor.lastrowid
except Exception:
return None
def get_user_drive(self, drive_id: int) -> Optional[Dict[str, Any]]:
"""获取用户网盘信息"""
self.cursor.execute("SELECT * FROM user_drives WHERE id = ?", (drive_id,))
result = self.cursor.fetchone()
if result:
result_dict = dict(result)
result_dict['login_config'] = json.loads(result_dict['login_config'])
return result_dict
return None
def get_user_drives_by_provider(self, provider_name: str) -> list:
"""获取指定服务商的所有用户网盘"""
self.cursor.execute("SELECT * FROM user_drives WHERE provider_name = ?", (provider_name,))
results = self.cursor.fetchall()
drives = []
for row in results:
drive = dict(row)
drive['login_config'] = json.loads(drive['login_config'])
drives.append(drive)
return drives
def get_all_user_drives(self) -> list:
"""获取所有用户网盘"""
self.cursor.execute("SELECT * FROM user_drives")
results = self.cursor.fetchall()
drives = []
for row in results:
drive = dict(row)
drive['login_config'] = json.loads(drive['login_config'])
drives.append(drive)
return drives
def update_user_drive(self, drive_id: int, login_config: Dict[str, Any] = None, remarks: str = None) -> bool:
"""更新用户网盘信息"""
try:
current = self.get_user_drive(drive_id)
if not current:
return False
if login_config is not None:
login_config_json = json.dumps(login_config, ensure_ascii=False)
else:
login_config_json = json.dumps(current['login_config'], ensure_ascii=False)
if remarks is None:
remarks = current['remarks']
self.cursor.execute(
"UPDATE user_drives SET login_config = ?, remarks = ? WHERE id = ?",
(login_config_json, remarks, drive_id)
)
self.conn.commit()
return True
except Exception:
return False
def delete_user_drive(self, drive_id: int) -> bool:
"""删除用户网盘"""
try:
self.cursor.execute("DELETE FROM user_drives WHERE id = ?", (drive_id,))
self.conn.commit()
return self.cursor.rowcount > 0
except Exception:
return False
# 外链表操作
def create_external_link(self, drive_id: int, total_quota: float, remarks: Optional[str] = None) -> Optional[str]:
"""创建外链"""
try:
# 检查用户网盘是否存在
if not self.get_user_drive(drive_id):
return None
# 生成不重复的UUID
link_uuid = str(uuid.uuid4())
while self.get_external_link_by_uuid(link_uuid):
link_uuid = str(uuid.uuid4())
self.cursor.execute(
"INSERT INTO external_links (drive_id, total_quota, used_quota, link_uuid, remarks) VALUES (?, ?, 0, ?, ?)",
(drive_id, total_quota, link_uuid, remarks)
)
self.conn.commit()
return link_uuid
except Exception as e:
print(f"创建外链错误: {e}")
return None
def get_external_link(self, link_id: int) -> Optional[Dict[str, Any]]:
"""获取外链信息"""
self.cursor.execute("SELECT * FROM external_links WHERE id = ?", (link_id,))
result = self.cursor.fetchone()
if result:
return dict(result)
return None
def get_external_link_by_uuid(self, link_uuid: str) -> Optional[Dict[str, Any]]:
"""通过UUID获取外链信息"""
self.cursor.execute("SELECT * FROM external_links WHERE link_uuid = ?", (link_uuid,))
result = self.cursor.fetchone()
if result:
return dict(result)
return None
def get_external_links_by_drive(self, drive_id: int) -> list:
"""获取指定用户网盘的所有外链"""
self.cursor.execute("SELECT * FROM external_links WHERE drive_id = ?", (drive_id,))
return [dict(row) for row in self.cursor.fetchall()]
def update_external_link_quota(self, link_uuid: str, used_quota: float) -> bool:
"""更新外链已使用配额"""
try:
link = self.get_external_link_by_uuid(link_uuid)
if not link:
return False
# 确保不超过总配额
if used_quota > link['total_quota']:
return False
self.cursor.execute(
"UPDATE external_links SET used_quota = ? WHERE link_uuid = ?",
(used_quota, link_uuid)
)
self.conn.commit()
return True
except Exception:
return False
def update_external_link(self, link_uuid: str, total_quota: float = None, remarks: str = None) -> bool:
"""更新外链信息"""
try:
link = self.get_external_link_by_uuid(link_uuid)
if not link:
return False
if total_quota is None:
total_quota = link['total_quota']
if remarks is None:
remarks = link['remarks']
# 确保新的总配额不小于已使用配额
if total_quota < link['used_quota']:
return False
self.cursor.execute(
"UPDATE external_links SET total_quota = ?, remarks = ? WHERE link_uuid = ?",
(total_quota, remarks, link_uuid)
)
self.conn.commit()
return True
except Exception:
return False
def delete_external_link(self, link_uuid: str) -> bool:
"""删除外链"""
try:
self.cursor.execute("DELETE FROM external_links WHERE link_uuid = ?", (link_uuid,))
self.conn.commit()
return self.cursor.rowcount > 0
except Exception:
return False
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
# 使用示例
if __name__ == "__main__":
# 创建数据库实例
db = CloudDriveDatabase("cloud_drive.db")
# 添加一个网盘服务商
db.add_drive_provider(
"夸克网盘",
{
"sign_wg": "",
"kps_wg": "",
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
"data":{
'client_id': '532',
'v': '1.2',
'request_id': "",
'sign_wg': "",
'kps_wg': "",
'vcode': "",
'token': ""
}
},
"夸克网盘API配置"
)
# 添加用户网盘
drive_id = db.add_user_drive(
"夸克网盘",
{
"sign_wg": "AAQHaE4ww2nnIPvofH2SfMv3N6OplcPRjxlgScTZozm/ZCMfQP74bsMLyKW883hZCGY=",
"kps_wg": "AARWcp9UM71t5VzV9i5pBJ4dLXjJ7EZL5a9qz2QVVQtkkmcqS4wQGYtk38CRzW6HH4+5c7qsB9/EtUgkWcd8x/k7h9+PmAHUDvxKHUWnX7iL3h2fH86XJ4cEqwvUnQ77QGs=",
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
"data":{
'client_id': '532',
'v': '1.2',
'request_id': "",
'sign_wg': "",
'kps_wg': "",
'vcode': "",
'token': ""
}
},
"张三的百度网盘"
)
# 创建外链
if drive_id:
link_uuid = db.create_external_link(
drive_id,
3,
"测试外链"
)
print(f"创建的外链UUID: {link_uuid}")
# 获取外链信息
link_info = db.get_external_link_by_uuid(link_uuid)
print(f"外链信息: {link_info}")
# 更新已使用配额
db.update_external_link_quota(link_uuid, 512.0) # 使用了512MB
# 重新获取外链信息
link_info = db.get_external_link_by_uuid(link_uuid)
print(f"更新后的外链信息: {link_info}")
# 关闭数据库连接
db.close()

View File

@ -22,7 +22,7 @@ def login_quark(token):
'sec-ch-ua-mobile': '?1',
'sec-ch-ua-platform': '"Android"'
}
sign_wg = "AAQHaE4ww2nnIPvofH2SfMv3N6OplcPRjxlgScTZozm/ZCMfQP74bsMLyKW883hZCGY=";
sign_wg = "AAQHaE4ww2nnIPvofH2SfMv3N6OplcPRjxlgScTZozm/ZCMfQP74bsMLyKW883hZCGY="
kps_wg = "AARWcp9UM71t5VzV9i5pBJ4dLXjJ7EZL5a9qz2QVVQtkkmcqS4wQGYtk38CRzW6HH4+5c7qsB9/EtUgkWcd8x/k7h9+PmAHUDvxKHUWnX7iL3h2fH86XJ4cEqwvUnQ77QGs=";
vcode = int(time.time() * 1000) # 相当于JavaScript中的Date.now(),返回当前时间的毫秒数
request_id = vcode + 5