feat: Initial commit for contraband ranking system
This commit is contained in:
226
update_security_ranking_v2.py
Normal file
226
update_security_ranking_v2.py
Normal file
@@ -0,0 +1,226 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import requests
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
# ================= 配置区 =================
|
||||
VERSION = "v2.3.8"
|
||||
WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=579c755e-24d2-47ae-9ca1-3ede6998327a"
|
||||
DATA_FILE = "security_data_v2.json"
|
||||
BEIJING_TZ = timezone(timedelta(hours=8))
|
||||
|
||||
# WebDAV 配置
|
||||
WEBDAV_BASE = "https://chfs.ouaone.top/webdav/openclaw/backup/security/"
|
||||
WEBDAV_AUTH = "openclaw:Khh13579"
|
||||
|
||||
# ================= 核心工具 =================
|
||||
|
||||
def get_now():
|
||||
return datetime.now(BEIJING_TZ)
|
||||
|
||||
def upload_to_webdav(local_path, remote_sub_path):
|
||||
remote_url = f"{WEBDAV_BASE}{remote_sub_path}"
|
||||
if "/" in remote_sub_path:
|
||||
parent_dir = "/".join(remote_sub_path.split("/")[:-1])
|
||||
mkdir_cmd = f"curl -u '{WEBDAV_AUTH}' -X MKCOL '{WEBDAV_BASE}{parent_dir}/' >/dev/null 2>&1"
|
||||
subprocess.run(mkdir_cmd, shell=True)
|
||||
cmd = f"curl -u '{WEBDAV_AUTH}' -T '{local_path}' '{remote_url}'"
|
||||
subprocess.Popen(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
|
||||
def load_data():
|
||||
data = {"meta": {"script_version": VERSION}, "members": {}, "logs": [], "last_id": -1}
|
||||
if os.path.exists(DATA_FILE):
|
||||
with open(DATA_FILE, 'r', encoding='utf-8') as f:
|
||||
loaded = json.load(f)
|
||||
if "meta" not in loaded: loaded["meta"] = data["meta"]
|
||||
return loaded
|
||||
return data
|
||||
|
||||
def save_data(data, action_text="系统自动备份"):
|
||||
if "meta" not in data: data["meta"] = {}
|
||||
data["meta"]["script_version"] = VERSION
|
||||
data["meta"]["last_update"] = get_now().isoformat()
|
||||
with open(DATA_FILE, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
# 异步备份
|
||||
upload_to_webdav(DATA_FILE, "latest_security_v2.json")
|
||||
upload_to_webdav(__file__, f"script_history/script_{VERSION}.py")
|
||||
# 触发新版快报推送
|
||||
send_notification(action_text, data)
|
||||
|
||||
def format_name(name):
|
||||
"""两个字姓名中间加4个空格以对齐三字姓名"""
|
||||
return f"{name[0]} {name[1]}" if len(name) == 2 else name
|
||||
|
||||
def send_notification(action_text, data):
|
||||
"""发送企业微信通知 - v2.3.8 对齐增强与删除功能 + 新业务日期"""
|
||||
if not WEBHOOK_URL: return
|
||||
now = get_now().strftime("%Y-%m-%d %H:%M")
|
||||
biz_month = get_now().strftime("%Y-%m")
|
||||
|
||||
# 1. 统计当月排名 (过滤0票)
|
||||
stats = {}
|
||||
for log in data.get("logs", []):
|
||||
# 只有未删除的记录才参与排名
|
||||
if log.get("biz_month") == biz_month and not log.get("is_deleted", False):
|
||||
n = log["name"]
|
||||
if n in data["members"] and data["members"][n]["status"] == "active":
|
||||
stats[n] = stats.get(n, 0) + 1
|
||||
|
||||
ranking = sorted(stats.items(), key=lambda x: x[1], reverse=True)
|
||||
# 此处调用 format_name 进行名字对齐
|
||||
ranking_text = "\n".join([f"{i+1}. {format_name(m[0])}: {m[1]} 票({data['members'][m[0]].get('shift', '未知')})" for i, m in enumerate(ranking)])
|
||||
|
||||
# 2. 获取最近动态 (只显示未删除的)
|
||||
recent_records = [l for l in data.get('logs', []) if not l.get('is_deleted', False)][-3:]
|
||||
recent_records.reverse()
|
||||
recent_text = ""
|
||||
for r in recent_records:
|
||||
r_time = datetime.fromisoformat(r['actual_time']).astimezone(BEIJING_TZ).strftime("%m-%d %H:%M")
|
||||
# 此处也应用对齐逻辑
|
||||
recent_text += f"- [{r_time}] {format_name(r['name'])}: {r['content']}\n"
|
||||
|
||||
content = f"""安检团队查获快报
|
||||
更新时间:{now}
|
||||
最新操作: {action_text}
|
||||
|
||||
排名明细:
|
||||
{ranking_text if ranking_text else "暂无记录"}
|
||||
(仅显示已查获成员,0 记录成员已隐藏)
|
||||
|
||||
最近动态:
|
||||
{recent_text if recent_text else "暂无记录"}"""
|
||||
|
||||
payload = {"msgtype": "text", "text": {"content": content}}
|
||||
try: requests.post(WEBHOOK_URL, json=payload, timeout=5)
|
||||
except: pass
|
||||
|
||||
def add_member(name_with_shift):
|
||||
m = re.match(r"^(.*?)\((.*?)\)$", name_with_shift)
|
||||
if not m: return "错误:格式应为 '新增 姓名(班次)'"
|
||||
name, shift = m.groups()
|
||||
data = load_data()
|
||||
if name in data["members"] and data["members"][name]["status"] == "active":
|
||||
return f"ℹ️ 人员 {name} 已存在且为活跃状态。"
|
||||
data["members"][name] = {"shift": shift, "status": "active"}
|
||||
action_text = f"新增成员 {format_name(name)} ({shift})"
|
||||
save_data(data, action_text)
|
||||
return f"✅ 已录入人员:{name} ({shift})"
|
||||
|
||||
def remove_member(name):
|
||||
data = load_data()
|
||||
if name not in data["members"] or data["members"][name]["status"] == "offboarded":
|
||||
return f"ℹ️ 未找到活跃人员 '{name}' 或已离职。"
|
||||
data["members"][name]["status"] = "offboarded"
|
||||
action_text = f"去除成员 {format_name(name)}"
|
||||
save_data(data, action_text)
|
||||
return f"✅ 已去除人员:{name} (已标记为离职,不参与排名)"
|
||||
|
||||
def record_entry(name, content, biz_date_manual=None):
|
||||
data = load_data()
|
||||
if name not in data["members"] or data["members"][name]["status"] == "offboarded":
|
||||
return f"错误:未找到活跃人员 '{name}' 或该人员已离职。"
|
||||
|
||||
now = get_now()
|
||||
if biz_date_manual:
|
||||
year = now.year
|
||||
dt = datetime.strptime(f"{year}{biz_date_manual.replace('-','')}", "%Y%m%d").replace(tzinfo=BEIJING_TZ)
|
||||
info = {"biz_date": dt.strftime("%Y-%m-%d"), "biz_month": dt.strftime("%Y-%m"), "is_manual": True}
|
||||
else:
|
||||
# 修正业务日期判定逻辑:00:00-05:00 归属到前一个日历日
|
||||
# 重点:判断当前小时是否在 0 到 5 之间(包含 0 和 5)
|
||||
biz_dt = now - timedelta(days=1) if 0 <= now.hour <= 5 else now
|
||||
info = {"biz_date": biz_dt.strftime("%Y-%m-%d"), "biz_month": biz_dt.strftime("%Y-%m"), "is_manual": False}
|
||||
|
||||
data["last_id"] += 1
|
||||
new_id = data["last_id"]
|
||||
log = {"id": new_id, "name": name, "content": content, "actual_time": now.isoformat(), "biz_date": info["biz_date"], "biz_month": info["biz_month"]}
|
||||
data["logs"].append(log)
|
||||
|
||||
tag = "[补录] " if info.get("is_manual") else ""
|
||||
action_text = f"{tag}新增 {format_name(name)} {content}"
|
||||
save_data(data, action_text)
|
||||
|
||||
daily_count = sum(1 for l in data["logs"] if l["name"] == name and l["biz_date"] == info["biz_date"] and not l.get('is_deleted', False))
|
||||
return f"[已录入] {name} - {content} (当日第{daily_count}票, 索引 #{new_id})"
|
||||
|
||||
def delete_entry(log_id):
|
||||
data = load_data()
|
||||
found = False
|
||||
for log in data["logs"]:
|
||||
if log['id'] == log_id:
|
||||
log['is_deleted'] = True # 标记为删除,而不是真正移除
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
action_text = f"删除记录 # {log_id}"
|
||||
save_data(data, action_text)
|
||||
return f"✅ 已删除记录:#{log_id}"
|
||||
return f"❌ 未找到索引为 #{log_id} 的记录。"
|
||||
|
||||
def query_logs(mode, value):
|
||||
data = load_data()
|
||||
results = [l for l in data["logs"] if (l["name"] == value if mode=="name" else l["biz_date"].endswith(value)) and not l.get('is_deleted', False)]
|
||||
if not results: return "🔍 暂无记录。"
|
||||
res = f"🔍 查询结果 ({value})\n----------------\n"
|
||||
for l in results:
|
||||
time_str = datetime.fromisoformat(l['actual_time']).astimezone(BEIJING_TZ).strftime("%m-%d %H:%M")
|
||||
res += f"#{l['id']} - {l['content']} (录入时间: {time_str})\n"
|
||||
return res
|
||||
|
||||
def get_ranking(target_month=None):
|
||||
data = load_data()
|
||||
now = get_now()
|
||||
if not target_month:
|
||||
target_month = (now.replace(day=1)-timedelta(days=1)).strftime("%Y-%m") if 0<=now.hour<6 else now.strftime("%Y-%m")
|
||||
elif len(target_month) <= 3:
|
||||
target_month = f"{now.year}-{target_month.replace('月','').zfill(2)}"
|
||||
|
||||
stats = {}
|
||||
for log in data.get("logs", []):
|
||||
if log.get("biz_month") == target_month and not log.get('is_deleted', False):
|
||||
n = log["name"]
|
||||
if n in data["members"] and data["members"][n]["status"] == "active":
|
||||
stats[n] = stats.get(n, 0) + 1
|
||||
|
||||
sorted_ranking = sorted(stats.items(), key=lambda x: x[1], reverse=True)
|
||||
res = f"🏆 安检排行榜 ({target_month}) [v{VERSION}]\n----------------\n"
|
||||
if not sorted_ranking: return res + "暂无记录。"
|
||||
for i, (name, count) in enumerate(sorted_ranking):
|
||||
res += f"{i+1}. {format_name(name)}: {count} 票 ({data['members'][name]['shift']})\n"
|
||||
return res
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = sys.argv[1:]
|
||||
# print(f"DEBUG: Raw args received: {args}") # 再次调试打印
|
||||
|
||||
if not args: print(f"安检系统 {VERSION}"); sys.exit(0)
|
||||
|
||||
# 优先检查去除成员命令
|
||||
if len(args) >= 2 and args[0] == "去除":
|
||||
print(remove_member(args[1]))
|
||||
# 优先检查新增成员命令
|
||||
elif len(args) >= 2 and args[0] == "新增":
|
||||
print(add_member(" ".join(args[1:])))
|
||||
# 优先检查删除命令,直接从 args 列表解析
|
||||
elif len(args) >= 2 and args[0] == "删除" and args[1].startswith("#"):
|
||||
try:
|
||||
log_id = int(args[1][1:].strip())
|
||||
print(delete_entry(log_id))
|
||||
except ValueError: print("错误:删除指令格式为 '删除 #索引号'")
|
||||
# 检查排行榜命令
|
||||
elif "排行榜" in " ".join(args):
|
||||
m = re.search(r"排行榜\s*(.*)", " ".join(args))
|
||||
print(get_ranking(m.group(1).strip() if m.group(1) else None))
|
||||
# 检查违禁品查询命令
|
||||
elif len(args) >= 2 and args[0].startswith("违禁品"):
|
||||
val = " ".join(args[1:]).strip()
|
||||
print(query_logs("date", val) if re.match(r"^\d{2}-?\d{2}$", val) else query_logs("name", val))
|
||||
# 默认处理记录录入
|
||||
elif len(args) >= 2:
|
||||
print(record_entry(args[0], " ".join(args[1:])))
|
||||
else:
|
||||
print("错误:格式不识别")
|
||||
Reference in New Issue
Block a user