搜索结果

×

搜索结果将在这里显示。

⭐ centauro.py

-- coding: utf-8 --

"""
Centauro 产品采集脚本 - 直接采集列表页产品并写入 Shopify 格式 CSV
流程: 获取列表 -> 写入 products.txt(去重)-> 从 products.txt 读取 -> 采集
支持: 1) 自动请求列表页 2) 从 next_data.json/coll.json 加载 3) 从 products.txt 加载 (productId,colorId)
用法: python centauro.py # 完整流程
python centauro.py -f # 直接从 products.txt 读取并采集(跳过列表获取)
python centauro.py -f xxx.txt # 从指定文件读取并采集
"""
import argparse
import html
import json
import csv
import sys
import re
import time
import urllib.request
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
import unicodedata

import requests

Windows 控制台 UTF-8

try:
sys.stdout.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
pass

Shopify CSV 字段

FIELDS = [
"Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
"Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
"Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
"Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
"Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
"Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
"Image Src", "Image Position", "Image Alt Text", "Gift Card",
"SEO Title", "SEO Description", "Google Shopping / Google Product Category",
"Google Shopping / Gender", "Google Shopping / Age Group",
"Google Shopping / MPN", "Google Shopping / AdWords Grouping",
"Google Shopping / AdWords Labels", "Google Shopping / Condition",
"Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
"Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
"Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
"Variant Image", "Variant Weight Unit", "Variant Tax Code",
"Cost per item", "Status", "Collection"
]

代理配置(青果隧道 HTTP 代理,账密模式),按 https://www.qg.net/doc/1879.html

403 时重试会换连接以触发隧道换 IP;置空则直连

PROXY_ADDR = "overseas-us.tunnel.qg.net:12907"
PROXY_AUTH_KEY = "KT8DL5OM"
PROXY_PASSWORD = "F7BBE43B9A2C"
PROXIES = None
if PROXY_ADDR:
from urllib.parse import quote

青果文档格式: http://user:password@hostname:port,账密需 URL 编码

safe_user = quote(PROXY_AUTH_KEY, safe="")
safe_pass = quote(PROXY_PASSWORD, safe="")
PROXY_URL = f"http://{safe_user}:{safe_pass}@{PROXY_ADDR}"
PROXIES = {"http": PROXY_URL, "https": PROXY_URL}
# 使用 HTTP 协议访问目标,避免 HTTPS CONNECT 隧道(部分代理对 HTTP 转发更稳定)
USE_HTTP = True

else:
USE_HTTP = False

要采集的集合 URL,Collection 名取路径最后一段(如 /nav/marca/newbalance -> newbalance)

COLLECTION_URLS = [
"https://www.centauro.com.br/busca/nike-air-max-90",
"https://www.centauro.com.br/nav/categorias/vestuario",
"https://www.centauro.com.br/nav/esportes/futebol",
]

API_HEADERS = {
"accept": "/",
"accept-language": "en-US,en;q=0.9",
"origin": "https://www.centauro.com.br",
"referer": "https://www.centauro.com.br/",
"sec-ch-ua": '"Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"
}

PAGE_HEADERS = {
"User-Agent": API_HEADERS["user-agent"],
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,/;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}

Next.js JSON 基础路径:https://www.centauro.com.br/_next/data/1.98.1{path}.json

NEXT_DATA_VERSION = "1.98.2"

硬编码请求头(来自 curl,attr_datetime 已替换为非 ASCII 部分避免编码错误)

若存在 cookies.txt,优先从中加载(需替换 attr_datetime 中非 ASCII 避免 latin-1 编码错误)

