556 lines
19 KiB
Python
556 lines
19 KiB
Python
import os
|
||
import requests
|
||
import sqlite3
|
||
import json
|
||
from flask import g
|
||
from flask import Flask, render_template, request, redirect, url_for, session,make_response,send_from_directory,jsonify
|
||
from flask.views import MethodView
|
||
from werkzeug.routing import BaseConverter
|
||
from utils.login import login_quark
|
||
from utils.tools import get_cnb_weburl
|
||
from utils.detebase import CloudDriveDatabase
|
||
from datetime import datetime, timezone
|
||
import logging
|
||
from logging.handlers import RotatingFileHandler
|
||
|
||
|
||
app = Flask(__name__)
|
||
#app = Flask(__name__,template_folder='templates') #修改模板目录
|
||
app.jinja_env.auto_reload = True
|
||
|
||
# --- Logging Setup Start ---
|
||
if not os.path.exists('logs'):
|
||
os.mkdir('logs')
|
||
|
||
# Use RotatingFileHandler to prevent log file from growing indefinitely
|
||
file_handler = RotatingFileHandler('logs/app.log', maxBytes=10240, backupCount=10)
|
||
file_handler.setFormatter(logging.Formatter(
|
||
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
|
||
))
|
||
file_handler.setLevel(logging.INFO) # Set level to INFO to capture info, warning, error
|
||
|
||
app.logger.addHandler(file_handler)
|
||
app.logger.setLevel(logging.INFO)
|
||
app.logger.info('Flask App startup')
|
||
# --- Logging Setup End ---
|
||
|
||
# 数据库配置
|
||
DATABASE = 'database.db'
|
||
|
||
def get_db():
|
||
# 从 Flask 的全局对象 g 中获取数据库连接
|
||
# getattr 函数用于获取对象的属性,如果属性不存在则返回第三个参数作为默认值
|
||
# 这里尝试获取 g._database 属性,如果不存在则返回 None
|
||
# 这种模式确保在一个请求中只创建一个数据库连接
|
||
db = getattr(g, '_database', None)
|
||
if db is None:
|
||
db = g._database = CloudDriveDatabase(DATABASE)
|
||
db.row_factory = sqlite3.Row
|
||
return db
|
||
|
||
|
||
# 定义数据库注入装饰器
|
||
def inject_db(f):
|
||
def decorated_function(*args, **kwargs):
|
||
db = get_db()
|
||
# 将 db 作为第一个参数传递
|
||
return f(db, *args, **kwargs)
|
||
return decorated_function
|
||
|
||
|
||
@app.teardown_appcontext
|
||
def close_connection(exception):
|
||
db = getattr(g, '_database', None)
|
||
if db is not None:
|
||
db.close()
|
||
|
||
|
||
exlink_info = {
|
||
"exlink_id":"1",
|
||
"qrlimit":3,
|
||
"driver":["quark","wechat"],
|
||
"use_limit":0
|
||
}
|
||
|
||
class PhoneConverter(BaseConverter):
|
||
# 自定义路由转换器
|
||
regex = '1[3-9]\d{9}'
|
||
|
||
class LiConverter(BaseConverter):
|
||
# 自定义to_python转换器
|
||
def to_python(self, value):
|
||
return value.split("+")
|
||
|
||
app.url_map.converters["phone"] = PhoneConverter
|
||
app.url_map.converters["li"] = LiConverter
|
||
|
||
@app.route("/", methods=["GET", "POST"])
|
||
def index():
|
||
#return render_template("index.html")
|
||
return render_template('index.html')
|
||
|
||
# @app.route("/<phone:param>")
|
||
# def phone(param):
|
||
# return param
|
||
|
||
# @app.route("/<li:param>")
|
||
# def uner_info(param):
|
||
# return param
|
||
|
||
#falsk中重定向
|
||
@app.route('/profile/')
|
||
def proflie():
|
||
if request.args.get('name'):
|
||
return '个人中心页面'
|
||
else:
|
||
# return redirect(url_for('login'))
|
||
return redirect(url_for('index'),code=302)
|
||
|
||
|
||
@app.route('/demo2')
|
||
def demo2():
|
||
resp = make_response('make response测试')
|
||
resp.headers['itbaizhan'] = 'Python'
|
||
resp.status = '404 not found'
|
||
return resp
|
||
|
||
@app.route('/login',methods=['POST'])
|
||
def login():
|
||
# 获取POST请求中的JSON数据
|
||
data = request.get_json()
|
||
token = data.get('token')
|
||
link_uuid = data.get('link_uuid')
|
||
|
||
if not token:
|
||
print('缺少token参数')
|
||
return jsonify({"status": False, "message": "缺少token参数"})
|
||
|
||
|
||
|
||
|
||
# 如果有外链UUID,则更新其使用次数
|
||
if link_uuid:
|
||
db = get_db()
|
||
# 获取当前外链信息
|
||
link_info = db.get_external_link_by_uuid(link_uuid)
|
||
|
||
if link_info:
|
||
# 获取当前已使用次数和总次数
|
||
used_quota = link_info.get('used_quota', 0)
|
||
total_quota = link_info.get('total_quota', 0)
|
||
exdrive_id = link_info.get('drive_id',0)
|
||
|
||
# 确保不超过总次数
|
||
if used_quota < total_quota:
|
||
|
||
# 根据exdrive_id从drive_providers表查询config_vars
|
||
drive_info = db.get_user_drive(exdrive_id)
|
||
print(drive_info)
|
||
if drive_info:
|
||
config_vars = drive_info.get("login_config")
|
||
status = login_quark(token,config_vars)
|
||
|
||
# 增加使用次数
|
||
new_used_quota = used_quota + 1
|
||
update_success = db.update_external_link_quota(link_uuid, new_used_quota)
|
||
if update_success:
|
||
print(f"已更新外链 {link_uuid} 的使用次数: {new_used_quota}/{total_quota}")
|
||
|
||
return jsonify({"status": status})
|
||
|
||
@app.route('/exlink/<string:id>')
|
||
def qrlink(id):
|
||
db = get_db()
|
||
data = {"status": False}
|
||
|
||
# 获取外链信息
|
||
link_info = db.get_external_link_by_uuid(id)
|
||
|
||
if link_info:
|
||
# 检查是否已过期
|
||
expiry_time = link_info.get('expiry_time')
|
||
if expiry_time:
|
||
try:
|
||
# 使用 fromisoformat 解析 ISO 8601 UTC 字符串
|
||
# Python < 3.11 doesn't handle Z directly, remove it.
|
||
if expiry_time.endswith('Z'):
|
||
expiry_time_str = expiry_time[:-1] + '+00:00'
|
||
else:
|
||
expiry_time_str = expiry_time # Assume it might already be offset-aware or naive
|
||
|
||
expiry_datetime = datetime.fromisoformat(expiry_time_str)
|
||
|
||
# Ensure expiry_datetime is offset-aware (UTC)
|
||
if expiry_datetime.tzinfo is None:
|
||
# If parsing resulted in naive, assume it was UTC as intended
|
||
expiry_datetime = expiry_datetime.replace(tzinfo=timezone.utc)
|
||
|
||
# 获取当前 UTC 时间进行比较
|
||
if datetime.now(timezone.utc) > expiry_datetime:
|
||
data["message"] = "此外链已过期"
|
||
return render_template('exlink_error.html', message=data["message"])
|
||
except (ValueError, TypeError) as e:
|
||
# 解析失败,记录错误,并可能视为无效链接
|
||
print(f"Error parsing expiry_time '{expiry_time}': {e}")
|
||
data["message"] = "外链信息有误(无效的过期时间)"
|
||
return render_template('exlink_error.html', message=data["message"])
|
||
|
||
|
||
# 检查使用次数是否超过限制
|
||
used_quota = link_info.get('used_quota', 0)
|
||
total_quota = link_info.get('total_quota', 0)
|
||
|
||
if used_quota < total_quota:
|
||
# 不再自动增加使用次数,而是由扫码登录成功后增加
|
||
# 获取关联的网盘信息
|
||
drive_id = link_info.get('drive_id')
|
||
drive_info = db.get_user_drive(drive_id)
|
||
|
||
if drive_info:
|
||
data["status"] = True
|
||
data["drive_info"] = {
|
||
"provider_name": drive_info.get('provider_name'),
|
||
"login_config": drive_info.get('login_config')
|
||
}
|
||
data["message"] = "外链访问成功"
|
||
data["remaining"] = total_quota - used_quota
|
||
|
||
# 返回页面和网盘信息
|
||
return render_template('exlink_view.html',
|
||
link_info=link_info,
|
||
drive_info=drive_info,
|
||
remaining_count=total_quota - used_quota,
|
||
expiry_time=link_info.get('expiry_time'))
|
||
else:
|
||
data["message"] = "找不到关联的网盘信息"
|
||
else:
|
||
data["message"] = "此外链已达到使用次数限制"
|
||
else:
|
||
data["message"] = "无效的外链ID"
|
||
|
||
# 如果失败,返回错误页面
|
||
return render_template('exlink_error.html', message=data["message"])
|
||
|
||
|
||
@app.route('/admin/')
|
||
def admin():
|
||
db = get_db()
|
||
providers = db.get_all_drive_providers()
|
||
alluser_drives = db.get_all_user_drives()
|
||
return render_template('admin.html',providers=providers,alluser_drives=alluser_drives)
|
||
|
||
|
||
@app.route('/admin/drive_provider/<metfunc>',methods=['POST'])
|
||
def drive_provider(metfunc):
|
||
db = get_db()
|
||
data = {"status":False}
|
||
if metfunc == "get":
|
||
|
||
alldrive= db.get_all_drive_providers()
|
||
if len(alldrive) > 0:
|
||
data["status"] = True
|
||
data['data'] = alldrive
|
||
# 使用jsonify确保中文正确显示
|
||
elif metfunc == "add":
|
||
"""
|
||
json样板
|
||
body -- {
|
||
"config_vars": {
|
||
"data": {
|
||
"client_id": "532",
|
||
"kps_wg": "",
|
||
"request_id": "",
|
||
"sign_wg": "",
|
||
"token": "",
|
||
"v": "1.2",
|
||
"vcode": ""
|
||
},
|
||
"kps_wg": "",
|
||
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
|
||
"sign_wg": ""
|
||
},
|
||
"provider_name": "夸克网盘",
|
||
"remarks": "夸克网盘"
|
||
}
|
||
Return: status
|
||
"""
|
||
|
||
body = request.get_json()
|
||
status = db.add_drive_provider(body.get("provider_name","测试网盘"),body.get("config_vars"),body.get("remarks","测试网盘"))
|
||
|
||
if status:
|
||
data["status"] = True
|
||
data["data"] = body
|
||
return data
|
||
|
||
@app.route('/admin/user_drive/<metfunc>',methods=['POST'])
|
||
def user_drive(metfunc):
|
||
db = get_db()
|
||
data = {"status":False}
|
||
if metfunc == "get":
|
||
body = request.get_json()
|
||
# 如果提供了ID,则返回特定驱动的信息
|
||
if body and 'id' in body:
|
||
drive_id = body.get('id')
|
||
user_drive = db.get_user_drive(drive_id)
|
||
if user_drive:
|
||
data["status"] = True
|
||
data["data"] = user_drive
|
||
else:
|
||
data["status"] = False
|
||
data["message"] = "未找到指定的网盘账号"
|
||
# 如果提供了provider_name,则返回该类型的所有账号
|
||
elif body and 'provider_name' in body:
|
||
provider_name = body.get('provider_name')
|
||
provider_drives = db.get_user_drives_by_provider(provider_name)
|
||
data["status"] = True
|
||
data["data"] = provider_drives if provider_drives else []
|
||
else:
|
||
# 否则返回所有驱动的信息
|
||
alluser_drives = db.get_all_user_drives()
|
||
# 即使列表为空也返回成功状态和空数组
|
||
data["status"] = True
|
||
data["data"] = alluser_drives if alluser_drives else []
|
||
elif metfunc == "add":
|
||
body = request.get_json()
|
||
print(body)
|
||
status = db.add_user_drive(body.get("provider_name","测试网盘"),body.get("login_config"),body.get("remarks","测试网盘"))
|
||
if status:
|
||
data["status"] = True
|
||
data["data"] = body
|
||
elif metfunc == "update":
|
||
body = request.get_json()
|
||
print(body)
|
||
print(body.get("id"),body.get("login_config"))
|
||
status = db.update_user_drive(body.get("id"),json.loads(body.get("login_config")),body.get("remarks","测试网盘"))
|
||
if status:
|
||
data["status"] = True
|
||
data["data"] = body
|
||
elif metfunc == "delete":
|
||
body = request.get_json()
|
||
drive_id = body.get("id")
|
||
if drive_id:
|
||
# 检查是否有关联的外链,如果有则不允许删除
|
||
external_links = db.get_external_links_by_drive(drive_id)
|
||
if external_links and len(external_links) > 0:
|
||
data["status"] = False
|
||
data["message"] = "该网盘账号有关联的外链,请先删除外链后再删除账号"
|
||
return data
|
||
|
||
status = db.delete_user_drive(drive_id)
|
||
if status:
|
||
data["status"] = True
|
||
data["message"] = "网盘账号删除成功"
|
||
else:
|
||
data["message"] = "网盘账号删除失败,可能不存在"
|
||
else:
|
||
data["message"] = "缺少必要的ID参数"
|
||
return data
|
||
|
||
|
||
class Exlink(MethodView):
|
||
def demo(self):
|
||
db = get_db()
|
||
db.add_drive_provider(
|
||
"阿里网盘",
|
||
{
|
||
"sign_wg": "",
|
||
"kps_wg": "",
|
||
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
|
||
"data":{
|
||
'client_id': '532',
|
||
'v': '1.2',
|
||
'request_id': "",
|
||
'sign_wg': "",
|
||
'kps_wg': "",
|
||
'vcode': "",
|
||
'token': ""
|
||
}
|
||
},
|
||
"阿里网盘API配置"
|
||
)
|
||
return jsonify({"status": True, "message": "success"})
|
||
|
||
def get(self):
|
||
db = get_db()
|
||
data = {"status": False}
|
||
try:
|
||
# 获取所有外链
|
||
external_links = []
|
||
results = db.cursor.execute("SELECT * FROM external_links").fetchall()
|
||
for row in results:
|
||
external_links.append(dict(row))
|
||
|
||
data["status"] = True
|
||
data["data"] = external_links
|
||
except Exception as e:
|
||
data["message"] = f"获取外链列表失败: {str(e)}"
|
||
|
||
return jsonify(data)
|
||
|
||
def post(self):
|
||
"""创建外链"""
|
||
db = get_db()
|
||
data = {"status": False}
|
||
|
||
try:
|
||
body = request.get_json()
|
||
|
||
# 处理前端可能传递的两种数据格式
|
||
# 1. 直接传递drive_id, total_quota, remarks
|
||
# 2. 将这些参数封装在data字段中
|
||
if body.get('data') and isinstance(body.get('data'), dict):
|
||
# 如果参数被封装在data字段中,提取出来
|
||
body_data = body.get('data')
|
||
drive_id = body_data.get('account_id') # 前端传的是account_id
|
||
total_quota = float(body_data.get('total_quota', 1))
|
||
remarks = body_data.get('remarks', '')
|
||
expiry_time = body_data.get('expiry_time')
|
||
else:
|
||
# 直接从body中获取
|
||
drive_id = body.get('drive_id')
|
||
total_quota = float(body.get('total_quota', 1))
|
||
remarks = body.get('remarks', '')
|
||
expiry_time = body.get('expiry_time')
|
||
|
||
if not drive_id:
|
||
data["message"] = "缺少必要的drive_id参数"
|
||
return jsonify(data)
|
||
|
||
# 检查网盘是否存在
|
||
user_drive = db.get_user_drive(drive_id)
|
||
if not user_drive:
|
||
data["message"] = "指定的网盘账号不存在"
|
||
return jsonify(data)
|
||
|
||
# 创建外链
|
||
link_uuid = db.create_external_link(
|
||
drive_id=drive_id,
|
||
total_quota=total_quota,
|
||
remarks=remarks,
|
||
expiry_time=expiry_time
|
||
)
|
||
|
||
if link_uuid:
|
||
data["status"] = True
|
||
data["data"] = {
|
||
"link_uuid": link_uuid,
|
||
"url": f"/exlink/{link_uuid}"
|
||
}
|
||
data["message"] = "外链创建成功"
|
||
else:
|
||
data["message"] = "外链创建失败"
|
||
except Exception as e:
|
||
data["message"] = f"创建外链失败: {str(e)}"
|
||
|
||
return jsonify(data)
|
||
|
||
def delete(self):
|
||
"""删除外链"""
|
||
db = get_db()
|
||
data = {"status": False}
|
||
|
||
try:
|
||
body = request.get_json()
|
||
link_uuid = body.get('link_uuid')
|
||
|
||
if not link_uuid:
|
||
data["message"] = "缺少必要的link_uuid参数"
|
||
return jsonify(data)
|
||
|
||
# 删除外链
|
||
status = db.delete_external_link(link_uuid)
|
||
|
||
if status:
|
||
data["status"] = True
|
||
data["message"] = "外链删除成功"
|
||
else:
|
||
data["message"] = "外链删除失败,可能不存在"
|
||
except Exception as e:
|
||
data["message"] = f"删除外链失败: {str(e)}"
|
||
|
||
return jsonify(data)
|
||
|
||
|
||
app.add_url_rule('/admin/exlink', view_func=Exlink.as_view('exlink'))
|
||
|
||
|
||
# 添加新的URL规则 - 外链管理API
|
||
@app.route('/admin/exlink/get', methods=['POST'])
|
||
def get_external_links():
|
||
return Exlink().get()
|
||
|
||
|
||
@app.route('/admin/exlink/create', methods=['POST'])
|
||
def create_external_link():
|
||
return Exlink().post()
|
||
|
||
|
||
@app.route('/admin/exlink/delete', methods=['POST'])
|
||
def delete_external_link():
|
||
return Exlink().delete()
|
||
|
||
|
||
# 新增:仪表盘数据 API
|
||
@app.route('/admin/dashboard_data', methods=['GET'])
|
||
def get_dashboard_data():
|
||
db = get_db()
|
||
try:
|
||
user_drives_count = db.get_total_user_drives_count()
|
||
active_links_count = db.get_active_external_links_count()
|
||
# 使用外链总数作为"今日访问量"的简化替代
|
||
total_links_count = db.get_total_external_links_count()
|
||
|
||
data = {
|
||
"status": True,
|
||
"data": {
|
||
"user_drives_count": user_drives_count,
|
||
"active_links_count": active_links_count,
|
||
"total_links_count": total_links_count
|
||
}
|
||
}
|
||
except Exception as e:
|
||
print(f"获取仪表盘数据错误: {e}")
|
||
data = {"status": False, "message": "获取仪表盘数据失败"}
|
||
return jsonify(data)
|
||
|
||
|
||
# 新增:统计分析数据 API
|
||
@app.route('/admin/statistics_data', methods=['GET'])
|
||
def get_statistics_data():
|
||
db = get_db()
|
||
try:
|
||
drives_by_provider = db.get_user_drives_count_by_provider()
|
||
# 这里可以添加更多统计数据,例如外链访问趋势 (需要记录访问日志)
|
||
# 暂时只返回网盘分布数据
|
||
|
||
data = {
|
||
"status": True,
|
||
"data": {
|
||
"drives_by_provider": drives_by_provider,
|
||
# "access_trend": [] # 示例:将来可以添加访问趋势数据
|
||
}
|
||
}
|
||
except Exception as e:
|
||
print(f"获取统计数据错误: {e}")
|
||
data = {"status": False, "message": "获取统计数据失败"}
|
||
return jsonify(data)
|
||
|
||
|
||
# -----------------------------
|
||
# 应用程序运行入口
|
||
# -----------------------------
|
||
if __name__ == '__main__':
|
||
# 初始化数据库 (如果需要)
|
||
# init_db()
|
||
|
||
# 启动Flask应用
|
||
weburl = get_cnb_weburl(5000)
|
||
print("Run_url:",weburl)
|
||
app.config.from_pyfile("config.py")
|
||
app.run(host="0.0.0.0")
|
||
|
||
|
||
|
||
|