TenantDrive/main.py

244 lines
6.7 KiB
Python

import os
import requests
import sqlite3
import json
from flask import g
from flask import Flask, render_template, request, redirect, url_for, session,make_response,send_from_directory,jsonify
from flask.views import MethodView
from werkzeug.routing import BaseConverter
from utils.login import login_quark
from utils.tools import get_cnb_weburl
from utils.detebase import CloudDriveDatabase
app = Flask(__name__)
#app = Flask(__name__,template_folder='templates') #修改模板目录
app.jinja_env.auto_reload = True
# 数据库配置
DATABASE = 'database.db'
def get_db():
# 从 Flask 的全局对象 g 中获取数据库连接
# getattr 函数用于获取对象的属性,如果属性不存在则返回第三个参数作为默认值
# 这里尝试获取 g._database 属性,如果不存在则返回 None
# 这种模式确保在一个请求中只创建一个数据库连接
db = getattr(g, '_database', None)
if db is None:
db = g._database = CloudDriveDatabase(DATABASE)
db.row_factory = sqlite3.Row
return db
# 定义数据库注入装饰器
def inject_db(f):
def decorated_function(*args, **kwargs):
db = get_db()
# 将 db 作为第一个参数传递
return f(db, *args, **kwargs)
return decorated_function
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
exlink_info = {
"exlink_id":"1",
"qrlimit":3,
"driver":["quark","wechat"],
"use_limit":0
}
class PhoneConverter(BaseConverter):
# 自定义路由转换器
regex = '1[3-9]\d{9}'
class LiConverter(BaseConverter):
# 自定义to_python转换器
def to_python(self, value):
return value.split("+")
app.url_map.converters["phone"] = PhoneConverter
app.url_map.converters["li"] = LiConverter
@app.route("/", methods=["GET", "POST"])
def index():
#return render_template("index.html")
return render_template('index.html')
# @app.route("/<phone:param>")
# def phone(param):
# return param
# @app.route("/<li:param>")
# def uner_info(param):
# return param
#falsk中重定向
@app.route('/profile/')
def proflie():
if request.args.get('name'):
return '个人中心页面'
else:
# return redirect(url_for('login'))
return redirect(url_for('index'),code=302)
@app.route('/demo2')
def demo2():
resp = make_response('make response测试')
resp.headers['itbaizhan'] = 'Python'
resp.status = '404 not found'
return resp
@app.route('/login',methods=['POST'])
def login():
# 获取POST请求中的JSON数据
data = request.get_json()
token = data.get('token')
if not token:
print('缺少token参数')
return jsonify({"status": False, "message": "缺少token参数"})
status = login_quark(token)
return jsonify({"status": status})
@app.route('/exlink/<string:id>')
def qrlink(id):
info = exlink_info
return info
@app.route('/admin/')
def admin():
db = get_db()
providers = db.get_all_drive_providers()
alluser_drives = db.get_all_user_drives()
return render_template('admin.html',providers=providers,alluser_drives=alluser_drives)
@app.route('/admin/drive_provider/<metfunc>',methods=['POST'])
def drive_provider(metfunc):
db = get_db()
data = {"status":False}
if metfunc == "get":
alldrive= db.get_all_drive_providers()
if len(alldrive) > 0:
data["status"] = True
data['data'] = alldrive
# 使用jsonify确保中文正确显示
elif metfunc == "add":
"""
json样板
body -- {
"config_vars": {
"data": {
"client_id": "532",
"kps_wg": "",
"request_id": "",
"sign_wg": "",
"token": "",
"v": "1.2",
"vcode": ""
},
"kps_wg": "",
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
"sign_wg": ""
},
"provider_name": "夸克网盘",
"remarks": "夸克网盘"
}
Return: status
"""
body = request.get_json()
status = db.add_drive_provider(body.get("provider_name","测试网盘"),body.get("config_vars"),body.get("remarks","测试网盘"))
if status:
data["status"] = True
data["data"] = body
return data
@app.route('/admin/user_drive/<metfunc>',methods=['POST'])
def user_drive(metfunc):
db = get_db()
data = {"status":False}
if metfunc == "get":
pass
elif metfunc == "add":
body = request.get_json()
print(body)
status = db.add_user_drive(body.get("provider_name","测试网盘"),body.get("config_vars"),body.get("remarks","测试网盘"))
if status:
data["status"] = True
data["data"] = body
elif metfunc == "update":
body = request.get_json()
print(body)
print(body.get("id"),body.get("login_config"))
status = db.update_user_drive(body.get("id"),json.loads(body.get("login_config")),body.get("remarks","测试网盘"))
if status:
data["status"] = True
data["data"] = body
return data
class Exlink(MethodView):
def get(self):
db = get_db()
db.add_drive_provider(
"阿里网盘",
{
"sign_wg": "",
"kps_wg": "",
"redirect_uri": "https://uop.quark.cn/cas/ajax/loginWithKpsAndQrcodeToken",
"data":{
'client_id': '532',
'v': '1.2',
'request_id': "",
'sign_wg': "",
'kps_wg': "",
'vcode': "",
'token': ""
}
},
"阿里网盘API配置"
)
return jsonify({"status": True, "message": "success"})
def post(self):
data = request.json
db = get_db()
cursor = db.cursor()
cursor.execute('''
INSERT INTO exlinks (exlink_id, qr_limit, drivers, use_limit)
VALUES (?, ?, ?, ?)
''', (
data.get('exlink_id'),
data.get('qr_limit', 3),
data.get('drivers', '["quark","wechat"]'),
data.get('use_limit', 0)
))
db.commit()
return jsonify({"status": True, "message": "success"})
app.add_url_rule('/admin/exlink', view_func=Exlink.as_view('exlink'))
if __name__ == "__main__":
weburl = get_cnb_weburl(5000)
print("Run_url:",weburl)
app.config.from_pyfile("config.py")
app.run(host="0.0.0.0")