COOKIE_FILE = "cookies.txt"
LIST_JSON_ACCEPT = "/"
LIST_JSON_ACCEPT_LANG = "en-US,en;q=0.9,zh;q=0.8,de;q=0.7,fi;q=0.6,pl;q=0.5,as;q=0.4,dv;q=0.3,sq;q=0.2,zh-CN;q=0.1"
LIST_JSON_COOKIE = (
"Secure; Secure; experiment-uuid=2695c48f-60c3-4291-bacb-0f514c5cafad; deviceId=744396c0-895a-4536-9834-6be34f847c1e; "
"Host-next-auth.csrf-token-auth=ddce3cc6556dc57422006163935f2f146b420bc7902552d59ed9a065d2c34009%7Cd30c382906626c815ab97cd911b527af640dcc8e6b4c0ff78546ee35813b055c; "
"
Secure-next-auth.callback-url-auth=https%3A%2F%2Fwww.centauro.com.br; attr_utm_source=(direct); attr_utm_medium=(none); AwinChannelCookie=direct; "
"_ga=GA1.1.506227375.1772099113; anonymousid=ca51ba45-4f66-4278-8208-1dc28f75c92a; _fbp=fb.2.1772099113121.1561054124; _tt_enable_cookie=1; ttp=01KJCND4YHAV0GZKHYW3EYAA3D.tt.2; "
"mParticleId=-8067566568244070344; _pm_id=872601772099114766; AdoptVisitorId=GwTgDAZgHCAsCGBaAzAEwEwFZGwOzrESl0xEXQGNUxUR4pkaBTIA; _gcl_au=1.1.1065313785.1772099126; "
"_hjSessionUser_540838=eyJpZCI6IjFkN2IxNDNjLWRjZmMtNWYyMS1hZWJkLTJhZTk1ZjZmNmUyOCIsImNyZWF0ZWQiOjE3NzIwOTkxMTM5ODcsImV4aXN0aW5nIjp0cnVlfQ==; "
"bm_ss=ab8e18ef4e; _pm_sid=823141772172195795; bm_mi=E63575EC751B279EF6E01A0BF1D6127E~YAAQmmAXAsS8Z3WcAQAAh8axnR6PuIi7dfwGt3RbuN9gK1LqsOvxrJNDS5JTJS98xZ1wBut+iFCxHRUVsXB1zzDldyRy3ABZ8y4P2lRCLw+jycCgx23sfrVmls4J0FRHw+rr5yXSoyf0oLfLa+qE72yROUNbaGux5XI+fLC0BvebFmDxsW/uu0+hbp8G8KuP4qglyhBRUZMQJkOu4sf8hXXZwCFTUSdQCCvJDZP710uE/v34+05G1SyDkjvLBHFGFQhnMJCSLtDLWtApq+OKAZArKLyfFd0WZquEtuy1y29hKahfl8uAmRKfqbeazkF1B4HyEj4ICGE34f56fr9qmj4NtyFbuyjX3RXMypF/eY0oV7SwdoCplvD28mGHN621HR0gcl5Xxa9YvebLbCqOyDNO6yvn8w==~1; "
"_abck=188FE50D1D84B7CBD554DDEA6159D7ED~0~YAAQmmAXAgq/Z3WcAQAAStGxnQ/GGktQa9o58pTqniSd9Akvv8qV0mSxAz+3GbMdGNHgznhGOOfUdZoOpVAmrNad2erAmbhbuxFbsIvuJ1zIPVG6NLwVzW1OQFlH9xQgN3IBE+oKYaqcP9WSe6BKgJBnEK6gC4qCgn5RLc1CUwQFwuuHqz/qy+pP5SchXCrsmHbnbU1DAJVj2umwsVsXePmIyNz7hFDJxWryGTty3bmDJYCh86soFSTwHs8O/r7lcnHtJVLsZWrj3QfeHjoGEA54kzkFZXcauy+RhaqtQaxVfqR8iWLMDLZWXYmFP5VxPFzTr9q/c/pokwfEqfLatO0elwo7p9GqGA6+Z7CexkwYjMRH7b+cwHap3UcbtNxAWE57Ot5vRBTqCTi02OWHOmFuUm4++mFNzZ51YeDZQGCRr5BnyFC1mVsF3T/mMBrOSO9MF+gDYbtPWsdKtM5JvUC5gYHkdWs7tzNN6TAry5XHSAdrSznIMcJDoPgkKgwmWerxeCAFe5s8dyPTt2tumPtRiKqfONXUD//FTTmLIZ96nxh/ZhYunjEdBvDGlFZV2yhQ3x7DK7jLuSvavN+v4zlcvvKVsjVTZxcnAwey/25ENxWGFmeik5s7B+DwINPK01yOF9oK4w==~-1~-1~-1~AAQAAAAF%2f%2f%2f%2f%2f5SKUbSBG4gueBtYnTr+znGJ+XXuJNKQiLRJUV+cCiQkilZ%2fLQtNkyhgxOeYMONNpVT26a8g1Ch4BriMGcBul0vZN5QcvoQfcTzz~-1; "
"ak_bmsc=CC008C86C78213DB6BF41F4284B81441~000000000000000000000000000000~YAAQmmAXAjXPZ3WcAQAA6huynR5Dcs076x905sRqDZtlEESkTgM2MdnV8LvKYoPhtlzWBaoiWLuqJMbN/RgnfQS+SxtQEj6fmBPRL+5opHRt35b+bE8hjtpH0wkM9QbTjGawksbBZKbRyyg1ikEW/5osWtYnns7K62banOBIGaqsweR1kZWEWVyDcFM26fvrvat9svRKxM2Ur90kZ8unN4aWsWNRlh556T/14SZeOkOVXbDE3vTmpTifz1iTOoizZ81GQDal3tyadSCJCT/JZzvPAYgQ2BuQ9g/upv9KuUWCnRgrWCT64qihFskX95L93eevJ4IvvVuundI+iVrFrs44nvtlu/rkaNMjPSZFDFDj/qotDiZID0D0Jt4ARpE+lozwoA0ZyD6pANHF5cuzo9PGlMTpESELDJpr94bdCDmonX5+liW5iovPKBq4Z0K9oCpb2BlMVS/zcEV4lndOlgmjqs0UJ9EODuFFGhPtD8t8N7RGzbtmMd1E3SxdtOZYUcLOPErfO+7sXwbc8TP2; "
"bm_so=20BC669BFB79EB118777E0FF6428174DC2AEAE856C50BBD266AD8A6056E6232D~YAAQBXHKF+PPwXCcAQAAzj2znQbeGPUK61ED+jntmvxtaOJN8S7z9O/E4lfWL6UGb/LjbrVwrojlYqXNFZdqNyKro/aOuFLT0OM85tdVUNHgxPcfnPik+Fa28IoQK6tddkZhMn1tk6er5O/ffumFZ7yYnOgM/u/GrkdJqXcp4xavlUKZjTSZe+eEobjPD5347+LwVmouHmiGuRgoI84swWXLT9A8KdbICzvhAClOVVvu4EseniHsywzGU+igZSJbfpOfDHThom3Ojb4wJO7d1OEf6/ir5MEmx0nptLWWN/Fe5gnHxRC438AVl9K6+cKQoXnjvX8DlU8uAfkHh2Z5Oaa+M8mpU2duvBw0AqWvoMYv6OlQo1PVgNtc/yBqOSjbbaiJWqMBrs3s82E3pjKoSuZUCbk2MwxHoyV/mLphpX/qQ/CJNwnj6KSGjvMyaKE92ob0rz8esTb8hknTQqVdPFaA0A==; "
"bm_sz=20CCA8B637D1913BEB909074D87D83C3~YAAQBXHKF+XPwXCcAQAAzj2znR7r2Y5TUDSF6DEoagfG3gTU0X/6G7aVh1WPGCCt1A02QpXYfsPmP0ivNKWcPeecpLthyAmGam65Wm6UA3lgh1fEyfSuhKwHGxBpRQKjnB4vFw9/di1ym6MBsET9TE03OGvlHlITFLQ1tYEpUXl5eGfe4VbEw0ON1QNXg7qYhl5nWZGccO8LDwg7veMD0daqvIwwW0eXCxqngoIlC6TGzbRXFK7x2BGX9QM3ijAswDhgTtzRhmiH4JxT4KZM0nWd3PdqqEjLmDsECBWO7aOZDoQ9A4tyoIrT4zcxmAXpFGtPRw5mKsrj/dVksePkFo3jU9IA78rAi7yPVwCObjijY6APd89F18pt8oxdSBcVF8SDxkIpOKcwI1vlAOSidcownL5ZrJ73LjDhBnzbJQPihHB7ai4ChPCbMsoLcUsSH6BjP5T2ThDdL70F8CgfatN2KlFWqkY7YPENBBhd0ofSBGEU8R7uL4/7FUeZ/89PSqYe/aOAWf/wHPH6yme+6yUTqMCbuA==~3228215~4272184; "
"rp_geo_location=%7B%22latitude%22%3A34.9592083%2C%22longitude%22%3A-116.419389%7D; "
"_hjSession_540838=eyJpZCI6IjI5ZGE0NzI1LWUyODYtNDUwYy1hZTFmLTRmMjdjZWQ0NzgyZCIsImMiOjE3NzIxNzIzMDEzMDYsInMiOjAsInIiOjAsInNiIjowLCJzciI6MCwic2UiOjAsImZzIjowLCJzcCI6MH0=; "
"rtbh.lid=%7B%22eventType%22%3A%22lid%22%2C%22id%22%3A%222bRHwfFonGNZ7HgB2s6B%22%2C%22expiryDate%22%3A%222027-02-27T06%3A05%3A01.443Z%22%7D; "
"cto_bundle=OjY6dV9HZ2FDdUQ3TFJQJTJGbm14clNhTUJ5REFwTUlWU3RCayUyQmE2SUtGdSUyRnRyckYyems1SEZoNHB4YVNPMTNDciUyRmFQVGpnYWpFcEtMdDZOVzNBb0MlMkJDWDNVNEpsV2o5RnVuZ1dkbDM2RE5PR0xvbWNFOTVIV0JpSGl4ZXczQ2k3UUFHaXpHV2w4SkV0QU1rVXdDVlZiYUkxRmJnJTNEJTNE; "
"bm_lso=20BC669BFB79EB118777E0FF6428174DC2AEAE856C50BBD266AD8A6056E6232D~YAAQBXHKF+PPwXCcAQAAzj2znQbeGPUK61ED+jntmvxtaOJN8S7z9O/E4lfWL6UGb/LjbrVwrojlYqXNFZdqNyKro/aOuFLT0OM85tdVUNHgxPcfnPik+Fa28IoQK6tddkZhMn1tk6er5O/ffumFZ7yYnOgM/u/GrkdJqXcp4xavlUKZjTSZe+eEobjPD5347+LwVmouHmiGuRgoI84swWXLT9A8KdbICzvhAClOVVvu4EseniHsywzGU+igZSJbfpOfDHThom3Ojb4wJO7d1OEf6/ir5MEmx0nptLWWN/Fe5gnHxRC438AVl9K6+cKQoXnjvX8DlU8uAfkHh2Z5Oaa+M8mpU2duvBw0AqWvoMYv6OlQo1PVgNtc/yBqOSjbbaiJWqMBrs3s82E3pjKoSuZUCbk2MwxHoyV/mLphpX/qQ/CJNwnj6KSGjvMyaKE92ob0rz8esTb8hknTQqVdPFaA0A==~1772172303609; "
"AdoptConsent=N4Ig7gpgRgzglgFwgSQCIgFwgEwE4DMEAZvgCakC0ALNgBwCG1VAjPhVFVLRbrVxADYqVUviJQQAGhAA3OPAQB7AE7JSmEANwAGIrVxVGZbAFZqAdmzaKtcydwVsAY1LbSuerXxuIUkAiciAGUEZTgAOwBzTHCAVwAbeOlFAAcEZHCAFXpImEwAbRAARQoKAGEADXoATxgUgFk/ABkAR2QmgCsOgH0AOXoACz8ggEFacNwALyblAHEh6QBpWO1F+rKilPMADz94gFsEWkVexQBVJuQ/MDBsAAV6bTAAJXwyvyJqxTAYZlRJ6qZPzKMoAIQgFCcFQA1r0JNJmAA1dbbbpUMDmADqfgQZQAmrNUAAJeJlDq9ZqLEwtABSRCI7gqfiKRJkRDgpBM9DgAj8cGYjEiHix2j8sTGUCCUG6iwgvmkFQA8t0UrF6kUghVFn5FkU8QhZpkTERFB0/B16GBmDIAGK9XA09DSXoGs6ggTdUg2ybNRHdSI25AmEYUaLSGDmSYIcLbfakbojPwCSKRM69JoyBAUBYgRQweIyaqzJwZxPSKDPZD1RSzFqTeI0vwjOD0EwuHJUABafnwiIQLSoAzx1SIZZAkTAsWUoKJNMW2Gefmh+2QRAqMAgFTguD8MiakV6ynik162h90jgVEmyhgVDgePovOkynqI0W8DA+0WoukRQGJmYL88RgJoihAABdZI0kVWIEGyXICkgkAnEUcIN3CdJ1CwTE7hSZARnMPwULQiAMMRCAbzgVDMFYaRYhSUh6CQUgRgQDQrGwAQKG0bBHHMTJtAEDBtBMDAWAAOiEARuwAXyAA===; "
"
gads=ID=af364d250ce9110e:T=1772100304:RT=1772172344:S=ALNI_Mb6eD4y_gGwSotM44Sy0pbELFSj8A; "
"gpi=UID=00001398476daf89:T=1772100304:RT=1772172344:S=ALNI_MZxB0R9J6HNoQ6vf_NELz5Yd9Kixw; eoi=ID=f3c07e9e2678c378:T=1772100304:RT=1772172344:S=AA-AfjaD5SrKsBWsEUT5s7wzABjG; "
"_uetsid=d964d3f012f711f18ec5f386ba12c45b; _uetvid=d9651aa012f711f1b2e1578d0725c64f; "
"bm_s=YAAQBXHKF402wnCcAQAA2gS0nQSetQLjamhPx2j5zFaIS/sY77p6YLLhOBJUL29/lc6AhzebKD+GD62pNhs9lHbpsrPCU32UIrYHwUkT1sNOsDDwVOfviLteVaWmJNMV4dfQyD68RNlycK5e4z/1HB89r2VzJLz4lMYeuobvuDAMGex0RLCE9GS1wktTYzuOh40sP1e/NfGW1uGTJiNhPwXjvDwEGB3FUwNszlGXrxl16DjEBPd50r9NFWPfYjzBqPO0BTsH4MIqZ5dQhS+MH55VpvK46584XwGb1I5pJXYqGW3zwBljEBdqQPUdlXJ7zEhBxpe5rqgMnUVmCThjcsBk/QTUTEDsi9To9YrrbHDemCB0HrncdSIWSnJQjWAXJ7kEyfUO4umNd4B0skrvN2xCsIAm8Olpnwl1Fr1FZFTElPtRMWJ/vaTBrdmO9bNvzR3ts/CM3z538eK1jTLtCMz5RpEwmeVmt2RTdSpfvNsU8WZpm//Qnlqhnswm/ul6BUQggsbCfOrqYQDrLFnjyeFzi3RnTn1xa/s59q483nbqSENGHxjtJW/SEvkEtykB1hkR2gqx7wxnbONI3pzq81ik2ia8u7dUQzhyREm/SJpIB6rB+vLRMB9gYFRFe4TygYzy+ilQyvZ0UWEThCjfFiQA5+PNB9eCBwMZqTfNdmI2mJOd9Tc+2ySyjQ8hnkR6IQpveOqZxaqwYsprXZNIU4kqCVT4XroqxP+1MtLJJE9mKgiRQa3+18uEQN4nrKrW6Sc; "
"bm_sv=E56DCE94E55014C714B84D30993C8726~YAAQBXHKF442wnCcAQAA2gS0nR6uy1H+Q1QPxTy52VT5ZVth011mn37lYT1joG5FRfNicmSotYKtVM+u9tz5GKepSI7YSlLTV1H1Qua1a3QRBpFfW6TWs2nbfJ5/v4zbRpa1odNmcdK7+0GjLdZGAsU6nfFUq84SDLY0USY0Y5/76+MVjArR8FI4fQTgsEBOICD7WUgMRv4921PSmEWO5KFGeGFj4XTj2pni6sy/eSUqFNB5zVgZfO40r/ujAY9T7pWVL4u~1; "
"_ga_T9CHK2M2XW=GS2.1.s1772172153$o3$g1$t1772172349$j60$l0$h0; "
"__rtbh.uid=%7B%22eventType%22%3A%22uid%22%2C%22id%22%3A%22unknown%22%2C%22expiryDate%22%3A%222027-02-27T06%3A05%3A49.616Z%22%7D; "
"attr_datetime=Fri Feb 27 2026 14:06:26 GMT+0800 (GMT+0800); "
"_ga_3RYKQ4MBLH=GS2.1.s1772172153$o3$g1$t1772172386$j17$l0$h1215551984; "
"ttcsid=1772172174321::gz_cAtJhHUxbO8k1CZ_A.3.1772172390362.0; ttcsid_CMJUDB3C77U705JFSVT0=1772172174321::fHGvTeH6Uet53_wlbB_i.3.1772172390362.1; "
"bm_sz=20CCA8B637D1913BEB909074D87D83C3~YAAQBXHKF+XPwXCcAQAAzj2znR7r2Y5TUDSF6DEoagfG3gTU0X/6G7aVh1WPGCCt1A02QpXYfsPmP0ivNKWcPeecpLthyAmGam65Wm6UA3lgh1fEyfSuhKwHGxBpRQKjnB4vFw9/di1ym6MBsET9TE03OGvlHlITFLQ1tYEpUXl5eGfe4VbEw0ON1QNXg7qYhl5nWZGccO8LDwg7veMD0daqvIwwW0eXCxqngoIlC6TGzbRXFK7x2BGX9QM3ijAswDhgTtzRhmiH4JxT4KZM0nWd3PdqqEjLmDsECBWO7aOZDoQ9A4tyoIrT4zcxmAXpFGtPRw5mKsrj/dVksePkFo3jU9IA78rAi7yPVwCObjijY6APd89F18pt8oxdSBcVF8SDxkIpOKcwI1vlAOSidcownL5ZrJ73LjDhBnzbJQPihHB7ai4ChPCbMsoLcUsSH6BjP5T2ThDdL70F8CgfatN2KlFWqkY7YPENBBhd0ofSBGEU8R7uL4/7FUeZ/89PSqYe/aOAWf/wHPH6yme+6yUTqMCbuA==~3228215~4272184"
)

