<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:atom="http://www.w3.org/2005/Atom"
>
<channel>
<title><![CDATA[ztx]]></title> 
<atom:link href="https://docs.colyoy.cn/rss.php" rel="self" type="application/rss+xml" />
<description><![CDATA[ztx文档系统是公司内部的知识与文档管理平台，主要用于团队内部资料共享和协作。支持权限管理、全文检索和版本回溯，方便员工随时查阅、编辑和沉淀文档，提高工作效率。(AI写的)]]></description>
<link>https://docs.colyoy.cn/</link>
<language>zh-cn</language>
<generator>emlog</generator>

<item>
    <title>xshop解析教程</title>
    <link>https://docs.colyoy.cn/?post=118</link>
    <description><![CDATA[<p><a href="https://www.name.com/zh-cn/account/domain/details/espacobem.com/nameservers">https://www.name.com/zh-cn/account/domain/details/espacobem.com/nameservers</a></p>
<p>espacobem.com这个是域名，<br />
<img src="https://docs.colyoy.cn/content/uploadfile/202603/fa441773721727.png" alt="" /><br />
全部删除，然后把xshop的值复制过来保存就好了</p>
<p><img src="https://docs.colyoy.cn/content/uploadfile/202603/87771773721771.png" alt="" /></p>
<p>输入完保存，这个值生效慢</p>
<p><img src="https://docs.colyoy.cn/content/uploadfile/202603/a7af1773721815.png" alt="" /></p>]]></description>
    <pubDate>Tue, 17 Mar 2026 12:28:18 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=118</guid>
</item>
<item>
    <title>cloak接入</title>
    <link>https://docs.colyoy.cn/?post=117</link>
    <description><![CDATA[<h2>一、认证方式</h2>
<p>多数需要鉴权的接口采用 <strong>userId + apiKey</strong> 校验，由 <code>CloakService.validateUserAndApiKey(userId, apiKey)</code> 或 <code>UserService.findByUserIdAndApiKey(userId, apiKey)</code> 完成。</p>
<p>API Key 获取途径：</p>
<ul>
<li>注册接口返回</li>
<li>登录接口返回</li>
<li>通过 <code>GET /api/cloak/api-key/{userId}?apiKey=xxx</code> 查询（需已知当前 apiKey）</li>
</ul>
<hr />
<h2>二、验证相关接口</h2>
<h3>1. Cloak 访问验证（userId + apiKey）</h3>
<p><strong>接口：</strong> <code>POST /api/cloak/validate-client-ip</code></p>
<p><strong>请求体：</strong></p>
<pre><code class="language-json">{
  "userId": 1,
  "apiKey": "your-api-key",
  "clientIP": "1.2.3.4",
  "userAgent": "Mozilla/5.0...",
  "language": "zh-CN",
  "utmSource": "google",
  "utmMedium": "cpc",
  "utmCampaign": "spring",
  "requestUrl": "https://xxx.com/page",
  "referer": "https://google.com"
}</code></pre>
<p><strong>说明：</strong> 使用该 userId 对应的 Cloak 配置，根据 IP、语言、UTM 等规则判断是否允许访问。</p>
<hr />
<h3>2. Session 登录验证（userId + apiKey）</h3>
<p><strong>接口：</strong> <code>POST /api/auth/login-with-apikey</code></p>
<p><strong>请求体：</strong></p>
<pre><code class="language-json">{
  "userId": 1,
  "apiKey": "your-api-key"
}</code></pre>
<p><strong>说明：</strong> 校验通过后写入 session，后续可调用 <code>GET /api/auth/current-user</code> 获取当前用户。</p>
<hr />
<h2>三、配置获取与修改</h2>
<h3>1. 获取 Cloak 配置（无需 apiKey）</h3>
<table>
<thead>
<tr>
<th>方法</th>
<th>路径</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td>GET</td>
<td><code>/api/cloak/config/{userId}</code></td>
<td>获取用户 Cloak 配置</td>
</tr>
</tbody>
</table>
<p>若用户无配置则自动创建默认配置。</p>
<hr />
<h3>2. 更新 Cloak 配置（无需 apiKey）</h3>
<table>
<thead>
<tr>
<th>方法</th>
<th>路径</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td>POST</td>
<td><code>/api/cloak/config/{userId}</code></td>
<td>更新 Cloak 配置</td>
</tr>
</tbody>
</table>
<p><strong>请求体参数（CloakConfigRequest）：</strong></p>
<table>
<thead>
<tr>
<th>参数</th>
<th>类型</th>
<th>必填</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td>language</td>
<td>string</td>
<td>否</td>
<td>允许的语言，如 zh-CN、en-US</td>
</tr>
<tr>
<td>utmSource</td>
<td>string</td>
<td>否</td>
<td>UTM 来源</td>
</tr>
<tr>
<td>utmMedium</td>
<td>string</td>
<td>否</td>
<td>UTM 媒介</td>
</tr>
<tr>
<td>utmCampaign</td>
<td>string</td>
<td>否</td>
<td>UTM 活动</td>
</tr>
<tr>
<td>whiteCountries</td>
<td>array</td>
<td>否</td>
<td>白名单国家代码，如 [&quot;US&quot;,&quot;GB&quot;]，映射到 allowedCountries</td>
</tr>
<tr>
<td>blackCountries</td>
<td>array</td>
<td>否</td>
<td>黑名单国家代码，如 [&quot;CN&quot;,&quot;RU&quot;]，映射到 blockedCountries</td>
</tr>
<tr>
<td>allowMobile</td>
<td>boolean</td>
<td>否</td>
<td>是否允许移动端</td>
</tr>
<tr>
<td>allowDesktop</td>
<td>boolean</td>
<td>否</td>
<td>是否允许桌面端</td>
</tr>
<tr>
<td>forceValidation</td>
<td>boolean</td>
<td>否</td>
<td>强制校验模式</td>
</tr>
<tr>
<td>normalValidation</td>
<td>boolean</td>
<td>否</td>
<td>普通校验模式</td>
</tr>
<tr>
<td>enableWhitelist</td>
<td>boolean</td>
<td>否</td>
<td>是否启用白名单（true=白名单，false=黑名单）</td>
</tr>
<tr>
<td>redirectUrl</td>
<td>string</td>
<td>否</td>
<td>拒绝跳转 URL，格式为 xxx.myshoplaza.com（存于 store_proxy_configs）</td>
</tr>
<tr>
<td>validationRules</td>
<td>string</td>
<td>否</td>
<td>校验规则，JSON 字符串</td>
</tr>
<tr>
<td>customHeaders</td>
<td>string</td>
<td>否</td>
<td>自定义请求头</td>
</tr>
<tr>
<td>enableProxy</td>
<td>boolean</td>
<td>否</td>
<td>是否启用代理检测</td>
</tr>
<tr>
<td>proxyRules</td>
<td>string</td>
<td>否</td>
<td>代理规则，JSON 字符串</td>
</tr>
<tr>
<td>enableIpInfo</td>
<td>boolean</td>
<td>否</td>
<td>是否启用 IPinfo 代理检测</td>
</tr>
</tbody>
</table>
<p><strong>请求示例：</strong></p>
<pre><code class="language-json">{
  "language": "zh-CN,en-US",
  "utmSource": "google",
  "utmMedium": "cpc",
  "utmCampaign": "spring",
  "whiteCountries": ["US", "GB", "CA"],
  "blackCountries": ["CN", "RU"],
  "allowMobile": true,
  "allowDesktop": true,
  "forceValidation": false,
  "normalValidation": true,
  "enableWhitelist": false,
  "redirectUrl": "puhuo001.myshoplaza.com",
  "enableProxy": true,
  "enableIpInfo": true
}</code></pre>
<hr />
<h2>四、日志与统计</h2>
<table>
<thead>
<tr>
<th>方法</th>
<th>路径</th>
<th>认证</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td>GET</td>
<td><code>/api/cloak/logs/{userId}</code></td>
<td>无</td>
<td>访问日志，支持分页与筛选</td>
</tr>
<tr>
<td>GET</td>
<td><code>/api/cloak/stats/{userId}</code></td>
<td>apiKey 查询参数</td>
<td>访问统计数据</td>
</tr>
<tr>
<td>GET</td>
<td><code>/api/cloak/analysis/{userId}</code></td>
<td>无</td>
<td>流量分析（支持 startDate/endDate）</td>
</tr>
<tr>
<td>GET</td>
<td><code>/api/cloak/utm-details/{userId}</code></td>
<td>无</td>
<td>UTM 详细日志</td>
</tr>
</tbody>
</table>
<p><strong>示例：</strong></p>
<pre><code class="language-text">GET /api/cloak/stats/1?apiKey=key_xxx
GET /api/cloak/analysis/1?startDate=2025-03-01&amp;endDate=2025-03-02</code></pre>
<hr />
<h2>五、API Key 管理</h2>
<table>
<thead>
<tr>
<th>方法</th>
<th>路径</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td>GET</td>
<td><code>/api/cloak/api-key/{userId}?apiKey=xxx</code></td>
<td>获取当前 apiKey</td>
</tr>
<tr>
<td>POST</td>
<td><code>/api/cloak/api-key/{userId}/regenerate?apiKey=xxx</code></td>
<td>重新生成 apiKey</td>
</tr>
</tbody>
</table>
<p>需使用当前有效 apiKey 作为查询参数完成校验。</p>
<hr />
<hr />
<h2>六、错误响应</h2>
<table>
<thead>
<tr>
<th>HTTP</th>
<th>说明</th>
</tr>
</thead>
<tbody>
<tr>
<td>400</td>
<td>参数错误或格式不正确</td>
</tr>
<tr>
<td>401</td>
<td>未登录或 userId/apiKey 校验失败</td>
</tr>
<tr>
<td>404</td>
<td>用户不存在</td>
</tr>
</tbody>
</table>
<hr />]]></description>
    <pubDate>Tue, 10 Mar 2026 20:42:27 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=117</guid>
</item>
<item>
    <title>tiktok账号登录</title>
    <link>https://docs.colyoy.cn/?post=116</link>
    <description><![CDATA[<p>比如拿到得是 12319c9f3ad05a@ar-bill.com Abc123123<br />
空格是分隔符，签名邮箱后面密码<br />
<a href="https://tiktok-email-worker.yanaputova75.workers.dev/code?email=12319c9f3ad05a@ar-bill.com">https://tiktok-email-worker.yanaputova75.workers.dev/code?email=12319c9f3ad05a@ar-bill.com</a></p>
<p>这个是获取邮箱验证码连接，把12319c9f3ad05a@ar-bill.com替换为12319c9f3ad05a@ar-bill.com<br />
就变成了https://tiktok-email-worker.yanaputova75.workers.dev/code?email=12319c9f3ad05a@ar-bill.com</p>
<p>访问链接，有时候要十几秒才发过来，<br />
<img src="https://docs.colyoy.cn/content/uploadfile/202602/a0e01772198239.png" alt="" /><br />
这个里面得code就是验证码</p>]]></description>
    <pubDate>Fri, 27 Feb 2026 21:15:52 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=116</guid>
