From 12b8cb259d4b9f6ddece47f94b499769d23d099a Mon Sep 17 00:00:00 2001 From: Debug Assistant Date: Tue, 17 Feb 2026 20:56:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E4=B8=AAbug?= =?UTF-8?q?=E5=B9=B6=E5=A2=9E=E5=BC=BA=E8=B0=83=E8=AF=95=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复的bug: 1. email_service.py: 验证码提取失败 - 从邮件Subject提取验证码(格式: XXX-XXX xAI confirmation code) 2. nsfw_service.py: enable_unhinged cookie错误 - sso-rw被错误设置为sso的值 3. grok.py: URL路径错误 - 分离base_url和site_url,修复重复/sign-up 4. api_solver.py: Turnstile JS语法错误 - return语句在全局作用域非法,包装为IIFE 增强功能: - 添加详细调试日志,便于定位问题 - 改进Turnstile solver等待逻辑,检测API可用性 - 添加更多错误处理和状态输出 --- api_solver.py | 148 +++++++++++++++++++++++++++++------- g/email_service.py | 46 +++++++++-- g/nsfw_service.py | 3 +- grok.py | 185 ++++++++++++++++++++++++++++++++++++--------- 4 files changed, 311 insertions(+), 71 deletions(-) diff --git a/api_solver.py b/api_solver.py index 16c408d..ad04902 100644 --- a/api_solver.py +++ b/api_solver.py @@ -449,17 +449,69 @@ class TurnstileAPIServer: return False async def _inject_captcha_directly(self, page, websiteKey: str, action: str = '', cdata: str = '', index: int = 0): - """Inject CAPTCHA directly into the target website""" + """Inject CAPTCHA directly into the target website or use existing one""" script = f""" - // Remove any existing turnstile widgets first + (function() {{ + // Check if there's already a turnstile widget on the page with matching sitekey + const existingWidgets = document.querySelectorAll('.cf-turnstile, [data-sitekey]'); + console.log('Turnstile Debug: Found ' + existingWidgets.length + ' potential widgets'); + let useExisting = false; + let foundSitekey = null; + + for (const widget of existingWidgets) {{ + const widgetSitekey = widget.getAttribute('data-sitekey'); + console.log('Turnstile Debug: Checking widget with sitekey:', widgetSitekey); + if (widgetSitekey === '{websiteKey}') {{ + useExisting = true; + foundSitekey = widgetSitekey; + console.log('Turnstile Debug: Found existing turnstile widget with matching sitekey'); + break; + }} + }} + + // Also check for iframe-based turnstile + const turnstileIframes = document.querySelectorAll('iframe[src*="turnstile"], iframe[src*="challenges.cloudflare"]'); + console.log('Turnstile Debug: Found ' + turnstileIframes.length + ' turnstile iframes'); + + // Create hidden input for token if not exists + let tokenInput = document.querySelector('input[name="cf-turnstile-response"]'); + if (!tokenInput) {{ + tokenInput = document.createElement('input'); + tokenInput.type = 'hidden'; + tokenInput.name = 'cf-turnstile-response'; + document.body.appendChild(tokenInput); + }} + + // Setup token capture callback + window._turnstileTokenCallback = function(token) {{ + console.log('Turnstile token captured:', token); + tokenInput.value = token; + }}; + + // If existing widget found, try to hook into it + if (useExisting) {{ + console.log('Using existing turnstile widget'); + // Try to override the existing callback or add our own + const originalCallback = window.turnstileCallback; + window.turnstileCallback = function(token) {{ + window._turnstileTokenCallback(token); + if (originalCallback) originalCallback(token); + }}; + // Also try to set up a mutation observer to capture token changes + return 'existing'; + }} + + // Remove any existing turnstile widgets that don't match our sitekey document.querySelectorAll('.cf-turnstile').forEach(el => el.remove()); - document.querySelectorAll('[data-sitekey]').forEach(el => el.remove()); + document.querySelectorAll('[data-sitekey]').forEach(el => {{ + if (el.getAttribute('data-sitekey') !== '{websiteKey}') el.remove(); + }}); // Create turnstile widget directly on the page const captchaDiv = document.createElement('div'); captchaDiv.className = 'cf-turnstile'; captchaDiv.setAttribute('data-sitekey', '{websiteKey}'); - captchaDiv.setAttribute('data-callback', 'onTurnstileCallback'); + captchaDiv.setAttribute('data-callback', '_turnstileTokenCallback'); {f'captchaDiv.setAttribute("data-action", "{action}");' if action else ''} {f'captchaDiv.setAttribute("data-cdata", "{cdata}");' if cdata else ''} captchaDiv.style.position = 'fixed'; @@ -493,15 +545,7 @@ class TurnstileAPIServer: {f'cdata: "{cdata}",' if cdata else ''} callback: function(token) {{ console.log('Turnstile solved with token:', token); - // Create hidden input for token - let tokenInput = document.querySelector('input[name="cf-turnstile-response"]'); - if (!tokenInput) {{ - tokenInput = document.createElement('input'); - tokenInput.type = 'hidden'; - tokenInput.name = 'cf-turnstile-response'; - document.body.appendChild(tokenInput); - }} - tokenInput.value = token; + window._turnstileTokenCallback(token); }}, 'error-callback': function(error) {{ console.log('Turnstile error:', error); @@ -531,14 +575,7 @@ class TurnstileAPIServer: {f'cdata: "{cdata}",' if cdata else ''} callback: function(token) {{ console.log('Turnstile solved with token:', token); - let tokenInput = document.querySelector('input[name="cf-turnstile-response"]'); - if (!tokenInput) {{ - tokenInput = document.createElement('input'); - tokenInput.type = 'hidden'; - tokenInput.name = 'cf-turnstile-response'; - document.body.appendChild(tokenInput); - }} - tokenInput.value = token; + window._turnstileTokenCallback(token); }}, 'error-callback': function(error) {{ console.log('Turnstile error:', error); @@ -556,11 +593,18 @@ class TurnstileAPIServer: window.onTurnstileCallback = function(token) {{ console.log('Global turnstile callback executed:', token); }}; + + return 'injected'; + }})(); """ - await page.evaluate(script) + result = await page.evaluate(script) if self.debug: - logger.debug(f"Browser {index}: Injected CAPTCHA directly into website with sitekey: {websiteKey}") + if result == 'existing': + logger.debug(f"Browser {index}: Detected existing turnstile widget with matching sitekey") + else: + logger.debug(f"Browser {index}: Injected new CAPTCHA widget with sitekey: {websiteKey}") + return result async def _solve_turnstile(self, task_id: str, url: str, sitekey: str, action: Optional[str] = None, cdata: Optional[str] = None): """Solve the Turnstile challenge.""" @@ -717,15 +761,62 @@ class TurnstileAPIServer: if self.debug: logger.debug(f"Browser {index}: Loading real website directly: {url}") - await page.goto(url, wait_until='domcontentloaded', timeout=30000) + await page.goto(url, wait_until='networkidle', timeout=30000) await self._unblock_rendering(page) - + + # Wait for turnstile to load (it may be lazy loaded) + if self.debug: + logger.debug(f"Browser {index}: Waiting for turnstile to appear...") + + # Try to wait for turnstile widget to appear + turnstile_found = False + for wait_attempt in range(15): + widget_count = await page.locator('.cf-turnstile, [data-sitekey]').count() + iframe_count = await page.locator('iframe[src*="turnstile"], iframe[src*="challenges.cloudflare"]').count() + + # Also check if turnstile API is available + turnstile_ready = await page.evaluate("""() => { + return typeof window.turnstile !== 'undefined'; + }""") + + if self.debug and wait_attempt % 3 == 0: + logger.debug(f"Browser {index}: Wait {wait_attempt + 1}s - {widget_count} widgets, {iframe_count} iframes, turnstile API: {turnstile_ready}") + + if widget_count > 0 or iframe_count > 0 or turnstile_ready: + if self.debug: + logger.debug(f"Browser {index}: Turnstile ready after {wait_attempt + 1}s (widgets: {widget_count}, iframes: {iframe_count}, API: {turnstile_ready})") + turnstile_found = True + break + await asyncio.sleep(1) + + if not turnstile_found: + if self.debug: + logger.debug(f"Browser {index}: Turnstile not found naturally, will inject our own") + # Сразу инъектируем виджет Turnstile на целевой сайт if self.debug: logger.debug(f"Browser {index}: Injecting Turnstile widget directly into target site") - await self._inject_captcha_directly(page, sitekey, action or '', cdata or '', index) + # Debug: Check page state before injection + if self.debug: + widget_count = await page.locator('.cf-turnstile, [data-sitekey]').count() + iframe_count = await page.locator('iframe[src*="turnstile"], iframe[src*="challenges.cloudflare"]').count() + logger.debug(f"Browser {index}: Before injection - {widget_count} widgets, {iframe_count} iframes") + # Check for sitekey attribute + try: + sitekey_elem = await page.locator('[data-sitekey]').get_attribute('data-sitekey') + logger.debug(f"Browser {index}: Found sitekey: {sitekey_elem}") + except: + pass + + inject_result = await self._inject_captcha_directly(page, sitekey, action or '', cdata or '', index) + + if self.debug: + if inject_result == 'existing': + logger.debug(f"Browser {index}: Using existing turnstile widget on page") + else: + logger.debug(f"Browser {index}: Injected new turnstile widget") # Ждем время для загрузки и рендеринга виджета await asyncio.sleep(3) @@ -748,6 +839,11 @@ class TurnstileAPIServer: if count == 0: if self.debug and attempt % 5 == 0: logger.debug(f"Browser {index}: No token elements found on attempt {attempt + 1}") + # Also check if turnstile widget exists on page + if self.debug and attempt == 0: + widget_count = await page.locator('.cf-turnstile, [data-sitekey]').count() + iframe_count = await page.locator('iframe[src*="turnstile"], iframe[src*="challenges.cloudflare"]').count() + logger.debug(f"Browser {index}: Page has {widget_count} turnstile widgets and {iframe_count} iframes") elif count == 1: # Если только один элемент, проверяем его токен try: diff --git a/g/email_service.py b/g/email_service.py index 71c72fd..795b7f4 100644 --- a/g/email_service.py +++ b/g/email_service.py @@ -32,24 +32,56 @@ class EmailService: print(f"[-] 创建邮箱失败: {e}") return None, None - def fetch_verification_code(self, email, max_attempts=30): + def fetch_verification_code(self, email, max_attempts=30, debug=False): """轮询获取验证码 GET /api/emails?mailbox=xxx""" - for _ in range(max_attempts): + if debug: + print(f"[DEBUG] 开始轮询获取验证码,邮箱: {email}") + for i in range(max_attempts): try: + if debug: + print(f"[DEBUG] 第 {i+1}/{max_attempts} 次轮询...") res = requests.get( f"{self.base_url}/api/emails", params={"mailbox": email}, headers=self.headers, timeout=10 ) + if debug: + print(f"[DEBUG] 响应状态: {res.status_code}") if res.status_code == 200: emails = res.json() - if emails and emails[0].get("verification_code"): - code = emails[0]["verification_code"] - return code.replace("-", "") - except: - pass + if debug: + print(f"[DEBUG] 响应数据: {emails}") + if emails and len(emails) > 0: + # 检查邮件字段 + first_email = emails[0] + if debug: + print(f"[DEBUG] 第一封邮件字段: {first_email.keys()}") + # 可能的验证码字段名 + code = first_email.get("verification_code") or first_email.get("code") or first_email.get("verify_code") + if code: + if debug: + print(f"[DEBUG] 获取到验证码: {code}") + return code.replace("-", "") + else: + # 从 subject 提取验证码(格式: "XXX-XXX xAI confirmation code") + subject = first_email.get("subject", "") + if debug: + print(f"[DEBUG] Subject: {subject}") + # 提取前 7 位(含横杠)或匹配 XXX-XXX 模式 + import re + match = re.search(r'^([A-Z0-9]{3}-[A-Z0-9]{3})', subject) + if match: + code = match.group(1) + if debug: + print(f"[DEBUG] 从 Subject 提取验证码: {code}") + return code.replace("-", "") + except Exception as e: + if debug: + print(f"[DEBUG] 轮询异常: {e}") time.sleep(1) + if debug: + print(f"[DEBUG] 轮询结束,未获取到验证码") return None def delete_email(self, address): diff --git a/g/nsfw_service.py b/g/nsfw_service.py index b161502..86eb1eb 100644 --- a/g/nsfw_service.py +++ b/g/nsfw_service.py @@ -120,6 +120,7 @@ class NsfwSettingsService: def enable_unhinged( self, sso: str, + sso_rw: str = "", impersonate: str = "chrome120", user_agent: Optional[str] = None, timeout: int = 30, @@ -139,7 +140,7 @@ class NsfwSettingsService: "user-agent": user_agent or DEFAULT_USER_AGENT, "x-grpc-web": "1", "x-user-agent": "connect-es/2.1.1", - "cookie": f"sso={sso}; sso-rw={sso}" + "cookie": f"sso={sso}; sso-rw={sso_rw or sso}" } payload = bytes([0x08, 0x01, 0x10, 0x01]) diff --git a/grok.py b/grok.py index be94a1d..c214b6b 100644 --- a/grok.py +++ b/grok.py @@ -1,6 +1,8 @@ import os, json, random, string, time, re, struct import threading import concurrent.futures +import argparse +import traceback from urllib.parse import urljoin, urlparse from curl_cffi import requests from bs4 import BeautifulSoup @@ -8,7 +10,10 @@ from bs4 import BeautifulSoup from g import EmailService, TurnstileService, UserAgreementService, NsfwSettingsService # 基础配置 -site_url = "https://accounts.x.ai" +# 基础 URL(用于 API 请求和 Solver) +base_url = "https://accounts.x.ai" +# 注册页面 URL(用于初始化扫描) +site_url = f"{base_url}/sign-up" DEFAULT_IMPERSONATE = "chrome120" CHROME_PROFILES = [ {"impersonate": "chrome110", "version": "110.0.0.0", "brand": "chrome"}, @@ -74,42 +79,57 @@ def encode_grpc_message_verify(email, code): payload = p1 + p2 return b'\x00' + struct.pack('>I', len(payload)) + payload -def send_email_code_grpc(session, email): - url = f"{site_url}/auth_mgmt.AuthManagement/CreateEmailValidationCode" +def send_email_code_grpc(session, email, debug_mode=False): + url = f"{base_url}/auth_mgmt.AuthManagement/CreateEmailValidationCode" data = encode_grpc_message(1, email) - headers = {"content-type": "application/grpc-web+proto", "x-grpc-web": "1", "x-user-agent": "connect-es/2.1.1", "origin": site_url, "referer": f"{site_url}/sign-up?redirect=grok-com"} + headers = {"content-type": "application/grpc-web+proto", "x-grpc-web": "1", "x-user-agent": "connect-es/2.1.1", "origin": base_url, "referer": f"{site_url}?redirect=grok-com"} try: - # print(f"[debug] {email} 正在发送验证码请求...") + if debug_mode: + print(f"[DEBUG] [{email}] 正在发送验证码请求...") res = session.post(url, data=data, headers=headers, timeout=15) - # print(f"[debug] {email} 请求结束,状态码: {res.status_code}") + if debug_mode: + print(f"[DEBUG] [{email}] 发送验证码响应: status={res.status_code}, len={len(res.content)}") return res.status_code == 200 except Exception as e: print(f"[-] {email} 发送验证码异常: {e}") return False -def verify_email_code_grpc(session, email, code): - url = f"{site_url}/auth_mgmt.AuthManagement/VerifyEmailValidationCode" +def verify_email_code_grpc(session, email, code, debug_mode=False): + url = f"{base_url}/auth_mgmt.AuthManagement/VerifyEmailValidationCode" data = encode_grpc_message_verify(email, code) - headers = {"content-type": "application/grpc-web+proto", "x-grpc-web": "1", "x-user-agent": "connect-es/2.1.1", "origin": site_url, "referer": f"{site_url}/sign-up?redirect=grok-com"} + headers = {"content-type": "application/grpc-web+proto", "x-grpc-web": "1", "x-user-agent": "connect-es/2.1.1", "origin": base_url, "referer": f"{site_url}?redirect=grok-com"} try: + if debug_mode: + print(f"[DEBUG] [{email}] 正在验证验证码: {code}") res = session.post(url, data=data, headers=headers, timeout=15) - # print(f"[debug] {email} 验证响应状态: {res.status_code}, 内容长度: {len(res.content)}") + if debug_mode: + print(f"[DEBUG] [{email}] 验证验证码响应: status={res.status_code}, len={len(res.content)}") return res.status_code == 200 except Exception as e: print(f"[-] {email} 验证验证码异常: {e}") return False -def register_single_thread(): +def register_single_thread(debug_mode=False, single_run=False): + thread_id = threading.current_thread().name # 错峰启动,防止瞬时并发过高 - time.sleep(random.uniform(0, 5)) + sleep_time = random.uniform(0, 2) + if debug_mode: + print(f"[DEBUG] [{thread_id}] 线程启动,错峰等待 {sleep_time:.2f}s") + time.sleep(sleep_time) try: + if debug_mode: + print(f"[DEBUG] [{thread_id}] 正在初始化服务...") email_service = EmailService() turnstile_service = TurnstileService() user_agreement_service = UserAgreementService() nsfw_service = NsfwSettingsService() + if debug_mode: + print(f"[DEBUG] [{thread_id}] 服务初始化成功") except Exception as e: - print(f"[-] 服务初始化失败: {e}") + print(f"[-] [{thread_id}] 服务初始化失败: {e}") + if debug_mode: + traceback.print_exc() return # 修正:直接从 config 获取 @@ -130,19 +150,26 @@ def register_single_thread(): impersonate_fingerprint, account_user_agent = get_random_chrome_profile() with requests.Session(impersonate=impersonate_fingerprint, proxies=PROXIES) as session: # 预热连接 - try: session.get(site_url, timeout=10) + try: session.get(base_url, timeout=10) except: pass password = generate_random_string() try: + if debug_mode: + print(f"[DEBUG] [{thread_id}] 正在创建临时邮箱...") jwt, email = email_service.create_email() current_email = email + if debug_mode: + print(f"[DEBUG] [{thread_id}] 邮箱创建成功: {email}") except Exception as e: - print(f"[-] 邮箱服务抛出异常: {e}") + print(f"[-] [{thread_id}] 邮箱服务抛出异常: {e}") + if debug_mode: + traceback.print_exc() jwt, email, current_email = None, None, None if not email: + print(f"[-] [{thread_id}] 邮箱创建失败,5秒后重试...") time.sleep(5); continue if stop_event.is_set(): @@ -150,35 +177,60 @@ def register_single_thread(): current_email = None return - print(f"[*] 开始注册: {email}") + print(f"[*] [{thread_id}] 开始注册: {email}") # Step 1: 发送验证码 - if not send_email_code_grpc(session, email): + if debug_mode: + print(f"[DEBUG] [{thread_id}] Step 1: 发送验证码...") + if not send_email_code_grpc(session, email, debug_mode): + print(f"[-] [{thread_id}] 发送验证码失败,删除邮箱: {email}") email_service.delete_email(email) current_email = None time.sleep(5); continue + if debug_mode: + print(f"[DEBUG] [{thread_id}] 验证码发送成功") # Step 2: 获取验证码 - verify_code = email_service.fetch_verification_code(email) + if debug_mode: + print(f"[DEBUG] [{thread_id}] Step 2: 获取验证码...") + verify_code = email_service.fetch_verification_code(email, debug=debug_mode) + if debug_mode: + print(f"[DEBUG] [{thread_id}] 获取到验证码: {verify_code}") if not verify_code: + print(f"[-] [{thread_id}] 获取验证码失败,删除邮箱: {email}") email_service.delete_email(email) current_email = None continue # Step 3: 验证验证码 - if not verify_email_code_grpc(session, email, verify_code): + if debug_mode: + print(f"[DEBUG] [{thread_id}] Step 3: 验证验证码...") + if not verify_email_code_grpc(session, email, verify_code, debug_mode): + print(f"[-] [{thread_id}] 验证验证码失败,删除邮箱: {email}") email_service.delete_email(email) current_email = None continue + if debug_mode: + print(f"[DEBUG] [{thread_id}] 验证码验证成功") # Step 4: 注册重试循环 + if debug_mode: + print(f"[DEBUG] [{thread_id}] Step 4: 开始注册流程...") for attempt in range(3): + if debug_mode: + print(f"[DEBUG] [{thread_id}] 注册尝试 {attempt + 1}/3") if stop_event.is_set(): email_service.delete_email(email) current_email = None return + if debug_mode: + print(f"[DEBUG] [{thread_id}] 创建 Turnstile 任务...") task_id = turnstile_service.create_task(site_url, config["site_key"]) + if debug_mode: + print(f"[DEBUG] [{thread_id}] Task ID: {task_id}") token = turnstile_service.get_response(task_id) + if debug_mode: + print(f"[DEBUG] [{thread_id}] 获取到 Token: {'成功' if token and token != 'CAPTCHA_FAIL' else '失败'} (len={len(token) if token else 0})") if not token or token == "CAPTCHA_FAIL": continue @@ -198,11 +250,16 @@ def register_single_thread(): }] with post_lock: + if debug_mode: + print(f"[DEBUG] [{thread_id}] 发送注册请求...") res = session.post(f"{site_url}/sign-up", json=payload, headers=headers) + if debug_mode: + print(f"[DEBUG] [{thread_id}] 注册响应: status={res.status_code}, len={len(res.text)}") if res.status_code == 200: match = re.search(r'(https://[^" \s]+set-cookie\?q=[^:" \s]+)1:', res.text) if not match: + print(f"[-] [{thread_id}] 未找到 verify_url,响应: {res.text[:200]}...") email_service.delete_email(email) current_email = None break @@ -211,11 +268,16 @@ def register_single_thread(): session.get(verify_url, allow_redirects=True) sso = session.cookies.get("sso") sso_rw = session.cookies.get("sso-rw") + if debug_mode: + print(f"[DEBUG] [{thread_id}] SSO: {sso[:20] if sso else None}..., sso-rw: {'存在' if sso_rw else '无'}") if not sso: + print(f"[-] [{thread_id}] 未获取到 SSO cookie") email_service.delete_email(email) current_email = None break + if debug_mode: + print(f"[DEBUG] [{thread_id}] 接受用户协议...") tos_result = user_agreement_service.accept_tos_version( sso=sso, sso_rw=sso_rw or "", @@ -223,11 +285,16 @@ def register_single_thread(): user_agent=account_user_agent, ) tos_hex = tos_result.get("hex_reply") or "" + if debug_mode: + print(f"[DEBUG] [{thread_id}] TOS 结果: ok={tos_result.get('ok')}, hex={tos_hex[:20] if tos_hex else None}...") if not tos_result.get("ok") or not tos_hex: + print(f"[-] [{thread_id}] TOS 接受失败") email_service.delete_email(email) current_email = None break + if debug_mode: + print(f"[DEBUG] [{thread_id}] 启用 NSFW...") nsfw_result = nsfw_service.enable_nsfw( sso=sso, sso_rw=sso_rw or "", @@ -235,14 +302,21 @@ def register_single_thread(): user_agent=account_user_agent, ) nsfw_hex = nsfw_result.get("hex_reply") or "" + if debug_mode: + print(f"[DEBUG] [{thread_id}] NSFW 结果: ok={nsfw_result.get('ok')}, hex={nsfw_hex[:20] if nsfw_hex else None}...") if not nsfw_result.get("ok") or not nsfw_hex: + print(f"[-] [{thread_id}] NSFW 启用失败") email_service.delete_email(email) current_email = None break # 立即进行二次验证 (enable_unhinged) - unhinged_result = nsfw_service.enable_unhinged(sso) + if debug_mode: + print(f"[DEBUG] [{thread_id}] 启用 Unhinged...") + unhinged_result = nsfw_service.enable_unhinged(sso, sso_rw or "") unhinged_ok = unhinged_result.get("ok", False) + if debug_mode: + print(f"[DEBUG] [{thread_id}] Unhinged 结果: ok={unhinged_ok}") with file_lock: global success_count @@ -279,36 +353,59 @@ def register_single_thread(): time.sleep(5) except Exception as e: - print(f"[-] 异常: {str(e)[:50]}") + print(f"[-] [{thread_id}] 异常: {str(e)[:100]}") + if debug_mode: + traceback.print_exc() # 异常时确保删除邮箱 if current_email: try: email_service.delete_email(current_email) - except: - pass + except Exception as del_err: + if debug_mode: + print(f"[DEBUG] [{thread_id}] 删除邮箱失败: {del_err}") current_email = None + if single_run: + raise # debug模式下单次运行就抛出 time.sleep(5) def main(): + parser = argparse.ArgumentParser(description='Grok 注册机') + parser.add_argument('-t', '--threads', type=int, default=1, help='并发数 (默认1)') + parser.add_argument('-n', '--number', type=int, default=1, help='注册数量 (默认1)') + parser.add_argument('--debug', action='store_true', help='调试模式:显示详细错误堆栈') + parser.add_argument('--single', action='store_true', help='单线程单次运行模式:出错立即抛出') + parser.add_argument('--no-input', action='store_true', help='非交互模式,使用默认参数') + args = parser.parse_args() + print("=" * 60 + "\nGrok 注册机\n" + "=" * 60) # 1. 扫描参数 print("[*] 正在初始化...") - start_url = f"{site_url}/sign-up" + start_url = site_url + print(f"[DEBUG] 请求 URL: {start_url}") with requests.Session(impersonate=DEFAULT_IMPERSONATE) as s: try: - html = s.get(start_url).text + print("[DEBUG] 正在获取页面...") + html = s.get(start_url, timeout=30).text + print(f"[DEBUG] 页面获取成功,长度: {len(html)}") # Key key_match = re.search(r'sitekey":"(0x4[a-zA-Z0-9_-]+)"', html) - if key_match: config["site_key"] = key_match.group(1) + if key_match: + config["site_key"] = key_match.group(1) + print(f"[DEBUG] Site Key: {config['site_key']}") # Tree tree_match = re.search(r'next-router-state-tree":"([^"]+)"', html) - if tree_match: config["state_tree"] = tree_match.group(1) + if tree_match: + config["state_tree"] = tree_match.group(1) + print(f"[DEBUG] State Tree 已获取") # Action ID + print("[DEBUG] 正在解析 JS 文件...") soup = BeautifulSoup(html, 'html.parser') js_urls = [urljoin(start_url, script['src']) for script in soup.find_all('script', src=True) if '_next/static' in script['src']] + print(f"[DEBUG] 找到 {len(js_urls)} 个 JS 文件") for js_url in js_urls: - js_content = s.get(js_url).text + print(f"[DEBUG] 正在请求 JS: {js_url}") + js_content = s.get(js_url, timeout=30).text match = re.search(r'7f[a-fA-F0-9]{40}', js_content) if match: config["action_id"] = match.group(0) @@ -316,6 +413,8 @@ def main(): break except Exception as e: print(f"[-] 初始化扫描失败: {e}") + if args.debug: + traceback.print_exc() return if not config["action_id"]: @@ -323,13 +422,18 @@ def main(): return # 2. 启动 - try: - t = int(input("\n并发数 (默认8): ").strip() or 8) - except: t = 8 + if args.no_input or args.threads or args.number: + t = args.threads + total = args.number + else: + # 交互式输入(保留原来的逻辑作为fallback) + try: + t = int(input("\n并发数 (默认8): ").strip() or 8) + except: t = 8 - try: - total = int(input("注册数量 (默认100): ").strip() or 100) - except: total = 100 + try: + total = int(input("注册数量 (默认100): ").strip() or 100) + except: total = 100 global target_count, output_file target_count = max(1, total) @@ -341,9 +445,16 @@ def main(): print(f"[*] 启动 {t} 个线程,目标 {target_count} 个") print(f"[*] 输出: {output_file}") - with concurrent.futures.ThreadPoolExecutor(max_workers=t) as executor: - futures = [executor.submit(register_single_thread) for _ in range(t)] - concurrent.futures.wait(futures) + print(f"[*] 调试模式: {'开启' if args.debug else '关闭'}") + + if args.single: + # 单线程单次运行模式 + print("[*] 单线程单次运行模式") + register_single_thread(debug_mode=args.debug, single_run=True) + else: + with concurrent.futures.ThreadPoolExecutor(max_workers=t) as executor: + futures = [executor.submit(register_single_thread, args.debug, args.single) for _ in range(t)] + concurrent.futures.wait(futures) if __name__ == "__main__": main() \ No newline at end of file