Initial commit: ToNav Personal Navigation Page

- Flask + SQLite 个人导航页系统
- 前台导航页(分类Tab、卡片展示)
- 管理后台(服务管理、分类管理、健康检测)
- 响应式设计
- Systemd 服务配置
This commit is contained in:
OpenClaw Agent
2026-02-12 21:57:15 +08:00
commit 872526505e
22 changed files with 3424 additions and 0 deletions

111
utils/health_check.py Normal file
View File

@@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
"""健康检查工具"""
import requests
import sqlite3
import threading
import time
from config import Config
def check_service_health(service_id):
"""检查单个服务健康状态"""
conn = sqlite3.connect(Config.DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT name, url, health_check_url, health_check_enabled
FROM services
WHERE id = ?
''', (service_id,))
service = cursor.fetchone()
if not service:
conn.close()
return
name, url, check_url, check_enabled = service
check_url = check_url or url # 如果没有单独配置使用主url
status = 'offline'
status_code = None
error = None
if check_enabled:
try:
response = requests.get(check_url, timeout=Config.HEALTH_CHECK_TIMEOUT)
status_code = response.status_code
if response.status_code < 500:
status = 'online'
except requests.exceptions.Timeout:
error = 'timeout'
except requests.exceptions.ConnectionError:
error = 'connection_error'
except Exception as e:
error = str(e)
# 这里可以添加状态记录表,暂时只检查
conn.close()
return {
'id': service_id,
'name': name,
'status': status,
'status_code': status_code,
'error': error
}
def check_all_services():
"""检查所有启用健康检测的服务"""
conn = sqlite3.connect(Config.DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT id FROM services
WHERE is_enabled = 1 AND health_check_enabled = 1
ORDER BY sort_order DESC
''')
services = cursor.fetchall()
conn.close()
results = []
for (service_id,) in services:
result = check_service_health(service_id)
if result:
results.append(result)
return results
class HealthCheckWorker:
"""后台健康检查工作线程"""
def __init__(self):
self.running = False
self.thread = None
def start(self):
"""启动后台检查"""
if not self.running:
self.running = True
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
print("健康检查线程已启动")
def stop(self):
"""停止后台检查"""
self.running = False
if self.thread:
self.thread.join()
print("健康检查线程已停止")
def _run(self):
"""检查循环"""
while self.running:
results = check_all_services()
# 记录日志
for result in results:
print(f"[HealthCheck] {result['name']}: {result['status']}")
time.sleep(Config.HEALTH_CHECK_INTERVAL)
# 全局工作线程
health_worker = HealthCheckWorker()