Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions src/services/browser_captcha_personal.py
Original file line number Diff line number Diff line change
Expand Up @@ -11036,6 +11036,33 @@ def get_last_fingerprint(self) -> Optional[Dict[str, Any]]:
return None
return dict(self._last_fingerprint)

async def get_current_user_agent(self) -> Optional[str]:
"""获取当前浏览器实例的真实 User-Agent。

按优先级依次尝试:
1) 最近一次打码指纹中的 user_agent
2) 初始化时构建的 runtime_surface_profile 中的 userAgent
3) 通过 CDP 从运行态浏览器实时获取
"""
# 1) 从已缓存的浏览器指纹获取
fingerprint = self.get_last_fingerprint()
if isinstance(fingerprint, dict) and fingerprint.get("user_agent"):
return fingerprint["user_agent"]

# 2) 从初始化时已构建的 runtime_surface_profile 获取(无需 CDP 调用)
runtime_profile = self._get_runtime_surface_profile()
if isinstance(runtime_profile, dict):
user_agent = runtime_profile.get("userAgent")
if user_agent:
return user_agent

# 3) 通过 CDP 直接从运行态浏览器获取
live_ua, _ = await self._get_live_browser_runtime_identity()
if live_ua:
return live_ua

return None

async def _clear_browser_cache(self):
"""清理浏览器全部缓存"""
if not self.browser:
Expand Down Expand Up @@ -13246,6 +13273,36 @@ def get_last_fingerprint(self) -> Optional[Dict[str, Any]]:
return fingerprint
return None

async def get_current_user_agent(self) -> Optional[str]:
"""获取当前浏览器实例的真实 User-Agent。

按优先级依次从 worker 中尝试:
1) 最近一次打码指纹中的 user_agent
2) 初始化时构建的 runtime_surface_profile 中的 userAgent
3) 通过 CDP 从运行态浏览器实时获取
"""
# 1) 从已缓存的浏览器指纹获取
fingerprint = self.get_last_fingerprint()
if isinstance(fingerprint, dict) and fingerprint.get("user_agent"):
return fingerprint["user_agent"]

# 2) 从已初始化的 worker 的 runtime_surface_profile 获取
for worker in self._workers:
runtime_profile = worker._get_runtime_surface_profile()
if isinstance(runtime_profile, dict):
user_agent = runtime_profile.get("userAgent")
if user_agent:
return user_agent

# 3) 通过 CDP 从有活跃浏览器的 worker 实时获取
for worker in self._workers:
if worker.browser:
live_ua, _ = await worker._get_live_browser_runtime_identity()
if live_ua:
return live_ua

return None

async def get_custom_token(
self,
website_url: str,
Expand Down
156 changes: 150 additions & 6 deletions src/services/flow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,23 @@ def __init__(self, proxy_manager, db=None):
)
self._remote_browser_prefill_last_sent: Dict[str, float] = {}

# Default "real browser" headers (macOS Chrome Desktop) to reduce upstream 4xx/5xx instability.
# Default "real browser" headers (macOS Chrome Desktop) to reduce upstream 4xx/5xx instability.
# These will be applied as defaults (won't override caller-provided headers).
# NOTE: Must match the UA platform (macOS) generated by _generate_user_agent.
# NOTE: Platform headers are auto-synced from real browser fingerprint in _make_request.
self._default_client_headers = {
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"macOS\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"x-browser-channel": "stable",
"x-browser-copyright": "Copyright 2026 Google LLC. All Rights reserved.",
"x-browser-copyright": "Copyright 2026 Google LLC. All Rights Reserved.",
"x-browser-year": "2026",
}
# 发车策略改为请求到就发
# 发车策略改为"请求到就发"
# 不在 flow2api 本地对提交做批次整形或排队,避免把同批请求打成阶梯。


def _generate_user_agent(self, account_id: str = None) -> str:
"""基于账号ID生成固定的 User-Agent

Expand Down Expand Up @@ -175,11 +176,28 @@ async def _make_request(
# 通用请求头 - 优先使用打码浏览器指纹中的 UA
fingerprint_user_agent = None
if isinstance(fingerprint, dict):
debug_logger.log_info(f"[FINGERPRINT] 当前请求链路绑定的浏览器指纹: {fingerprint}")
fingerprint_user_agent = fingerprint.get("user_agent")

elif getattr(config, "captcha_method", "") == "personal":
debug_logger.log_info("[FINGERPRINT] captcha_method=personal,尝试从内置浏览器获取真实浏览器指纹作为请求头")
try:
from .browser_captcha_personal import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.db)
if service:
browser_fingerprint = service.get_last_fingerprint()
if isinstance(browser_fingerprint, dict) and browser_fingerprint.get("user_agent"):
fingerprint = browser_fingerprint
fingerprint_user_agent = fingerprint["user_agent"]
else:
fingerprint_user_agent = await service.get_current_user_agent()
except Exception as e:
debug_logger.log_warning(f"[FINGERPRINT] 从内置浏览器获取指纹失败: {e}")
else:
debug_logger.log_info("[FINGERPRINT] 未检测到浏览器指纹上下文,使用基于账号ID生成的固定 User-Agent 作为请求头")
fingerprint_user_agent = self._generate_user_agent(account_id)
headers.update({
"Content-Type": "application/json",
"User-Agent": fingerprint_user_agent or self._generate_user_agent(account_id)
"User-Agent": fingerprint_user_agent
})

# 若存在打码浏览器指纹,覆盖关键客户端提示头,保证提交请求与打码时一致。
Expand Down Expand Up @@ -2428,6 +2446,132 @@ async def check_video_status(self, at: str, operations: List[Dict]) -> dict:
raise last_error
raise RuntimeError("视频状态查询失败")

async def get_media_url_redirect(
self,
st: str,
media_name: str,
) -> str:
"""通过 labs.google trpc 端点拿到真实视频 CDN URL(2026-05 新增)。

