Files
toworker/smsreceiver-worker/worker/migrate_sqlite_to_d1.py
2026-03-23 12:58:51 +08:00

57 lines
1.6 KiB
Python

#!/usr/bin/env python3
import os
import json
import sqlite3
import time
import requests
D1_DATABASE_ID = os.getenv('D1_DATABASE_ID', '')
CF_API_TOKEN = os.getenv('CF_API_TOKEN', '')
CF_ACCOUNT_ID = os.getenv('CF_ACCOUNT_ID', '')
SQLITE_PATH = os.getenv('SQLITE_PATH', 'sms_receiver_go.db')
BATCH_SIZE = int(os.getenv('BATCH_SIZE', '50'))
if not D1_DATABASE_ID or not CF_API_TOKEN or not CF_ACCOUNT_ID:
raise SystemExit('Missing env: D1_DATABASE_ID / CF_API_TOKEN / CF_ACCOUNT_ID')
api = f"https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT_ID}/d1/database/{D1_DATABASE_ID}/query"
headers = {"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}
conn = sqlite3.connect(SQLITE_PATH)
conn.row_factory = sqlite3.Row
TABLES = ['sms_messages', 'receive_logs']
def post_query(sql, params=None):
payload = {"sql": sql}
if params is not None:
payload["params"] = params
r = requests.post(api, headers=headers, data=json.dumps(payload))
r.raise_for_status()
def migrate_table(table):
cur = conn.cursor()
cur.execute(f"SELECT * FROM {table}")
rows = cur.fetchall()
print(f"{table}: {len(rows)} rows")
batch = 0
for row in rows:
cols = row.keys()
placeholders = ','.join(['?'] * len(cols))
sql = f"INSERT INTO {table} ({','.join(cols)}) VALUES ({placeholders})"
params = [row[c] for c in cols]
post_query(sql, params)
batch += 1
if batch >= BATCH_SIZE:
time.sleep(0.2)
batch = 0
for table in TABLES:
migrate_table(table)
print('done')