Files
ToNav/app.py
OpenClaw Agent c0cdd146b1 feat: upgrade to V1.2 - Tags, Click Stats, and Robust WebDAV
- add Tagging system (backend and frontend)
- add Click count statistics and redirection logic
- add config.example.py
- fix WebDAV MKCOL 405 error and response handling
- fix redirection loop during force password change
- audit SQL queries for security
2026-02-13 07:58:11 +08:00

260 lines
13 KiB
Python

# -*- coding: utf-8 -*-
"""ToNav - 点击统计与标签版"""
import sqlite3
import json
import os
import shutil
import zipfile
import requests
import logging
import re
from logging.handlers import RotatingFileHandler
from datetime import datetime
from flask import Flask, render_template, request, jsonify, session, redirect, url_for, send_file
from config import Config
from utils.auth import authenticate, is_logged_in, hash_password
from utils.health_check import health_worker, check_all_services
from utils.database import init_database, insert_initial_data
# ==================== 配置与日志 ====================
handler = RotatingFileHandler(Config.LOG_FILE, maxBytes=5*1024*1024, backupCount=3)
formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger = logging.getLogger('tonav')
logger.addHandler(handler)
logger.setLevel(logging.INFO)
app = Flask(__name__)
app.config.from_object(Config)
if not os.path.exists(Config.DATABASE_PATH):
init_database(); insert_initial_data()
health_worker.start()
def get_db():
conn = sqlite3.connect(Config.DATABASE_PATH)
conn.row_factory = sqlite3.Row
return conn
# ==================== 拦截中间件 ====================
@app.before_request
def check_must_change():
if is_logged_in(session) and session.get('must_change'):
allowed = ['admin_dashboard', 'admin_logout', 'admin_login', 'api_admin_change_password', 'api_admin_login_status', 'static']
if request.endpoint and request.endpoint not in allowed and (request.path.startswith('/admin') or request.path.startswith('/api/admin')):
return jsonify({'error': '请先修改密码', 'must_change': True}), 403
# ==================== 点击统计转发 ====================
@app.route('/visit/<int:sid>')
def visit_service(sid):
"""记录点击次数并跳转"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('SELECT url FROM services WHERE id = ?', (sid,))
row = cursor.fetchone()
if row:
cursor.execute('UPDATE services SET click_count = click_count + 1 WHERE id = ?', (sid,))
conn.commit()
conn.close()
return redirect(row['url'])
conn.close()
return redirect('/')
# ==================== 备份管理相关 ====================
def ensure_webdav_dir(url, auth):
parts = url.rstrip('/').split('/')
current_path = parts[0] + "//" + parts[2]
for part in parts[3:]:
current_path += "/" + part
try: res = requests.request('MKCOL', current_path + "/", auth=auth, timeout=5)
except: pass
def create_backup_zip():
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
name = f"tonav_backup_{timestamp}.zip"
path = os.path.join('/tmp', name)
with zipfile.ZipFile(path, 'w') as zipf:
zipf.write(Config.DATABASE_PATH, 'tonav.db')
zipf.write(os.path.join(os.path.dirname(__file__), 'config.py'), 'config.py')
return path, name
# ==================== API 路由 (全补全) ====================
@app.route('/')
def index(): return render_template('index.html')
@app.route('/api/services')
def api_public_services():
conn = get_db(); cursor = conn.cursor()
cursor.execute('''
SELECT s.*, COALESCE(c.name, s.category) as category
FROM services s LEFT JOIN categories c ON s.category = c.name
WHERE s.is_enabled = 1 ORDER BY s.sort_order DESC
''')
data = [dict(r) for r in cursor.fetchall()]; conn.close(); return jsonify(data)
@app.route('/api/categories')
def api_public_categories():
conn = get_db(); cursor = conn.cursor()
cursor.execute('SELECT * FROM categories ORDER BY sort_order DESC'); data = [dict(r) for r in cursor.fetchall()]; conn.close(); return jsonify(data)
@app.route('/admin')
def admin_dashboard():
if not is_logged_in(session): return redirect(url_for('admin_login'))
return render_template('admin/dashboard.html')
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
if request.method == 'GET': return render_template('admin/login.html')
user = authenticate(request.form.get('username'), request.form.get('password'))
if user:
session['user_id'] = user['id']; session['username'] = user['username']
return redirect(url_for('admin_dashboard'))
return render_template('admin/login.html', error='账号或密码错误')
@app.route('/admin/logout')
def admin_logout(): session.clear(); return redirect(url_for('admin_login'))
@app.route('/admin/services')
def admin_services():
if not is_logged_in(session): return redirect(url_for('admin_login'))
return render_template('admin/services.html')
@app.route('/admin/categories')
def admin_categories():
if not is_logged_in(session): return redirect(url_for('admin_login'))
return render_template('admin/categories.html')
@app.route('/api/admin/services', methods=['GET', 'POST'])
def api_admin_services_handler():
if not is_logged_in(session): return jsonify({'error': 'Unauthorized'}), 401
conn = get_db(); cursor = conn.cursor()
if request.method == 'POST':
data = request.get_json()
cursor.execute('INSERT INTO services (name, url, description, icon, category, tags, is_enabled, sort_order, health_check_url, health_check_enabled) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
(data['name'], data['url'], data.get('description', ''), data.get('icon', ''), data.get('category', '默认'), data.get('tags', ''), 1 if data.get('is_enabled', True) else 0, data.get('sort_order', 0), data.get('health_check_url', ''), 1 if data.get('health_check_enabled', False) else 0))
conn.commit(); conn.close(); return jsonify({'message': 'OK'})
cursor.execute('SELECT s.*, COALESCE(c.name, s.category) as category FROM services s LEFT JOIN categories c ON s.category = c.name ORDER BY s.sort_order DESC')
data = [dict(r) for r in cursor.fetchall()]; conn.close(); return jsonify(data)
@app.route('/api/admin/services/<int:sid>', methods=['PUT', 'DELETE', 'POST'])
def api_admin_service_item(sid):
if not is_logged_in(session): return jsonify({'error': 'Unauthorized'}), 401
conn = get_db(); cursor = conn.cursor()
if request.method == 'DELETE':
cursor.execute('DELETE FROM services WHERE id=?', (sid,))
elif request.method == 'POST': # Toggle
cursor.execute('UPDATE services SET is_enabled = 1 - is_enabled WHERE id=?', (sid,))
else: # PUT
data = request.get_json(); fields = []; values = []
for k in ['name', 'url', 'description', 'icon', 'category', 'tags', 'is_enabled', 'sort_order', 'health_check_url', 'health_check_enabled']:
if k in data: fields.append(f"{k}=?"); values.append(data[k])
values.append(sid)
cursor.execute(f"UPDATE services SET {', '.join(fields)}, updated_at=CURRENT_TIMESTAMP WHERE id=?", values)
conn.commit(); conn.close(); return jsonify({'message': 'OK'})
@app.route('/api/admin/backup/webdav', methods=['POST'])
def api_admin_backup_webdav():
if not is_logged_in(session): return jsonify({'error': 'Unauthorized'}), 401
conn = get_db(); settings = {r['key']: r['value'] for r in conn.execute('SELECT * FROM settings').fetchall()}; conn.close()
base_url = settings.get('webdav_url', '').rstrip('/') + '/tonav/'
user = settings.get('webdav_user'); pw = settings.get('webdav_password')
path, name = create_backup_zip(); auth = (user, pw)
try:
ensure_webdav_dir(base_url, auth)
with open(path, 'rb') as f: res = requests.put(base_url + name, data=f, auth=auth, timeout=30)
if 200 <= res.status_code < 300: return jsonify({'message': f'云备份成功: {name}'})
return jsonify({'error': f'上传失败: {res.status_code}'}), 500
finally:
if os.path.exists(path): os.remove(path)
@app.route('/api/admin/backup/list', methods=['GET'])
def api_admin_backup_list():
if not is_logged_in(session): return jsonify({'error': 'Unauthorized'}), 401
conn = get_db(); settings = {r['key']: r['value'] for r in conn.execute('SELECT * FROM settings').fetchall()}; conn.close()
url = settings.get('webdav_url', '').rstrip('/') + '/tonav/'
try:
res = requests.request('PROPFIND', url, auth=(settings.get('webdav_user'), settings.get('webdav_password')), headers={'Depth': '1'}, timeout=10)
files = re.findall(r'[<>](tonav_backup_.*?\.zip)[<>]', res.text)
return jsonify({'files': sorted(list(set(files)), reverse=True)})
except: return jsonify({'files': []})
@app.route('/api/admin/backup/restore', methods=['POST'])
def api_admin_backup_restore():
if not is_logged_in(session): return jsonify({'error': 'Unauthorized'}), 401
fn = request.get_json().get('filename')
conn = get_db(); settings = {r['key']: r['value'] for r in conn.execute('SELECT * FROM settings').fetchall()}; conn.close()
url = settings.get('webdav_url', '').rstrip('/') + '/tonav/' + fn
auth = (settings.get('webdav_user'), settings.get('webdav_password'))
path = os.path.join('/tmp', fn)
try:
res = requests.get(url, auth=auth, timeout=60)
with open(path, 'wb') as f: f.write(res.content)
health_worker.stop()
shutil.copy2(Config.DATABASE_PATH, Config.DATABASE_PATH + ".bak")
with zipfile.ZipFile(path, 'r') as z: z.extract('tonav.db', os.path.dirname(Config.DATABASE_PATH))
health_worker.start()
return jsonify({'message': '恢复成功'})
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/admin/settings', methods=['GET', 'POST'])
def api_admin_settings():
conn = get_db(); cursor = conn.cursor()
if request.method == 'POST':
data = request.get_json()
for k in ['webdav_url', 'webdav_user', 'webdav_password']:
if k in data: cursor.execute('INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)', (k, data[k]))
conn.commit(); conn.close(); return jsonify({'message': 'OK'})
data = {r['key']: r['value'] for r in cursor.execute('SELECT * FROM settings').fetchall()}; conn.close(); return jsonify(data)
@app.route('/api/admin/login/status')
def api_admin_status():
if not is_logged_in(session): return jsonify({'logged_in': False})
conn = get_db(); row = conn.execute('SELECT must_change_password FROM users WHERE id=?', (session['user_id'],)).fetchone(); conn.close()
return jsonify({'logged_in': True, 'username': session.get('username'), 'must_change': True if (row and row[0]==1) else False})
@app.route('/api/admin/change-password', methods=['POST'])
def api_admin_cpw():
data = request.get_json(); conn = get_db()
row = conn.execute('SELECT password_hash FROM users WHERE id=?', (session['user_id'],)).fetchone()
if row and row[0] == hash_password(data.get('old_password')):
conn.execute('UPDATE users SET password_hash=?, must_change_password=0 WHERE id=?', (hash_password(data.get('new_password')), session['user_id']))
conn.commit(); conn.close(); session.pop('must_change', None); return jsonify({'message': 'OK'})
conn.close(); return jsonify({'error': '旧密码错误'}), 400
@app.route('/api/admin/categories', methods=['GET', 'POST'])
def api_admin_cat_h():
if not is_logged_in(session): return jsonify({'error': 'Unauthorized'}), 401
conn = get_db(); cursor = conn.cursor()
if request.method == 'POST':
data = request.get_json()
cursor.execute('INSERT INTO categories (name, sort_order) VALUES (?, ?)', (data['name'], data.get('sort_order', 0)))
conn.commit(); conn.close(); return jsonify({'message': 'OK'})
cursor.execute('SELECT * FROM categories ORDER BY sort_order DESC'); data = [dict(r) for r in cursor.fetchall()]; conn.close(); return jsonify(data)
@app.route('/api/admin/categories/<int:cid>', methods=['PUT', 'DELETE'])
def api_admin_cat_i(cid):
if not is_logged_in(session): return jsonify({'error': 'Unauthorized'}), 401
conn = get_db(); cursor = conn.cursor()
if request.method == 'DELETE': cursor.execute('DELETE FROM categories WHERE id=?', (cid,))
else:
data = request.get_json(); cursor.execute('SELECT name FROM categories WHERE id=?', (cid,))
old = cursor.fetchone()[0]; new = data.get('name')
cursor.execute('UPDATE categories SET name=?, sort_order=? WHERE id=?', (new, data.get('sort_order', 0), cid))
if old != new: cursor.execute('UPDATE services SET category=? WHERE category=?', (new, old))
conn.commit(); conn.close(); return jsonify({'message': 'OK'})
@app.route('/api/admin/backup/local', methods=['GET'])
def api_admin_blocal(): path, name = create_backup_zip(); return send_file(path, as_attachment=True, download_name=name)
@app.route('/api/admin/health-check', methods=['POST'])
def api_admin_hc(): return jsonify({'results': check_all_services()})
if __name__ == '__main__':
app.run(host=Config.HOST, port=Config.PORT, debug=Config.DEBUG)