</item>
<item>
    <title>boticario_cj</title>
    <link>https://docs.colyoy.cn/?post=115</link>
    <description><![CDATA[<p>获取url </p>
<pre><code># -*- coding: utf-8 -*-
"""
Boticario 产品链接采集脚本
使用 Playwright 浏览器自动化，按 XPath 获取产品链接，翻页直到无更多链接，保存到文本
格式: 产品链接,分类slug
"""
import sys
import time

# Windows 控制台 UTF-8
try:
    sys.stdout.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
    pass

# 分类 URL 与 slug 对应
CATEGORIES = [
    ("https://www.boticario.com.br/corpo-e-banho/", "corpo-e-banho"),
    ("https://www.boticario.com.br/para-usar-ja/", "para-usar-ja"),
]

# XPath（产品链接支持多种结构，按顺序尝试）
PRODUCT_LINKS_XPATHS = [
    "/html/body/main/div[2]/section[7]/section/div/div[3]/article/div[1]/a",
    "/html/body/main/div[2]/section[13]/section/div/div[3]/article/div[1]/a",
    "/html/body/main/div[2]/section[*]/section/div/div[3]/article/div[1]/a",
]
# 下一页按钮 XPath（兼容多种结构）
NEXT_PAGE_BUTTON_XPATHS = [
    "/html/body/main/div[2]/section[7]/section/div/div[3]/div/button",
    "/html/body/main/div[2]/section[13]/section/div/div[3]/div/button",
]

OUTPUT_FILE = "boticario_products.txt"

def _accept_cookies(page):
    """如有 Cookie 同意横幅，点击接受"""
    try:
        btn = page.get_by_role("button", name="Aceitar todos os cookies")
        if btn.is_visible():
            btn.click()
            time.sleep(0.5)
    except Exception:
        pass

def _get_product_links(page):
    """获取当前页产品链接，兼容多种 XPath 结构，合并去重"""
    seen = set()
    links = []
    for xpath in PRODUCT_LINKS_XPATHS:
        try:
            loc = page.locator(f"xpath={xpath}")
            for i in range(loc.count()):
                href = loc.nth(i).get_attribute("href")
                if href and href.strip():
                    if not href.startswith("http"):
                        href = "https://www.boticario.com.br" + (href if href.startswith("/") else "/" + href)
                    href = href.strip()
                    if href not in seen:
                        seen.add(href)
                        links.append(href)
        except Exception:
            continue
    return links

def _has_next_page(page):
    """检查是否有下一页按钮且可点击，兼容多种 XPath"""
    for xpath in NEXT_PAGE_BUTTON_XPATHS:
        try:
            btn = page.locator(f"xpath={xpath}")
            if btn.count() == 0:
                continue
            first = btn.first
            if not first.is_visible():
                continue
            if first.is_enabled():
                return True
        except Exception:
            continue
    return False

def _click_next_page(page):
    """点击下一页，按 XPath 顺序尝试"""
    for xpath in NEXT_PAGE_BUTTON_XPATHS:
        try:
            loc = page.locator(f"xpath={xpath}")
            if loc.count() == 0:
                continue
            first = loc.first
            if first.is_visible() and first.is_enabled():
                first.click()
                time.sleep(1.5)
                return True
        except Exception:
            continue
    return False

def _crawl_category(page, url, slug):
    """采集单个分类：先把页面点完，再读取链接"""
    all_links = []
    seen = set()

    # 阶段1：先把页面点完（点击下一页直到无更多）
    page.goto(url, wait_until="domcontentloaded", timeout=30000)
    time.sleep(2)
    _accept_cookies(page)
    time.sleep(0.5)

    page_urls = []
    page_num = 0
    while True:
        page_num += 1
        current_url = page.url
        page_urls.append(current_url)
        print(f"  [{slug}] 第 {page_num} 页已加载: {current_url[:60]}...")

        if not _has_next_page(page):
            break
        if not _click_next_page(page):
            break
        time.sleep(1)

    # 阶段2：再读取链接（逐页访问并采集）
    unique_urls = list(dict.fromkeys(page_urls))
    if len(unique_urls) == 1 and len(page_urls) &gt; 1:
        # 分页不改变 URL，DOM 会累积产品，只采集本页新增的（每页约 36 个）
        print(f"  [{slug}] 分页不改变 URL，改为边点边采集（每页仅取新增链接）")
        page.goto(url, wait_until="domcontentloaded", timeout=30000)
        time.sleep(2)
        _accept_cookies(page)
        page_num = 0
        prev_count = 0
        while True:
            page_num += 1
            links = _get_product_links(page)
            # 只取本页新增的链接（DOM 累积，新增的在末尾）
            new_links = links[prev_count:]
            prev_count = len(links)
            added = 0
            for u in new_links:
                if u not in seen:
                    seen.add(u)
                    all_links.append((u, slug))
                    added += 1
            print(f"  [{slug}] 第 {page_num} 页: 本页 {len(new_links)} 个，新增 {added}，累计 {len(all_links)}")
            if not _has_next_page(page):
                break
            if not _click_next_page(page):
                break
            time.sleep(1)
    else:
        # 分页改变 URL，逐页访问采集
        for i, page_url in enumerate(unique_urls):
            page.goto(page_url, wait_until="domcontentloaded", timeout=30000)
            time.sleep(1.5)
            links = _get_product_links(page)
            added = 0
            for u in links:
                if u not in seen:
                    seen.add(u)
                    all_links.append((u, slug))
                    added += 1
            print(f"  [{slug}] 第 {i+1} 页: {len(links)} 个链接，新增 {added}，累计 {len(all_links)}")

    return all_links

def main():
    try:
        from playwright.sync_api import sync_playwright
    except ImportError:
        print("请先安装 Playwright: pip install playwright")
        print("然后运行: playwright install chromium  (或使用已安装的 Chrome)")
        return 1

    print("启动浏览器，采集 Boticario 产品链接...")

    with sync_playwright() as p:
        # 使用 Chrome 浏览器，移除自动化标识让浏览器更真实
        browser = p.chromium.launch(
            channel="chrome",
            headless=False,
            ignore_default_args=["--enable-automation"],  # 移除"受自动化软件控制"标识
            args=[
                "--incognito",
                "--disable-blink-features=AutomationControlled",  # 隐藏 AutomationControlled
                "--disable-application-cache",
                "--disable-cache",
                "--disable-offline-load-stale-cache",
                "--disable-infobars",  # 不显示"Chrome 正受到自动化测试软件的控制"
                "--no-sandbox",
                "--disable-dev-shm-usage",
            ],
        )
        # 模拟真实浏览器：巴西地区、常见分辨率、完整 UA
        context = browser.new_context(
            viewport={"width": 1920, "height": 1080},
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
            locale="pt-BR",
            timezone_id="America/Sao_Paulo",
            permissions=["geolocation"],
            color_scheme="light",
            extra_http_headers={
                "Accept-Language": "pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7",
                "sec-ch-ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
                "sec-ch-ua-mobile": "?0",
                "sec-ch-ua-platform": '"Windows"',
            },
        )
        # 注入脚本：隐藏 webdriver 标识，模拟真实浏览器指纹
        context.add_init_script("""
            Object.defineProperty(navigator, 'webdriver', { get: () =&gt; undefined });
            Object.defineProperty(navigator, 'languages', { get: () =&gt; ['pt-BR', 'pt', 'en-US', 'en'] });
            if (!window.chrome) window.chrome = {};
            if (!window.chrome.runtime) window.chrome.runtime = {};
        """)
        context.clear_cookies()
        page = context.new_page()
        # 通过 CDP 清空浏览器缓存
        try:
            cdp = context.new_cdp_session(page)
            cdp.send("Network.clearBrowserCache")
            cdp.send("Network.clearBrowserCookies")
        except Exception:
            pass

        all_results = []
        for url, slug in CATEGORIES:
            print(f"\n采集分类: {slug} ({url})")
            results = _crawl_category(page, url, slug)
            all_results.extend(results)

        # 保存到文本: 产品链接,分类slug
        if all_results:
            with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
                for link, slug in all_results:
                    f.write(f"{link},{slug}\n")
            print(f"\n✓ 已保存 {len(all_results)} 条到 {OUTPUT_FILE}")
        else:
            print("\n未采集到任何产品链接")

        print("\n按 Enter 关闭浏览器...")
        input()
        browser.close()

    print("完成")
    return 0

if __name__ == "__main__":
    sys.exit(main())
</code></pre>
<pre><code># -*- coding: utf-8 -*-
"""
Boticario 产品详情采集脚本 - 参考 carrefour_cj.py
流程: 读取 boticario_products.txt -&gt; 浏览器进入每个产品页 -&gt; 采集 blz.product / JSON-LD / 图片 -&gt; 输出 Shopify CSV
产品描述以 blz.product 为准
"""
import html
import csv
import os
import sys
import re
import unicodedata
import asyncio
from queue import Queue
import threading

# Windows 控制台 UTF-8
try:
    sys.stdout.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
    pass

# Shopify CSV 字段
FIELDS = [
    "Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
    "Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
    "Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
    "Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
    "Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
    "Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
    "Image Src", "Image Position", "Image Alt Text", "Gift Card",
    "SEO Title", "SEO Description", "Google Shopping / Google Product Category",
    "Google Shopping / Gender", "Google Shopping / Age Group",
    "Google Shopping / MPN", "Google Shopping / AdWords Grouping",
    "Google Shopping / AdWords Labels", "Google Shopping / Condition",
    "Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
    "Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
    "Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
    "Variant Image", "Variant Weight Unit", "Variant Tax Code",
    "Cost per item", "Status", "Collection"
]

PRODUCTS_TXT = "boticario_products.txt"
OUTPUT_DIR = "boticario_output"
VENDOR = "O Boticário"
SKIP_COLLECTIONS = ["perfumaria"]  # 不采集的分类
SKIP_COLLECTIONS_SET = {c.lower() for c in SKIP_COLLECTIONS}

# 产品图片 XPath（兼容多种结构）
PRODUCT_IMAGE_XPATHS = [
    "/html/body/main/div[3]/div[2]/div[2]/div[1]/span/img",
    "/html/body/main/div[3]/div[2]/div[2]/div[1]//img",
]

CONCURRENT_WORKERS = 5  # 同时启动的采集数量

write_queue = Queue()
counter_lock = threading.Lock()
processed_count = 0
total_tasks = 0
test_mode = False

def decode_html_entities(text):
    """解码 HTML 实体"""
    if not text:
        return ""
    s = str(text)
    for _ in range(2):
        s = html.unescape(s)
    return s

def normalize_text(text):
    """处理特殊字符"""
    if not text:
        return ""
    normalized = unicodedata.normalize("NFKD", str(text))
    normalized = "".join(c for c in normalized if not unicodedata.combining(c))
    return normalized

def slug_from_title(title):
    """从标题生成 handle"""
    s = unicodedata.normalize("NFKD", str(title))
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = re.sub(r"[^\w\s\-]", "", s)
    return re.sub(r"[-\s]+", "-", s).strip("-").lower()

def _coll_to_slug(name):
    """分类名转文件名用 slug"""
    if not name:
        return "default"
    s = unicodedata.normalize("NFKD", str(name))
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = re.sub(r"[^\w\s\-]", "", s)
    s = re.sub(r"[-\s]+", "-", s).strip("-").lower()
    return s or "default"

def load_products_txt(path=None):
    """从 txt 加载 (url, collection)，格式: 产品链接,分类slug"""
    if path is None:
        path = PRODUCTS_TXT
    items = []
    seen = set()
    try:
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith("#"):
                    continue
                parts = [x.strip() for x in line.split(",", 1)]
                url = parts[0]
                coll = parts[1] if len(parts) &gt; 1 else ""
                if coll.lower() in SKIP_COLLECTIONS_SET:
                    continue
                if url in seen:
                    continue
                seen.add(url)
                items.append((url, coll))
    except FileNotFoundError:
        pass
    return items

async def _extract_blz_product(page):
    """从页面提取 blz.product"""
    try:
        result = await page.evaluate("""() =&gt; {
            const p = (typeof blz !== 'undefined' &amp;&amp; blz &amp;&amp; blz.product) ? blz.product : null;
            if (!p) return null;
            return {
                sku: p.sku || '',
                name: p.name || '',
                slugName: p.slugName || '',
                subTotal: p.subTotal != null ? p.subTotal : '',
                description: p.description || '',
                quantity: p.quantity,
                buybox: p.buybox,
                kit: p.kit,
                tags: Array.isArray(p.tags) ? p.tags : []
            };
        }""")
        return result
    except Exception:
        return None

async def _extract_json_ld(page):
    """从页面提取 application/ld+json"""
    try:
        result = await page.evaluate("""() =&gt; {
            const scripts = document.querySelectorAll('script[type="application/ld+json"]');
            const arr = [];
            scripts.forEach(s =&gt; {
                try {
                    const data = JSON.parse(s.innerText);
                    arr.push(data);
                } catch (e) {}
            });
            return arr;
        }""")
        return result if result else []
    except Exception:
        return []

async def _extract_images_from_page(page):
    """从页面 XPath 提取产品图片 URL"""
    images = []
    seen = set()
    for xpath in PRODUCT_IMAGE_XPATHS:
        try:
            loc = page.locator(f"xpath={xpath}")
            count = await loc.count()
            for i in range(count):
                src = await loc.nth(i).get_attribute("src")
                if src and src.strip() and src not in seen:
                    if not src.startswith("http"):
                        src = "https://www.boticario.com.br" + (src if src.startswith("/") else "/" + src)
                    seen.add(src)
                    images.append(src.strip())
        except Exception:
            continue
        if images:
            break
    return images

def parse_boticario_product(blz, ld_list, images, url, collection):
    """从 blz.product、JSON-LD、图片 解析为 Shopify CSV 行"""
    coll_slug = _coll_to_slug(collection)

    # 以 blz.product 为准
    title = ""
    body = ""
    sku = ""
    price = ""
    handle = ""

    if blz and isinstance(blz, dict):
        title = (blz.get("name") or "").strip()
        body = (blz.get("description") or "").strip()
        sku = str(blz.get("sku") or "")
        p = blz.get("subTotal")
        price = str(p) if p is not None else ""
        slug_name = blz.get("slugName") or ""
        handle = slug_name[:100] if slug_name else ""

    # JSON-LD 作为补充（若 blz 缺失）
    for ld in (ld_list or []):
        if not isinstance(ld, dict):
            continue
        n, d = ld.get("name") or "", ld.get("description") or ""
        graph = ld.get("@graph") or []
        for g in graph if isinstance(graph, list) else []:
            if isinstance(g, dict) and g.get("@type") == "Product":
                n = n or g.get("name", "")
                d = d or g.get("description", "")
                if not images:
                    img = g.get("image")
                    if isinstance(img, str) and img.startswith("http"):
                        images = [img]
                    elif isinstance(img, list):
                        images = [x for x in img if isinstance(x, str) and x.startswith("http")]
                break
        if n and not title:
            title = str(n).strip()
        if d and not body:
            body = str(d).strip()
        if not images and ld.get("image"):
            img = ld["image"]
            images = [img] if isinstance(img, str) and img.startswith("http") else [x for x in (img or []) if isinstance(x, str) and x.startswith("http")]

    title = decode_html_entities(normalize_text(title)) or "Unknown"
    body = decode_html_entities(normalize_text(body))
    handle = handle or (re.sub(r"[^\w\-]", "-", slug_from_title(title)).strip("-")[:100] or "product")

    # 从 URL 提取 handle 备用
    if not handle and url:
        m = re.search(r"boticario\.com\.br/([^/?]+)", url)
        if m:
            handle = m.group(1).strip("/")[:100]

    base_row = {
        "Handle": handle or "product",
        "Title": title,
        "Body (HTML)": body,
        "Vendor": VENDOR,
        "Type": collection or "",
        "Tags": "",
        "Published": "TRUE",
        "_coll_slug": coll_slug,
        "Option1 Name": "",
        "Option1 Value": "",
        "Option2 Name": "",
        "Option2 Value": "",
        "Option3 Name": "",
        "Option3 Value": "",
        "Variant SKU": sku,
        "Variant Grams": "",
        "Variant Inventory Tracker": "shopify",
        "Variant Inventory Qty": 100,
        "Variant Inventory Policy": "deny",
        "Variant Fulfillment Service": "manual",
        "Variant Price": price,
        "Variant Compare At Price": "",
        "Variant Requires Shipping": "TRUE",
        "Variant Taxable": "TRUE",
        "Variant Barcode": "",
        "Image Src": "",
        "Image Position": 1,
        "Image Alt Text": title,
        "Gift Card": "FALSE",
        "SEO Title": title,
        "SEO Description": (body[:160] if body else ""),
        "Google Shopping / Google Product Category": "",
        "Google Shopping / Gender": "",
        "Google Shopping / Age Group": "",
        "Google Shopping / MPN": handle,
        "Google Shopping / AdWords Grouping": "",
        "Google Shopping / AdWords Labels": "",
        "Google Shopping / Condition": "New",
        "Google Shopping / Custom Product": "FALSE",
        "Google Shopping / Custom Label 0": "",
        "Google Shopping / Custom Label 1": "",
        "Google Shopping / Custom Label 2": "",
        "Google Shopping / Custom Label 3": "",
        "Google Shopping / Custom Label 4": "",
        "Variant Image": "",
        "Variant Weight Unit": "kg",
        "Variant Tax Code": "",
        "Cost per item": "",
        "Status": "active",
        "Collection": collection,
    }
    for k in FIELDS:
        if k not in base_row:
            base_row[k] = ""

    rows = []
    if images:
        for pos, img_url in enumerate(images, 1):
            row = dict(base_row)
            row["Image Src"] = img_url
            row["Image Position"] = pos
            if pos &gt; 1:
                row["Title"] = ""
                row["Body (HTML)"] = ""
                row["Vendor"] = ""
                row["Variant Price"] = ""
                row["Variant Compare At Price"] = ""
                row["Variant SKU"] = ""
            rows.append(row)
    else:
        rows.append(base_row)
    return rows

def writer_thread():
    """CSV 写入线程"""
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    files = {}
    writers = {}
    single_file = "boticario_test.csv" if test_mode else None

    while True:
        row = write_queue.get()
        if row is None:
            break
        coll = "_single" if single_file else (row.get("_coll_slug", "") or "default")
        if coll not in writers:
            fname = single_file if single_file else f"boticario_{coll}.csv"
            fpath = os.path.join(OUTPUT_DIR, fname)
            f = open(fpath, "w", newline="", encoding="utf-8")
            w = csv.DictWriter(f, fieldnames=FIELDS)
            w.writeheader()
            files[coll] = f
            writers[coll] = w
        out_row = {k: v for k, v in row.items() if k in FIELDS}
        writers[coll].writerow(out_row)
        files[coll].flush()
        write_queue.task_done()
    for f in files.values():
        f.close()

async def _accept_cookies(page):
    """如有 Cookie 同意横幅，点击接受"""
    try:
        btn = page.get_by_role("button", name="Aceitar todos os cookies")
        if await btn.is_visible():
            await btn.click()
            await asyncio.sleep(0.5)
    except Exception:
        pass

async def process_one_async(page, url, collection):
    """处理单个产品页（异步）"""
    global processed_count
    try:
        await page.goto(url, wait_until="domcontentloaded", timeout=30000)
        await asyncio.sleep(1.5)
        await _accept_cookies(page)
        await asyncio.sleep(0.5)
        try:
            await page.wait_for_function("() =&gt; typeof blz !== 'undefined' &amp;&amp; blz &amp;&amp; blz.product", timeout=5000)
        except Exception:
            pass

        blz = await _extract_blz_product(page)
        ld_list = await _extract_json_ld(page)
        images = await _extract_images_from_page(page)

        rows = parse_boticario_product(blz, ld_list, images, url, collection)
        for row in rows:
            write_queue.put(row)

        with counter_lock:
            processed_count += 1
            print(f"Progress: {processed_count}/{total_tasks} - {url[:60]}...")
    except Exception as e:
        print(f"跳过 {url[:50]}... ({e})")

async def _worker(page, queue):
    """异步 worker：从 queue 取任务并处理"""
    while True:
        try:
            url, collection = queue.get_nowait()
        except asyncio.QueueEmpty:
            break
        await process_one_async(page, url, collection)
        await asyncio.sleep(0.3)

async def _create_context(browser):
    """创建单个浏览器 context"""
    return await browser.new_context(
        viewport={"width": 1920, "height": 1080},
        user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
        locale="pt-BR",
        timezone_id="America/Sao_Paulo",
    )

async def run_async(workers, items):
    """异步主流程"""
    from playwright.async_api import async_playwright

    async with async_playwright() as p:
        browser = await p.chromium.launch(
            channel="chrome",
            headless=not ("--show" in sys.argv or "-s" in sys.argv),
            ignore_default_args=["--enable-automation"],
            args=[
                "--disable-blink-features=AutomationControlled",
                "--disable-infobars",
                "--no-sandbox",
                "--disable-dev-shm-usage",
            ],
        )
        queue = asyncio.Queue()
        for url, coll in items:
            queue.put_nowait((url, coll))

        pages = []
        for _ in range(workers):
            ctx = await _create_context(browser)
            ctx.add_init_script("""
                Object.defineProperty(navigator, 'webdriver', { get: () =&gt; undefined });
                if (!window.chrome) window.chrome = {};
                if (!window.chrome.runtime) window.chrome.runtime = {};
            """)
            pages.append(await ctx.new_page())

        await asyncio.gather(*[_worker(page, queue) for page in pages])
        await browser.close()

def main():
    global total_tasks, test_mode
    test_mode = "--test" in sys.argv or "-t" in sys.argv
    workers = CONCURRENT_WORKERS
    if "--workers" in sys.argv:
        for i, arg in enumerate(sys.argv):
            if arg == "--workers" and i + 1 &lt; len(sys.argv):
                try:
                    workers = int(sys.argv[i + 1])
                except ValueError:
                    pass
                break

    items = load_products_txt()
    if not items:
        print(f"无产品。请确保 {PRODUCTS_TXT} 存在且有内容（格式: 产品链接,分类slug）")
        return 1

    if test_mode:
        items = items[:3]
        print(f"[--test] 测试模式：仅采集前 3 个产品")

    total_tasks = len(items)
    print(f"开始采集产品详情，共 {total_tasks} 个，{workers} 个并发（asyncio）")

    try:
        import playwright.async_api
    except ImportError:
        print("请先安装 Playwright: pip install playwright")
        return 1

    writer = threading.Thread(target=writer_thread)
    writer.start()

    asyncio.run(run_async(workers, items))

    write_queue.put(None)
    writer.join()
    print(f"完成，处理 {processed_count}/{total_tasks}，已写入 {OUTPUT_DIR}/")

if __name__ == "__main__":
    sys.exit(main() or 0)
</code></pre>]]></description>
    <pubDate>Fri, 27 Feb 2026 18:40:03 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=115</guid>