新版 schema 下,batchCheckAsyncVideoGenerationStatus 即使 SUCCESSFUL
也不再直接返回 fifeUrl,必须通过 labs.google 上的
/fx/api/trpc/media.getMediaUrlRedirect 端点(用 ST cookie 鉴权)拿到
302 重定向 Location。

Args:
st: Session Token (__Secure-next-auth.session-token cookie 值)
media_name: 媒体 ID,即 batchCheckAsync... 返回的 media[].name

Returns:
真实的视频 CDN URL(3xx Location 头里的值)。

Raises:
ValueError: media_name 或 st 为空。
RuntimeError: 网络错误、未返回 3xx,或缺少 Location 头。
"""
if not media_name:
raise ValueError("get_media_url_redirect: media_name 为空")
if not st:
raise ValueError("get_media_url_redirect: 缺少 ST token")

url = (
f"{self.labs_base_url}/trpc/media.getMediaUrlRedirect"
f"?name={media_name}"
)

# 真实浏览器抓包(FINDINGS/T2V_04_FINDING_the_redirect_result.md)的
# Accept / Range 头复刻;不能用通用 _make_request 因为它会自动跟随 302。
headers = {
"accept": (
"video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,"
"audio/*;q=0.6,*/*;q=0.5"
),
"accept-language": "en-US,en;q=0.9",
"accept-encoding": "identity",
"Range": "bytes=0-",
"referer": f"{self.labs_base_url}/fx/tools/flow",
"Sec-Fetch-Dest": "video",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Priority": "u=0",
"pragma": "no-cache",
"cache-control": "no-cache",
"cookie": f"__Secure-next-auth.session-token={st}",
}

# 通过代理(如有)发请求,复用 _make_request 的代理解析逻辑。
proxy_url = None
try:
if self.proxy_manager:
if hasattr(self.proxy_manager, "get_request_proxy_url"):
proxy_url = await self.proxy_manager.get_request_proxy_url()
else:
proxy_url = await self.proxy_manager.get_proxy_url()
except Exception:
proxy_url = None

# Debug: 记录请求
if config.debug_enabled:
debug_logger.log_request(
method="GET",
url=url,
headers=headers,
proxy=proxy_url
)

start_time = time.time()

try:
async with AsyncSession(trust_env=False) as session:
response = await session.get(
url,
headers=headers,
allow_redirects=False,
timeout=30,
proxy=proxy_url,
impersonate="chrome124",
)
except Exception as e:
debug_logger.log_error(f"[MEDIA REDIRECT] 请求失败: media={media_name}, error={e}")
raise RuntimeError(
f"getMediaUrlRedirect 请求失败 (media={media_name}): {e}"
) from e

duration_ms = (time.time() - start_time) * 1000
status_code = getattr(response, "status_code", 0)
resp_headers = getattr(response, "headers", {}) or {}

# Debug: 记录响应
if config.debug_enabled:
debug_logger.log_response(
status_code=status_code,
headers=dict(resp_headers) if resp_headers else {},
body=f"Location: {resp_headers.get('Location') or resp_headers.get('location', 'N/A')}",
duration_ms=duration_ms
)

location = None
try:
location = resp_headers.get("Location") or resp_headers.get("location")
except Exception:
try:
location = dict(resp_headers).get("Location")
except Exception:
location = None

if status_code not in (301, 302, 303, 307, 308) or not location:
debug_logger.log_error(
f"[MEDIA REDIRECT] 未返回重定向: status={status_code}, "
f"media={media_name}, location={location}"
)
raise RuntimeError(
f"getMediaUrlRedirect 未返回重定向 "
f"(status={status_code}, media={media_name})"
)

debug_logger.log_info(f"[MEDIA REDIRECT] 成功: media={media_name}, location={str(location)[:200]}")
return str(location)

# ========== 媒体删除 (使用ST) ==========

async def delete_media(self, st: str, media_names: List[str]):
Expand Down
24 changes: 23 additions & 1 deletion src/services/generation_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2022,9 +2022,31 @@ async def _poll_video_result(
# NOT the CAUS base64 mediaGenerationId from video_info
import re as _re
_uuid_match = _re.search(r'/video/([0-9a-f-]{36})', video_url or '')
video_media_id = _uuid_match.group(1) if _uuid_match else video_info.get("mediaGenerationId", "")
media_name = (
operation.get("mediaName")
or operation["operation"].get("name")
or ""
)
video_media_id = (
_uuid_match.group(1) if _uuid_match
else video_info.get("mediaGenerationId") or media_name
)
aspect_ratio = video_info.get("aspectRatio", "VIDEO_ASPECT_RATIO_LANDSCAPE")

# New schema: fifeUrl absent → fetch CDN URL via labs.google
# /trpc/media.getMediaUrlRedirect (returns 3xx with Location).
if not video_url and media_name:
try:
video_url = await self.flow_client.get_media_url_redirect(
token.st, media_name
)
except Exception as e:
error_msg = f"视频生成失败: 获取视频 URL 失败: {e}"
await self._fail_video_task(checked_operations, error_msg)
self._mark_generation_failed(generation_result, error_msg)
yield self._create_error_response(error_msg, status_code=502)
return

if not video_url:
error_msg = "视频生成失败: 视频URL为空"
await self._fail_video_task(checked_operations, error_msg)
Expand Down