238 lines
6.4 KiB
Python
238 lines
6.4 KiB
Python
import os
|
|
import requests
|
|
import sqlite3
|
|
import json
|
|
from flask import g
|
|
from flask import Flask, render_template, request, redirect, url_for, session,make_response,send_from_directory,jsonify
|
|
from flask.views import MethodView
|
|
from werkzeug.routing import BaseConverter
|
|
from utils.login import login_quark
|
|
from utils.tools import get_cnb_weburl
|
|
from utils.detebase import CloudDriveDatabase
|
|
|
|
|
|
app = Flask(__name__)
|
|
#app = Flask(__name__,template_folder='templates') #修改模板目录
|
|
app.jinja_env.auto_reload = True
|
|
|
|
# 数据库配置
|
|
DATABASE = 'database.db'
|
|
|
|
def get_db():
|
|
# 从 Flask 的全局对象 g 中获取数据库连接
|
|
# getattr 函数用于获取对象的属性,如果属性不存在则返回第三个参数作为默认值
|
|
# 这里尝试获取 g._database 属性,如果不存在则返回 None
|
|
# 这种模式确保在一个请求中只创建一个数据库连接
|
|
db = getattr(g, '_database', None)
|
|
if db is None:
|
|
db = g._database = CloudDriveDatabase(DATABASE)
|
|
db.row_factory = sqlite3.Row
|
|
return db
|
|
|
|
|
|
# 定义数据库注入装饰器
|
|
def inject_db(f):
|
|
def decorated_function(*args, **kwargs):
|
|
db = get_db()
|
|
# 将 db 作为第一个参数传递
|
|
return f(db, *args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
@app.teardown_appcontext
|
|
def close_connection(exception):
|
|
db = getattr(g, '_database', None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
|
|
exlink_info = {
|
|
"exlink_id":"1",
|
|
"qrlimit":3,
|
|
"driver":["quark","wechat"],
|
|
"use_limit":0
|
|
}
|
|
|
|
class PhoneConverter(BaseConverter):
|
|
# 自定义路由转换器
|
|
regex = '1[3-9]\d{9}'
|
|
|
|
class LiConverter(BaseConverter):
|
|
# 自定义to_python转换器
|
|
def to_python(self, value):
|
|
return value.split("+")
|
|
|
|
app.url_map.converters["phone"] = PhoneConverter
|
|
app.url_map.converters["li"] = LiConverter
|
|
|
|
@app.route("/", methods=["GET", "POST"])
|
|
def index():
|
|
#return render_template("index.html")
|
|
return render_template('index.html')
|
|
|
|
# @app.route("/<phone:param>")
|
|
# def phone(param):
|
|
# return param
|
|
|
|
# @app.route("/<li:param>")
|
|
# def uner_info(param):
|
|
# return param
|
|
|
|
#falsk中重定向
|
|
@app.route('/profile/')
|
|
def proflie():
|
|
if request.args.get('name'):
|
|
return '个人中心页面'
|
|
else:
|
|
# return redirect(url_for('login'))
|
|
return redirect(url_for('index'),code=302)
|
|
|
|
|
|
@app.route('/demo2')
|
|
def demo2():
|
|
resp = make_response('make response测试')
|
|
resp.headers['itbaizhan'] = 'Python'
|
|
resp.status = '404 not found'
|
|
return resp
|
|
|
|
@app.route('/login',methods=['POST'])
|
|
def login():
|
|
# 获取POST请求中的JSON数据
|
|
data = request.get_json()
|
|
token = data.get('token')
|
|
if not token:
|
|
print('缺少token参数')
|
|
return jsonify({"status": False, "message": "缺少token参数"})
|
|
status = login_quark(token)
|
|
return jsonify({"status": status})
|
|
|
|
@app.route('/exlink/<string:id>')
|
|
def qrlink(id):
|
|
info = exlink_info
|
|
return info
|
|
|
|
@app.route('/admin/')
|
|
def admin():
|
|
db = get_db()
|
|
providers = db.get_all_drive_providers()
|
|
alluser_drives = db.get_all_user_drives()
|
|
print(alluser_drives)
|
|
return render_template('admin.html',providers=providers,alluser_drives=alluser_drives)
|
|
|
|
|
|
@app.route('/admin/drive_provider/<metfunc>',methods=['POST'])
|
|
def drive_provider(metfunc):
|
|
db = get_db()
|
|
data = {"status":False}
|
|
if metfunc == "get":
|
|
|
|
alldrive= db.get_all_drive_providers()
|
|
if len(alldrive) > 0:
|
|
data["status"] = True
|
|
data['data'] = alldrive
|
|
# 使用jsonify确保中文正确显示
|
|
elif metfunc == "add":
|
|
"""
|
|
json样板
|
|
body -- {
|
|
"config_vars": {
|
|
"data": {
|
|
"client_id": "532",
|
|
"kps_wg": "",
|
|
"request_id": "",
|
|
"sign_wg": "",
|
|
"token": "",
|
|
"v": "1.2",
|
|
"vcode": ""
|
|
},
|
|
"kps_wg": "",
|
|
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
|
|
"sign_wg": ""
|
|
},
|
|
"provider_name": "夸克网盘",
|
|
"remarks": "夸克网盘"
|
|
}
|
|
Return: status
|
|
"""
|
|
|
|
body = request.get_json()
|
|
print(data)
|
|
status = db.add_drive_provider(body.get("provider_name","测试网盘"),body.get("config_vars"),body.get("remarks","测试网盘"))
|
|
|
|
if status:
|
|
data["status"] = True
|
|
data["data"] = body
|
|
return data
|
|
|
|
@app.route('/admin/user_drive/<metfunc>',methods=['POST'])
|
|
def user_drive(metfunc):
|
|
db = get_db()
|
|
data = {"status":False}
|
|
if metfunc == "get":
|
|
pass
|
|
elif metfunc == "add":
|
|
body = request.get_json()
|
|
print(body)
|
|
status = db.add_user_drive(body.get("provider_name","测试网盘"),body.get("config_vars"),body.get("remarks","测试网盘"))
|
|
if status:
|
|
data["status"] = True
|
|
data["data"] = body
|
|
return data
|
|
|
|
|
|
class Exlink(MethodView):
|
|
def get(self):
|
|
db = get_db()
|
|
db.add_drive_provider(
|
|
"阿里网盘",
|
|
{
|
|
"sign_wg": "",
|
|
"kps_wg": "",
|
|
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
|
|
"data":{
|
|
'client_id': '532',
|
|
'v': '1.2',
|
|
'request_id': "",
|
|
'sign_wg': "",
|
|
'kps_wg': "",
|
|
'vcode': "",
|
|
'token': ""
|
|
}
|
|
},
|
|
"阿里网盘API配置"
|
|
)
|
|
return jsonify({"status": True, "message": "success"})
|
|
|
|
|
|
def post(self):
|
|
data = request.json
|
|
db = get_db()
|
|
cursor = db.cursor()
|
|
cursor.execute('''
|
|
INSERT INTO exlinks (exlink_id, qr_limit, drivers, use_limit)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (
|
|
data.get('exlink_id'),
|
|
data.get('qr_limit', 3),
|
|
data.get('drivers', '["quark","wechat"]'),
|
|
data.get('use_limit', 0)
|
|
))
|
|
db.commit()
|
|
return jsonify({"status": True, "message": "success"})
|
|
|
|
|
|
app.add_url_rule('/admin/exlink', view_func=Exlink.as_view('exlink'))
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
weburl = get_cnb_weburl(5000)
|
|
print("Run_url:",weburl)
|
|
app.config.from_pyfile("config.py")
|
|
app.run(host="0.0.0.0")
|
|
|
|
|
|
|
|
|