</item>
<item>
    <title>centauro.py</title>
    <link>https://docs.colyoy.cn/?post=114</link>
    <description><![CDATA[<h1>-<em>- coding: utf-8 -</em>-</h1>
<p>&quot;&quot;&quot;<br />
Centauro 产品采集脚本 - 直接采集列表页产品并写入 Shopify 格式 CSV<br />
流程: 获取列表 -&gt; 写入 products.txt（去重）-&gt; 从 products.txt 读取 -&gt; 采集<br />
支持: 1) 自动请求列表页  2) 从 next_data.json/coll.json 加载  3) 从 products.txt 加载 (productId,colorId)<br />
用法: python centauro.py           # 完整流程<br />
python centauro.py -f        # 直接从 products.txt 读取并采集（跳过列表获取）<br />
python centauro.py -f xxx.txt  # 从指定文件读取并采集<br />
&quot;&quot;&quot;<br />
import argparse<br />
import html<br />
import json<br />
import csv<br />
import sys<br />
import re<br />
import time<br />
import urllib.request<br />
from urllib.error import HTTPError, URLError<br />
from urllib.parse import urlparse<br />
from concurrent.futures import ThreadPoolExecutor, as_completed<br />
from queue import Queue<br />
import threading<br />
import unicodedata</p>
<p>import requests</p>
<h1>Windows 控制台 UTF-8</h1>
<p>try:<br />
sys.stdout.reconfigure(encoding=&quot;utf-8&quot;)<br />
except (AttributeError, OSError):<br />
pass</p>
<h1>Shopify CSV 字段</h1>
<p>FIELDS = [<br />
&quot;Handle&quot;, &quot;Title&quot;, &quot;Body (HTML)&quot;, &quot;Vendor&quot;, &quot;Type&quot;, &quot;Tags&quot;, &quot;Published&quot;,<br />
&quot;Option1 Name&quot;, &quot;Option1 Value&quot;, &quot;Option2 Name&quot;, &quot;Option2 Value&quot;,<br />
&quot;Option3 Name&quot;, &quot;Option3 Value&quot;, &quot;Variant SKU&quot;, &quot;Variant Grams&quot;,<br />
&quot;Variant Inventory Tracker&quot;, &quot;Variant Inventory Qty&quot;, &quot;Variant Inventory Policy&quot;,<br />
&quot;Variant Fulfillment Service&quot;, &quot;Variant Price&quot;, &quot;Variant Compare At Price&quot;,<br />
&quot;Variant Requires Shipping&quot;, &quot;Variant Taxable&quot;, &quot;Variant Barcode&quot;,<br />
&quot;Image Src&quot;, &quot;Image Position&quot;, &quot;Image Alt Text&quot;, &quot;Gift Card&quot;,<br />
&quot;SEO Title&quot;, &quot;SEO Description&quot;, &quot;Google Shopping / Google Product Category&quot;,<br />
&quot;Google Shopping / Gender&quot;, &quot;Google Shopping / Age Group&quot;,<br />
&quot;Google Shopping / MPN&quot;, &quot;Google Shopping / AdWords Grouping&quot;,<br />
&quot;Google Shopping / AdWords Labels&quot;, &quot;Google Shopping / Condition&quot;,<br />
&quot;Google Shopping / Custom Product&quot;, &quot;Google Shopping / Custom Label 0&quot;,<br />
&quot;Google Shopping / Custom Label 1&quot;, &quot;Google Shopping / Custom Label 2&quot;,<br />
&quot;Google Shopping / Custom Label 3&quot;, &quot;Google Shopping / Custom Label 4&quot;,<br />
&quot;Variant Image&quot;, &quot;Variant Weight Unit&quot;, &quot;Variant Tax Code&quot;,<br />
&quot;Cost per item&quot;, &quot;Status&quot;, &quot;Collection&quot;<br />
]</p>
<h1>代理配置（青果隧道 HTTP 代理，账密模式），按 <a href="https://www.qg.net/doc/1879.html">https://www.qg.net/doc/1879.html</a></h1>
<h1>403 时重试会换连接以触发隧道换 IP；置空则直连</h1>
<p>PROXY_ADDR = &quot;overseas-us.tunnel.qg.net:12907&quot;<br />
PROXY_AUTH_KEY = &quot;KT8DL5OM&quot;<br />
PROXY_PASSWORD = &quot;F7BBE43B9A2C&quot;<br />
PROXIES = None<br />
if PROXY_ADDR:<br />
from urllib.parse import quote</p>
<h1>青果文档格式: <a href="http://user:password@hostname:port，账密需">http://user:password@hostname:port，账密需</a> URL 编码</h1>
<pre><code>safe_user = quote(PROXY_AUTH_KEY, safe="")
safe_pass = quote(PROXY_PASSWORD, safe="")
PROXY_URL = f"http://{safe_user}:{safe_pass}@{PROXY_ADDR}"
PROXIES = {"http": PROXY_URL, "https": PROXY_URL}
# 使用 HTTP 协议访问目标，避免 HTTPS CONNECT 隧道（部分代理对 HTTP 转发更稳定）
USE_HTTP = True</code></pre>
<p>else:<br />
USE_HTTP = False</p>
<h1>要采集的集合 URL，Collection 名取路径最后一段（如 /nav/marca/newbalance -&gt; newbalance）</h1>
<p>COLLECTION_URLS = [<br />
&quot;<a href="https://www.centauro.com.br/busca/nike-air-max-90">https://www.centauro.com.br/busca/nike-air-max-90</a>&quot;,<br />
&quot;<a href="https://www.centauro.com.br/nav/categorias/vestuario">https://www.centauro.com.br/nav/categorias/vestuario</a>&quot;,<br />
&quot;<a href="https://www.centauro.com.br/nav/esportes/futebol">https://www.centauro.com.br/nav/esportes/futebol</a>&quot;,<br />
]</p>
<p>API_HEADERS = {<br />
&quot;accept&quot;: &quot;<em>/</em>&quot;,<br />
&quot;accept-language&quot;: &quot;en-US,en;q=0.9&quot;,<br />
&quot;origin&quot;: &quot;<a href="https://www.centauro.com.br">https://www.centauro.com.br</a>&quot;,<br />
&quot;referer&quot;: &quot;<a href="https://www.centauro.com.br/">https://www.centauro.com.br/</a>&quot;,<br />
&quot;sec-ch-ua&quot;: '&quot;Google Chrome&quot;;v=&quot;129&quot;, &quot;Not=A?Brand&quot;;v=&quot;8&quot;, &quot;Chromium&quot;;v=&quot;129&quot;',<br />
&quot;sec-ch-ua-mobile&quot;: &quot;?0&quot;,<br />
&quot;sec-ch-ua-platform&quot;: '&quot;Windows&quot;',<br />
&quot;sec-fetch-dest&quot;: &quot;empty&quot;,<br />
&quot;sec-fetch-mode&quot;: &quot;cors&quot;,<br />
&quot;sec-fetch-site&quot;: &quot;same-site&quot;,<br />
&quot;user-agent&quot;: &quot;Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36&quot;<br />
}</p>
<p>PAGE_HEADERS = {<br />
&quot;User-Agent&quot;: API_HEADERS[&quot;user-agent&quot;],<br />
&quot;Accept&quot;: &quot;text/html,application/xhtml+xml,application/xml;q=0.9,<em>/</em>;q=0.8&quot;,<br />
&quot;Accept-Language&quot;: &quot;en-US,en;q=0.9&quot;,<br />
}</p>
<h1>Next.js JSON 基础路径：<a href="https://www.centauro.com.br/_next/data/1.98.1{path}.json">https://www.centauro.com.br/_next/data/1.98.1{path}.json</a></h1>
<p>NEXT_DATA_VERSION = &quot;1.98.2&quot;</p>
<h1>硬编码请求头（来自 curl，attr_datetime 已替换为非 ASCII 部分避免编码错误）</h1>
<h1>若存在 cookies.txt，优先从中加载（需替换 attr_datetime 中非 ASCII 避免 latin-1 编码错误）</h1>
<p>COOKIE_FILE = &quot;cookies.txt&quot;<br />
LIST_JSON_ACCEPT = &quot;<em>/</em>&quot;<br />
LIST_JSON_ACCEPT_LANG = &quot;en-US,en;q=0.9,zh;q=0.8,de;q=0.7,fi;q=0.6,pl;q=0.5,as;q=0.4,dv;q=0.3,sq;q=0.2,zh-CN;q=0.1&quot;<br />
LIST_JSON_COOKIE = (<br />
&quot;Secure; Secure; experiment-uuid=2695c48f-60c3-4291-bacb-0f514c5cafad; deviceId=744396c0-895a-4536-9834-6be34f847c1e; &quot;<br />
&quot;<strong>Host-next-auth.csrf-token-auth=ddce3cc6556dc57422006163935f2f146b420bc7902552d59ed9a065d2c34009%7Cd30c382906626c815ab97cd911b527af640dcc8e6b4c0ff78546ee35813b055c; &quot;<br />
&quot;</strong>Secure-next-auth.callback-url-auth=https%3A%2F%2Fwww.centauro.com.br; attr_utm_source=(direct); attr_utm_medium=(none); AwinChannelCookie=direct; &quot;<br />
&quot;_ga=GA1.1.506227375.1772099113; anonymousid=ca51ba45-4f66-4278-8208-1dc28f75c92a; _fbp=fb.2.1772099113121.1561054124; _tt_enable_cookie=1; <em>ttp=01KJCND4YHAV0GZKHYW3EYAA3D</em>.tt.2; &quot;<br />
&quot;mParticleId=-8067566568244070344; _pm_id=872601772099114766; AdoptVisitorId=GwTgDAZgHCAsCGBaAzAEwEwFZGwOzrESl0xEXQGNUxUR4pkaBTIA; _gcl_au=1.1.1065313785.1772099126; &quot;<br />
&quot;_hjSessionUser_540838=eyJpZCI6IjFkN2IxNDNjLWRjZmMtNWYyMS1hZWJkLTJhZTk1ZjZmNmUyOCIsImNyZWF0ZWQiOjE3NzIwOTkxMTM5ODcsImV4aXN0aW5nIjp0cnVlfQ==; &quot;<br />
&quot;bm_ss=ab8e18ef4e; _pm_sid=823141772172195795; bm_mi=E63575EC751B279EF6E01A0BF1D6127E~YAAQmmAXAsS8Z3WcAQAAh8axnR6PuIi7dfwGt3RbuN9gK1LqsOvxrJNDS5JTJS98xZ1wBut+iFCxHRUVsXB1zzDldyRy3ABZ8y4P2lRCLw+jycCgx23sfrVmls4J0FRHw+rr5yXSoyf0oLfLa+qE72yROUNbaGux5XI+fLC0BvebFmDxsW/uu0+hbp8G8KuP4qglyhBRUZMQJkOu4sf8hXXZwCFTUSdQCCvJDZP710uE/v34+05G1SyDkjvLBHFGFQhnMJCSLtDLWtApq+OKAZArKLyfFd0WZquEtuy1y29hKahfl8uAmRKfqbeazkF1B4HyEj4ICGE34f56fr9qmj4NtyFbuyjX3RXMypF/eY0oV7SwdoCplvD28mGHN621HR0gcl5Xxa9YvebLbCqOyDNO6yvn8w==~1; &quot;<br />
&quot;_abck=188FE50D1D84B7CBD554DDEA6159D7ED~0~YAAQmmAXAgq/Z3WcAQAAStGxnQ/GGktQa9o58pTqniSd9Akvv8qV0mSxAz+3GbMdGNHgznhGOOfUdZoOpVAmrNad2erAmbhbuxFbsIvuJ1zIPVG6NLwVzW1OQFlH9xQgN3IBE+oKYaqcP9WSe6BKgJBnEK6gC4qCgn5RLc1CUwQFwuuHqz/qy+pP5SchXCrsmHbnbU1DAJVj2umwsVsXePmIyNz7hFDJxWryGTty3bmDJYCh86soFSTwHs8O/r7lcnHtJVLsZWrj3QfeHjoGEA54kzkFZXcauy+RhaqtQaxVfqR8iWLMDLZWXYmFP5VxPFzTr9q/c/pokwfEqfLatO0elwo7p9GqGA6+Z7CexkwYjMRH7b+cwHap3UcbtNxAWE57Ot5vRBTqCTi02OWHOmFuUm4++mFNzZ51YeDZQGCRr5BnyFC1mVsF3T/mMBrOSO9MF+gDYbtPWsdKtM5JvUC5gYHkdWs7tzNN6TAry5XHSAdrSznIMcJDoPgkKgwmWerxeCAFe5s8dyPTt2tumPtRiKqfONXUD//FTTmLIZ96nxh/ZhYunjEdBvDGlFZV2yhQ3x7DK7jLuSvavN+v4zlcvvKVsjVTZxcnAwey/25ENxWGFmeik5s7B+DwINPK01yOF9oK4w==~-1~-1~-1~AAQAAAAF%2f%2f%2f%2f%2f5SKUbSBG4gueBtYnTr+znGJ+XXuJNKQiLRJUV+cCiQkilZ%2fLQtNkyhgxOeYMONNpVT26a8g1Ch4BriMGcBul0vZN5QcvoQfcTzz~-1; &quot;<br />
&quot;ak_bmsc=CC008C86C78213DB6BF41F4284B81441~000000000000000000000000000000~YAAQmmAXAjXPZ3WcAQAA6huynR5Dcs076x905sRqDZtlEESkTgM2MdnV8LvKYoPhtlzWBaoiWLuqJMbN/RgnfQS+SxtQEj6fmBPRL+5opHRt35b+bE8hjtpH0wkM9QbTjGawksbBZKbRyyg1ikEW/5osWtYnns7K62banOBIGaqsweR1kZWEWVyDcFM26fvrvat9svRKxM2Ur90kZ8unN4aWsWNRlh556T/14SZeOkOVXbDE3vTmpTifz1iTOoizZ81GQDal3tyadSCJCT/JZzvPAYgQ2BuQ9g/upv9KuUWCnRgrWCT64qihFskX95L93eevJ4IvvVuundI+iVrFrs44nvtlu/rkaNMjPSZFDFDj/qotDiZID0D0Jt4ARpE+lozwoA0ZyD6pANHF5cuzo9PGlMTpESELDJpr94bdCDmonX5+liW5iovPKBq4Z0K9oCpb2BlMVS/zcEV4lndOlgmjqs0UJ9EODuFFGhPtD8t8N7RGzbtmMd1E3SxdtOZYUcLOPErfO+7sXwbc8TP2; &quot;<br />
&quot;bm_so=20BC669BFB79EB118777E0FF6428174DC2AEAE856C50BBD266AD8A6056E6232D~YAAQBXHKF+PPwXCcAQAAzj2znQbeGPUK61ED+jntmvxtaOJN8S7z9O/E4lfWL6UGb/LjbrVwrojlYqXNFZdqNyKro/aOuFLT0OM85tdVUNHgxPcfnPik+Fa28IoQK6tddkZhMn1tk6er5O/ffumFZ7yYnOgM/u/GrkdJqXcp4xavlUKZjTSZe+eEobjPD5347+LwVmouHmiGuRgoI84swWXLT9A8KdbICzvhAClOVVvu4EseniHsywzGU+igZSJbfpOfDHThom3Ojb4wJO7d1OEf6/ir5MEmx0nptLWWN/Fe5gnHxRC438AVl9K6+cKQoXnjvX8DlU8uAfkHh2Z5Oaa+M8mpU2duvBw0AqWvoMYv6OlQo1PVgNtc/yBqOSjbbaiJWqMBrs3s82E3pjKoSuZUCbk2MwxHoyV/mLphpX/qQ/CJNwnj6KSGjvMyaKE92ob0rz8esTb8hknTQqVdPFaA0A==; &quot;<br />
&quot;bm_sz=20CCA8B637D1913BEB909074D87D83C3~YAAQBXHKF+XPwXCcAQAAzj2znR7r2Y5TUDSF6DEoagfG3gTU0X/6G7aVh1WPGCCt1A02QpXYfsPmP0ivNKWcPeecpLthyAmGam65Wm6UA3lgh1fEyfSuhKwHGxBpRQKjnB4vFw9/di1ym6MBsET9TE03OGvlHlITFLQ1tYEpUXl5eGfe4VbEw0ON1QNXg7qYhl5nWZGccO8LDwg7veMD0daqvIwwW0eXCxqngoIlC6TGzbRXFK7x2BGX9QM3ijAswDhgTtzRhmiH4JxT4KZM0nWd3PdqqEjLmDsECBWO7aOZDoQ9A4tyoIrT4zcxmAXpFGtPRw5mKsrj/dVksePkFo3jU9IA78rAi7yPVwCObjijY6APd89F18pt8oxdSBcVF8SDxkIpOKcwI1vlAOSidcownL5ZrJ73LjDhBnzbJQPihHB7ai4ChPCbMsoLcUsSH6BjP5T2ThDdL70F8CgfatN2KlFWqkY7YPENBBhd0ofSBGEU8R7uL4/7FUeZ/89PSqYe/aOAWf/wHPH6yme+6yUTqMCbuA==~3228215~4272184; &quot;<br />
&quot;rp_geo_location=%7B%22latitude%22%3A34.9592083%2C%22longitude%22%3A-116.419389%7D; &quot;<br />
&quot;_hjSession_540838=eyJpZCI6IjI5ZGE0NzI1LWUyODYtNDUwYy1hZTFmLTRmMjdjZWQ0NzgyZCIsImMiOjE3NzIxNzIzMDEzMDYsInMiOjAsInIiOjAsInNiIjowLCJzciI6MCwic2UiOjAsImZzIjowLCJzcCI6MH0=; &quot;<br />
&quot;<strong>rtbh.lid=%7B%22eventType%22%3A%22lid%22%2C%22id%22%3A%222bRHwfFonGNZ7HgB2s6B%22%2C%22expiryDate%22%3A%222027-02-27T06%3A05%3A01.443Z%22%7D; &quot;<br />
&quot;cto_bundle=OjY6dV9HZ2FDdUQ3TFJQJTJGbm14clNhTUJ5REFwTUlWU3RCayUyQmE2SUtGdSUyRnRyckYyems1SEZoNHB4YVNPMTNDciUyRmFQVGpnYWpFcEtMdDZOVzNBb0MlMkJDWDNVNEpsV2o5RnVuZ1dkbDM2RE5PR0xvbWNFOTVIV0JpSGl4ZXczQ2k3UUFHaXpHV2w4SkV0QU1rVXdDVlZiYUkxRmJnJTNEJTNE; &quot;<br />
&quot;bm_lso=20BC669BFB79EB118777E0FF6428174DC2AEAE856C50BBD266AD8A6056E6232D~YAAQBXHKF+PPwXCcAQAAzj2znQbeGPUK61ED+jntmvxtaOJN8S7z9O/E4lfWL6UGb/LjbrVwrojlYqXNFZdqNyKro/aOuFLT0OM85tdVUNHgxPcfnPik+Fa28IoQK6tddkZhMn1tk6er5O/ffumFZ7yYnOgM/u/GrkdJqXcp4xavlUKZjTSZe+eEobjPD5347+LwVmouHmiGuRgoI84swWXLT9A8KdbICzvhAClOVVvu4EseniHsywzGU+igZSJbfpOfDHThom3Ojb4wJO7d1OEf6/ir5MEmx0nptLWWN/Fe5gnHxRC438AVl9K6+cKQoXnjvX8DlU8uAfkHh2Z5Oaa+M8mpU2duvBw0AqWvoMYv6OlQo1PVgNtc/yBqOSjbbaiJWqMBrs3s82E3pjKoSuZUCbk2MwxHoyV/mLphpX/qQ/CJNwnj6KSGjvMyaKE92ob0rz8esTb8hknTQqVdPFaA0A==~1772172303609; &quot;<br />
&quot;AdoptConsent=N4Ig7gpgRgzglgFwgSQCIgFwgEwE4DMEAZvgCakC0ALNgBwCG1VAjPhVFVLRbrVxADYqVUviJQQAGhAA3OPAQB7AE7JSmEANwAGIrVxVGZbAFZqAdmzaKtcydwVsAY1LbSuerXxuIUkAiciAGUEZTgAOwBzTHCAVwAbeOlFAAcEZHCAFXpImEwAbRAARQoKAGEADXoATxgUgFk/ABkAR2QmgCsOgH0AOXoACz8ggEFacNwALyblAHEh6QBpWO1F+rKilPMADz94gFsEWkVexQBVJuQ/MDBsAAV6bTAAJXwyvyJqxTAYZlRJ6qZPzKMoAIQgFCcFQA1r0JNJmAA1dbbbpUMDmADqfgQZQAmrNUAAJeJlDq9ZqLEwtABSRCI7gqfiKRJkRDgpBM9DgAj8cGYjEiHix2j8sTGUCCUG6iwgvmkFQA8t0UrF6kUghVFn5FkU8QhZpkTERFB0/B16GBmDIAGK9XA09DSXoGs6ggTdUg2ybNRHdSI25AmEYUaLSGDmSYIcLbfakbojPwCSKRM69JoyBAUBYgRQweIyaqzJwZxPSKDPZD1RSzFqTeI0vwjOD0EwuHJUABafnwiIQLSoAzx1SIZZAkTAsWUoKJNMW2Gefmh+2QRAqMAgFTguD8MiakV6ynik162h90jgVEmyhgVDgePovOkynqI0W8DA+0WoukRQGJmYL88RgJoihAABdZI0kVWIEGyXICkgkAnEUcIN3CdJ1CwTE7hSZARnMPwULQiAMMRCAbzgVDMFYaRYhSUh6CQUgRgQDQrGwAQKG0bBHHMTJtAEDBtBMDAWAAOiEARuwAXyAA===; &quot;<br />
&quot;</strong>gads=ID=af364d250ce9110e:T=1772100304:RT=1772172344:S=ALNI_Mb6eD4y_gGwSotM44Sy0pbELFSj8A; &quot;<br />
&quot;<strong>gpi=UID=00001398476daf89:T=1772100304:RT=1772172344:S=ALNI_MZxB0R9J6HNoQ6vf_NELz5Yd9Kixw; </strong>eoi=ID=f3c07e9e2678c378:T=1772100304:RT=1772172344:S=AA-AfjaD5SrKsBWsEUT5s7wzABjG; &quot;<br />
&quot;_uetsid=d964d3f012f711f18ec5f386ba12c45b; _uetvid=d9651aa012f711f1b2e1578d0725c64f; &quot;<br />
&quot;bm_s=YAAQBXHKF402wnCcAQAA2gS0nQSetQLjamhPx2j5zFaIS/sY77p6YLLhOBJUL29/lc6AhzebKD+GD62pNhs9lHbpsrPCU32UIrYHwUkT1sNOsDDwVOfviLteVaWmJNMV4dfQyD68RNlycK5e4z/1HB89r2VzJLz4lMYeuobvuDAMGex0RLCE9GS1wktTYzuOh40sP1e/NfGW1uGTJiNhPwXjvDwEGB3FUwNszlGXrxl16DjEBPd50r9NFWPfYjzBqPO0BTsH4MIqZ5dQhS+MH55VpvK46584XwGb1I5pJXYqGW3zwBljEBdqQPUdlXJ7zEhBxpe5rqgMnUVmCThjcsBk/QTUTEDsi9To9YrrbHDemCB0HrncdSIWSnJQjWAXJ7kEyfUO4umNd4B0skrvN2xCsIAm8Olpnwl1Fr1FZFTElPtRMWJ/vaTBrdmO9bNvzR3ts/CM3z538eK1jTLtCMz5RpEwmeVmt2RTdSpfvNsU8WZpm//Qnlqhnswm/ul6BUQggsbCfOrqYQDrLFnjyeFzi3RnTn1xa/s59q483nbqSENGHxjtJW/SEvkEtykB1hkR2gqx7wxnbONI3pzq81ik2ia8u7dUQzhyREm/SJpIB6rB+vLRMB9gYFRFe4TygYzy+ilQyvZ0UWEThCjfFiQA5+PNB9eCBwMZqTfNdmI2mJOd9Tc+2ySyjQ8hnkR6IQpveOqZxaqwYsprXZNIU4kqCVT4XroqxP+1MtLJJE9mKgiRQa3+18uEQN4nrKrW6Sc; &quot;<br />
&quot;bm_sv=E56DCE94E55014C714B84D30993C8726~YAAQBXHKF442wnCcAQAA2gS0nR6uy1H+Q1QPxTy52VT5ZVth011mn37lYT1joG5FRfNicmSotYKtVM+u9tz5GKepSI7YSlLTV1H1Qua1a3QRBpFfW6TWs2nbfJ5/v4zbRpa1odNmcdK7+0GjLdZGAsU6nfFUq84SDLY0USY0Y5/76+MVjArR8FI4fQTgsEBOICD7WUgMRv4921PSmEWO5KFGeGFj4XTj2pni6sy/eSUqFNB5zVgZfO40r/ujAY9T7pWVL4u~1; &quot;<br />
&quot;_ga_T9CHK2M2XW=GS2.1.s1772172153$o3$g1$t1772172349$j60$l0$h0; &quot;<br />
&quot;__rtbh.uid=%7B%22eventType%22%3A%22uid%22%2C%22id%22%3A%22unknown%22%2C%22expiryDate%22%3A%222027-02-27T06%3A05%3A49.616Z%22%7D; &quot;<br />
&quot;attr_datetime=Fri Feb 27 2026 14:06:26 GMT+0800 (GMT+0800); &quot;<br />
&quot;_ga_3RYKQ4MBLH=GS2.1.s1772172153$o3$g1$t1772172386$j17$l0$h1215551984; &quot;<br />
&quot;ttcsid=1772172174321::gz_cAtJhHUxbO8k1CZ_A.3.1772172390362.0; ttcsid_CMJUDB3C77U705JFSVT0=1772172174321::fHGvTeH6Uet53_wlbB_i.3.1772172390362.1; &quot;<br />
&quot;bm_sz=20CCA8B637D1913BEB909074D87D83C3~YAAQBXHKF+XPwXCcAQAAzj2znR7r2Y5TUDSF6DEoagfG3gTU0X/6G7aVh1WPGCCt1A02QpXYfsPmP0ivNKWcPeecpLthyAmGam65Wm6UA3lgh1fEyfSuhKwHGxBpRQKjnB4vFw9/di1ym6MBsET9TE03OGvlHlITFLQ1tYEpUXl5eGfe4VbEw0ON1QNXg7qYhl5nWZGccO8LDwg7veMD0daqvIwwW0eXCxqngoIlC6TGzbRXFK7x2BGX9QM3ijAswDhgTtzRhmiH4JxT4KZM0nWd3PdqqEjLmDsECBWO7aOZDoQ9A4tyoIrT4zcxmAXpFGtPRw5mKsrj/dVksePkFo3jU9IA78rAi7yPVwCObjijY6APd89F18pt8oxdSBcVF8SDxkIpOKcwI1vlAOSidcownL5ZrJ73LjDhBnzbJQPihHB7ai4ChPCbMsoLcUsSH6BjP5T2ThDdL70F8CgfatN2KlFWqkY7YPENBBhd0ofSBGEU8R7uL4/7FUeZ/89PSqYe/aOAWf/wHPH6yme+6yUTqMCbuA==~3228215~4272184&quot;<br />
)</p>
<p>def _get_list_cookie():<br />
&quot;&quot;&quot;获取列表请求用的 cookie，优先从 cookies.txt 加载并替换非 ASCII&quot;&quot;&quot;<br />
try:<br />
with open(COOKIE_FILE, &quot;r&quot;, encoding=&quot;utf-8&quot;) as f:<br />
cookie = f.read().strip()<br />
if not cookie:<br />
return LIST_JSON_COOKIE</p>
<h1>attr_datetime 中括号内可能含中文等非 ASCII，替换为 (GMT+0800) 避免 latin-1 编码错误</h1>
<pre><code>    cookie = re.sub(r"(attr_datetime=[^;]*)\s*\([^)]*\)", r"\1 (GMT+0800)", cookie)
    # 移除其余非 ASCII（HTTP 头需 latin-1）
    cookie = "".join(c for c in cookie if ord(c) &lt; 256)
    print(f"已从 {COOKIE_FILE} 加载 cookie")
    return cookie
