From 0c008f4a2d93eb9a8b2831693ffe624fdb289100 Mon Sep 17 00:00:00 2001 From: hehe <87370940+wonendieee@users.noreply.github.com> Date: Wed, 29 Apr 2026 15:44:09 +0800 Subject: [PATCH 1/2] Add QCC 5-year company report generator --- QCC_REPORT_USAGE.md | 71 ++++++++++++++++ qcc_report_generator.py | 180 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+) create mode 100644 QCC_REPORT_USAGE.md create mode 100644 qcc_report_generator.py diff --git a/QCC_REPORT_USAGE.md b/QCC_REPORT_USAGE.md new file mode 100644 index 0000000..42c9537 --- /dev/null +++ b/QCC_REPORT_USAGE.md @@ -0,0 +1,71 @@ +# 企查查近五年企业情报采集脚本 + +这个仓库新增了 `qcc_report_generator.py`,用于输入一个或多个公司名称,调用你有权限的企查查接口,并输出统一的 Markdown 报告,供后续大模型分析。 + +## 1) 准备 + +1. 安装依赖: + +```bash +pip install requests +``` + +2. 配置 Token(推荐环境变量方式): + +```bash +export QCC_BEARER_TOKEN='你的Bearer Token' +``` + +## 2) 运行示例 + +```bash +python qcc_report_generator.py "腾讯科技(深圳)有限公司" "阿里巴巴(中国)有限公司" --output-dir reports +``` + +执行后会在 `reports/` 下生成: + +- `腾讯科技_深圳_有限公司_5y_report.md` +- `阿里巴巴_中国_有限公司_5y_report.md` + +## 3) 参数说明 + +- `companies`: 一个或多个公司名称(必填) +- `--token`: 企查查 Bearer Token(不传则读取 `QCC_BEARER_TOKEN`) +- `--output-dir`: 报告输出目录,默认 `reports` +- `--lookback-years`: 回溯年限,默认 `5` +- `--timeout`: 单次请求超时秒数,默认 `30` +- `--endpoints`: 只拉取指定模块,可选: + - `qcc-company` + - `qcc-risk` + - `qcc-ipr` + - `qcc-operation` + - `qcc-executive` + +示例(只拉风险与知识产权): + +```bash +python qcc_report_generator.py "京东科技控股股份有限公司" --endpoints qcc-risk qcc-ipr +``` + +## 4) 报告结构 + +每个公司生成一个 Markdown,包含: + +1. 基本元信息(生成时间、时间窗口起始) +2. 各模块原始记录数与近五年记录数对比表 +3. 各模块筛选后的 JSON 内容(代码块形式) + +## 5) 二次加工建议(给大模型) + +后续可把 Markdown 作为上下文,让模型执行: + +- 风险事件时间线提取 +- 法务/经营异常聚类 +- 高管与对外投资关系梳理 +- 同行业公司横向对比 + +## 6) 注意事项 + +- 不同接口返回字段可能不一致,脚本使用“自动识别日期字段”的泛化方式进行近五年筛选。 +- 如果某些字段没有日期,会尽可能保留,避免误删关键信息。 +- 生产环境建议:增加重试、分页、限流、断点续跑、日志落库。 diff --git a/qcc_report_generator.py b/qcc_report_generator.py new file mode 100644 index 0000000..6fb7d96 --- /dev/null +++ b/qcc_report_generator.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +"""Generate 5-year company intelligence markdown reports from QCC MCP endpoints.""" + +from __future__ import annotations + +import argparse +import datetime as dt +import json +import os +import pathlib +import re +from dataclasses import dataclass +from typing import Any, Iterable + +import requests + +DEFAULT_SERVERS = { + "qcc-company": "https://agent.qcc.com/mcp/company/stream", + "qcc-risk": "https://agent.qcc.com/mcp/risk/stream", + "qcc-ipr": "https://agent.qcc.com/mcp/ipr/stream", + "qcc-operation": "https://agent.qcc.com/mcp/operation/stream", + "qcc-executive": "https://agent.qcc.com/mcp/executive/stream", +} + +DATE_PATTERNS = [ + re.compile(r"(19|20)\d{2}[-/.](0[1-9]|1[0-2])[-/.](0[1-9]|[12]\d|3[01])"), + re.compile(r"(19|20)\d{2}年(0?[1-9]|1[0-2])月(0?[1-9]|[12]\d|3[01])日?"), + re.compile(r"(19|20)\d{2}[-/.](0[1-9]|1[0-2])"), +] + + +@dataclass +class EndpointResult: + endpoint: str + total_records: int + filtered_records: int + payload: Any + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Generate company report markdown from QCC MCP APIs") + parser.add_argument("companies", nargs="+", help="One or more company names") + parser.add_argument("--token", default=os.getenv("QCC_BEARER_TOKEN"), help="QCC bearer token") + parser.add_argument("--output-dir", default="reports", help="Directory for markdown reports") + parser.add_argument("--lookback-years", type=int, default=5, help="How many years to keep") + parser.add_argument("--timeout", type=int, default=30, help="Request timeout seconds") + parser.add_argument( + "--endpoints", + nargs="*", + default=list(DEFAULT_SERVERS.keys()), + choices=list(DEFAULT_SERVERS.keys()), + help="Subset of endpoints to query", + ) + return parser.parse_args() + + +def extract_date(value: Any) -> dt.date | None: + if value is None: + return None + text = str(value) + for p in DATE_PATTERNS: + m = p.search(text) + if not m: + continue + raw = m.group(0) + raw = raw.replace("年", "-").replace("月", "-").replace("日", "") + raw = raw.replace("/", "-").replace(".", "-") + if len(raw) == 7: + raw = f"{raw}-01" + try: + return dt.date.fromisoformat(raw) + except ValueError: + continue + return None + + +def within_years(item: Any, cutoff: dt.date) -> bool: + if isinstance(item, dict): + dates = [extract_date(v) for v in item.values()] + dates = [d for d in dates if d is not None] + if dates: + return max(dates) >= cutoff + return any(within_years(v, cutoff) for v in item.values()) + if isinstance(item, list): + return any(within_years(v, cutoff) for v in item) + parsed = extract_date(item) + return parsed is None or parsed >= cutoff + + +def filter_payload(payload: Any, cutoff: dt.date) -> tuple[Any, int, int]: + if isinstance(payload, list): + filtered = [x for x in payload if within_years(x, cutoff)] + return filtered, len(payload), len(filtered) + if isinstance(payload, dict): + filtered_obj = {} + total = 0 + kept = 0 + for k, v in payload.items(): + if isinstance(v, list): + new_v = [x for x in v if within_years(x, cutoff)] + total += len(v) + kept += len(new_v) + filtered_obj[k] = new_v + else: + filtered_obj[k] = v + return filtered_obj, total, kept + return payload, 1, 1 + + +def query_endpoint(server_url: str, token: str, company: str, timeout: int) -> Any: + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + body = { + "companyName": company, + "keyword": company, + } + resp = requests.post(server_url, headers=headers, json=body, timeout=timeout) + resp.raise_for_status() + try: + return resp.json() + except Exception: + return {"raw": resp.text} + + +def to_markdown(company: str, results: Iterable[EndpointResult], cutoff: dt.date) -> str: + lines = [ + f"# {company} 近五年企业情报汇总", + "", + f"- 生成时间(UTC):{dt.datetime.utcnow().isoformat(timespec='seconds')}Z", + f"- 时间窗口起始:{cutoff.isoformat()}", + "", + "## 数据覆盖概览", + "", + "| 模块 | 原始记录数 | 近五年记录数 |", + "|---|---:|---:|", + ] + for r in results: + lines.append(f"| {r.endpoint} | {r.total_records} | {r.filtered_records} |") + + for r in results: + lines.extend([ + "", + f"## {r.endpoint}", + "", + "```json", + json.dumps(r.payload, ensure_ascii=False, indent=2), + "```", + ]) + return "\n".join(lines) + + +def main() -> int: + args = parse_args() + if not args.token: + raise SystemExit("Missing token. Provide --token or set QCC_BEARER_TOKEN.") + + output_dir = pathlib.Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + cutoff = dt.date.today() - dt.timedelta(days=365 * args.lookback_years) + + for company in args.companies: + results: list[EndpointResult] = [] + for endpoint in args.endpoints: + payload = query_endpoint(DEFAULT_SERVERS[endpoint], args.token, company, args.timeout) + filtered_payload, total, kept = filter_payload(payload, cutoff) + results.append(EndpointResult(endpoint, total, kept, filtered_payload)) + + md = to_markdown(company, results, cutoff) + filename = re.sub(r"[^\w\-\u4e00-\u9fff]+", "_", company).strip("_") + path = output_dir / f"{filename}_5y_report.md" + path.write_text(md, encoding="utf-8") + print(f"Generated: {path}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) From 8091be82c440e367f2455b9eeccc937af03cdab6 Mon Sep 17 00:00:00 2001 From: hehe <87370940+wonendieee@users.noreply.github.com> Date: Wed, 29 Apr 2026 15:56:39 +0800 Subject: [PATCH 2/2] Restructure QCC report output to required sections --- QCC_REPORT_USAGE.md | 73 ++++++++---------- qcc_report_generator.py | 165 +++++++++++++++++++++++++++++++--------- 2 files changed, 161 insertions(+), 77 deletions(-) diff --git a/QCC_REPORT_USAGE.md b/QCC_REPORT_USAGE.md index 42c9537..45d7ec4 100644 --- a/QCC_REPORT_USAGE.md +++ b/QCC_REPORT_USAGE.md @@ -1,18 +1,23 @@ -# 企查查近五年企业情报采集脚本 +# 企查查近五年企业情报采集脚本(结构化输出版) -这个仓库新增了 `qcc_report_generator.py`,用于输入一个或多个公司名称,调用你有权限的企查查接口,并输出统一的 Markdown 报告,供后续大模型分析。 +脚本:`qcc_report_generator.py` -## 1) 准备 +目标:输入一个或多个公司名称,调用你当前有权限的企查查接口,输出**固定结构** Markdown,尽可能覆盖以下内容: -1. 安装依赖: +- 股权关联信息 +- 基本信息(法人、出资人、注册资本、员工、成立日期、工商信息、对外投资、股东信息、社保) +- 集团风险提示 +- 图:股权穿透图、股权结构 +- 法律诉讼情况(全部) +- 经营风险(全部) +- 经营信息(资质证书、纳税人资质、招投标、行政许可) -```bash -pip install requests -``` +如果接口未返回或当前权限无法获取,会明确标注:`无法获取(接口未返回或权限不足)`。 -2. 配置 Token(推荐环境变量方式): +## 1) 安装与配置 ```bash +pip install requests export QCC_BEARER_TOKEN='你的Bearer Token' ``` @@ -22,50 +27,38 @@ export QCC_BEARER_TOKEN='你的Bearer Token' python qcc_report_generator.py "腾讯科技(深圳)有限公司" "阿里巴巴(中国)有限公司" --output-dir reports ``` -执行后会在 `reports/` 下生成: - -- `腾讯科技_深圳_有限公司_5y_report.md` -- `阿里巴巴_中国_有限公司_5y_report.md` +## 3) 可选参数 -## 3) 参数说明 - -- `companies`: 一个或多个公司名称(必填) -- `--token`: 企查查 Bearer Token(不传则读取 `QCC_BEARER_TOKEN`) -- `--output-dir`: 报告输出目录,默认 `reports` -- `--lookback-years`: 回溯年限,默认 `5` -- `--timeout`: 单次请求超时秒数,默认 `30` -- `--endpoints`: 只拉取指定模块,可选: +- `--lookback-years`:近几年,默认 `5` +- `--endpoints`:默认全部,可选子集: - `qcc-company` - `qcc-risk` - `qcc-ipr` - `qcc-operation` - `qcc-executive` +- `--timeout`:请求超时秒数,默认 `30` -示例(只拉风险与知识产权): +只拉法律和经营风险相关模块示例: ```bash -python qcc_report_generator.py "京东科技控股股份有限公司" --endpoints qcc-risk qcc-ipr +python qcc_report_generator.py "某某公司" --endpoints qcc-risk qcc-operation ``` -## 4) 报告结构 - -每个公司生成一个 Markdown,包含: - -1. 基本元信息(生成时间、时间窗口起始) -2. 各模块原始记录数与近五年记录数对比表 -3. 各模块筛选后的 JSON 内容(代码块形式) - -## 5) 二次加工建议(给大模型) +## 4) 输出说明 -后续可把 Markdown 作为上下文,让模型执行: +每个公司生成一个 `*_5y_report.md`,结构固定为: -- 风险事件时间线提取 -- 法务/经营异常聚类 -- 高管与对外投资关系梳理 -- 同行业公司横向对比 +1. 数据覆盖概览(原始记录数/近五年记录数) +2. 股权关联信息 +3. 基本信息(九项) +4. 集团风险提示 +5. 图谱信息(股权穿透图、股权结构图) +6. 法律诉讼情况(全部) +7. 经营风险(全部) +8. 经营信息(资质证书、纳税人资质、招投标、行政许可) +9. 原始接口数据(过滤后 JSON) -## 6) 注意事项 +## 5) 已知限制 -- 不同接口返回字段可能不一致,脚本使用“自动识别日期字段”的泛化方式进行近五年筛选。 -- 如果某些字段没有日期,会尽可能保留,避免误删关键信息。 -- 生产环境建议:增加重试、分页、限流、断点续跑、日志落库。 +- “股权穿透图/股权结构图”通常需要图谱专用接口或图片 URL,当前这 5 个接口不一定提供;脚本会保留该栏目并标注无法获取。 +- 不同企业、不同账号权限下字段差异很大;脚本通过关键词尽可能提取,但建议保留原始 JSON 做复核。 diff --git a/qcc_report_generator.py b/qcc_report_generator.py index 6fb7d96..b031282 100644 --- a/qcc_report_generator.py +++ b/qcc_report_generator.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Generate 5-year company intelligence markdown reports from QCC MCP endpoints.""" +"""Generate structured 5-year company intelligence markdown reports from QCC MCP endpoints.""" from __future__ import annotations @@ -28,6 +28,8 @@ re.compile(r"(19|20)\d{2}[-/.](0[1-9]|1[0-2])"), ] +UNAVAILABLE = "无法获取(接口未返回或权限不足)" + @dataclass class EndpointResult: @@ -38,7 +40,7 @@ class EndpointResult: def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Generate company report markdown from QCC MCP APIs") + parser = argparse.ArgumentParser(description="Generate structured company report markdown from QCC MCP APIs") parser.add_argument("companies", nargs="+", help="One or more company names") parser.add_argument("--token", default=os.getenv("QCC_BEARER_TOKEN"), help="QCC bearer token") parser.add_argument("--output-dir", default="reports", help="Directory for markdown reports") @@ -58,12 +60,11 @@ def extract_date(value: Any) -> dt.date | None: if value is None: return None text = str(value) - for p in DATE_PATTERNS: - m = p.search(text) - if not m: + for pattern in DATE_PATTERNS: + match = pattern.search(text) + if not match: continue - raw = m.group(0) - raw = raw.replace("年", "-").replace("月", "-").replace("日", "") + raw = match.group(0).replace("年", "-").replace("月", "-").replace("日", "") raw = raw.replace("/", "-").replace(".", "-") if len(raw) == 7: raw = f"{raw}-01" @@ -76,7 +77,7 @@ def extract_date(value: Any) -> dt.date | None: def within_years(item: Any, cutoff: dt.date) -> bool: if isinstance(item, dict): - dates = [extract_date(v) for v in item.values()] + dates = [extract_date(v) for v in item.values() if not isinstance(v, (dict, list))] dates = [d for d in dates if d is not None] if dates: return max(dates) >= cutoff @@ -92,39 +93,104 @@ def filter_payload(payload: Any, cutoff: dt.date) -> tuple[Any, int, int]: filtered = [x for x in payload if within_years(x, cutoff)] return filtered, len(payload), len(filtered) if isinstance(payload, dict): - filtered_obj = {} + filtered_obj: dict[str, Any] = {} total = 0 kept = 0 - for k, v in payload.items(): - if isinstance(v, list): - new_v = [x for x in v if within_years(x, cutoff)] - total += len(v) - kept += len(new_v) - filtered_obj[k] = new_v + for key, value in payload.items(): + if isinstance(value, list): + new_value = [x for x in value if within_years(x, cutoff)] + total += len(value) + kept += len(new_value) + filtered_obj[key] = new_value else: - filtered_obj[k] = v + filtered_obj[key] = value return filtered_obj, total, kept return payload, 1, 1 def query_endpoint(server_url: str, token: str, company: str, timeout: int) -> Any: - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "application/json", - } - body = { - "companyName": company, - "keyword": company, - } - resp = requests.post(server_url, headers=headers, json=body, timeout=timeout) - resp.raise_for_status() + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + body = {"companyName": company, "keyword": company} + response = requests.post(server_url, headers=headers, json=body, timeout=timeout) + response.raise_for_status() try: - return resp.json() + return response.json() except Exception: - return {"raw": resp.text} + return {"raw": response.text} + + +def flatten_records(data: Any, path: str = "") -> list[tuple[str, Any]]: + out: list[tuple[str, Any]] = [] + if isinstance(data, dict): + for key, value in data.items(): + p = f"{path}.{key}" if path else key + out.extend(flatten_records(value, p)) + elif isinstance(data, list): + for idx, value in enumerate(data): + p = f"{path}[{idx}]" + out.extend(flatten_records(value, p)) + else: + out.append((path, data)) + return out + + +def find_values_by_keywords(data: Any, keywords: list[str], limit: int = 10) -> list[str]: + result: list[str] = [] + lower_keywords = [k.lower() for k in keywords] + for path, value in flatten_records(data): + path_low = path.lower() + if any(k in path_low for k in lower_keywords): + text = str(value).strip() + if text: + result.append(f"`{path}`: {text}") + if len(result) >= limit: + break + return result + + +def pick_or_unavailable(values: list[str]) -> list[str]: + return values if values else [UNAVAILABLE] + + +def extract_sections(results_map: dict[str, Any]) -> dict[str, list[str]]: + merged = {"all": results_map} + + sections = { + "法人": pick_or_unavailable(find_values_by_keywords(merged, ["法人", "法定代表", "legal", "representative"])), + "出资人": pick_or_unavailable(find_values_by_keywords(merged, ["出资", "investor", "认缴", "实缴"])), + "注册资本": pick_or_unavailable(find_values_by_keywords(merged, ["注册资本", "capital", "regcap"])), + "员工": pick_or_unavailable(find_values_by_keywords(merged, ["员工", "人员", "staff", "employee"])), + "成立日期": pick_or_unavailable(find_values_by_keywords(merged, ["成立", "成立日期", "establish", "found"])), + "工商信息": pick_or_unavailable(find_values_by_keywords(merged, ["工商", "登记", "信用代码", "注册号", "business", "license"])), + "对外投资情况": pick_or_unavailable(find_values_by_keywords(merged, ["对外投资", "invest", "投资企业", "被投资"])), + "股东信息": pick_or_unavailable(find_values_by_keywords(merged, ["股东", "shareholder", "持股", "股权"])), + "社保情况": pick_or_unavailable(find_values_by_keywords(merged, ["社保", "保险", "social", "security"])), + "股权关联信息": pick_or_unavailable(find_values_by_keywords(merged, ["股权", "控制", "穿透", "beneficial", "ownership"])), + "集团风险提示": pick_or_unavailable(find_values_by_keywords(merged, ["风险", "异常", "处罚", "失信", "执行", "冻结"])), + "股权穿透图": [UNAVAILABLE], + "股权结构图": [UNAVAILABLE], + "法律诉讼情况(全部)": pick_or_unavailable(find_values_by_keywords(merged, ["诉讼", "法院", "判决", "开庭", "案件", "legalcase"], limit=30)), + "经营风险(全部)": pick_or_unavailable(find_values_by_keywords(merged, ["经营风险", "行政处罚", "欠税", "环保", "risk"], limit=30)), + "资质证书": pick_or_unavailable(find_values_by_keywords(merged, ["资质", "证书", "cert"])), + "纳税人资质": pick_or_unavailable(find_values_by_keywords(merged, ["纳税", "一般纳税人", "taxpayer", "tax"])), + "招投标信息": pick_or_unavailable(find_values_by_keywords(merged, ["招投标", "中标", "投标", "bid", "tender"])), + "行政许可": pick_or_unavailable(find_values_by_keywords(merged, ["行政许可", "许可", "permit", "license"])), + } + return sections + + +def render_bullet_section(lines: list[str], title: str, items: list[str]) -> None: + lines.append(f"### {title}") + for item in items: + lines.append(f"- {item}") + lines.append("") def to_markdown(company: str, results: Iterable[EndpointResult], cutoff: dt.date) -> str: + results_list = list(results) + results_map = {r.endpoint: r.payload for r in results_list} + sections = extract_sections(results_map) + lines = [ f"# {company} 近五年企业情报汇总", "", @@ -136,18 +202,43 @@ def to_markdown(company: str, results: Iterable[EndpointResult], cutoff: dt.date "| 模块 | 原始记录数 | 近五年记录数 |", "|---|---:|---:|", ] - for r in results: - lines.append(f"| {r.endpoint} | {r.total_records} | {r.filtered_records} |") + for result in results_list: + lines.append(f"| {result.endpoint} | {result.total_records} | {result.filtered_records} |") + + lines.extend(["", "## 一、股权关联信息", ""]) + render_bullet_section(lines, "股权关联信息", sections["股权关联信息"]) + + lines.extend(["## 二、基本信息", ""]) + for key in ["法人", "出资人", "注册资本", "员工", "成立日期", "工商信息", "对外投资情况", "股东信息", "社保情况"]: + render_bullet_section(lines, key, sections[key]) + + lines.extend(["## 三、集团风险提示", ""]) + render_bullet_section(lines, "集团风险提示", sections["集团风险提示"]) + + lines.extend(["## 四、图谱信息", ""]) + render_bullet_section(lines, "股权穿透图", sections["股权穿透图"]) + render_bullet_section(lines, "股权结构图", sections["股权结构图"]) + + lines.extend(["## 五、法律诉讼情况(全部)", ""]) + render_bullet_section(lines, "法律诉讼情况(全部)", sections["法律诉讼情况(全部)"]) - for r in results: + lines.extend(["## 六、经营风险(全部)", ""]) + render_bullet_section(lines, "经营风险(全部)", sections["经营风险(全部)"]) + + lines.extend(["## 七、经营信息", ""]) + for key in ["资质证书", "纳税人资质", "招投标信息", "行政许可"]: + render_bullet_section(lines, key, sections[key]) + + lines.extend(["## 八、原始接口数据(近五年过滤后)", ""]) + for result in results_list: lines.extend([ - "", - f"## {r.endpoint}", - "", + f"### {result.endpoint}", "```json", - json.dumps(r.payload, ensure_ascii=False, indent=2), + json.dumps(result.payload, ensure_ascii=False, indent=2), "```", + "", ]) + return "\n".join(lines) @@ -167,10 +258,10 @@ def main() -> int: filtered_payload, total, kept = filter_payload(payload, cutoff) results.append(EndpointResult(endpoint, total, kept, filtered_payload)) - md = to_markdown(company, results, cutoff) + markdown = to_markdown(company, results, cutoff) filename = re.sub(r"[^\w\-\u4e00-\u9fff]+", "_", company).strip("_") path = output_dir / f"{filename}_5y_report.md" - path.write_text(md, encoding="utf-8") + path.write_text(markdown, encoding="utf-8") print(f"Generated: {path}") return 0