新增.gitignore和.cursorindexingignore文件,更新config.py以包含Flask配置和日志设置,完善README文档,添加LICENSE文件,优化main.py中的数据库导入和日志记录,更新requirements.txt以反映依赖项变更,改进admin.html的样式和结构,确保项目结构清晰且符合开源标准。
This commit is contained in:
7
utils/__init__.py
Normal file
7
utils/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
"""
|
||||
工具模块包
|
||||
"""
|
||||
|
||||
from .database import CloudDriveDatabase
|
||||
from .login import login_quark
|
||||
from .tools import get_cnb_weburl
|
BIN
utils/__pycache__/detebase.cpython-313.pyc
Normal file
BIN
utils/__pycache__/detebase.cpython-313.pyc
Normal file
Binary file not shown.
BIN
utils/__pycache__/login.cpython-313.pyc
Normal file
BIN
utils/__pycache__/login.cpython-313.pyc
Normal file
Binary file not shown.
BIN
utils/__pycache__/tools.cpython-313.pyc
Normal file
BIN
utils/__pycache__/tools.cpython-313.pyc
Normal file
Binary file not shown.
@ -2,10 +2,19 @@ import sqlite3
|
||||
import json
|
||||
import uuid
|
||||
from typing import Dict, Any, Optional
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
class CloudDriveDatabase:
|
||||
"""网盘数据库管理类"""
|
||||
|
||||
def __init__(self, db_path: str = "cloud_drive.db"):
|
||||
"""初始化数据库连接"""
|
||||
"""
|
||||
初始化数据库连接
|
||||
|
||||
参数:
|
||||
db_path: 数据库文件路径
|
||||
"""
|
||||
self.db_path = db_path
|
||||
self.conn = sqlite3.connect(db_path)
|
||||
self.conn.row_factory = sqlite3.Row
|
||||
@ -54,7 +63,14 @@ class CloudDriveDatabase:
|
||||
self.conn.commit()
|
||||
|
||||
def _add_column_if_not_exists(self, table_name: str, column_name: str, column_type: str):
|
||||
"""检查表是否存在指定列,如果不存在则添加"""
|
||||
"""
|
||||
检查表是否存在指定列,如果不存在则添加
|
||||
|
||||
参数:
|
||||
table_name: 表名
|
||||
column_name: 列名
|
||||
column_type: 列类型
|
||||
"""
|
||||
self.cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
columns = [column[1] for column in self.cursor.fetchall()]
|
||||
if column_name not in columns:
|
||||
@ -67,7 +83,17 @@ class CloudDriveDatabase:
|
||||
|
||||
# 网盘驱动表操作
|
||||
def add_drive_provider(self, provider_name: str, config_vars: Dict[str, Any], remarks: Optional[str] = None) -> bool:
|
||||
"""添加网盘服务商"""
|
||||
"""
|
||||
添加网盘服务商
|
||||
|
||||
参数:
|
||||
provider_name: 服务商名称
|
||||
config_vars: 配置参数
|
||||
remarks: 备注说明
|
||||
|
||||
返回:
|
||||
bool: 是否添加成功
|
||||
"""
|
||||
try:
|
||||
self.cursor.execute(
|
||||
"INSERT INTO drive_providers (provider_name, config_vars, remarks) VALUES (?, ?, ?)",
|
||||
@ -80,8 +106,15 @@ class CloudDriveDatabase:
|
||||
return False
|
||||
|
||||
def get_drive_provider(self, provider_name: str) -> Optional[Dict[str, Any]]:
|
||||
"""获取网盘服务商信息"""
|
||||
"""
|
||||
获取网盘服务商信息
|
||||
|
||||
参数:
|
||||
provider_name: 服务商名称
|
||||
|
||||
返回:
|
||||
Dict: 服务商信息,不存在时返回None
|
||||
"""
|
||||
self.cursor.execute("SELECT * FROM drive_providers WHERE provider_name = ?", (provider_name,))
|
||||
result = self.cursor.fetchone()
|
||||
if result:
|
||||
@ -91,7 +124,12 @@ class CloudDriveDatabase:
|
||||
return None
|
||||
|
||||
def get_all_drive_providers(self) -> list:
|
||||
"""获取所有网盘服务商"""
|
||||
"""
|
||||
获取所有网盘服务商
|
||||
|
||||
返回:
|
||||
List: 所有服务商信息列表
|
||||
"""
|
||||
self.cursor.execute("SELECT * FROM drive_providers")
|
||||
results = self.cursor.fetchall()
|
||||
providers = []
|
||||
@ -102,7 +140,17 @@ class CloudDriveDatabase:
|
||||
return providers
|
||||
|
||||
def update_drive_provider(self, provider_name: str, config_vars: Dict[str, Any] = None, remarks: str = None) -> bool:
|
||||
"""更新网盘服务商信息"""
|
||||
"""
|
||||
更新网盘服务商信息
|
||||
|
||||
参数:
|
||||
provider_name: 服务商名称
|
||||
config_vars: 配置参数,可选
|
||||
remarks: 备注说明,可选
|
||||
|
||||
返回:
|
||||
bool: 是否更新成功
|
||||
"""
|
||||
try:
|
||||
current = self.get_drive_provider(provider_name)
|
||||
if not current:
|
||||
@ -126,7 +174,15 @@ class CloudDriveDatabase:
|
||||
return False
|
||||
|
||||
def delete_drive_provider(self, provider_name: str) -> bool:
|
||||
"""删除网盘服务商"""
|
||||
"""
|
||||
删除网盘服务商
|
||||
|
||||
参数:
|
||||
provider_name: 服务商名称
|
||||
|
||||
返回:
|
||||
bool: 是否删除成功
|
||||
"""
|
||||
try:
|
||||
self.cursor.execute("DELETE FROM drive_providers WHERE provider_name = ?", (provider_name,))
|
||||
self.conn.commit()
|
||||
@ -136,7 +192,17 @@ class CloudDriveDatabase:
|
||||
|
||||
# 用户网盘表操作
|
||||
def add_user_drive(self, provider_name: str, login_config: Dict[str, Any], remarks: Optional[str] = None) -> Optional[int]:
|
||||
"""添加用户网盘"""
|
||||
"""
|
||||
添加用户网盘
|
||||
|
||||
参数:
|
||||
provider_name: 服务商名称
|
||||
login_config: 登录配置
|
||||
remarks: 备注说明
|
||||
|
||||
返回:
|
||||
int: 新添加的用户网盘ID,失败时返回None
|
||||
"""
|
||||
try:
|
||||
# 检查服务商是否存在
|
||||
if not self.get_drive_provider(provider_name):
|
||||
@ -152,7 +218,15 @@ class CloudDriveDatabase:
|
||||
return None
|
||||
|
||||
def get_user_drive(self, drive_id: int) -> Optional[Dict[str, Any]]:
|
||||
"""获取用户网盘信息"""
|
||||
"""
|
||||
获取用户网盘信息
|
||||
|
||||
参数:
|
||||
drive_id: 网盘ID
|
||||
|
||||
返回:
|
||||
Dict: 用户网盘信息,不存在时返回None
|
||||
"""
|
||||
self.cursor.execute("SELECT * FROM user_drives WHERE id = ?", (drive_id,))
|
||||
result = self.cursor.fetchone()
|
||||
if result:
|
||||
@ -162,7 +236,15 @@ class CloudDriveDatabase:
|
||||
return None
|
||||
|
||||
def get_user_drives_by_provider(self, provider_name: str) -> list:
|
||||
"""获取指定服务商的所有用户网盘"""
|
||||
"""
|
||||
获取指定服务商的所有用户网盘
|
||||
|
||||
参数:
|
||||
provider_name: 服务商名称
|
||||
|
||||
返回:
|
||||
List: 用户网盘信息列表
|
||||
"""
|
||||
self.cursor.execute("SELECT * FROM user_drives WHERE provider_name = ?", (provider_name,))
|
||||
results = self.cursor.fetchall()
|
||||
drives = []
|
||||
@ -173,7 +255,12 @@ class CloudDriveDatabase:
|
||||
return drives
|
||||
|
||||
def get_all_user_drives(self) -> list:
|
||||
"""获取所有用户网盘"""
|
||||
"""
|
||||
获取所有用户网盘
|
||||
|
||||
返回:
|
||||
List: 所有用户网盘信息列表
|
||||
"""
|
||||
self.cursor.execute("SELECT * FROM user_drives")
|
||||
results = self.cursor.fetchall()
|
||||
drives = []
|
||||
@ -184,7 +271,17 @@ class CloudDriveDatabase:
|
||||
return drives
|
||||
|
||||
def update_user_drive(self, drive_id: int, login_config: Dict[str, Any] = None, remarks: str = None) -> bool:
|
||||
"""更新用户网盘信息"""
|
||||
"""
|
||||
更新用户网盘信息
|
||||
|
||||
参数:
|
||||
drive_id: 网盘ID
|
||||
login_config: 登录配置,可选
|
||||
remarks: 备注说明,可选
|
||||
|
||||
返回:
|
||||
bool: 是否更新成功
|
||||
"""
|
||||
try:
|
||||
current = self.get_user_drive(drive_id)
|
||||
if not current:
|
||||
@ -208,7 +305,15 @@ class CloudDriveDatabase:
|
||||
return False
|
||||
|
||||
def delete_user_drive(self, drive_id: int) -> bool:
|
||||
"""删除用户网盘"""
|
||||
"""
|
||||
删除用户网盘
|
||||
|
||||
参数:
|
||||
drive_id: 网盘ID
|
||||
|
||||
返回:
|
||||
bool: 是否删除成功
|
||||
"""
|
||||
try:
|
||||
self.cursor.execute("DELETE FROM user_drives WHERE id = ?", (drive_id,))
|
||||
self.conn.commit()
|
||||
@ -218,7 +323,18 @@ class CloudDriveDatabase:
|
||||
|
||||
# 外链表操作
|
||||
def create_external_link(self, drive_id: int, total_quota: float, remarks: Optional[str] = None, expiry_time: str = None) -> Optional[str]:
|
||||
"""创建外链"""
|
||||
"""
|
||||
创建外链
|
||||
|
||||
参数:
|
||||
drive_id: 网盘ID
|
||||
total_quota: 总配额(使用次数)
|
||||
remarks: 备注说明
|
||||
expiry_time: 过期时间,格式为ISO 8601
|
||||
|
||||
返回:
|
||||
str: 外链UUID,失败时返回None
|
||||
"""
|
||||
try:
|
||||
# 检查用户网盘是否存在
|
||||
if not self.get_user_drive(drive_id):
|
||||
@ -231,8 +347,7 @@ class CloudDriveDatabase:
|
||||
|
||||
# 如果没有指定到期时间,默认为24小时后
|
||||
if not expiry_time:
|
||||
from datetime import datetime, timedelta
|
||||
expiry_time = (datetime.now() + timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S')
|
||||
expiry_time = (datetime.now() + timedelta(hours=24)).isoformat()
|
||||
|
||||
self.cursor.execute(
|
||||
"INSERT INTO external_links (drive_id, total_quota, used_quota, link_uuid, remarks, expiry_time) VALUES (?, ?, 0, ?, ?, ?)",
|
||||
@ -245,7 +360,15 @@ class CloudDriveDatabase:
|
||||
return None
|
||||
|
||||
def get_external_link(self, link_id: int) -> Optional[Dict[str, Any]]:
|
||||
"""获取外链信息"""
|
||||
"""
|
||||
获取外链信息
|
||||
|
||||
参数:
|
||||
link_id: 外链ID
|
||||
|
||||
返回:
|
||||
Dict: 外链信息,不存在时返回None
|
||||
"""
|
||||
self.cursor.execute("SELECT * FROM external_links WHERE id = ?", (link_id,))
|
||||
result = self.cursor.fetchone()
|
||||
if result:
|
||||
@ -253,7 +376,15 @@ class CloudDriveDatabase:
|
||||
return None
|
||||
|
||||
def get_external_link_by_uuid(self, link_uuid: str) -> Optional[Dict[str, Any]]:
|
||||
"""通过UUID获取外链信息"""
|
||||
"""
|
||||
通过UUID获取外链信息
|
||||
|
||||
参数:
|
||||
link_uuid: 外链UUID
|
||||
|
||||
返回:
|
||||
Dict: 外链信息,不存在时返回None
|
||||
"""
|
||||
self.cursor.execute("SELECT * FROM external_links WHERE link_uuid = ?", (link_uuid,))
|
||||
result = self.cursor.fetchone()
|
||||
if result:
|
||||
@ -261,12 +392,29 @@ class CloudDriveDatabase:
|
||||
return None
|
||||
|
||||
def get_external_links_by_drive(self, drive_id: int) -> list:
|
||||
"""获取指定用户网盘的所有外链"""
|
||||
"""
|
||||
获取指定用户网盘的所有外链
|
||||
|
||||
参数:
|
||||
drive_id: 网盘ID
|
||||
|
||||
返回:
|
||||
List: 外链信息列表
|
||||
"""
|
||||
self.cursor.execute("SELECT * FROM external_links WHERE drive_id = ?", (drive_id,))
|
||||
return [dict(row) for row in self.cursor.fetchall()]
|
||||
|
||||
def update_external_link_quota(self, link_uuid: str, used_quota: float) -> bool:
|
||||
"""更新外链已使用配额"""
|
||||
"""
|
||||
更新外链已使用配额
|
||||
|
||||
参数:
|
||||
link_uuid: 外链UUID
|
||||
used_quota: 已使用配额
|
||||
|
||||
返回:
|
||||
bool: 是否更新成功
|
||||
"""
|
||||
try:
|
||||
link = self.get_external_link_by_uuid(link_uuid)
|
||||
if not link:
|
||||
@ -286,7 +434,17 @@ class CloudDriveDatabase:
|
||||
return False
|
||||
|
||||
def update_external_link(self, link_uuid: str, total_quota: float = None, remarks: str = None) -> bool:
|
||||
"""更新外链信息"""
|
||||
"""
|
||||
更新外链信息
|
||||
|
||||
参数:
|
||||
link_uuid: 外链UUID
|
||||
total_quota: 总配额,可选
|
||||
remarks: 备注说明,可选
|
||||
|
||||
返回:
|
||||
bool: 是否更新成功
|
||||
"""
|
||||
try:
|
||||
link = self.get_external_link_by_uuid(link_uuid)
|
||||
if not link:
|
||||
@ -312,7 +470,15 @@ class CloudDriveDatabase:
|
||||
return False
|
||||
|
||||
def delete_external_link(self, link_uuid: str) -> bool:
|
||||
"""删除外链"""
|
||||
"""
|
||||
删除外链
|
||||
|
||||
参数:
|
||||
link_uuid: 外链UUID
|
||||
|
||||
返回:
|
||||
bool: 是否删除成功
|
||||
"""
|
||||
try:
|
||||
self.cursor.execute("DELETE FROM external_links WHERE link_uuid = ?", (link_uuid,))
|
||||
self.conn.commit()
|
||||
@ -320,9 +486,13 @@ class CloudDriveDatabase:
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# 新增统计方法
|
||||
def get_total_user_drives_count(self) -> int:
|
||||
"""获取用户网盘总数"""
|
||||
"""
|
||||
获取用户网盘总数
|
||||
|
||||
返回:
|
||||
int: 用户网盘总数
|
||||
"""
|
||||
try:
|
||||
self.cursor.execute("SELECT COUNT(*) FROM user_drives")
|
||||
result = self.cursor.fetchone()
|
||||
@ -330,51 +500,36 @@ class CloudDriveDatabase:
|
||||
except Exception as e:
|
||||
print(f"获取用户网盘总数错误: {e}")
|
||||
return 0
|
||||
|
||||
|
||||
def get_active_external_links_count(self) -> int:
|
||||
"""获取活跃外链数量 (未过期且有剩余次数)"""
|
||||
"""
|
||||
获取活跃外链数量
|
||||
|
||||
返回:
|
||||
int: 活跃外链数量
|
||||
"""
|
||||
try:
|
||||
from datetime import datetime, timezone
|
||||
now_utc_iso = datetime.now(timezone.utc).isoformat()
|
||||
# 获取当前时间的ISO格式
|
||||
now = datetime.now().isoformat()
|
||||
|
||||
# 注意:SQLite 不直接支持 ISO 8601 比较,此查询可能需要调整或在 Python 中过滤
|
||||
# 简单起见,我们先只检查次数和时间是否存在
|
||||
# 更精确的查询可能需要 DATETIME 函数,或在 Python 中处理
|
||||
# 查询未过期且使用次数未达到上限的外链
|
||||
self.cursor.execute(
|
||||
"""
|
||||
SELECT COUNT(*) FROM external_links
|
||||
WHERE (used_quota < total_quota)
|
||||
AND (expiry_time IS NOT NULL AND expiry_time > ?)
|
||||
""",
|
||||
(now_utc_iso,) # 这个比较可能不适用于所有 SQLite 版本/配置,后续可能需要调整
|
||||
"SELECT COUNT(*) FROM external_links WHERE (expiry_time IS NULL OR expiry_time > ?) AND used_quota < total_quota",
|
||||
(now,)
|
||||
)
|
||||
# 备选(更兼容但效率低):获取所有链接在 Python 中过滤
|
||||
# self.cursor.execute("SELECT link_uuid, expiry_time, used_quota, total_quota FROM external_links")
|
||||
# links = self.cursor.fetchall()
|
||||
# count = 0
|
||||
# for link in links:
|
||||
# is_active = False
|
||||
# if link['used_quota'] < link['total_quota']:
|
||||
# if link['expiry_time']:
|
||||
# try:
|
||||
# expiry_dt = datetime.fromisoformat(link['expiry_time'].replace('Z', '+00:00'))
|
||||
# if datetime.now(timezone.utc) < expiry_dt:
|
||||
# is_active = True
|
||||
# except: pass # Ignore parsing errors
|
||||
# else: # No expiry time means active if quota is available
|
||||
# is_active = True
|
||||
# if is_active:
|
||||
# count += 1
|
||||
# return count
|
||||
|
||||
result = self.cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
except Exception as e:
|
||||
print(f"获取活跃外链数量错误: {e}")
|
||||
return 0 # 返回 0 或其他错误指示
|
||||
return 0 # 返回0或其他错误指示
|
||||
|
||||
def get_total_external_links_count(self) -> int:
|
||||
"""获取外链总数"""
|
||||
"""
|
||||
获取外链总数
|
||||
|
||||
返回:
|
||||
int: 外链总数
|
||||
"""
|
||||
try:
|
||||
self.cursor.execute("SELECT COUNT(*) FROM external_links")
|
||||
result = self.cursor.fetchone()
|
||||
@ -384,7 +539,12 @@ class CloudDriveDatabase:
|
||||
return 0
|
||||
|
||||
def get_user_drives_count_by_provider(self) -> Dict[str, int]:
|
||||
"""按提供商统计用户网盘数量"""
|
||||
"""
|
||||
按提供商统计用户网盘数量
|
||||
|
||||
返回:
|
||||
Dict: 按提供商分类的网盘数量
|
||||
"""
|
||||
try:
|
||||
self.cursor.execute("SELECT provider_name, COUNT(*) as count FROM user_drives GROUP BY provider_name")
|
||||
results = self.cursor.fetchall()
|
||||
@ -396,73 +556,4 @@ class CloudDriveDatabase:
|
||||
def close(self):
|
||||
"""关闭数据库连接"""
|
||||
if self.conn:
|
||||
self.conn.close()
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
# 创建数据库实例
|
||||
db = CloudDriveDatabase("cloud_drive.db")
|
||||
|
||||
# 添加一个网盘服务商
|
||||
db.add_drive_provider(
|
||||
"夸克网盘",
|
||||
{
|
||||
"sign_wg": "",
|
||||
"kps_wg": "",
|
||||
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
|
||||
"data":{
|
||||
'client_id': '532',
|
||||
'v': '1.2',
|
||||
'request_id': "",
|
||||
'sign_wg': "",
|
||||
'kps_wg': "",
|
||||
'vcode': "",
|
||||
'token': ""
|
||||
}
|
||||
},
|
||||
"夸克网盘API配置"
|
||||
)
|
||||
|
||||
# 添加用户网盘
|
||||
drive_id = db.add_user_drive(
|
||||
"夸克网盘",
|
||||
{
|
||||
"sign_wg": "AAQHaE4ww2nnIPvofH2SfMv3N6OplcPRjxlgScTZozm/ZCMfQP74bsMLyKW883hZCGY=",
|
||||
"kps_wg": "AARWcp9UM71t5VzV9i5pBJ4dLXjJ7EZL5a9qz2QVVQtkkmcqS4wQGYtk38CRzW6HH4+5c7qsB9/EtUgkWcd8x/k7h9+PmAHUDvxKHUWnX7iL3h2fH86XJ4cEqwvUnQ77QGs=",
|
||||
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
|
||||
"data":{
|
||||
'client_id': '532',
|
||||
'v': '1.2',
|
||||
'request_id': "",
|
||||
'sign_wg': "",
|
||||
'kps_wg': "",
|
||||
'vcode': "",
|
||||
'token': ""
|
||||
}
|
||||
},
|
||||
"张三的百度网盘"
|
||||
)
|
||||
|
||||
# 创建外链
|
||||
if drive_id:
|
||||
link_uuid = db.create_external_link(
|
||||
drive_id,
|
||||
3,
|
||||
"测试外链"
|
||||
)
|
||||
|
||||
print(f"创建的外链UUID: {link_uuid}")
|
||||
|
||||
# 获取外链信息
|
||||
link_info = db.get_external_link_by_uuid(link_uuid)
|
||||
print(f"外链信息: {link_info}")
|
||||
|
||||
# 更新已使用配额
|
||||
db.update_external_link_quota(link_uuid, 512.0) # 使用了512MB
|
||||
|
||||
# 重新获取外链信息
|
||||
link_info = db.get_external_link_by_uuid(link_uuid)
|
||||
print(f"更新后的外链信息: {link_info}")
|
||||
|
||||
# 关闭数据库连接
|
||||
db.close()
|
||||
self.conn.close()
|
@ -1,50 +1,45 @@
|
||||
import requests
|
||||
import time
|
||||
|
||||
import json
|
||||
|
||||
|
||||
def login_quark(token, config_vars):
|
||||
"""
|
||||
夸克网盘登录
|
||||
|
||||
def login_quark(token,config_vars):
|
||||
参数:
|
||||
token: 登录token
|
||||
config_vars: 配置参数
|
||||
|
||||
返回:
|
||||
bool: 是否登录成功
|
||||
"""
|
||||
夸克登录
|
||||
:param token: 登录token
|
||||
:return: 是否登录成功
|
||||
"""
|
||||
aa = {
|
||||
'data': {
|
||||
'client_id': '515',
|
||||
'kps_wg': '',
|
||||
'request_id': '',
|
||||
'sign_wg': '',
|
||||
'token': '',
|
||||
'v': '1.2',
|
||||
'vcode': ''
|
||||
},
|
||||
'kps_wg': '',
|
||||
'redirect_uri': 'https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken',
|
||||
'sign_wg': ''
|
||||
}
|
||||
if len(config_vars) > 0:
|
||||
_config_vars = config_vars.get("data")
|
||||
|
||||
s = requests.Session()
|
||||
s.headers = {
|
||||
'Accept': 'application/json, text/plain, */*',
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'Origin': 'https://b.quark.cn',
|
||||
'Referer': 'https://b.quark.cn/',
|
||||
'User-Agent': 'Mozilla/5.0 (Linux; U; Android 15; zh-CN; 2312DRA50C Build/AQ3A.240912.001) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/123.0.6312.80 Quark/7.9.2.771 Mobile Safari/537.36',
|
||||
'X-Requested-With': 'com.quark.browser',
|
||||
'sec-ch-ua': '"Android WebView";v="123", "Not:A-Brand";v="8", "Chromium";v="123"',
|
||||
'sec-ch-ua-mobile': '?1',
|
||||
'sec-ch-ua-platform': '"Android"'
|
||||
}
|
||||
# sign_wg = "AAQHaE4ww2nnIPvofH2SfMv3N6OplcPRjxlgScTZozm/ZCMfQP74bsMLyKW883hZCGY="
|
||||
# kps_wg = "AARWcp9UM71t5VzV9i5pBJ4dLXjJ7EZL5a9qz2QVVQtkkmcqS4wQGYtk38CRzW6HH4+5c7qsB9/EtUgkWcd8x/k7h9+PmAHUDvxKHUWnX7iL3h2fH86XJ4cEqwvUnQ77QGs="
|
||||
vcode = int(time.time() * 1000) # 相当于JavaScript中的Date.now(),返回当前时间的毫秒数
|
||||
'Accept': 'application/json, text/plain, */*',
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'Origin': 'https://b.quark.cn',
|
||||
'Referer': 'https://b.quark.cn/',
|
||||
'User-Agent': 'Mozilla/5.0 (Linux; U; Android 15; zh-CN; 2312DRA50C Build/AQ3A.240912.001) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/123.0.6312.80 Quark/7.9.2.771 Mobile Safari/537.36',
|
||||
'X-Requested-With': 'com.quark.browser',
|
||||
'sec-ch-ua': '"Android WebView";v="123", "Not:A-Brand";v="8", "Chromium";v="123"',
|
||||
'sec-ch-ua-mobile': '?1',
|
||||
'sec-ch-ua-platform': '"Android"'
|
||||
}
|
||||
|
||||
# 生成时间戳,用于请求参数
|
||||
vcode = int(time.time() * 1000) # 获取当前时间的毫秒数
|
||||
request_id = vcode + 5
|
||||
is_login = False
|
||||
|
||||
# 构建请求URL和参数
|
||||
url = 'https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken'
|
||||
queryParams = config_vars.get("queryParams") + str(int(time.time() * 1000))
|
||||
|
||||
# 构建请求数据
|
||||
data = {
|
||||
'client_id': _config_vars.get("client_id"),
|
||||
'v': _config_vars.get("v"),
|
||||
@ -54,9 +49,14 @@ def login_quark(token,config_vars):
|
||||
'vcode': vcode,
|
||||
'token': token
|
||||
}
|
||||
|
||||
# 发送登录请求
|
||||
print(data)
|
||||
res =s.post(url, data=data, params=queryParams)
|
||||
res = s.post(url, data=data, params=queryParams)
|
||||
print(res.json())
|
||||
|
||||
# 检查登录结果
|
||||
if res.json().get('status') == 2000000:
|
||||
is_login = True
|
||||
|
||||
return is_login
|
@ -5,13 +5,27 @@ import re
|
||||
s = requests.Session()
|
||||
|
||||
def get_cnb_weburl(port):
|
||||
s.headers= {"Authorization": "2hk178fffIx8tdXLD9YjYJot0gA"}
|
||||
"""
|
||||
获取CNB项目的Web URL
|
||||
|
||||
参数:
|
||||
port: 应用运行端口
|
||||
|
||||
返回:
|
||||
str: 外部可访问的URL
|
||||
"""
|
||||
# 设置认证头
|
||||
s.headers = {"Authorization": "2hk178fffIx8tdXLD9YjYJot0gA"}
|
||||
|
||||
# 获取工作区列表
|
||||
res = s.get("https://api.cnb.cool/workspace/list?branch=main&status=running").json()
|
||||
info = res["list"][0]
|
||||
|
||||
# 获取工作区详细信息
|
||||
urlinfo = s.get(f"https://api.cnb.cool/{info['slug']}/-/workspace/detail/{info['sn']}").json()
|
||||
print(urlinfo)
|
||||
#re提取cnb-id
|
||||
|
||||
# 使用正则表达式提取cnb-id
|
||||
pattern = r'cnb-[a-z0-9]+-[a-z0-9]+-\d+'
|
||||
match = re.search(pattern, urlinfo["webide"])
|
||||
if match:
|
||||
@ -19,4 +33,5 @@ def get_cnb_weburl(port):
|
||||
else:
|
||||
cnb_id = None
|
||||
|
||||
# 返回格式化的URL
|
||||
return f"https://{cnb_id}-{port}.cnb.run/"
|
Reference in New Issue
Block a user