except FileNotFoundError:
    return LIST_JSON_COOKIE</code></pre>
<p>API_BASE = &quot;<a href="https://apigateway.centauro.com.br/centauro-bff/products">https://apigateway.centauro.com.br/centauro-bff/products</a>&quot;</p>
<p>def _maybe_http(url):<br />
&quot;&quot;&quot;使用代理且 USE_HTTP 时，改用 HTTP 协议避免 CONNECT 隧道&quot;&quot;&quot;<br />
if USE_HTTP and url.startswith(&quot;https://&quot;):<br />
return &quot;http://&quot; + url[8:]<br />
return url</p>
<p>def page_url_to_json_base(page_url):<br />
&quot;&quot;&quot;页面 URL 转 JSON 接口 base，如 /nav/marca/newbalance -&gt; .../nav/marca/newbalance.json&quot;&quot;&quot;<br />
from urllib.parse import urlparse<br />
parsed = urlparse(page_url)<br />
path = parsed.path.rstrip(&quot;/&quot;)<br />
return f&quot;<a href="https://www.centauro.com.br/_next/data/{NEXT_DATA_VERSION}{path}.json">https://www.centauro.com.br/_next/data/{NEXT_DATA_VERSION}{path}.json</a>&quot;</p>
<p>def _path_to_nav_slug_params(path):<br />
&quot;&quot;&quot;从路径提取 navSlug 参数，如 /nav/esportes/basquete -&gt; navSlug=esportes&amp;navSlug=basquete&quot;&quot;&quot;<br />
from urllib.parse import quote<br />
segments = [s for s in path.strip(&quot;/&quot;).split(&quot;/&quot;) if s]<br />
if not segments:<br />
return &quot;&quot;</p>
<h1>若首段为 nav，取后续段；否则取全部</h1>
<pre><code>slugs = segments[1:] if segments[0] == "nav" else segments
if not slugs:
    return ""
return "&amp;".join(f"navSlug={quote(s)}" for s in slugs)</code></pre>
<p>def url_to_collection_name(page_url):<br />
&quot;&quot;&quot;从 URL 提取 Collection 名，取路径最后一段&quot;&quot;&quot;<br />
from urllib.parse import urlparse<br />
parsed = urlparse(page_url)<br />
path = parsed.path.rstrip(&quot;/&quot;)<br />
return path.split(&quot;/&quot;)[-1] if path else &quot;&quot;</p>
<p>write_queue = Queue()<br />
counter_lock = threading.Lock()<br />
processed_count = 0<br />
total_tasks = 0</p>
<h1>403 重试配置（换连接以触发隧道换 IP）</h1>
<p>MAX_403_RETRIES = 5<br />
RETRY_DELAY = 2</p>
<h1>线程本地 Session，每个线程独立连接（独立 IP）</h1>
<p>_thread_local = threading.local()</p>
<p>def _get_thread_session():<br />
&quot;&quot;&quot;每个线程独立 Session（独立 IP），403 后需调用 _reset_thread_session 换 IP&quot;&quot;&quot;<br />
if not hasattr(_thread_local, &quot;session&quot;) or _thread_local.session is None:<br />
_thread_local.session = requests.Session()<br />
if PROXIES:<br />
_thread_local.session.proxies.update(PROXIES)<br />
return _thread_local.session</p>
<p>def _reset_thread_session():<br />
&quot;&quot;&quot;重置线程 Session，下次请求将获取新 IP&quot;&quot;&quot;<br />
_thread_local.session = None</p>
<p>def _req_to_url_headers(req):<br />
&quot;&quot;&quot;从 urllib Request 提取 url 和 headers 字典&quot;&quot;&quot;<br />
url = req.full_url if hasattr(req, &quot;full_url&quot;) else req.get_full_url()<br />
headers = dict(req.header_items()) if hasattr(req, &quot;header_items&quot;) else {}<br />
return url, headers</p>
<p>class _ResponseWrapper:<br />
&quot;&quot;&quot;将 requests.Response 包装成类似 urlopen 返回的对象，支持 read() 和 with 语句&quot;&quot;&quot;</p>
<pre><code>def __init__(self, resp):
    self._resp = resp
    self._content = resp.content

def read(self):
    return self._content

def decode(self, encoding="utf-8", errors="replace"):
    return self._content.decode(encoding, errors)

def __enter__(self):
    return self

def __exit__(self, *args):
    pass</code></pre>
<p>def verify_proxy():<br />
&quot;&quot;&quot;启动前验证代理是否可用，USE_HTTP 时用 HTTP 避免 CONNECT 隧道 406&quot;&quot;&quot;<br />
if not PROXIES:<br />
return True<br />
print(&quot;验证代理中...&quot;)<br />
for url in (_maybe_http(&quot;<a href="https://httpbin.org/ip">https://httpbin.org/ip</a>&quot;), _maybe_http(&quot;<a href="https://www.centauro.com.br/">https://www.centauro.com.br/</a>&quot;)):<br />
try:<br />
kw = {&quot;timeout&quot;: 15, &quot;headers&quot;: {&quot;User-Agent&quot;: API_HEADERS[&quot;user-agent&quot;]}}<br />
if PROXIES:<br />
kw[&quot;proxies&quot;] = PROXIES<br />
r = requests.get(url, **kw)</p>
<h1>代理不支持 HTTPS CONNECT 时用 HTTP，406 表示代理已转发（部分代理会返回）</h1>
<pre><code>        if r.status_code in (200, 301, 302, 406):
            print("代理验证通过")
            return True
        r.raise_for_status()
    except Exception as e:
        print(f"  尝试 {url} 失败: {e}")
print("代理验证失败")
return False</code></pre>
<p>def urlopen_with_403_retry(req, timeout=30, desc=&quot;&quot;):<br />
&quot;&quot;&quot;带 403 重试的 HTTP 请求（使用 requests 按青果文档接入 HTTP 代理）&quot;&quot;&quot;<br />
url, headers = _req_to_url_headers(req)<br />
url = _maybe_http(url)<br />
last_err = None<br />
for attempt in range(MAX_403_RETRIES + 1):<br />
try:<br />
session = _get_thread_session()<br />
r = session.get(url, headers=headers, timeout=timeout)<br />
r.raise_for_status()<br />
return _ResponseWrapper(r)<br />
except requests.exceptions.HTTPError as e:<br />
last_err = e<br />
if e.response.status_code == 403 and attempt &lt; MAX_403_RETRIES:<br />
_reset_thread_session()<br />
wait = RETRY_DELAY <em> (attempt + 1)<br />
print(f&quot;  403 换 IP 重试 {attempt + 1}/{MAX_403_RETRIES}，{wait}s 后{f': {desc}' if desc else ''}&quot;)<br />
time.sleep(wait)<br />
else:<br />
raise HTTPError(url, e.response.status_code, e.response.reason, e.response.headers, None)<br />
except requests.exceptions.RequestException as e:<br />
last_err = e<br />
if attempt &lt; MAX_403_RETRIES:<br />
_reset_thread_session()<br />
wait = RETRY_DELAY </em> (attempt + 1)<br />
print(f&quot;  连接失败重试 {attempt + 1}/{MAX_403_RETRIES}，{wait}s 后{f': {desc}' if desc else ''}&quot;)<br />
time.sleep(wait)<br />
else:<br />
raise URLError(str(e))<br />
raise last_err</p>
<p>def decode_html<em>entities(text):<br />
&quot;&quot;&quot;解码 HTML 实体，双重解码处理 &amp;ccedil; -&gt; ç&quot;&quot;&quot;<br />
if not text:<br />
return &quot;&quot;<br />
s = str(text)<br />
for </em> in range(2):<br />
s = html.unescape(s)<br />
return s</p>
<p>def normalize_text(text):<br />
&quot;&quot;&quot;处理特殊字符&quot;&quot;&quot;<br />
if not text:<br />
return &quot;&quot;<br />
normalized = unicodedata.normalize(&quot;NFKD&quot;, str(text))<br />
normalized = &quot;&quot;.join(c for c in normalized if not unicodedata.combining(c))<br />
return normalized</p>
<p>def is_valid_image_url(url):<br />
&quot;&quot;&quot;过滤无效图片 URL，如 <a href="https://3rx8ammbpzw/">https://3rx8ammbpzw/</a> 这种短格式&quot;&quot;&quot;<br />
if not url or len(url) &lt; 20:<br />
return False<br />
try:</p>
<h1>提取 host，无效格式通常无 TLD（无点号）</h1>
<pre><code>    after = url.split("//", 1)[-1].split("/")[0].split(":")[0]
    if "." not in after:
        return False
    return True
except Exception:
    return False</code></pre>
<p>def _find_color_ids(obj, found):<br />
&quot;&quot;&quot;递归查找 colorId / colorCode&quot;&quot;&quot;<br />
if isinstance(obj, dict):<br />
cid = obj.get(&quot;colorId&quot;) or obj.get(&quot;colorCode&quot;)<br />
if cid is not None:<br />
found.add(str(cid))<br />
for v in obj.values():<br />
_find_color_ids(v, found)<br />
elif isinstance(obj, list):<br />
for v in obj:<br />
_find_color_ids(v, found)</p>
<p>def extract_product_color_pairs(products):<br />
&quot;&quot;&quot;<br />
从 products 列表提取 (productId, colorId) 对<br />
支持 coll.json 格式: seo.mpn + details.colorId<br />
支持其他: id+colorId、code+colorVariations 等嵌套结构<br />
&quot;&quot;&quot;<br />
pairs = []<br />
seen = set()<br />
for p in products:</p>
<h1>coll.json 格式: seo.mpn 为 productId, details.colorId 为 colorId</h1>
<pre><code>    details = p.get("details", {})
    seo = p.get("seo", {})
    cid = details.get("colorId") or seo.get("colorId") or p.get("colorId")
    pid = seo.get("mpn") or p.get("code") or p.get("productId") or p.get("id")

    if pid and cid:
        pid, cid = str(pid).strip(), str(cid).strip()
        key = (pid, cid)
        if key not in seen:
            seen.add(key)
            pairs.append((pid, cid))
        continue

    # 兼容旧格式
    pid = p.get("id") or p.get("code") or p.get("productId")
    if not pid:
        continue
    pid = str(pid).strip()

    colors = set()
    if cid is not None:
        colors.add(str(cid))
    if "colorId" in p:
        colors.add(str(p["colorId"]))
    if "colorCode" in p:
        colors.add(str(p["colorCode"]))
    if "colorVariations" in p:
        for cv in p["colorVariations"]:
            c = cv.get("colorId") or cv.get("colorCode")
            if c is not None:
                colors.add(str(c))
    if "colors" in p:
        for c in p["colors"]:
            cid = c.get("colorId") or c.get("id") or c.get("code")
            if cid is not None:
                colors.add(str(cid))
    if not colors:
        _find_color_ids(p, colors)

    if colors:
        for cid in colors:
            base_id = pid
            if pid.endswith(cid):
                base_id = pid[: -len(cid)]
            key = (base_id, cid)
            if key not in seen:
                seen.add(key)
                pairs.append((base_id, cid))
    else:
        key = (pid, "")
        if key not in seen:
            seen.add(key)
            pairs.append((pid, ""))
return pairs</code></pre>
<p>def _get_list_json_headers(json_base, page=1, page_url=None):<br />
&quot;&quot;&quot;获取列表请求头（硬编码，来自 curl）&quot;&quot;&quot;<br />
if page_url is None:<br />
page_url = json_base.replace(f&quot;/_next/data/{NEXT_DATA_VERSION}&quot;, &quot;&quot;).replace(&quot;.json&quot;, &quot;&quot;)<br />
h = {<br />
&quot;accept&quot;: LIST_JSON_ACCEPT,<br />
&quot;accept-language&quot;: LIST_JSON_ACCEPT_LANG,<br />
&quot;baggage&quot;: &quot;sentry-environment=production,sentry-release=v1.98.2,sentry-public_key=f32efb2aa98343a2855c60442e10a23e,sentry-trace_id=ac4175f497b9437590923ae240481840&quot;,<br />
&quot;cookie&quot;: _get_list_cookie(),<br />
&quot;priority&quot;: &quot;u=1, i&quot;,<br />
&quot;referer&quot;: f&quot;{page_url}?page={page - 1}&quot; if page &gt; 1 else page_url,<br />
&quot;sec-ch-ua&quot;: '&quot;Google Chrome&quot;;v=&quot;129&quot;, &quot;Not=A?Brand&quot;;v=&quot;8&quot;, &quot;Chromium&quot;;v=&quot;129&quot;',<br />
&quot;sec-ch-ua-mobile&quot;: &quot;?0&quot;,<br />
&quot;sec-ch-ua-platform&quot;: '&quot;Windows&quot;',<br />
&quot;sec-fetch-dest&quot;: &quot;empty&quot;,<br />
&quot;sec-fetch-mode&quot;: &quot;cors&quot;,<br />
&quot;sec-fetch-site&quot;: &quot;same-origin&quot;,<br />
&quot;sentry-trace&quot;: &quot;ac4175f497b9437590923ae240481840-869537b6fa5f6e1e&quot;,<br />
&quot;user-agent&quot;: &quot;Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36&quot;,<br />
&quot;x-nextjs-data&quot;: &quot;1&quot;,<br />
}<br />
return h</p>
<p>def _parse_products_from_json_response(data):<br />
&quot;&quot;&quot;从 JSON 响应解析 products&quot;&quot;&quot;<br />
page_props = data.get(&quot;pageProps&quot;) or data.get(&quot;props&quot;, {}).get(&quot;pageProps&quot;, {})<br />
fallback = page_props.get(&quot;fallback&quot;, {})<br />
if not fallback:<br />
return []<br />
first = list(fallback.values())[0]<br />
return first.get(&quot;products&quot;, [])</p>
<p>def fetch_list_json_api(page_url):<br />
&quot;&quot;&quot;<br />
请求 Next.js JSON 接口获取产品列表，支持分页<br />
page_url 如 <a href="https://www.centauro.com.br/nav/marca/newbalance">https://www.centauro.com.br/nav/marca/newbalance</a><br />
转为 <a href="https://www.centauro.com.br/_next/data/1.98.2/nav/marca/newbalance.json?navSlug=...&amp;page=N">https://www.centauro.com.br/_next/data/1.98.2/nav/marca/newbalance.json?navSlug=...&amp;page=N</a><br />
&quot;&quot;&quot;<br />
from urllib.parse import urlparse<br />
json_base = page_url_to_json_base(page_url)<br />
path = urlparse(page_url).path.rstrip(&quot;/&quot;)<br />
nav_slug_qs = _path_to_nav_slug_params(path)<br />
all_products = []<br />
page = 1<br />
total_pages = 1<br />
while page &lt;= total_pages:<br />
if nav_slug_qs:<br />
url = f&quot;{json_base}?{nav_slug_qs}&amp;page={page}&quot; if page &gt; 1 else f&quot;{json_base}?{nav_slug_qs}&quot;<br />
else:<br />
url = f&quot;{json_base}?page={page}&quot; if page &gt; 1 else json_base<br />
print(f&quot;请求第 {page} 页: {url}&quot;)<br />
req = urllib.request.Request(url, headers=_get_list_json_headers(json_base, page, page_url))<br />
try:<br />
with urlopen_with_403_retry(req, desc=f&quot;第{page}页&quot;) as r:<br />
data = json.loads(r.read().decode())<br />
except HTTPError as e:<br />
if all_products and e.code == 404:<br />
print(f&quot;  第 {page} 页 404，返回已获取的 {len(all_products)} 个产品&quot;)<br />
return all_products<br />
raise<br />
products = _parse_products_from_json_response(data)<br />
if not products:<br />
print(f&quot;  第 {page} 页无产品，停止&quot;)<br />
break<br />
all_products.extend(products)<br />
print(f&quot;  第 {page} 页获取 {len(products)} 个产品，累计 {len(all_products)}&quot;)<br />
if page == 1:<br />
fallback = data.get(&quot;pageProps&quot;, {}).get(&quot;fallback&quot;, {})<br />
first = list(fallback.values())[0] if fallback else {}<br />
qty = first.get(&quot;quantity&quot;, 0)<br />
per_page = first.get(&quot;productsPerPage&quot;, 36)<br />
total_pages = max(1, (qty + per_page - 1) // per_page)<br />
print(f&quot;共 {qty} 个产品，{total_pages} 页&quot;)<br />
page += 1<br />
if page &lt;= total_pages:<br />
time.sleep(0.5)<br />
return all_products</p>
<p>def fetch_list_page(page_url):<br />
&quot;&quot;&quot;请求 HTML 列表页并解析 <strong>NEXT_DATA__&quot;&quot;&quot;<br />
req = urllib.request.Request(page_url, headers=PAGE_HEADERS)<br />
with urlopen_with_403_retry(req, desc=&quot;列表页&quot;) as r:<br />
html = r.read().decode(&quot;utf-8&quot;, errors=&quot;replace&quot;)<br />
match = re.search(r'&lt;script id=&quot;</strong>NEXT_DATA<strong>&quot;[^&gt;]*&gt;([^&lt;]+)</script>', html)<br />
if not match:<br />
raise ValueError(&quot;未找到 </strong>NEXT_DATA__&quot;)<br />
data = json.loads(match.group(1))<br />
products = _parse_products_from_json_response(data)<br />
if not products:<br />
raise ValueError(&quot;products 为空&quot;)<br />
return products</p>
<p>def load_products_from_json(path=&quot;next_data.json&quot;):<br />
&quot;&quot;&quot;从 JSON 文件加载 products，支持 next_data.json 和 coll.json&quot;&quot;&quot;<br />
with open(path, &quot;r&quot;, encoding=&quot;utf-8&quot;) as f:<br />
data = json.load(f)</p>
<h1>coll.json: pageProps 在顶层</h1>
<pre><code>page_props = data.get("pageProps") or data.get("props", {}).get("pageProps", {})
fallback = page_props.get("fallback", {})
if fallback:
    first = list(fallback.values())[0]
    return first.get("products", [])
return data.get("products", data) if isinstance(data, dict) else []</code></pre>
<p>def save_pairs_to_txt(triples, path=&quot;products.txt&quot;):<br />
&quot;&quot;&quot;写入 (productId,colorId,collection) 到文本，去重后写入&quot;&quot;&quot;<br />
seen = set()<br />
lines = []<br />
for item in triples:<br />
pid = str(item[0]).strip()<br />
cid = str(item[1]).strip() if len(item) &gt; 1 and item[1] else &quot;&quot;<br />
coll = str(item[2]).strip() if len(item) &gt; 2 and item[2] else &quot;&quot;<br />
key = (pid, cid, coll)<br />
if key in seen:<br />
continue<br />
seen.add(key)<br />
lines.append(f&quot;{pid},{cid},{coll}\n&quot;)<br />
with open(path, &quot;w&quot;, encoding=&quot;utf-8&quot;) as f:<br />
f.writelines(lines)<br />
print(f&quot;已写入 {len(lines)} 条（去重后）到 {path}&quot;)</p>
<p>def load_products_from_txt(path=&quot;products.txt&quot;):<br />
&quot;&quot;&quot;从 txt 加载，每行: productId,colorId,collection，加载时去重&quot;&quot;&quot;<br />
triples = []<br />
seen = set()<br />
with open(path, &quot;r&quot;, encoding=&quot;utf-8&quot;) as f:<br />
for line in f:<br />
line = line.strip()<br />
if not line or line.startswith(&quot;#&quot;):<br />
continue<br />
parts = [x.strip() for x in line.split(&quot;,&quot;, 2)]<br />
pid = parts[0]<br />
cid = parts[1] if len(parts) &gt; 1 else &quot;&quot;<br />
coll = parts[2] if len(parts) &gt; 2 else &quot;&quot;<br />
key = (pid, cid, coll)<br />
if key in seen:<br />
continue<br />
seen.add(key)<br />
triples.append((pid, cid, coll))<br />
return triples</p>
<p>def fetch_product_api(product_id, color_id):<br />
&quot;&quot;&quot;请求产品详情 API，403 时自动重试&quot;&quot;&quot;<br />
url = f&quot;{API_BASE}/{product_id}?color={color_id}&quot; if color_id else f&quot;{API_BASE}/{product_id}&quot;<br />
req = urllib.request.Request(url, headers=API_HEADERS)<br />
with urlopen_with_403_retry(req, desc=f&quot;{product_id}?color={color_id}&quot;) as r:<br />
return json.loads(r.read().decode())</p>
<p>def centauro_to_shopify_rows(api_data, collection=&quot;&quot;):<br />
&quot;&quot;&quot;将 Centauro API 响应转为 Shopify CSV 行&quot;&quot;&quot;<br />
prod = api_data.get(&quot;product&quot;, {})<br />
if not prod:<br />
return []</p>
<pre><code>handle = prod.get("code", "")
title = normalize_text(prod.get("name", ""))
body = ""
for attr in api_data.get("attributes", []):
    if attr.get("htmlContent"):
        body = attr["htmlContent"]
        break
if not body:
    body = prod.get("description", "")
body = decode_html_entities(body)
body = normalize_text(body)
vendor = normalize_text(prod.get("brand", ""))
product_type = normalize_text(prod.get("category", ""))
tags = normalize_text(prod.get("collection", ""))
published = "TRUE" if prod.get("isAvailable") else "FALSE"
colour = normalize_text(prod.get("colorInfo", {}).get("description", ""))

images = []
for m in prod.get("visualMedias", []):
    u = m.get("url", "")
    if u and not u.startswith("http"):
        u = "https:" + u
    if u and is_valid_image_url(u):
        images.append(u)
if not images and prod.get("colorVariations"):
    u = prod["colorVariations"][0].get("photoUrl", "")
    if u:
        u = "https:" + u if not u.startswith("http") else u
        if is_valid_image_url(u):
            images.append(u)

sizes = prod.get("sizes", [])
rows = []
for idx, sz in enumerate(sizes):
    desc = sz.get("description", "")
    sku = sz.get("sku", "")
    pi = sz.get("priceInfos", {})
    price = pi.get("promotionalPrice") or pi.get("price") or ""
    if price != "":
        price = str(price)

    row = {
        "Handle": handle,
        "Title": title if idx == 0 else "",
        "Body (HTML)": body if idx == 0 else "",
        "Vendor": vendor if idx == 0 else "",
        "Type": product_type if idx == 0 else "",
        "Tags": tags if idx == 0 else "",
        "Published": published if idx == 0 else "",
        "Option1 Name": "Size",
        "Option1 Value": normalize_text(desc),
        "Option2 Name": "Color",
        "Option2 Value": colour,
        "Variant SKU": sku,
        "Variant Grams": "",
        "Variant Inventory Tracker": "shopify",
        "Variant Inventory Qty": 100,
        "Variant Inventory Policy": "deny",
        "Variant Fulfillment Service": "manual",
        "Variant Price": price,
        "Variant Compare At Price": "",
        "Variant Requires Shipping": "TRUE",
        "Variant Taxable": "TRUE",
        "Variant Barcode": sz.get("ean", ""),
        "Image Src": images[idx % len(images)] if images else "",
        "Image Position": idx + 1,
        "Image Alt Text": title,
        "Gift Card": "FALSE",
        "SEO Title": title if idx == 0 else "",
        "SEO Description": body[:160] if idx == 0 else "",
        "Google Shopping / Google Product Category": "",
        "Google Shopping / Gender": prod.get("gender", "Unisex"),
        "Google Shopping / Age Group": prod.get("ageGroup", "Adult"),
        "Google Shopping / MPN": handle,
        "Google Shopping / AdWords Grouping": "",
        "Google Shopping / AdWords Labels": "",
        "Google Shopping / Condition": "New",
        "Google Shopping / Custom Product": "FALSE",
        "Google Shopping / Custom Label 0": "",
        "Google Shopping / Custom Label 1": "",
        "Google Shopping / Custom Label 2": "",
        "Google Shopping / Custom Label 3": "",
        "Google Shopping / Custom Label 4": "",
        "Variant Image": "",
        "Variant Weight Unit": "kg",
        "Variant Tax Code": "",
        "Cost per item": "",
        "Status": "active",
        "Collection": collection,
    }
    for k in FIELDS:
        if k not in row:
            row[k] = ""
    rows.append(row)
return rows</code></pre>
<p>def process_one(args):<br />
&quot;&quot;&quot;处理单个 (productId, colorId, collection)&quot;&quot;&quot;<br />
global processed_count<br />
product_id, color_id, collection = args[0], args[1], args[2] if len(args) &gt; 2 else &quot;&quot;<br />
try:<br />
if not color_id:<br />
color_id = &quot;&quot;<br />
data = fetch_product_api(product_id, color_id)<br />
rows = centauro_to_shopify_rows(data, collection)<br />
for row in rows:<br />
write_queue.put(row)<br />
with counter_lock:<br />
processed_count += 1<br />
print(f&quot;Progress: {processed_count}/{total_tasks} - {product_id}?color={color_id}&quot;)<br />
except HTTPError as e:<br />
print(f&quot;HTTP Error {product_id}?color={color_id}: {e.code}&quot;)<br />
except Exception as e:<br />
print(f&quot;Error {product_id}?color={color_id}: {e}&quot;)</p>
<p>def writer_thread():<br />
&quot;&quot;&quot;CSV 写入线程，按 Collection 分文件写入：{collection}.csv&quot;&quot;&quot;<br />
files = {}<br />
writers = {}<br />
while True:<br />
row = write_queue.get()<br />
if row is None:<br />
break<br />
coll = row.get(&quot;Collection&quot;, &quot;&quot;) or &quot;_default&quot;<br />
if coll not in writers:<br />
f = open(f&quot;{coll}.csv&quot;, &quot;w&quot;, newline=&quot;&quot;, encoding=&quot;utf-8&quot;)<br />
w = csv.DictWriter(f, fieldnames=FIELDS)<br />
w.writeheader()<br />
files[coll] = f<br />
writers[coll] = w<br />
writers[coll].writerow(row)<br />
files[coll].flush()<br />
write_queue.task_done()<br />
for f in files.values():<br />
f.close()</p>
<p>def _fetch_one_collection(page_url):<br />
&quot;&quot;&quot;获取单个集合的列表，返回 [(pid, cid, coll), ...]&quot;&quot;&quot;<br />
coll = url_to_collection_name(page_url)<br />
try:<br />
print(f&quot;\n正在采集集合 [{coll}]: {page_url}&quot;)<br />
products = fetch_list_json_api(page_url)<br />
pairs = extract_product_color_pairs(products)<br />
if pairs:<br />
print(f&quot;  [{coll}] 解析到 {len(pairs)} 个组合&quot;)<br />
return [(pid, cid, coll) for pid, cid in pairs]<br />
except (HTTPError, URLError, ValueError) as e:<br />
print(f&quot;  [{coll}] 获取失败: {e}&quot;)<br />
return []</p>
<p>def main():<br />
global total_tasks<br />
parser = argparse.ArgumentParser(description=&quot;Centauro 产品采集&quot;)<br />
parser.add_argument(&quot;-f&quot;, &quot;--from-file&quot;, nargs=&quot;?&quot;, const=&quot;products.txt&quot;, metavar=&quot;FILE&quot;,<br />
help=&quot;直接从文本文件读取并采集，跳过列表获取（默认 products.txt）&quot;)<br />
parser.add_argument(&quot;-n&quot;, &quot;--no-verify-proxy&quot;, action=&quot;store_true&quot;, help=&quot;跳过代理验证（验证失败时可使用）&quot;)<br />
args = parser.parse_args()</p>
<pre><code>if PROXIES:
    print(f"使用代理: {PROXY_ADDR}，每线程独立 IP，403 时自动换连接重试")
    if not args.no_verify_proxy and not verify_proxy():
        print("请检查代理配置，或使用 -n 跳过验证后重试")
        return
    elif args.no_verify_proxy:
        print("已跳过代理验证")

list_file = args.from_file if args.from_file else "products.txt"
triples = []

if args.from_file:
    # 直接从文本读取并采集
    try:
        triples = load_products_from_txt(list_file)
        print(f"从 {list_file} 加载 {len(triples)} 个组合，直接开始采集")
    except FileNotFoundError:
        print(f"文件不存在: {list_file}")
        return
else:
    # 1. 多线程获取各集合列表
    list_workers = min(30, len(COLLECTION_URLS))
    with ThreadPoolExecutor(max_workers=list_workers) as ex:
        for future in as_completed([ex.submit(_fetch_one_collection, url) for url in COLLECTION_URLS]):
            triples.extend(future.result())

    # 2. 若无则尝试 coll.json / next_data.json（collection 为空）
    if not triples:
        for name in ("coll.json", "next_data.json"):
            try:
                products = load_products_from_json(name)
                pairs = extract_product_color_pairs(products)
                if pairs:
                    for pid, cid in pairs:
                        triples.append((pid, cid, ""))
                    print(f"从 {name} 解析到 {len(pairs)} 个组合")
                    break
            except FileNotFoundError:
                continue
            except Exception as e:
                print(f"{name} 解析失败: {e}")

    if triples:
        save_pairs_to_txt(triples, list_file)

    # 3. 从 products.txt 加载
    try:
        triples = load_products_from_txt(list_file)
        print(f"\n从 {list_file} 加载 {len(triples)} 个组合用于采集")
    except FileNotFoundError:
        if not triples:
            pass
        else:
            raise

if not triples:
    print("无产品可采集。请确保网络可访问 www.centauro.com.br")
    return

total_tasks = len(triples)
print(f"开始采集，共 {total_tasks} 个任务，每个集合输出单独 CSV 文件")

writer = threading.Thread(target=writer_thread)
writer.start()

with ThreadPoolExecutor(max_workers=30) as ex:
    for future in as_completed([ex.submit(process_one, t) for t in triples]):
        try:
            future.result()
        except Exception as e:
            print(f"Task error: {e}")
    time.sleep(0.5)

write_queue.put(None)
writer.join()
print(f"完成，处理 {processed_count}/{total_tasks}，已按 Collection 分文件写入")</code></pre>
<p>if <strong>name</strong> == &quot;<strong>main</strong>&quot;:<br />
main()</p>]]></description>
    <pubDate>Fri, 27 Feb 2026 18:39:33 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=114</guid>
</item>
<item>
    <title>carrefour_cj</title>
    <link>https://docs.colyoy.cn/?post=113</link>
    <description><![CDATA[<pre><code># -*- coding: utf-8 -*-
"""
Carrefour Brasil 产品采集脚本 - mercado.carrefour.com.br
流程: 读取分类配置 -&gt; 请求分类列表 API -&gt; 解析产品链接 -&gt; 写入 products.txt -&gt; 采集产品详情 -&gt; 输出 Shopify CSV
每个产品写入两个 Collection 值：一级分类 + 二级分类
"""
import html
import json
import csv
import os
import sys
import re
import time
import urllib.request
from urllib.error import HTTPError, URLError
from urllib.parse import quote
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
import unicodedata

# Windows 控制台 UTF-8
try:
    sys.stdout.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
    pass

# Shopify CSV 字段
FIELDS = [
    "Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
    "Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
    "Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
    "Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
    "Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
    "Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
    "Image Src", "Image Position", "Image Alt Text", "Gift Card",
    "SEO Title", "SEO Description", "Google Shopping / Google Product Category",
    "Google Shopping / Gender", "Google Shopping / Age Group",
    "Google Shopping / MPN", "Google Shopping / AdWords Grouping",
    "Google Shopping / AdWords Labels", "Google Shopping / Condition",
    "Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
    "Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
    "Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
    "Variant Image", "Variant Weight Unit", "Variant Tax Code",
    "Cost per item", "Status", "Collection"
]

BASE_URL = "https://mercado.carrefour.com.br"
CATEGORY_CONFIG = "carrefour_categories.json"
PRODUCTS_TXT = "carrefour_products.txt"
OUTPUT_DIR = "carrefour_output"

# 硬编码请求头（不含 cookie，避免 latin-1 编码问题）
PAGE_HEADERS = {
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
    "accept-language": "en-US,en;q=0.9,zh;q=0.8",
    "cache-control": "max-age=0",
    "sec-ch-ua": '"Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"',
    "sec-ch-ua-mobile": "?0",
    "sec-ch-ua-platform": '"Windows"',
    "sec-fetch-dest": "document",
    "sec-fetch-mode": "navigate",
    "sec-fetch-site": "none",
    "sec-fetch-user": "?1",
    "upgrade-insecure-requests": "1",
    "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
}

write_queue = Queue()
counter_lock = threading.Lock()
processed_count = 0
total_tasks = 0
test_mode = False

MAX_403_RETRIES = 3
RETRY_DELAY = 2

def urlopen_with_403_retry(req, timeout=30, desc=""):
    """带 403 重试的 urlopen"""
    last_err = None
    for attempt in range(MAX_403_RETRIES + 1):
        try:
            return urllib.request.urlopen(req, timeout=timeout)
        except HTTPError as e:
            last_err = e
            if e.code == 403 and attempt &lt; MAX_403_RETRIES:
                wait = RETRY_DELAY * (attempt + 1)
                print(f"  403 重试 {attempt + 1}/{MAX_403_RETRIES}，{wait}s 后重试{f': {desc}' if desc else ''}")
                time.sleep(wait)
            else:
                raise
    raise last_err

def decode_html_entities(text):
    """解码 HTML 实体"""
    if not text:
        return ""
    s = str(text)
    for _ in range(2):
        s = html.unescape(s)
    return s

def normalize_text(text):
    """处理特殊字符"""
    if not text:
        return ""
    normalized = unicodedata.normalize("NFKD", str(text))
    normalized = "".join(c for c in normalized if not unicodedata.combining(c))
    return normalized

def load_categories(config_path=CATEGORY_CONFIG):
    """加载分类配置，返回 [(slug1, slug2, level1, level2), ...]"""
    try:
        with open(config_path, "r", encoding="utf-8") as f:
            data = json.load(f)
    except FileNotFoundError:
        return _parse_categories_from_fenlei_json()

    cats = []
    for c in data.get("categories", []):
        cats.append((
            c["slug1"],
            c["slug2"],
            c["level1"],
            c["level2"]
        ))
    return cats

def _parse_categories_from_fenlei_json(path="carrefourfenlei.json"):
    """
    从 carrefourfenlei.json 解析 /categoria/xxx/yyy 格式的二级分类
    返回 [(slug1, slug2, level1, level2), ...]
    """
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
    except FileNotFoundError:
        return []

    text = json.dumps(data)
    # 匹配 /categoria/slug1/slug2 或 /categoria/slug1/slug2/...
    pattern = r'mercado\.carrefour\.com\.br/categoria/([a-z0-9\-]+)/([a-z0-9\-]+)(?:[?/#]|$)'
    seen = set()
    cats = []
    for m in re.finditer(pattern, text):
        s1, s2 = m.group(1), m.group(2)
        if (s1, s2) in seen:
            continue
        seen.add((s1, s2))
        # slug 转显示名: higiene-e-perfumaria -&gt; Higiene e Perfumaria
        def slug_to_name(s):
            return s.replace("-", " ").title()
        cats.append((s1, s2, slug_to_name(s1), slug_to_name(s2)))
    return cats

PAGE_SIZE = 60  # 每页产品数

def category_data_url(slug1, slug2, page=0):
    """分类列表 .data API URL。第一页不带 page，第二页起 page=2,3,...；count 控制每页数量"""
    path = f"/categoria/{slug1}/{slug2}.data"
    routes = "_routes=layout%2Fdefault%2Croutes%2Fcategory-search"
    if page == 0:
        params = f"sort=orders_desc&amp;count={PAGE_SIZE}&amp;{routes}"
    else:
        params = f"sort=orders_desc&amp;page={page + 1}&amp;count={PAGE_SIZE}&amp;{routes}"
    return f"{BASE_URL}{path}?{params}"

def _collect_product_slugs_from_tree(obj, slugs):
    """从解析后的树中递归收集产品 slug（link 中 /slug/p 格式），放宽最小长度以捕获更多"""
    exclude = ("categoria", "colecao", "drogaria", "busca", "facets", "layout")
    # 最小 5 字符，避免漏抓短 slug
    slug_pat = r'(?:mercado\.carrefour\.com\.br/|/)([a-z0-9][a-z0-9\-]{4,}?)(?:/p\.data|/p)(?=["\'\s?#]|$)'
    if isinstance(obj, str):
        for m in re.finditer(slug_pat, obj):
            s = m.group(1).strip()
            if not any(e in s for e in exclude):
                slugs.add(s)
        return
    if isinstance(obj, dict):
        for k, v in obj.items():
            if k in ("link", "href", "url") and isinstance(v, str) and "/p" in v:
                for m in re.finditer(r'/([a-z0-9][a-z0-9\-]{4,}?)(?:/p\.data|/p)', v):
                    s = m.group(1)
                    if not any(e in s for e in exclude):
                        slugs.add(s)
            _collect_product_slugs_from_tree(v, slugs)
    elif isinstance(obj, list):
        for item in obj:
            _collect_product_slugs_from_tree(item, slugs)

def extract_product_slugs_from_json(data):
    """
    从分类 API 返回的 JSON 中提取产品 slug
    支持 RSC 解析 + 正则兜底
    """
    slugs = set()
    exclude = ("categoria", "colecao", "drogaria", "busca", "facets", "layout")

    if isinstance(data, list):
        root = _resolve_rsc_value(data, 0)
        _collect_product_slugs_from_tree(root, slugs)

    text = json.dumps(data) if isinstance(data, (dict, list)) else str(data)
    for m in re.finditer(r'(?:mercado\.carrefour\.com\.br/|["\']/)([a-z0-9][a-z0-9\-]{4,}?)(?:/p\.data|/p)(?=["\'\s?#]|$)', text, re.I):
        s = m.group(1).strip()
        if not any(e in s for e in exclude):
            slugs.add(s)

    return list(slugs)

def fetch_category_page(slug1, slug2, page):
    """请求分类列表页 .data API"""
    url = category_data_url(slug1, slug2, page)
    req = urllib.request.Request(url, headers=PAGE_HEADERS)
    with urlopen_with_403_retry(req, desc=f"分类 page={page}") as r:
        return json.loads(r.read().decode("utf-8", errors="replace"))

def fetch_all_products_from_category(slug1, slug2, level1, level2, max_products=None):
    """获取分类下产品 slug，返回 [(slug, coll1, coll2, slug1, slug2), ...]。max_products 为 None 时获取全部"""
    results = []
    seen = set()
    page = 0

    while True:
        try:
            data = fetch_category_page(slug1, slug2, page)
            slugs = extract_product_slugs_from_json(data)
            if not slugs:
                if page == 0:
                    print(f"  警告: 分类 {level1}/{level2} 第 0 页无产品")
                break

            added = 0
            for slug in slugs:
                if slug not in seen:
                    seen.add(slug)
                    results.append((slug, level1, level2, slug1, slug2))
                    added += 1
                    if max_products and len(results) &gt;= max_products:
                        break
            print(f"  [{level1}/{level2}] 第 {page} 页: {len(slugs)} 个产品，新增 {added}，累计 {len(results)}")
            if max_products and len(results) &gt;= max_products:
                break
            if added == 0 and len(slugs) &gt; 0:
                print(f"  第 {page} 页全为重复，停止翻页")
                break
            page += 1
            time.sleep(0.5)

        except (HTTPError, URLError, json.JSONDecodeError) as e:
            print(f"  分类 {level1}/{level2} 第 {page} 页失败: {e}")
            break

    return results

def product_data_url(slug):
    """产品详情 .data API URL"""
    # slug 可能已编码，确保正确
    encoded = quote(slug, safe="-")
    return f"{BASE_URL}/{encoded}/p.data"

def fetch_product_data(slug):
    """请求产品详情 .data API"""
    url = product_data_url(slug)
    req = urllib.request.Request(url, headers=PAGE_HEADERS)
    with urlopen_with_403_retry(req, desc=slug) as r:
        return json.loads(r.read().decode("utf-8", errors="replace"))

def _resolve_rsc_value(arr, idx, visited=None):
    """
    RSC 格式解析：arr 为扁平数组，dict {"_N": M} 表示 key=arr[N], value=arr[M]
    """
    if visited is None:
        visited = set()
    if idx in visited or idx &lt; 0 or idx &gt;= len(arr):
        return None
    visited.add(idx)
    val = arr[idx]

    if isinstance(val, dict):
        result = {}
        for k, v in val.items():
            if k.startswith("_") and isinstance(v, int):
                try:
                    key_idx = int(k[1:])
                except (ValueError, TypeError):
                    continue
                if 0 &lt;= key_idx &lt; len(arr):
                    key_val = arr[key_idx]
                    val_resolved = _resolve_rsc_value(arr, v, visited.copy())
                    if isinstance(key_val, str):
                        result[key_val] = val_resolved
        return result
    elif isinstance(val, list):
        result = []
        for i in val:
            if isinstance(i, int) and 0 &lt;= i &lt; len(arr):
                result.append(_resolve_rsc_value(arr, i, visited.copy()))
        return result
    return val

def _get_product_from_resolved(root):
    """从解析后的树中获取 product 对象：routes.$productSlug.p.data.product"""
    def find_product(obj):
        if isinstance(obj, dict):
            if "product" in obj and isinstance(obj["product"], dict):
                return obj["product"]
            for v in obj.values():
                r = find_product(v)
                if r:
                    return r
        elif isinstance(obj, list):
            for item in obj:
                r = find_product(item)
                if r:
                    return r
        return None

    return find_product(root or {})

def parse_carrefour_product(data, slug="", collection1="", collection2="", coll_slug=""):
    """
    从 Carrefour .data API 响应解析产品，转为 Shopify CSV 行
    使用 RSC 解析：product 在 routes.$productSlug.p.data.product
    """
    collection = f"{collection1},{collection2}" if collection1 and collection2 else (collection1 or collection2 or "")

    product = None
    if isinstance(data, list):
        root = _resolve_rsc_value(data, 0)
        product = _get_product_from_resolved(root)

    if not product or not isinstance(product, dict):
        cs = coll_slug or f"{_coll_to_slug(collection1)}_{_coll_to_slug(collection2)}"
        return [{"Handle": slug[:100] or "product", "Title": "Parse Error", "Collection": collection, "_coll_slug": cs}]

    title = (product.get("productName") or product.get("name") or "").strip()
    title = decode_html_entities(normalize_text(title)) or "Unknown"

    body = (product.get("description") or "").strip()
    body = decode_html_entities(normalize_text(body))

    vendor = (product.get("brand") or "").strip()
    vendor = decode_html_entities(normalize_text(vendor))

    price = ""
    compare_price = ""
    sku = ""
    items = product.get("items") or []
    if items:
        item = items[0]
        sku = str(item.get("itemId") or product.get("productId") or "")
        sellers = item.get("sellers") or []
        if sellers:
            offer = sellers[0].get("commertialOffer") or {}
            p = offer.get("Price") or offer.get("calculatedPrice")
            if p is not None:
                price = str(p)
            lp = offer.get("ListPrice") or offer.get("calculatedListPrice")
            if lp is not None:
                compare_price = str(lp)

    def _collect_image_urls(obj, out):
        if isinstance(obj, dict):
            for key in ("imageUrl", "imageTag", "url", "src"):
                val = obj.get(key)
                if isinstance(val, str):
                    u = val
                    if "src=" in u:
                        mm = re.search(r'src=["\']([^"\']+)["\']', u)
                        if mm:
                            u = mm.group(1)
                    if "vtexassets" in u and u not in out:
                        out.append(u)
            for v in obj.values():
                _collect_image_urls(v, out)
        elif isinstance(obj, list):
            for x in obj:
                _collect_image_urls(x, out)

    images = []
    _collect_image_urls(product, images)

    if not images:
        text = json.dumps(product)
        for m in re.finditer(r'https?://[^"\'?\s]*vtexassets[^"\'?\s]+', text):
            url = m.group(0).rstrip("\\")
            if url not in images and ("arquivos" in url or "images" in url or any(ext in url for ext in [".jpg", ".jpeg", ".png", ".webp"])):
                images.append(url)

    handle = slug[:100] if slug else (re.sub(r"[^\w\-]", "-", slug_from_title(title)).strip("-")[:100] if title else "product")

    base_row = {
        "Handle": handle or "product",
        "Title": title,
        "Body (HTML)": body,
        "Vendor": vendor,
        "Type": collection2 or collection1 or "",
        "Tags": "",
        "Published": "TRUE",
        "_coll_slug": coll_slug or f"{_coll_to_slug(collection1)}_{_coll_to_slug(collection2)}",
        "Option1 Name": "",
        "Option1 Value": "",
        "Option2 Name": "",
        "Option2 Value": "",
        "Option3 Name": "",
        "Option3 Value": "",
        "Variant SKU": sku,
        "Variant Grams": "",
        "Variant Inventory Tracker": "shopify",
        "Variant Inventory Qty": 100,
        "Variant Inventory Policy": "deny",
        "Variant Fulfillment Service": "manual",
        "Variant Price": price,
        "Variant Compare At Price": compare_price,
        "Variant Requires Shipping": "TRUE",
        "Variant Taxable": "TRUE",
        "Variant Barcode": "",
        "Image Src": "",
        "Image Position": 1,
        "Image Alt Text": title,
        "Gift Card": "FALSE",
        "SEO Title": title,
        "SEO Description": (body[:160] if body else ""),
        "Google Shopping / Google Product Category": "",
        "Google Shopping / Gender": "",
        "Google Shopping / Age Group": "",
        "Google Shopping / MPN": handle,
        "Google Shopping / AdWords Grouping": "",
        "Google Shopping / AdWords Labels": "",
        "Google Shopping / Condition": "New",
        "Google Shopping / Custom Product": "FALSE",
        "Google Shopping / Custom Label 0": "",
        "Google Shopping / Custom Label 1": "",
        "Google Shopping / Custom Label 2": "",
        "Google Shopping / Custom Label 3": "",
        "Google Shopping / Custom Label 4": "",
        "Variant Image": "",
        "Variant Weight Unit": "kg",
        "Variant Tax Code": "",
        "Cost per item": "",
        "Status": "active",
        "Collection": collection,
    }
    for k in FIELDS:
        if k not in base_row:
            base_row[k] = ""

    rows = []
    if images:
        for pos, url in enumerate(images, 1):
            row = dict(base_row)
            row["Image Src"] = url
            row["Image Position"] = pos
            if pos &gt; 1:
                row["Title"] = ""
                row["Body (HTML)"] = ""
                row["Vendor"] = ""
                row["Variant Price"] = ""
                row["Variant Compare At Price"] = ""
                row["Variant SKU"] = ""
            rows.append(row)
    else:
        rows.append(base_row)
    return rows

def slug_from_title(title):
    """从标题生成 handle"""
    s = unicodedata.normalize("NFKD", str(title))
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = re.sub(r"[^\w\s\-]", "", s)
    return re.sub(r"[-\s]+", "-", s).strip("-").lower()

def _coll_to_slug(name):
    """分类名转文件名用 slug：Higiene e Perfumaria -&gt; higiene-e-perfumaria"""
    if not name:
        return "default"
    s = unicodedata.normalize("NFKD", str(name))
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = re.sub(r"[^\w\s\-]", "", s)
    s = re.sub(r"[-\s]+", "-", s).strip("-").lower()
    return s or "default"

def save_products_txt(tuples, path=None):
    """写入 (slug, coll1, coll2, slug1, slug2) 到文本"""
    if path is None:
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        path = os.path.join(OUTPUT_DIR, PRODUCTS_TXT)
    seen = set()
    lines = []
    for t in tuples:
        slug, c1, c2 = t[0], t[1], t[2]
        cs1 = t[3] if len(t) &gt; 3 else _coll_to_slug(c1)
        cs2 = t[4] if len(t) &gt; 4 else _coll_to_slug(c2)
        key = (slug, c1, c2)
        if key in seen:
            continue
        seen.add(key)
        lines.append(f"{slug},{c1},{c2},{cs1},{cs2}\n")
    with open(path, "w", encoding="utf-8") as f:
        f.writelines(lines)
    print(f"已写入 {len(lines)} 条到 {path}")

def load_products_txt(path=None):
    """从 txt 加载 (slug, coll1, coll2, slug1, slug2)，兼容旧格式"""
    if path is None:
        path = os.path.join(OUTPUT_DIR, PRODUCTS_TXT)
    triples = []
    seen = set()
    try:
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith("#"):
                    continue
                parts = [x.strip() for x in line.split(",", 4)]
                slug = parts[0]
                c1 = parts[1] if len(parts) &gt; 1 else ""
                c2 = parts[2] if len(parts) &gt; 2 else ""
                cs1 = parts[3] if len(parts) &gt; 3 else _coll_to_slug(c1)
                cs2 = parts[4] if len(parts) &gt; 4 else _coll_to_slug(c2)
                key = (slug, c1, c2)
                if key in seen:
                    continue
                seen.add(key)
                triples.append((slug, c1, c2, cs1, cs2))
    except FileNotFoundError:
        pass
    return triples

def process_one(args):
    """处理单个产品 (slug, coll1, coll2, slug1, slug2)。404 或异常状态码时跳过"""
    global processed_count
    slug = args[0]
    coll1 = args[1] if len(args) &gt; 1 else ""
    coll2 = args[2] if len(args) &gt; 2 else ""
    cs1 = args[3] if len(args) &gt; 3 else _coll_to_slug(coll1)
    cs2 = args[4] if len(args) &gt; 4 else _coll_to_slug(coll2)
    try:
        data = fetch_product_data(slug)
        rows = parse_carrefour_product(data, slug=slug, collection1=coll1, collection2=coll2, coll_slug=f"{cs1}_{cs2}")
        for row in rows:
            write_queue.put(row)
        with counter_lock:
            processed_count += 1
            print(f"Progress: {processed_count}/{total_tasks} - {slug[:50]}")
    except HTTPError as e:
        print(f"跳过 {slug[:50]} (HTTP {e.code})")
    except URLError as e:
        print(f"跳过 {slug[:50]} (网络异常)")
    except Exception as e:
        print(f"跳过 {slug[:50]} ({e})")

def writer_thread():
    """CSV 写入线程。测试模式：全部写入一个文件；否则按 coll_slug 分文件。所有文件输出到 OUTPUT_DIR"""
    global test_mode
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    files = {}
    writers = {}
    single_file = "carrefour_test.csv" if test_mode else None

    while True:
        row = write_queue.get()
        if row is None:
            break
        if single_file:
            coll = "_single"
        else:
            coll = row.get("_coll_slug", "") or "default"
        if coll not in writers:
            fname = single_file if single_file else f"carrefour_{coll}.csv"
            fpath = os.path.join(OUTPUT_DIR, fname)
            f = open(fpath, "w", newline="", encoding="utf-8")
            w = csv.DictWriter(f, fieldnames=FIELDS)
            w.writeheader()
            files[coll] = f
            writers[coll] = w
        out_row = {k: v for k, v in row.items() if k in FIELDS}
        writers[coll].writerow(out_row)
        files[coll].flush()
        write_queue.task_done()
    for f in files.values():
        f.close()

def main():
    global total_tasks, test_mode
    list_only = "--list-only" in sys.argv or "-l" in sys.argv
    test_mode = "--test" in sys.argv or "-t" in sys.argv
    from_file = "--from-file" in sys.argv or "-f" in sys.argv
    per_category = 2 if test_mode else None

    # --only Corpo 只处理指定二级分类
    only_cat = None
    for i, arg in enumerate(sys.argv):
        if arg in ("--only", "-o") and i + 1 &lt; len(sys.argv):
            only_cat = sys.argv[i + 1].strip()
            break

    triples = []
    all_products = []

    if from_file:
        triples = load_products_txt()
        if not triples:
            triples = load_products_txt(path=PRODUCTS_TXT)
        if not triples:
            print(f"无产品。请确保 {OUTPUT_DIR}/{PRODUCTS_TXT} 或 {PRODUCTS_TXT} 存在且有内容。")
            return
        if only_cat:
            triples = [t for t in triples if (t[2] if len(t) &gt; 2 else "").lower() == only_cat.lower()]
            if not triples:
                print(f"文本中无 {only_cat} 分类的产品")
                return
            print(f"从文本加载 {len(triples)} 个 {only_cat} 产品，直接采集")
        else:
            print(f"从文本加载 {len(triples)} 个产品，直接采集")
    else:
        categories = load_categories()
        if only_cat:
            categories = [(s1, s2, l1, l2) for s1, s2, l1, l2 in categories if l2.lower() == only_cat.lower()]
            if not categories:
                print(f"未找到分类: {only_cat}")
                return
            print(f"仅处理: {only_cat} ({len(categories)} 个)")
        else:
            print(f"加载 {len(categories)} 个分类")
        if test_mode:
            print("[--test] 测试模式：每个分类采集 2 个产品，输出到 carrefour_output/carrefour_test.csv")

        for slug1, slug2, level1, level2 in categories:
            print(f"\n采集分类: {level1} / {level2}")
            products = fetch_all_products_from_category(slug1, slug2, level1, level2, max_products=per_category)
            all_products.extend(products)

        if all_products:
            save_products_txt(all_products)

        if list_only:
            print(f"\n[--list-only] 仅获取列表，已保存到 {OUTPUT_DIR}/{PRODUCTS_TXT}，共 {len(all_products)} 条")
            return

        triples = load_products_txt()
        if not triples and all_products:
            triples = all_products

    if not triples:
        print("无产品可采集。请检查分类配置或网络。")
        return

    total_tasks = len(triples)
    print(f"\n开始采集产品详情，共 {total_tasks} 个，每个产品写入两个 Collection（一级+二级）")

    writer = threading.Thread(target=writer_thread)
    writer.start()

    with ThreadPoolExecutor(max_workers=20) as ex:
        for future in as_completed([ex.submit(process_one, t) for t in triples]):
            try:
                future.result()
            except Exception as e:
                print(f"Task error: {e}")
        time.sleep(0.5)

    write_queue.put(None)
    writer.join()
    if test_mode:
        print(f"完成，处理 {processed_count}/{total_tasks}，已写入 carrefour_output/")
    else:
        print(f"完成，处理 {processed_count}/{total_tasks}，已按 Collection 分文件写入 carrefour_output/")

if __name__ == "__main__":
    main()
</code></pre>]]></description>
    <pubDate>Fri, 27 Feb 2026 18:34:15 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=113</guid>
</item>
<item>
    <title>命令</title>
    <link>https://docs.colyoy.cn/?post=112</link>
    <description><![CDATA[<p>[该文章已设置加密]</p>]]></description>
    <pubDate>Tue, 20 Jan 2026 18:02:51 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=112</guid>
</item>
<item>
    <title>ipapi-local</title>
    <link>https://docs.colyoy.cn/?post=111</link>
    <description><![CDATA[<p>修改配置步骤<br />
编辑配置文件：<br />
sudo nano /opt/ip-api-proxy/config<br />
重启服务使配置生效：<br />
sudo systemctl restart ip-api-proxy<br />
检查服务状态：<br />
sudo systemctl status ip-api-proxy<br />
详细文档</p>]]></description>
    <pubDate>Thu, 18 Dec 2025 21:04:18 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=111</guid>
</item>
<item>
    <title>店匠斗篷cloak-xkt使用教程</title>
    <link>https://docs.colyoy.cn/?post=110</link>
    <description><![CDATA[<h1>安装连接</h1>
<p>:::note<br />
/admin/smart_apps/angora/app_store/plugins/156513?client_id=Av5Rd-f-Ejkx7d_smGiz-R40124QNIIl0X7bYTODzzA&amp;scope=read_shop&amp;redirect_uri=<a href="https://cloakfront.xktsystem.com/oauth_sdk/redirect_uri&amp;response_type=code">https://cloakfront.xktsystem.com/oauth_sdk/redirect_uri&amp;response_type=code</a><br />
:::<br />
比如店铺是</p>
<hr />
<p><a href="https://xxxvip.myshoplaza.com/">https://xxxvip.myshoplaza.com/</a></p>
<hr />
<p>那么安装链接就是：<a href="https://xxxvip.myshoplaza.com/admin/smart_apps/angora/app_store/plugins/156513?client_id=Av5Rd-f-Ejkx7d_smGiz-R40124QNIIl0X7bYTODzzA&amp;scope=read_shop&amp;redirect_uri=https://cloakfront.xktsystem.com/oauth_sdk/redirect_uri&amp;response_type=code">https://xxxvip.myshoplaza.com/admin/smart_apps/angora/app_store/plugins/156513?client_id=Av5Rd-f-Ejkx7d_smGiz-R40124QNIIl0X7bYTODzzA&amp;scope=read_shop&amp;redirect_uri=https://cloakfront.xktsystem.com/oauth_sdk/redirect_uri&amp;response_type=code</a></p>
<hr />
<h1>配置教程</h1>
<p>安装好了之后进入斗篷，确保如下图是开启状态，没事不要关闭！</p>
<p><img src="https://docs.colyoy.cn/content/uploadfile/202512/d9be1765605653.png" alt="" /></p>
<p>白名单是允许哪些国家访问，黑名单是禁止哪些国家访问.<br />
<img src="https://docs.colyoy.cn/content/uploadfile/202512/ddc41765605696.png" alt="" /><br />
:::warning<br />
拒绝跳转url是店匠的域名,是ph站的域名，默认斗篷是安装在fp站的，比如puhuoxxx.myshoplaza.com，这种不要带https还有后面的/<br />
:::<br />
<img src="https://docs.colyoy.cn/content/uploadfile/202512/6c041765605832.png" alt="" /></p>
<p><img src="https://docs.colyoy.cn/content/uploadfile/202512/6b9d1765605855.png" alt="" /></p>]]></description>
    <pubDate>Sat, 13 Dec 2025 13:56:15 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=110</guid>
</item>
<item>
    <title>worldsxinpay直连支付格式</title>
    <link>https://docs.colyoy.cn/?post=108</link>
    <description><![CDATA[<h2>基本信息</h2>
<ul>
<li><strong>接口地址</strong>：<code>/api/payment</code></li>
<li><strong>请求方式</strong>：<code>POST</code></li>
</ul>
<h2>📌 请求头（Headers）说明</h2>
<p>支付接口请求时必须在 HTTP 头中携带以下参数，每个字段都很关键，用于安全校验和身份验证。</p>
<table>
<thead>
<tr>
<th>参数名</th>
<th>说明</th>
<th>必须</th>
<th>格式示例</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Content-Type</strong></td>
<td>请求内容类型，固定为 <code>application/json</code>，表示请求体是 JSON 格式</td>
<td>是</td>
<td><code>application/json</code></td>
</tr>
<tr>
<td><strong>X-API-Key</strong></td>
<td>商户 API 密钥，由支付平台分配。用于标识商户身份并进行接口权限验证。</td>
<td>是</td>
<td><code>ABCD1234EFGH5678</code></td>
</tr>
<tr>
<td><strong>X-User-ID</strong></td>
<td>商户系统在支付平台注册的用户 ID，用于区分不同商户。</td>
<td>是</td>
<td><code>100001</code></td>
</tr>
<tr>
<td><strong>X-Signature</strong></td>
<td>请求签名，用于校验请求的完整性和防篡改。必须使用 HMAC-SHA256 算法结合商户 API Key 生成。</td>
<td>是</td>
<td><code>3f1c2b4a5e6d7f8g9h0i...</code></td>
</tr>
</tbody>
</table>
<hr />
<h3>⚡ 请求头详细说明</h3>
<ol>
<li>
<p><strong>Content-Type</strong></p>
<ul>
<li>必须为 <code>application/json</code></li>
<li>如果填写错误（如 <code>text/plain</code>），服务器会拒绝请求</li>
</ul>
</li>
<li>
<p><strong>X-API-Key</strong></p>
<ul>
<li>商户唯一标识</li>
<li>请妥善保管，不要泄露</li>
<li>与商户后台生成的密钥对应</li>
</ul>
</li>
<li>
<p><strong>X-User-ID</strong></p>
<ul>
<li>与商户账户绑定，用于识别操作请求来源</li>
<li>需要和后台注册信息一致，否则会报 “用户不存在” 错误</li>
</ul>
</li>
<li>
<p><strong>X-Signature</strong></p>
<ul>
<li>签名算法：
<ol>
<li>将请求参数按字母升序排列</li>
<li>转为 JSON 字符串</li>
<li>使用 <strong>HMAC-SHA256</strong> 结合 API Key 生成签名</li>
<li>放入请求头 <code>X-Signature</code></li>
</ol></li>
<li>示例生成（Node.js）：
<pre><code class="language-javascript">const crypto = require('crypto');
const apiKey = 'YOUR_API_KEY';
const payload = JSON.stringify({ order_no: '123', amount: '29.90' });
const signature = crypto.createHmac('sha256', apiKey).update(payload).digest('hex');
console.log(signature);</code></pre></li>
</ul>
</li>
</ol>
<h2>请求参数（JSON Body）</h2>
<table>
<thead>
<tr>
<th>参数名</th>
<th>类型</th>
<th>必须</th>
<th>详细说明</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>order_no</strong></td>
<td>string</td>
<td>是</td>
<td>商户系统生成的唯一订单号，需在商户系统内保持唯一性，用于识别此次支付交易。例如：<code>"ORD202512120001"</code></td>
</tr>
<tr>
<td><strong>amount</strong></td>
<td>string</td>
<td>是</td>
<td>商品总金额，需等于所有商品 price × quantity 的总和。格式建议为字符串并保留两位小数，如 <code>"29.90"</code>。</td>
</tr>
<tr>
<td><strong>other</strong></td>
<td>string</td>
<td>否</td>
<td>SDK 自动生成的加密参数，用于风控校验和额外安全验证。调用 PaymentSDK 时会自动生成，无需手动填写。</td>
</tr>
<tr>
<td><strong>card_no</strong></td>
<td>string</td>
<td>是</td>
<td>信用卡卡号，需提供完整数字，不含空格，例如 <code>"4111111111111111"</code>。</td>
</tr>
<tr>
<td><strong>exp_month</strong></td>
<td>string</td>
<td>是</td>
<td>信用卡有效期月份，格式为两位数字：<code>"01"</code> ~ <code>"12"</code>。</td>
</tr>
<tr>
<td><strong>exp_year</strong></td>
<td>string</td>
<td>是</td>
<td>信用卡有效期年份，4位数字，如 <code>"2025"</code>。</td>
</tr>
<tr>
<td><strong>cvv</strong></td>
<td>string</td>
<td>是</td>
<td>信用卡安全码（Visa/Master 为卡片背面3位，Amex为前面4位）。例如 <code>"123"</code>。</td>
</tr>
<tr>
<td><strong>bill_full_name</strong></td>
<td>string</td>
<td>是</td>
<td>持卡人账单姓名，需与信用卡账单信息一致，如 <code>"James Harpool"</code>。</td>
</tr>
<tr>
<td><strong>bill_phone</strong></td>
<td>string</td>
<td>是</td>
<td>持卡人账单电话，一般为国际规范手机号，如 <code>"9132316070"</code>。</td>
</tr>
<tr>
<td><strong>bill_email</strong></td>
<td>string</td>
<td>是</td>
<td>持卡人账单邮箱，用于支付验证和通知，如 <code>"test@example.com"</code>。</td>
</tr>
<tr>
<td><strong>bill_country</strong></td>
<td>string</td>
<td>是</td>
<td>国家代码（ISO 3166-1 alpha-2），如美国 <code>"US"</code>，中国 <code>"CN"</code>，加拿大 <code>"CA"</code> 等。</td>
</tr>
<tr>
<td><strong>bill_state</strong></td>
<td>string</td>
<td>是</td>
<td>州 / 省，例如 <code>"New Mexico"</code> 或 <code>"CA"</code> 或 <code>"广东"</code>。与账单地址一致即可。</td>
</tr>
<tr>
<td><strong>bill_city</strong></td>
<td>string</td>
<td>是</td>
<td>城市名称，如 <code>"Los Angeles"</code>、<code>"上海"</code>。</td>
</tr>
<tr>
<td><strong>bill_address1</strong></td>
<td>string</td>
<td>是</td>
<td>持卡人账单地址第一行，通常为详细街道地址，如 <code>"316 Don Fernando Street"</code>。</td>
</tr>
<tr>
<td><strong>bill_address2</strong></td>
<td>string</td>
<td>否</td>
<td>账单地址第二行，可选，如公寓号、单元号等，例如 <code>"Unit B"</code>。</td>
</tr>
<tr>
<td><strong>bill_zip</strong></td>
<td>string</td>
<td>是</td>
<td>邮政编码，例如 <code>"87571"</code>、<code>"100000"</code>。需与账单国家对应。</td>
</tr>
<tr>
<td><strong>return_url</strong></td>
<td>string</td>
<td>是</td>
<td>支付完成后同步跳转地址，用户完成支付后浏览器会跳转到此 URL。</td>
</tr>
<tr>
<td><strong>notify_url</strong></td>
<td>string</td>
<td>是</td>
<td>支付结果服务器回调地址（异步通知）。系统会以 POST 请求发送支付结果，请确保地址可访问且能接收 JSON。</td>
</tr>
<tr>
<td><strong>ip_address</strong></td>
<td>string</td>
<td>是</td>
<td>客户端真实 IP 地址，例如 <code>"128.123.56.14"</code>。若服务器代发请求也需传用户 IP。</td>
</tr>
<tr>
<td><strong>GoodsJson</strong></td>
<td>string</td>
<td>是</td>
<td>商品列表 JSON 字符串，包含商品名称、价格、数量。必须与 amount 金额一致。示例：<br><code>"{\"goodsInfo\":[{\"goodsName\":\"Product 1\",\"goodsPrice\":\"19.90\",\"quantity\":1}]}"</code></td>
</tr>
</tbody>
</table>
<h2>商品信息格式（GoodsJson）</h2>
<pre><code class="language-json">{
  "goodsInfo": [
    {
      "goodsName": "商品名称",
      "goodsPrice": "商品价格",
      "quantity": 1
    }
  ]
}</code></pre>
<h2>SDK 自动生成 other 参数示例</h2>
<pre><code class="language-html">&lt;script src="https://js.worldsxinpay.com/payment_sdk_real_jm.js"&gt;&lt;/script&gt;
&lt;script&gt;
async function makePayment() {
    const orderData = {
        "order_no": "TEST123456789",
        "amount": "29.90",
        "return_url": "https://www.example.com/return",
        "notify_url": "https://www.example.com/notify",
        "card_no": "4111111111111111",
        "exp_month": "12",
        "exp_year": "2025",
        "cvv": "123",
        "bill_full_name": "James Harpool",
        "bill_phone": "9132316070",
        "bill_email": "test@example.com",
        "bill_country": "US",
        "bill_state": "New Mexico",
        "bill_city": "Taos",
        "bill_address1": "316 Don Fernando Street",
        "bill_address2": "Unit B",
        "bill_zip": "87571",
        "ip_address": "128.123.56.14",
        "GoodsJson": "{'goodsInfo':[{'goodsName':'Product 1','goodsPrice':'19.90','quantity':1}]}"
    };
    const result = await PaymentSDK.processPayment(orderData);
}
&lt;/script&gt;
</code></pre>
<h2>完整示例请求</h2>
<pre><code class="language-json">{
  "order_no": "TEST123456789",
  "amount": "29.90",
  "return_url": "https://www.example.com/return",
  "notify_url": "https://www.example.com/notify",
  "card_no": "4111111111111111",
  "exp_month": "12",
  "exp_year": "2025",
  "cvv": "123",
  "bill_full_name": "James Harpool",
  "bill_phone": "9132316070",
  "bill_email": "test@example.com",
  "bill_country": "US",
  "bill_state": "New Mexico",
  "bill_city": "Taos",
  "bill_address1": "316 Don Fernando Street",
  "bill_address2": "Unit B",
  "bill_zip": "87571",
  "ip_address": "128.123.56.14",
  "GoodsJson": "{\"goodsInfo\":[{\"goodsName\":\"Product 1\",\"goodsPrice\":\"19.90\",\"quantity\":1},{\"goodsName\":\"Product 2\",\"goodsPrice\":\"10.00\",\"quantity\":1}]}"
}
</code></pre>
<h3>成功</h3>
<pre><code class="language-json">{
  "status": "success",
  "data": {
    "status": "success"
  }
}</code></pre>
<h3>失败</h3>
<pre><code class="language-json">{
  "status": "failed",
  "ErroMsg": "Payment failed",
  "data": {
    "status": "failed",
    "ErroMsg": "Payment failed"
  }
}</code></pre>
<h2>签名算法（Signature）</h2>
<p>将所有请求参数按字母顺序排序</p>
<p>转为 JSON 字符串</p>
<p>使用 HMAC-SHA256 + API Key 生成签名</p>
<p>将签名放入请求头 X-Signature</p>]]></description>
    <pubDate>Fri, 12 Dec 2025 14:29:40 +0800</pubDate>
    <dc:creator>emer</dc:creator>
    <guid>https://docs.colyoy.cn/?post=108</guid>
</item>
</channel>
</rss>