version: 1 name: cpa_usage_restore description: 从备份包恢复 usage(公网管理接口,双重校验) inputs: - backup_id steps: - id: pre_backup action: shell.exec on_fail: stop with: command: | CPA_TOKEN=${env.cpa_management_token} CPA_BASE=https://cpa.pao.xx.kg/v0/management ts=$(date +%F_%H%M%S) out=/root/cliproxyapi/usage_export_${ts}.json curl -sS -H "Authorization: Bearer ${CPA_TOKEN}" ${CPA_BASE}/usage/export -o ${out} echo ${out} latest=$(ls -1t /root/cliproxyapi/usage_export_*.json | head -n 1) ts=$(date +%Y-%m-%d_%H%M%S) out=/root/backups/cpa-runtime-daily/hwsg_usage_realtime_${ts}.tar.gz meta=/root/backups/cpa-runtime-daily/hwsg_usage_realtime_${ts}.meta.txt mkdir -p /root/backups/cpa-runtime-daily tar -czf ${out} ${latest} sha=$(sha256sum ${out} | awk '{print $1}') size=$(du -h ${out} | awk '{print $1}') req=$(python3 -c "import json; data=json.load(open('${latest}','r',encoding='utf-8')); u=data.get('usage',{}); print(u.get('total_requests', data.get('total_requests','unknown')))" ) tok=$(python3 -c "import json; data=json.load(open('${latest}','r',encoding='utf-8')); u=data.get('usage',{}); print(u.get('total_tokens', data.get('total_tokens','unknown')))" ) { echo "time=$(date '+%F %T %z')" echo "source=${latest}" echo "backup=${out}" echo "sha256=${sha}" echo "size=${size}" echo "total_requests=${req}" echo "total_tokens=${tok}" } > ${meta} cat ${meta} - id: find_backup action: shell.exec on_fail: stop with: command: "ls -1 /root/backups/cpa-runtime-daily/${inputs.backup_id}.tar.gz" - id: extract_backup action: shell.exec on_fail: stop with: command: "mkdir -p /tmp/cpa-restore && tar -xzf /root/backups/cpa-runtime-daily/${inputs.backup_id}.tar.gz -C /tmp/cpa-restore" - id: import_usage action: shell.exec on_fail: stop with: command: | CPA_TOKEN=${env.cpa_management_token} CPA_BASE=https://cpa.pao.xx.kg/v0/management latest=$(ls -1 /tmp/cpa-restore/root/cliproxyapi/usage_export_*.json 2>/dev/null | head -n 1) if [ -z "$latest" ]; then latest=$(ls -1 /tmp/cpa-restore/root/cliproxyapi/stats_persistence-*.json 2>/dev/null | head -n 1) fi if [ -z "$latest" ]; then echo "no usage file found"; exit 1 fi python3 -c "import json; json.load(open('${latest}','r',encoding='utf-8')); print('json_ok')" resp=$(curl -sS -H "Authorization: Bearer ${CPA_TOKEN}" -H "Content-Type: application/json" --data @${latest} ${CPA_BASE}/usage/import) echo "$resp" python3 -c "import json,sys; r=json.loads(sys.argv[1]); import sys as _s; _s.exit(r.get('error')) if isinstance(r,dict) and r.get('error') else print('import_ok')" "$resp" - id: verify_now action: shell.exec on_fail: stop with: command: | CPA_TOKEN=${env.cpa_management_token} CPA_BASE=https://cpa.pao.xx.kg/v0/management curl -sS -H "Authorization: Bearer ${CPA_TOKEN}" ${CPA_BASE}/usage - id: verify_now_assert action: assert.json on_fail: stop with: source_step: verify_now required_paths: - "usage.total_requests" - "usage.total_tokens" - id: wait_10s action: sleep on_fail: continue with: ms: 10000 - id: verify_later action: shell.exec on_fail: stop with: command: | CPA_TOKEN=${env.cpa_management_token} CPA_BASE=https://cpa.pao.xx.kg/v0/management curl -sS -H "Authorization: Bearer ${CPA_TOKEN}" ${CPA_BASE}/usage - id: verify_later_assert action: assert.json on_fail: stop with: source_step: verify_later required_paths: - "usage.total_requests" - "usage.total_tokens"