新增仪表盘和统计分析数据API,优化外链过期时间处理,完善日志记录功能,更新前端样式以提升用户体验,确保数据统计准确性和可视化展示。

This commit is contained in:
dockermen
2025-04-11 19:38:57 +08:00
parent aad2182d0c
commit c4ce2fc5a4
5 changed files with 463 additions and 45 deletions

View File

@ -320,6 +320,79 @@ class CloudDriveDatabase:
except Exception:
return False
# 新增统计方法
def get_total_user_drives_count(self) -> int:
"""获取用户网盘总数"""
try:
self.cursor.execute("SELECT COUNT(*) FROM user_drives")
result = self.cursor.fetchone()
return result[0] if result else 0
except Exception as e:
print(f"获取用户网盘总数错误: {e}")
return 0
def get_active_external_links_count(self) -> int:
"""获取活跃外链数量 (未过期且有剩余次数)"""
try:
from datetime import datetime, timezone
now_utc_iso = datetime.now(timezone.utc).isoformat()
# 注意SQLite 不直接支持 ISO 8601 比较,此查询可能需要调整或在 Python 中过滤
# 简单起见,我们先只检查次数和时间是否存在
# 更精确的查询可能需要 DATETIME 函数,或在 Python 中处理
self.cursor.execute(
"""
SELECT COUNT(*) FROM external_links
WHERE (used_quota < total_quota)
AND (expiry_time IS NOT NULL AND expiry_time > ?)
""",
(now_utc_iso,) # 这个比较可能不适用于所有 SQLite 版本/配置,后续可能需要调整
)
# 备选(更兼容但效率低):获取所有链接在 Python 中过滤
# self.cursor.execute("SELECT link_uuid, expiry_time, used_quota, total_quota FROM external_links")
# links = self.cursor.fetchall()
# count = 0
# for link in links:
# is_active = False
# if link['used_quota'] < link['total_quota']:
# if link['expiry_time']:
# try:
# expiry_dt = datetime.fromisoformat(link['expiry_time'].replace('Z', '+00:00'))
# if datetime.now(timezone.utc) < expiry_dt:
# is_active = True
# except: pass # Ignore parsing errors
# else: # No expiry time means active if quota is available
# is_active = True
# if is_active:
# count += 1
# return count
result = self.cursor.fetchone()
return result[0] if result else 0
except Exception as e:
print(f"获取活跃外链数量错误: {e}")
return 0 # 返回 0 或其他错误指示
def get_total_external_links_count(self) -> int:
"""获取外链总数"""
try:
self.cursor.execute("SELECT COUNT(*) FROM external_links")
result = self.cursor.fetchone()
return result[0] if result else 0
except Exception as e:
print(f"获取外链总数错误: {e}")
return 0
def get_user_drives_count_by_provider(self) -> Dict[str, int]:
"""按提供商统计用户网盘数量"""
try:
self.cursor.execute("SELECT provider_name, COUNT(*) as count FROM user_drives GROUP BY provider_name")
results = self.cursor.fetchall()
return {row['provider_name']: row['count'] for row in results}
except Exception as e:
print(f"按提供商统计用户网盘数量错误: {e}")
return {}
def close(self):
"""关闭数据库连接"""
if self.conn: