Initial commit

This commit is contained in:
doujiang
2026-03-21 23:00:03 +08:00
commit 8f51c3d378
92 changed files with 28858 additions and 0 deletions

110
tests/test_cpa_upload.py Normal file
View File

@@ -0,0 +1,110 @@
from src.core.upload import cpa_upload
class FakeResponse:
def __init__(self, status_code=200, payload=None, text=""):
self.status_code = status_code
self._payload = payload
self.text = text
def json(self):
if self._payload is None:
raise ValueError("no json payload")
return self._payload
class FakeMime:
def __init__(self):
self.parts = []
def addpart(self, **kwargs):
self.parts.append(kwargs)
def test_upload_to_cpa_accepts_management_root_url(monkeypatch):
calls = []
def fake_post(url, **kwargs):
calls.append({"url": url, "kwargs": kwargs})
return FakeResponse(status_code=201)
monkeypatch.setattr(cpa_upload, "CurlMime", FakeMime)
monkeypatch.setattr(cpa_upload.cffi_requests, "post", fake_post)
success, message = cpa_upload.upload_to_cpa(
{"email": "tester@example.com"},
api_url="https://cpa.example.com/v0/management",
api_token="token-123",
)
assert success is True
assert message == "上传成功"
assert calls[0]["url"] == "https://cpa.example.com/v0/management/auth-files"
def test_upload_to_cpa_does_not_double_append_full_endpoint(monkeypatch):
calls = []
def fake_post(url, **kwargs):
calls.append({"url": url, "kwargs": kwargs})
return FakeResponse(status_code=201)
monkeypatch.setattr(cpa_upload, "CurlMime", FakeMime)
monkeypatch.setattr(cpa_upload.cffi_requests, "post", fake_post)
success, _ = cpa_upload.upload_to_cpa(
{"email": "tester@example.com"},
api_url="https://cpa.example.com/v0/management/auth-files",
api_token="token-123",
)
assert success is True
assert calls[0]["url"] == "https://cpa.example.com/v0/management/auth-files"
def test_upload_to_cpa_falls_back_to_raw_json_when_multipart_returns_404(monkeypatch):
calls = []
responses = [
FakeResponse(status_code=404, text="404 page not found"),
FakeResponse(status_code=200, payload={"status": "ok"}),
]
def fake_post(url, **kwargs):
calls.append({"url": url, "kwargs": kwargs})
return responses.pop(0)
monkeypatch.setattr(cpa_upload, "CurlMime", FakeMime)
monkeypatch.setattr(cpa_upload.cffi_requests, "post", fake_post)
success, message = cpa_upload.upload_to_cpa(
{"email": "tester@example.com", "type": "codex"},
api_url="https://cpa.example.com",
api_token="token-123",
)
assert success is True
assert message == "上传成功"
assert calls[0]["kwargs"]["multipart"] is not None
assert calls[1]["url"] == "https://cpa.example.com/v0/management/auth-files?name=tester%40example.com.json"
assert calls[1]["kwargs"]["headers"]["Content-Type"] == "application/json"
assert calls[1]["kwargs"]["data"].startswith(b"{")
def test_test_cpa_connection_uses_get_and_normalized_url(monkeypatch):
calls = []
def fake_get(url, **kwargs):
calls.append({"url": url, "kwargs": kwargs})
return FakeResponse(status_code=200, payload={"files": []})
monkeypatch.setattr(cpa_upload.cffi_requests, "get", fake_get)
success, message = cpa_upload.test_cpa_connection(
"https://cpa.example.com/v0/management",
"token-123",
)
assert success is True
assert message == "CPA 连接测试成功"
assert calls[0]["url"] == "https://cpa.example.com/v0/management/auth-files"
assert calls[0]["kwargs"]["headers"]["Authorization"] == "Bearer token-123"

View File

@@ -0,0 +1,143 @@
from src.services.duck_mail import DuckMailService
class FakeResponse:
def __init__(self, status_code=200, payload=None, text=""):
self.status_code = status_code
self._payload = payload
self.text = text
self.headers = {}
def json(self):
if self._payload is None:
raise ValueError("no json payload")
return self._payload
class FakeHTTPClient:
def __init__(self, responses):
self.responses = list(responses)
self.calls = []
def request(self, method, url, **kwargs):
self.calls.append({
"method": method,
"url": url,
"kwargs": kwargs,
})
if not self.responses:
raise AssertionError(f"未准备响应: {method} {url}")
return self.responses.pop(0)
def test_create_email_creates_account_and_fetches_token():
service = DuckMailService({
"base_url": "https://api.duckmail.test",
"default_domain": "duckmail.sbs",
"api_key": "dk_test_key",
"password_length": 10,
})
fake_client = FakeHTTPClient([
FakeResponse(
status_code=201,
payload={
"id": "account-1",
"address": "tester@duckmail.sbs",
"authType": "email",
},
),
FakeResponse(
payload={
"id": "account-1",
"token": "token-123",
}
),
])
service.http_client = fake_client
email_info = service.create_email()
assert email_info["email"] == "tester@duckmail.sbs"
assert email_info["service_id"] == "account-1"
assert email_info["account_id"] == "account-1"
assert email_info["token"] == "token-123"
create_call = fake_client.calls[0]
assert create_call["method"] == "POST"
assert create_call["url"] == "https://api.duckmail.test/accounts"
assert create_call["kwargs"]["json"]["address"].endswith("@duckmail.sbs")
assert len(create_call["kwargs"]["json"]["password"]) == 10
assert create_call["kwargs"]["headers"]["Authorization"] == "Bearer dk_test_key"
token_call = fake_client.calls[1]
assert token_call["method"] == "POST"
assert token_call["url"] == "https://api.duckmail.test/token"
assert token_call["kwargs"]["json"] == {
"address": "tester@duckmail.sbs",
"password": email_info["password"],
}
def test_get_verification_code_reads_message_detail_and_extracts_code():
service = DuckMailService({
"base_url": "https://api.duckmail.test",
"default_domain": "duckmail.sbs",
})
fake_client = FakeHTTPClient([
FakeResponse(
status_code=201,
payload={
"id": "account-1",
"address": "tester@duckmail.sbs",
"authType": "email",
},
),
FakeResponse(
payload={
"id": "account-1",
"token": "token-123",
}
),
FakeResponse(
payload={
"hydra:member": [
{
"id": "msg-1",
"from": {
"name": "OpenAI",
"address": "noreply@openai.com",
},
"subject": "Your verification code",
"createdAt": "2026-03-19T10:00:00Z",
}
]
}
),
FakeResponse(
payload={
"id": "msg-1",
"text": "Your OpenAI verification code is 654321",
"html": [],
}
),
])
service.http_client = fake_client
email_info = service.create_email()
code = service.get_verification_code(
email=email_info["email"],
email_id=email_info["service_id"],
timeout=1,
)
assert code == "654321"
messages_call = fake_client.calls[2]
assert messages_call["method"] == "GET"
assert messages_call["url"] == "https://api.duckmail.test/messages"
assert messages_call["kwargs"]["headers"]["Authorization"] == "Bearer token-123"
detail_call = fake_client.calls[3]
assert detail_call["method"] == "GET"
assert detail_call["url"] == "https://api.duckmail.test/messages/msg-1"
assert detail_call["kwargs"]["headers"]["Authorization"] == "Bearer token-123"

View File

@@ -0,0 +1,94 @@
import asyncio
from contextlib import contextmanager
from pathlib import Path
from src.config.constants import EmailServiceType
from src.database.models import Base, EmailService
from src.database.session import DatabaseSessionManager
from src.services.base import EmailServiceFactory
from src.web.routes import email as email_routes
from src.web.routes import registration as registration_routes
class DummySettings:
custom_domain_base_url = ""
custom_domain_api_key = None
def test_duck_mail_service_registered():
service_type = EmailServiceType("duck_mail")
service_class = EmailServiceFactory.get_service_class(service_type)
assert service_class is not None
assert service_class.__name__ == "DuckMailService"
def test_email_service_types_include_duck_mail():
result = asyncio.run(email_routes.get_service_types())
duckmail_type = next(item for item in result["types"] if item["value"] == "duck_mail")
assert duckmail_type["label"] == "DuckMail"
field_names = [field["name"] for field in duckmail_type["config_fields"]]
assert "base_url" in field_names
assert "default_domain" in field_names
assert "api_key" in field_names
def test_filter_sensitive_config_marks_duckmail_api_key():
filtered = email_routes.filter_sensitive_config({
"base_url": "https://api.duckmail.test",
"api_key": "dk_test_key",
"default_domain": "duckmail.sbs",
})
assert filtered["base_url"] == "https://api.duckmail.test"
assert filtered["default_domain"] == "duckmail.sbs"
assert filtered["has_api_key"] is True
assert "api_key" not in filtered
def test_registration_available_services_include_duck_mail(monkeypatch):
runtime_dir = Path("tests_runtime")
runtime_dir.mkdir(exist_ok=True)
db_path = runtime_dir / "duckmail_routes.db"
if db_path.exists():
db_path.unlink()
manager = DatabaseSessionManager(f"sqlite:///{db_path}")
Base.metadata.create_all(bind=manager.engine)
with manager.session_scope() as session:
session.add(
EmailService(
service_type="duck_mail",
name="DuckMail 主服务",
config={
"base_url": "https://api.duckmail.test",
"default_domain": "duckmail.sbs",
"api_key": "dk_test_key",
},
enabled=True,
priority=0,
)
)
@contextmanager
def fake_get_db():
session = manager.SessionLocal()
try:
yield session
finally:
session.close()
monkeypatch.setattr(registration_routes, "get_db", fake_get_db)
import src.config.settings as settings_module
monkeypatch.setattr(settings_module, "get_settings", lambda: DummySettings())
result = asyncio.run(registration_routes.get_available_email_services())
assert result["duck_mail"]["available"] is True
assert result["duck_mail"]["count"] == 1
assert result["duck_mail"]["services"][0]["name"] == "DuckMail 主服务"
assert result["duck_mail"]["services"][0]["type"] == "duck_mail"
assert result["duck_mail"]["services"][0]["default_domain"] == "duckmail.sbs"

View File

@@ -0,0 +1,296 @@
import base64
import json
from src.config.constants import EmailServiceType, OPENAI_API_ENDPOINTS, OPENAI_PAGE_TYPES
from src.core.http_client import OpenAIHTTPClient
from src.core.openai.oauth import OAuthStart
from src.core.register import RegistrationEngine
from src.services.base import BaseEmailService
class DummyResponse:
def __init__(self, status_code=200, payload=None, text="", headers=None, on_return=None):
self.status_code = status_code
self._payload = payload
self.text = text
self.headers = headers or {}
self.on_return = on_return
def json(self):
if self._payload is None:
raise ValueError("no json payload")
return self._payload
class QueueSession:
def __init__(self, steps):
self.steps = list(steps)
self.calls = []
self.cookies = {}
def get(self, url, **kwargs):
return self._request("GET", url, **kwargs)
def post(self, url, **kwargs):
return self._request("POST", url, **kwargs)
def request(self, method, url, **kwargs):
return self._request(method.upper(), url, **kwargs)
def close(self):
return None
def _request(self, method, url, **kwargs):
self.calls.append({
"method": method,
"url": url,
"kwargs": kwargs,
})
if not self.steps:
raise AssertionError(f"unexpected request: {method} {url}")
expected_method, expected_url, response = self.steps.pop(0)
assert method == expected_method
assert url == expected_url
if callable(response):
response = response(self)
if response.on_return:
response.on_return(self)
return response
class FakeEmailService(BaseEmailService):
def __init__(self, codes):
super().__init__(EmailServiceType.TEMPMAIL)
self.codes = list(codes)
self.otp_requests = []
def create_email(self, config=None):
return {
"email": "tester@example.com",
"service_id": "mailbox-1",
}
def get_verification_code(self, email, email_id=None, timeout=120, pattern=r"(?<!\d)(\d{6})(?!\d)", otp_sent_at=None):
self.otp_requests.append({
"email": email,
"email_id": email_id,
"otp_sent_at": otp_sent_at,
})
if not self.codes:
raise AssertionError("no verification code queued")
return self.codes.pop(0)
def list_emails(self, **kwargs):
return []
def delete_email(self, email_id):
return True
def check_health(self):
return True
class FakeOAuthManager:
def __init__(self):
self.start_calls = 0
self.callback_calls = []
def start_oauth(self):
self.start_calls += 1
return OAuthStart(
auth_url=f"https://auth.example.test/flow/{self.start_calls}",
state=f"state-{self.start_calls}",
code_verifier=f"verifier-{self.start_calls}",
redirect_uri="http://localhost:1455/auth/callback",
)
def handle_callback(self, callback_url, expected_state, code_verifier):
self.callback_calls.append({
"callback_url": callback_url,
"expected_state": expected_state,
"code_verifier": code_verifier,
})
return {
"account_id": "acct-1",
"access_token": "access-1",
"refresh_token": "refresh-1",
"id_token": "id-1",
}
class FakeOpenAIClient:
def __init__(self, sessions, sentinel_tokens):
self._sessions = list(sessions)
self._session_index = 0
self._session = self._sessions[0]
self._sentinel_tokens = list(sentinel_tokens)
@property
def session(self):
return self._session
def check_ip_location(self):
return True, "US"
def check_sentinel(self, did):
if not self._sentinel_tokens:
raise AssertionError("no sentinel token queued")
return self._sentinel_tokens.pop(0)
def close(self):
if self._session_index + 1 < len(self._sessions):
self._session_index += 1
self._session = self._sessions[self._session_index]
def _workspace_cookie(workspace_id):
payload = base64.urlsafe_b64encode(
json.dumps({"workspaces": [{"id": workspace_id}]}).encode("utf-8")
).decode("ascii").rstrip("=")
return f"{payload}.sig"
def _response_with_did(did):
return DummyResponse(
status_code=200,
text="ok",
on_return=lambda session: session.cookies.__setitem__("oai-did", did),
)
def _response_with_login_cookies(workspace_id="ws-1", session_token="session-1"):
def setter(session):
session.cookies["oai-client-auth-session"] = _workspace_cookie(workspace_id)
session.cookies["__Secure-next-auth.session-token"] = session_token
return DummyResponse(status_code=200, payload={}, on_return=setter)
def test_check_sentinel_sends_non_empty_pow(monkeypatch):
session = QueueSession([
("POST", OPENAI_API_ENDPOINTS["sentinel"], DummyResponse(payload={"token": "sentinel-token"})),
])
client = OpenAIHTTPClient()
client._session = session
monkeypatch.setattr(
"src.core.http_client.build_sentinel_pow_token",
lambda user_agent: "gAAAAACpow-token",
)
token = client.check_sentinel("device-1")
assert token == "sentinel-token"
body = json.loads(session.calls[0]["kwargs"]["data"])
assert body["id"] == "device-1"
assert body["flow"] == "authorize_continue"
assert body["p"] == "gAAAAACpow-token"
def test_run_registers_then_relogs_to_fetch_token():
session_one = QueueSession([
("GET", "https://auth.example.test/flow/1", _response_with_did("did-1")),
(
"POST",
OPENAI_API_ENDPOINTS["signup"],
DummyResponse(payload={"page": {"type": OPENAI_PAGE_TYPES["PASSWORD_REGISTRATION"]}}),
),
("POST", OPENAI_API_ENDPOINTS["register"], DummyResponse(payload={})),
("GET", OPENAI_API_ENDPOINTS["send_otp"], DummyResponse(payload={})),
("POST", OPENAI_API_ENDPOINTS["validate_otp"], DummyResponse(payload={})),
("POST", OPENAI_API_ENDPOINTS["create_account"], DummyResponse(payload={})),
])
session_two = QueueSession([
("GET", "https://auth.example.test/flow/2", _response_with_did("did-2")),
(
"POST",
OPENAI_API_ENDPOINTS["signup"],
DummyResponse(payload={"page": {"type": OPENAI_PAGE_TYPES["LOGIN_PASSWORD"]}}),
),
(
"POST",
OPENAI_API_ENDPOINTS["password_verify"],
DummyResponse(payload={"page": {"type": OPENAI_PAGE_TYPES["EMAIL_OTP_VERIFICATION"]}}),
),
("POST", OPENAI_API_ENDPOINTS["validate_otp"], _response_with_login_cookies()),
(
"POST",
OPENAI_API_ENDPOINTS["select_workspace"],
DummyResponse(payload={"continue_url": "https://auth.example.test/continue"}),
),
(
"GET",
"https://auth.example.test/continue",
DummyResponse(
status_code=302,
headers={"Location": "http://localhost:1455/auth/callback?code=code-2&state=state-2"},
),
),
])
email_service = FakeEmailService(["123456", "654321"])
engine = RegistrationEngine(email_service)
fake_oauth = FakeOAuthManager()
engine.http_client = FakeOpenAIClient([session_one, session_two], ["sentinel-1", "sentinel-2"])
engine.oauth_manager = fake_oauth
result = engine.run()
assert result.success is True
assert result.source == "register"
assert result.workspace_id == "ws-1"
assert result.session_token == "session-1"
assert fake_oauth.start_calls == 2
assert len(email_service.otp_requests) == 2
assert all(item["otp_sent_at"] is not None for item in email_service.otp_requests)
assert sum(1 for call in session_one.calls if call["url"] == OPENAI_API_ENDPOINTS["send_otp"]) == 1
assert sum(1 for call in session_two.calls if call["url"] == OPENAI_API_ENDPOINTS["send_otp"]) == 0
assert sum(1 for call in session_one.calls if call["url"] == OPENAI_API_ENDPOINTS["select_workspace"]) == 0
assert sum(1 for call in session_two.calls if call["url"] == OPENAI_API_ENDPOINTS["select_workspace"]) == 1
relogin_start_body = json.loads(session_two.calls[1]["kwargs"]["data"])
assert relogin_start_body["screen_hint"] == "login"
assert relogin_start_body["username"]["value"] == "tester@example.com"
password_verify_body = json.loads(session_two.calls[2]["kwargs"]["data"])
assert password_verify_body == {"password": result.password}
assert result.metadata["token_acquired_via_relogin"] is True
def test_existing_account_login_uses_auto_sent_otp_without_manual_send():
session = QueueSession([
("GET", "https://auth.example.test/flow/1", _response_with_did("did-1")),
(
"POST",
OPENAI_API_ENDPOINTS["signup"],
DummyResponse(payload={"page": {"type": OPENAI_PAGE_TYPES["EMAIL_OTP_VERIFICATION"]}}),
),
("POST", OPENAI_API_ENDPOINTS["validate_otp"], _response_with_login_cookies("ws-existing", "session-existing")),
(
"POST",
OPENAI_API_ENDPOINTS["select_workspace"],
DummyResponse(payload={"continue_url": "https://auth.example.test/continue-existing"}),
),
(
"GET",
"https://auth.example.test/continue-existing",
DummyResponse(
status_code=302,
headers={"Location": "http://localhost:1455/auth/callback?code=code-1&state=state-1"},
),
),
])
email_service = FakeEmailService(["246810"])
engine = RegistrationEngine(email_service)
fake_oauth = FakeOAuthManager()
engine.http_client = FakeOpenAIClient([session], ["sentinel-1"])
engine.oauth_manager = fake_oauth
result = engine.run()
assert result.success is True
assert result.source == "login"
assert fake_oauth.start_calls == 1
assert sum(1 for call in session.calls if call["url"] == OPENAI_API_ENDPOINTS["send_otp"]) == 0
assert len(email_service.otp_requests) == 1
assert email_service.otp_requests[0]["otp_sent_at"] is not None
assert result.metadata["token_acquired_via_relogin"] is False

View File

@@ -0,0 +1,28 @@
from pathlib import Path
import importlib
web_app = importlib.import_module("src.web.app")
def test_static_asset_version_is_non_empty_string():
version = web_app._build_static_asset_version(web_app.STATIC_DIR)
assert isinstance(version, str)
assert version
assert version.isdigit()
def test_email_services_template_uses_versioned_static_assets():
template = Path("templates/email_services.html").read_text(encoding="utf-8")
assert '/static/css/style.css?v={{ static_version }}' in template
assert '/static/js/utils.js?v={{ static_version }}' in template
assert '/static/js/email_services.js?v={{ static_version }}' in template
def test_index_template_uses_versioned_static_assets():
template = Path("templates/index.html").read_text(encoding="utf-8")
assert '/static/css/style.css?v={{ static_version }}' in template
assert '/static/js/utils.js?v={{ static_version }}' in template
assert '/static/js/app.js?v={{ static_version }}' in template