def _get_list_cookie():
"""获取列表请求用的 cookie,优先从 cookies.txt 加载并替换非 ASCII"""
try:
with open(COOKIE_FILE, "r", encoding="utf-8") as f:
cookie = f.read().strip()
if not cookie:
return LIST_JSON_COOKIE

attr_datetime 中括号内可能含中文等非 ASCII,替换为 (GMT+0800) 避免 latin-1 编码错误

    cookie = re.sub(r"(attr_datetime=[^;]*)\s*\([^)]*\)", r"\1 (GMT+0800)", cookie)
    # 移除其余非 ASCII(HTTP 头需 latin-1)
    cookie = "".join(c for c in cookie if ord(c) < 256)
    print(f"已从 {COOKIE_FILE} 加载 cookie")
    return cookie
except FileNotFoundError:
    return LIST_JSON_COOKIE

API_BASE = "https://apigateway.centauro.com.br/centauro-bff/products"

def _maybe_http(url):
"""使用代理且 USE_HTTP 时,改用 HTTP 协议避免 CONNECT 隧道"""
if USE_HTTP and url.startswith("https://"):
return "http://" + url[8:]
return url

def page_url_to_json_base(page_url):
"""页面 URL 转 JSON 接口 base,如 /nav/marca/newbalance -> .../nav/marca/newbalance.json"""
from urllib.parse import urlparse
parsed = urlparse(page_url)
path = parsed.path.rstrip("/")
return f"https://www.centauro.com.br/_next/data/{NEXT_DATA_VERSION}{path}.json"

def _path_to_nav_slug_params(path):
"""从路径提取 navSlug 参数,如 /nav/esportes/basquete -> navSlug=esportes&navSlug=basquete"""
from urllib.parse import quote
segments = [s for s in path.strip("/").split("/") if s]
if not segments:
return ""

若首段为 nav,取后续段;否则取全部

slugs = segments[1:] if segments[0] == "nav" else segments
if not slugs:
    return ""
return "&".join(f"navSlug={quote(s)}" for s in slugs)

def url_to_collection_name(page_url):
"""从 URL 提取 Collection 名,取路径最后一段"""
from urllib.parse import urlparse
parsed = urlparse(page_url)
path = parsed.path.rstrip("/")
return path.split("/")[-1] if path else ""

write_queue = Queue()
counter_lock = threading.Lock()
processed_count = 0
total_tasks = 0

403 重试配置(换连接以触发隧道换 IP)

MAX_403_RETRIES = 5
RETRY_DELAY = 2

线程本地 Session,每个线程独立连接(独立 IP)

_thread_local = threading.local()

def _get_thread_session():
"""每个线程独立 Session(独立 IP),403 后需调用 _reset_thread_session 换 IP"""
if not hasattr(_thread_local, "session") or _thread_local.session is None:
_thread_local.session = requests.Session()
if PROXIES:
_thread_local.session.proxies.update(PROXIES)
return _thread_local.session

def _reset_thread_session():
"""重置线程 Session,下次请求将获取新 IP"""
_thread_local.session = None

def _req_to_url_headers(req):
"""从 urllib Request 提取 url 和 headers 字典"""
url = req.full_url if hasattr(req, "full_url") else req.get_full_url()
headers = dict(req.header_items()) if hasattr(req, "header_items") else {}
return url, headers

class _ResponseWrapper:
"""将 requests.Response 包装成类似 urlopen 返回的对象,支持 read() 和 with 语句"""

def __init__(self, resp):
    self._resp = resp
    self._content = resp.content

def read(self):
    return self._content

def decode(self, encoding="utf-8", errors="replace"):
    return self._content.decode(encoding, errors)

def __enter__(self):
    return self

def __exit__(self, *args):
    pass

def verify_proxy():
"""启动前验证代理是否可用,USE_HTTP 时用 HTTP 避免 CONNECT 隧道 406"""
if not PROXIES:
return True
print("验证代理中...")
for url in (_maybe_http("https://httpbin.org/ip"), _maybe_http("https://www.centauro.com.br/")):
try:
kw = {"timeout": 15, "headers": {"User-Agent": API_HEADERS["user-agent"]}}
if PROXIES:
kw["proxies"] = PROXIES
r = requests.get(url, **kw)

代理不支持 HTTPS CONNECT 时用 HTTP,406 表示代理已转发(部分代理会返回)

        if r.status_code in (200, 301, 302, 406):
            print("代理验证通过")
            return True
        r.raise_for_status()
    except Exception as e:
        print(f"  尝试 {url} 失败: {e}")
print("代理验证失败")
return False

def urlopen_with_403_retry(req, timeout=30, desc=""):
"""带 403 重试的 HTTP 请求(使用 requests 按青果文档接入 HTTP 代理)"""
url, headers = _req_to_url_headers(req)
url = _maybe_http(url)
last_err = None
for attempt in range(MAX_403_RETRIES + 1):
try:
session = _get_thread_session()
r = session.get(url, headers=headers, timeout=timeout)
r.raise_for_status()
return _ResponseWrapper(r)
except requests.exceptions.HTTPError as e:
last_err = e
if e.response.status_code == 403 and attempt < MAX_403_RETRIES:
_reset_thread_session()
wait = RETRY_DELAY (attempt + 1)
print(f" 403 换 IP 重试 {attempt + 1}/{MAX_403_RETRIES},{wait}s 后{f': {desc}' if desc else ''}")
time.sleep(wait)
else:
raise HTTPError(url, e.response.status_code, e.response.reason, e.response.headers, None)
except requests.exceptions.RequestException as e:
last_err = e
if attempt < MAX_403_RETRIES:
_reset_thread_session()
wait = RETRY_DELAY
(attempt + 1)
print(f" 连接失败重试 {attempt + 1}/{MAX_403_RETRIES},{wait}s 后{f': {desc}' if desc else ''}")
time.sleep(wait)
else:
raise URLError(str(e))
raise last_err

def decode_htmlentities(text):
"""解码 HTML 实体,双重解码处理 &ccedil; -> ç"""
if not text:
return ""
s = str(text)
for
in range(2):
s = html.unescape(s)
return s

def normalize_text(text):
"""处理特殊字符"""
if not text:
return ""
normalized = unicodedata.normalize("NFKD", str(text))
normalized = "".join(c for c in normalized if not unicodedata.combining(c))
return normalized

def is_valid_image_url(url):
"""过滤无效图片 URL,如 https://3rx8ammbpzw/ 这种短格式"""
if not url or len(url) < 20:
return False
try:

提取 host,无效格式通常无 TLD(无点号)

    after = url.split("//", 1)[-1].split("/")[0].split(":")[0]
    if "." not in after:
        return False
    return True
except Exception:
    return False

def _find_color_ids(obj, found):
"""递归查找 colorId / colorCode"""
if isinstance(obj, dict):
cid = obj.get("colorId") or obj.get("colorCode")
if cid is not None:
found.add(str(cid))
for v in obj.values():
_find_color_ids(v, found)
elif isinstance(obj, list):
for v in obj:
_find_color_ids(v, found)

def extract_product_color_pairs(products):
"""
从 products 列表提取 (productId, colorId) 对
支持 coll.json 格式: seo.mpn + details.colorId
支持其他: id+colorId、code+colorVariations 等嵌套结构
"""
pairs = []
seen = set()
for p in products:

coll.json 格式: seo.mpn 为 productId, details.colorId 为 colorId

    details = p.get("details", {})
    seo = p.get("seo", {})
    cid = details.get("colorId") or seo.get("colorId") or p.get("colorId")
    pid = seo.get("mpn") or p.get("code") or p.get("productId") or p.get("id")

    if pid and cid:
        pid, cid = str(pid).strip(), str(cid).strip()
        key = (pid, cid)
        if key not in seen:
            seen.add(key)
            pairs.append((pid, cid))
        continue

    # 兼容旧格式
    pid = p.get("id") or p.get("code") or p.get("productId")
    if not pid:
        continue
    pid = str(pid).strip()

    colors = set()
    if cid is not None:
        colors.add(str(cid))
    if "colorId" in p:
        colors.add(str(p["colorId"]))
    if "colorCode" in p:
        colors.add(str(p["colorCode"]))
    if "colorVariations" in p:
        for cv in p["colorVariations"]:
            c = cv.get("colorId") or cv.get("colorCode")
            if c is not None:
                colors.add(str(c))
    if "colors" in p:
        for c in p["colors"]:
            cid = c.get("colorId") or c.get("id") or c.get("code")
            if cid is not None:
                colors.add(str(cid))
    if not colors:
        _find_color_ids(p, colors)

    if colors:
        for cid in colors:
            base_id = pid
            if pid.endswith(cid):
                base_id = pid[: -len(cid)]
            key = (base_id, cid)
            if key not in seen:
                seen.add(key)
                pairs.append((base_id, cid))
    else:
        key = (pid, "")
        if key not in seen:
            seen.add(key)
            pairs.append((pid, ""))
return pairs

def _get_list_json_headers(json_base, page=1, page_url=None):
"""获取列表请求头(硬编码,来自 curl)"""
if page_url is None:
page_url = json_base.replace(f"/_next/data/{NEXT_DATA_VERSION}", "").replace(".json", "")
h = {
"accept": LIST_JSON_ACCEPT,
"accept-language": LIST_JSON_ACCEPT_LANG,
"baggage": "sentry-environment=production,sentry-release=v1.98.2,sentry-public_key=f32efb2aa98343a2855c60442e10a23e,sentry-trace_id=ac4175f497b9437590923ae240481840",
"cookie": _get_list_cookie(),
"priority": "u=1, i",
"referer": f"{page_url}?page={page - 1}" if page > 1 else page_url,
"sec-ch-ua": '"Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sentry-trace": "ac4175f497b9437590923ae240481840-869537b6fa5f6e1e",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"x-nextjs-data": "1",
}
return h

def _parse_products_from_json_response(data):
"""从 JSON 响应解析 products"""
page_props = data.get("pageProps") or data.get("props", {}).get("pageProps", {})
fallback = page_props.get("fallback", {})
if not fallback:
return []
first = list(fallback.values())[0]
return first.get("products", [])

def fetch_list_json_api(page_url):
"""
请求 Next.js JSON 接口获取产品列表,支持分页
page_url 如 https://www.centauro.com.br/nav/marca/newbalance
转为 https://www.centauro.com.br/_next/data/1.98.2/nav/marca/newbalance.json?navSlug=...&page=N
"""
from urllib.parse import urlparse
json_base = page_url_to_json_base(page_url)
path = urlparse(page_url).path.rstrip("/")
nav_slug_qs = _path_to_nav_slug_params(path)
all_products = []
page = 1
total_pages = 1
while page <= total_pages:
if nav_slug_qs:
url = f"{json_base}?{nav_slug_qs}&page={page}" if page > 1 else f"{json_base}?{nav_slug_qs}"
else:
url = f"{json_base}?page={page}" if page > 1 else json_base
print(f"请求第 {page} 页: {url}")
req = urllib.request.Request(url, headers=_get_list_json_headers(json_base, page, page_url))
try:
with urlopen_with_403_retry(req, desc=f"第{page}页") as r:
data = json.loads(r.read().decode())
except HTTPError as e:
if all_products and e.code == 404:
print(f" 第 {page} 页 404,返回已获取的 {len(all_products)} 个产品")
return all_products
raise
products = _parse_products_from_json_response(data)
if not products:
print(f" 第 {page} 页无产品,停止")
break
all_products.extend(products)
print(f" 第 {page} 页获取 {len(products)} 个产品,累计 {len(all_products)}")
if page == 1:
fallback = data.get("pageProps", {}).get("fallback", {})
first = list(fallback.values())[0] if fallback else {}
qty = first.get("quantity", 0)
per_page = first.get("productsPerPage", 36)
total_pages = max(1, (qty + per_page - 1) // per_page)
print(f"共 {qty} 个产品,{total_pages} 页")
page += 1
if page <= total_pages:
time.sleep(0.5)
return all_products

def fetch_list_page(page_url):
"""请求 HTML 列表页并解析 NEXT_DATA__"""
req = urllib.request.Request(page_url, headers=PAGE_HEADERS)
with urlopen_with_403_retry(req, desc="列表页") as r:
html = r.read().decode("utf-8", errors="replace")
match = re.search(r'<script id="
NEXT_DATA"[^>]*>([^<]+)', html)
if not match:
raise ValueError("未找到
NEXT_DATA__")
data = json.loads(match.group(1))
products = _parse_products_from_json_response(data)
if not products:
raise ValueError("products 为空")
return products

def load_products_from_json(path="next_data.json"):
"""从 JSON 文件加载 products,支持 next_data.json 和 coll.json"""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)

coll.json: pageProps 在顶层

page_props = data.get("pageProps") or data.get("props", {}).get("pageProps", {})
fallback = page_props.get("fallback", {})
if fallback:
    first = list(fallback.values())[0]
    return first.get("products", [])
return data.get("products", data) if isinstance(data, dict) else []

def save_pairs_to_txt(triples, path="products.txt"):
"""写入 (productId,colorId,collection) 到文本,去重后写入"""
seen = set()
lines = []
for item in triples:
pid = str(item[0]).strip()
cid = str(item[1]).strip() if len(item) > 1 and item[1] else ""
coll = str(item[2]).strip() if len(item) > 2 and item[2] else ""
key = (pid, cid, coll)
if key in seen:
continue
seen.add(key)
lines.append(f"{pid},{cid},{coll}\n")
with open(path, "w", encoding="utf-8") as f:
f.writelines(lines)
print(f"已写入 {len(lines)} 条(去重后)到 {path}")

def load_products_from_txt(path="products.txt"):
"""从 txt 加载,每行: productId,colorId,collection,加载时去重"""
triples = []
seen = set()
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = [x.strip() for x in line.split(",", 2)]
pid = parts[0]
cid = parts[1] if len(parts) > 1 else ""
coll = parts[2] if len(parts) > 2 else ""
key = (pid, cid, coll)
if key in seen:
continue
seen.add(key)
triples.append((pid, cid, coll))
return triples

def fetch_product_api(product_id, color_id):
"""请求产品详情 API,403 时自动重试"""
url = f"{API_BASE}/{product_id}?color={color_id}" if color_id else f"{API_BASE}/{product_id}"
req = urllib.request.Request(url, headers=API_HEADERS)
with urlopen_with_403_retry(req, desc=f"{product_id}?color={color_id}") as r:
return json.loads(r.read().decode())

def centauro_to_shopify_rows(api_data, collection=""):
"""将 Centauro API 响应转为 Shopify CSV 行"""
prod = api_data.get("product", {})
if not prod:
return []

handle = prod.get("code", "")
title = normalize_text(prod.get("name", ""))
body = ""
for attr in api_data.get("attributes", []):
    if attr.get("htmlContent"):
        body = attr["htmlContent"]
        break
if not body:
    body = prod.get("description", "")
body = decode_html_entities(body)
body = normalize_text(body)
vendor = normalize_text(prod.get("brand", ""))
product_type = normalize_text(prod.get("category", ""))
tags = normalize_text(prod.get("collection", ""))
published = "TRUE" if prod.get("isAvailable") else "FALSE"
colour = normalize_text(prod.get("colorInfo", {}).get("description", ""))

images = []
for m in prod.get("visualMedias", []):
    u = m.get("url", "")
    if u and not u.startswith("http"):
        u = "https:" + u
    if u and is_valid_image_url(u):
        images.append(u)
if not images and prod.get("colorVariations"):
    u = prod["colorVariations"][0].get("photoUrl", "")
    if u:
        u = "https:" + u if not u.startswith("http") else u
        if is_valid_image_url(u):
            images.append(u)

sizes = prod.get("sizes", [])
rows = []
for idx, sz in enumerate(sizes):
    desc = sz.get("description", "")
    sku = sz.get("sku", "")
    pi = sz.get("priceInfos", {})
    price = pi.get("promotionalPrice") or pi.get("price") or ""
    if price != "":
        price = str(price)

    row = {
        "Handle": handle,
        "Title": title if idx == 0 else "",
        "Body (HTML)": body if idx == 0 else "",
        "Vendor": vendor if idx == 0 else "",
        "Type": product_type if idx == 0 else "",
        "Tags": tags if idx == 0 else "",
        "Published": published if idx == 0 else "",
        "Option1 Name": "Size",
        "Option1 Value": normalize_text(desc),
        "Option2 Name": "Color",
        "Option2 Value": colour,
        "Variant SKU": sku,
        "Variant Grams": "",
        "Variant Inventory Tracker": "shopify",
        "Variant Inventory Qty": 100,
        "Variant Inventory Policy": "deny",
        "Variant Fulfillment Service": "manual",
        "Variant Price": price,
        "Variant Compare At Price": "",
        "Variant Requires Shipping": "TRUE",
        "Variant Taxable": "TRUE",
        "Variant Barcode": sz.get("ean", ""),
        "Image Src": images[idx % len(images)] if images else "",
        "Image Position": idx + 1,
        "Image Alt Text": title,
        "Gift Card": "FALSE",
        "SEO Title": title if idx == 0 else "",
        "SEO Description": body[:160] if idx == 0 else "",
        "Google Shopping / Google Product Category": "",
        "Google Shopping / Gender": prod.get("gender", "Unisex"),
        "Google Shopping / Age Group": prod.get("ageGroup", "Adult"),
        "Google Shopping / MPN": handle,
        "Google Shopping / AdWords Grouping": "",
        "Google Shopping / AdWords Labels": "",
        "Google Shopping / Condition": "New",
        "Google Shopping / Custom Product": "FALSE",
        "Google Shopping / Custom Label 0": "",
        "Google Shopping / Custom Label 1": "",
        "Google Shopping / Custom Label 2": "",
        "Google Shopping / Custom Label 3": "",
        "Google Shopping / Custom Label 4": "",
        "Variant Image": "",
        "Variant Weight Unit": "kg",
        "Variant Tax Code": "",
        "Cost per item": "",
        "Status": "active",
        "Collection": collection,
    }
    for k in FIELDS:
        if k not in row:
            row[k] = ""
    rows.append(row)
return rows

def process_one(args):
"""处理单个 (productId, colorId, collection)"""
global processed_count
product_id, color_id, collection = args[0], args[1], args[2] if len(args) > 2 else ""
try:
if not color_id:
color_id = ""
data = fetch_product_api(product_id, color_id)
rows = centauro_to_shopify_rows(data, collection)
for row in rows:
write_queue.put(row)
with counter_lock:
processed_count += 1
print(f"Progress: {processed_count}/{total_tasks} - {product_id}?color={color_id}")
except HTTPError as e:
print(f"HTTP Error {product_id}?color={color_id}: {e.code}")
except Exception as e:
print(f"Error {product_id}?color={color_id}: {e}")

def writer_thread():
"""CSV 写入线程,按 Collection 分文件写入:{collection}.csv"""
files = {}
writers = {}
while True:
row = write_queue.get()
if row is None:
break
coll = row.get("Collection", "") or "_default"
if coll not in writers:
f = open(f"{coll}.csv", "w", newline="", encoding="utf-8")
w = csv.DictWriter(f, fieldnames=FIELDS)
w.writeheader()
files[coll] = f
writers[coll] = w
writers[coll].writerow(row)
files[coll].flush()
write_queue.task_done()
for f in files.values():
f.close()

def _fetch_one_collection(page_url):
"""获取单个集合的列表,返回 [(pid, cid, coll), ...]"""
coll = url_to_collection_name(page_url)
try:
print(f"\n正在采集集合 [{coll}]: {page_url}")
products = fetch_list_json_api(page_url)
pairs = extract_product_color_pairs(products)
if pairs:
print(f" [{coll}] 解析到 {len(pairs)} 个组合")
return [(pid, cid, coll) for pid, cid in pairs]
except (HTTPError, URLError, ValueError) as e:
print(f" [{coll}] 获取失败: {e}")
return []

def main():
global total_tasks
parser = argparse.ArgumentParser(description="Centauro 产品采集")
parser.add_argument("-f", "--from-file", nargs="?", const="products.txt", metavar="FILE",
help="直接从文本文件读取并采集,跳过列表获取(默认 products.txt)")
parser.add_argument("-n", "--no-verify-proxy", action="store_true", help="跳过代理验证(验证失败时可使用)")
args = parser.parse_args()

if PROXIES:
    print(f"使用代理: {PROXY_ADDR},每线程独立 IP,403 时自动换连接重试")
    if not args.no_verify_proxy and not verify_proxy():
        print("请检查代理配置,或使用 -n 跳过验证后重试")
        return
    elif args.no_verify_proxy:
        print("已跳过代理验证")

list_file = args.from_file if args.from_file else "products.txt"
triples = []

if args.from_file:
    # 直接从文本读取并采集
    try:
        triples = load_products_from_txt(list_file)
        print(f"从 {list_file} 加载 {len(triples)} 个组合,直接开始采集")
    except FileNotFoundError:
        print(f"文件不存在: {list_file}")
        return
else:
    # 1. 多线程获取各集合列表
    list_workers = min(30, len(COLLECTION_URLS))
    with ThreadPoolExecutor(max_workers=list_workers) as ex:
        for future in as_completed([ex.submit(_fetch_one_collection, url) for url in COLLECTION_URLS]):
            triples.extend(future.result())

    # 2. 若无则尝试 coll.json / next_data.json(collection 为空)
    if not triples:
        for name in ("coll.json", "next_data.json"):
            try:
                products = load_products_from_json(name)
                pairs = extract_product_color_pairs(products)
                if pairs:
                    for pid, cid in pairs:
                        triples.append((pid, cid, ""))
                    print(f"从 {name} 解析到 {len(pairs)} 个组合")
                    break
            except FileNotFoundError:
                continue
            except Exception as e:
                print(f"{name} 解析失败: {e}")

    if triples:
        save_pairs_to_txt(triples, list_file)

    # 3. 从 products.txt 加载
    try:
        triples = load_products_from_txt(list_file)
        print(f"\n从 {list_file} 加载 {len(triples)} 个组合用于采集")
    except FileNotFoundError:
        if not triples:
            pass
        else:
            raise

if not triples:
    print("无产品可采集。请确保网络可访问 www.centauro.com.br")
    return

total_tasks = len(triples)
print(f"开始采集,共 {total_tasks} 个任务,每个集合输出单独 CSV 文件")

writer = threading.Thread(target=writer_thread)
writer.start()

with ThreadPoolExecutor(max_workers=30) as ex:
    for future in as_completed([ex.submit(process_one, t) for t in triples]):
        try:
            future.result()
        except Exception as e:
            print(f"Task error: {e}")
    time.sleep(0.5)

write_queue.put(None)
writer.join()
print(f"完成,处理 {processed_count}/{total_tasks},已按 Collection 分文件写入")

if name == "main":
main()