# -*- coding: utf-8 -*-
"""
Carrefour Brasil 产品采集脚本 - mercado.carrefour.com.br
流程: 读取分类配置 -> 请求分类列表 API -> 解析产品链接 -> 写入 products.txt -> 采集产品详情 -> 输出 Shopify CSV
每个产品写入两个 Collection 值:一级分类 + 二级分类
"""
import html
import json
import csv
import os
import sys
import re
import time
import urllib.request
from urllib.error import HTTPError, URLError
from urllib.parse import quote
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
import unicodedata
# Windows 控制台 UTF-8
try:
sys.stdout.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
pass
# Shopify CSV 字段
FIELDS = [
"Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
"Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
"Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
"Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
"Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
"Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
"Image Src", "Image Position", "Image Alt Text", "Gift Card",
"SEO Title", "SEO Description", "Google Shopping / Google Product Category",
"Google Shopping / Gender", "Google Shopping / Age Group",
"Google Shopping / MPN", "Google Shopping / AdWords Grouping",
"Google Shopping / AdWords Labels", "Google Shopping / Condition",
"Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
"Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
"Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
"Variant Image", "Variant Weight Unit", "Variant Tax Code",
"Cost per item", "Status", "Collection"
]
BASE_URL = "https://mercado.carrefour.com.br"
CATEGORY_CONFIG = "carrefour_categories.json"
PRODUCTS_TXT = "carrefour_products.txt"
OUTPUT_DIR = "carrefour_output"
# 硬编码请求头(不含 cookie,避免 latin-1 编码问题)
PAGE_HEADERS = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "en-US,en;q=0.9,zh;q=0.8",
"cache-control": "max-age=0",
"sec-ch-ua": '"Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
}
write_queue = Queue()
counter_lock = threading.Lock()
processed_count = 0
total_tasks = 0
test_mode = False
MAX_403_RETRIES = 3
RETRY_DELAY = 2
def urlopen_with_403_retry(req, timeout=30, desc=""):
"""带 403 重试的 urlopen"""
last_err = None
for attempt in range(MAX_403_RETRIES + 1):
try:
return urllib.request.urlopen(req, timeout=timeout)
except HTTPError as e:
last_err = e
if e.code == 403 and attempt < MAX_403_RETRIES:
wait = RETRY_DELAY * (attempt + 1)
print(f" 403 重试 {attempt + 1}/{MAX_403_RETRIES},{wait}s 后重试{f': {desc}' if desc else ''}")
time.sleep(wait)
else:
raise
raise last_err
def decode_html_entities(text):
"""解码 HTML 实体"""
if not text:
return ""
s = str(text)
for _ in range(2):
s = html.unescape(s)
return s
def normalize_text(text):
"""处理特殊字符"""
if not text:
return ""
normalized = unicodedata.normalize("NFKD", str(text))
normalized = "".join(c for c in normalized if not unicodedata.combining(c))
return normalized
def load_categories(config_path=CATEGORY_CONFIG):
"""加载分类配置,返回 [(slug1, slug2, level1, level2), ...]"""
try:
with open(config_path, "r", encoding="utf-8") as f:
data = json.load(f)
except FileNotFoundError:
return _parse_categories_from_fenlei_json()
cats = []
for c in data.get("categories", []):
cats.append((
c["slug1"],
c["slug2"],
c["level1"],
c["level2"]
))
return cats
def _parse_categories_from_fenlei_json(path="carrefourfenlei.json"):
"""
从 carrefourfenlei.json 解析 /categoria/xxx/yyy 格式的二级分类
返回 [(slug1, slug2, level1, level2), ...]
"""
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
except FileNotFoundError:
return []
text = json.dumps(data)
# 匹配 /categoria/slug1/slug2 或 /categoria/slug1/slug2/...
pattern = r'mercado\.carrefour\.com\.br/categoria/([a-z0-9\-]+)/([a-z0-9\-]+)(?:[?/#]|$)'
seen = set()
cats = []
for m in re.finditer(pattern, text):
s1, s2 = m.group(1), m.group(2)
if (s1, s2) in seen:
continue
seen.add((s1, s2))
# slug 转显示名: higiene-e-perfumaria -> Higiene e Perfumaria
def slug_to_name(s):
return s.replace("-", " ").title()
cats.append((s1, s2, slug_to_name(s1), slug_to_name(s2)))
return cats
PAGE_SIZE = 60 # 每页产品数
def category_data_url(slug1, slug2, page=0):
"""分类列表 .data API URL。第一页不带 page,第二页起 page=2,3,...;count 控制每页数量"""
path = f"/categoria/{slug1}/{slug2}.data"
routes = "_routes=layout%2Fdefault%2Croutes%2Fcategory-search"
if page == 0:
params = f"sort=orders_desc&count={PAGE_SIZE}&{routes}"
else:
params = f"sort=orders_desc&page={page + 1}&count={PAGE_SIZE}&{routes}"
return f"{BASE_URL}{path}?{params}"
def _collect_product_slugs_from_tree(obj, slugs):
"""从解析后的树中递归收集产品 slug(link 中 /slug/p 格式),放宽最小长度以捕获更多"""
exclude = ("categoria", "colecao", "drogaria", "busca", "facets", "layout")
# 最小 5 字符,避免漏抓短 slug
slug_pat = r'(?:mercado\.carrefour\.com\.br/|/)([a-z0-9][a-z0-9\-]{4,}?)(?:/p\.data|/p)(?=["\'\s?#]|$)'
if isinstance(obj, str):
for m in re.finditer(slug_pat, obj):
s = m.group(1).strip()
if not any(e in s for e in exclude):
slugs.add(s)
return
if isinstance(obj, dict):
for k, v in obj.items():
if k in ("link", "href", "url") and isinstance(v, str) and "/p" in v:
for m in re.finditer(r'/([a-z0-9][a-z0-9\-]{4,}?)(?:/p\.data|/p)', v):
s = m.group(1)
if not any(e in s for e in exclude):
slugs.add(s)
_collect_product_slugs_from_tree(v, slugs)
elif isinstance(obj, list):
for item in obj:
_collect_product_slugs_from_tree(item, slugs)
def extract_product_slugs_from_json(data):
"""
从分类 API 返回的 JSON 中提取产品 slug
支持 RSC 解析 + 正则兜底
"""
slugs = set()
exclude = ("categoria", "colecao", "drogaria", "busca", "facets", "layout")
if isinstance(data, list):
root = _resolve_rsc_value(data, 0)
_collect_product_slugs_from_tree(root, slugs)
text = json.dumps(data) if isinstance(data, (dict, list)) else str(data)
for m in re.finditer(r'(?:mercado\.carrefour\.com\.br/|["\']/)([a-z0-9][a-z0-9\-]{4,}?)(?:/p\.data|/p)(?=["\'\s?#]|$)', text, re.I):
s = m.group(1).strip()
if not any(e in s for e in exclude):
slugs.add(s)
return list(slugs)
def fetch_category_page(slug1, slug2, page):
"""请求分类列表页 .data API"""
url = category_data_url(slug1, slug2, page)
req = urllib.request.Request(url, headers=PAGE_HEADERS)
with urlopen_with_403_retry(req, desc=f"分类 page={page}") as r:
return json.loads(r.read().decode("utf-8", errors="replace"))
def fetch_all_products_from_category(slug1, slug2, level1, level2, max_products=None):
"""获取分类下产品 slug,返回 [(slug, coll1, coll2, slug1, slug2), ...]。max_products 为 None 时获取全部"""
results = []
seen = set()
page = 0
while True:
try:
data = fetch_category_page(slug1, slug2, page)
slugs = extract_product_slugs_from_json(data)
if not slugs:
if page == 0:
print(f" 警告: 分类 {level1}/{level2} 第 0 页无产品")
break
added = 0
for slug in slugs:
if slug not in seen:
seen.add(slug)
results.append((slug, level1, level2, slug1, slug2))
added += 1
if max_products and len(results) >= max_products:
break
print(f" [{level1}/{level2}] 第 {page} 页: {len(slugs)} 个产品,新增 {added},累计 {len(results)}")
if max_products and len(results) >= max_products:
break
if added == 0 and len(slugs) > 0:
print(f" 第 {page} 页全为重复,停止翻页")
break
page += 1
time.sleep(0.5)
except (HTTPError, URLError, json.JSONDecodeError) as e:
print(f" 分类 {level1}/{level2} 第 {page} 页失败: {e}")
break
return results
def product_data_url(slug):
"""产品详情 .data API URL"""
# slug 可能已编码,确保正确
encoded = quote(slug, safe="-")
return f"{BASE_URL}/{encoded}/p.data"
def fetch_product_data(slug):
"""请求产品详情 .data API"""
url = product_data_url(slug)
req = urllib.request.Request(url, headers=PAGE_HEADERS)
with urlopen_with_403_retry(req, desc=slug) as r:
return json.loads(r.read().decode("utf-8", errors="replace"))
def _resolve_rsc_value(arr, idx, visited=None):
"""
RSC 格式解析:arr 为扁平数组,dict {"_N": M} 表示 key=arr[N], value=arr[M]
"""
if visited is None:
visited = set()
if idx in visited or idx < 0 or idx >= len(arr):
return None
visited.add(idx)
val = arr[idx]
if isinstance(val, dict):
result = {}
for k, v in val.items():
if k.startswith("_") and isinstance(v, int):
try:
key_idx = int(k[1:])
except (ValueError, TypeError):
continue
if 0 <= key_idx < len(arr):
key_val = arr[key_idx]
val_resolved = _resolve_rsc_value(arr, v, visited.copy())
if isinstance(key_val, str):
result[key_val] = val_resolved
return result
elif isinstance(val, list):
result = []
for i in val:
if isinstance(i, int) and 0 <= i < len(arr):
result.append(_resolve_rsc_value(arr, i, visited.copy()))
return result
return val
def _get_product_from_resolved(root):
"""从解析后的树中获取 product 对象:routes.$productSlug.p.data.product"""
def find_product(obj):
if isinstance(obj, dict):
if "product" in obj and isinstance(obj["product"], dict):
return obj["product"]
for v in obj.values():
r = find_product(v)
if r:
return r
elif isinstance(obj, list):
for item in obj:
r = find_product(item)
if r:
return r
return None
return find_product(root or {})
def parse_carrefour_product(data, slug="", collection1="", collection2="", coll_slug=""):
"""
从 Carrefour .data API 响应解析产品,转为 Shopify CSV 行
使用 RSC 解析:product 在 routes.$productSlug.p.data.product
"""
collection = f"{collection1},{collection2}" if collection1 and collection2 else (collection1 or collection2 or "")
product = None
if isinstance(data, list):
root = _resolve_rsc_value(data, 0)
product = _get_product_from_resolved(root)
if not product or not isinstance(product, dict):
cs = coll_slug or f"{_coll_to_slug(collection1)}_{_coll_to_slug(collection2)}"
return [{"Handle": slug[:100] or "product", "Title": "Parse Error", "Collection": collection, "_coll_slug": cs}]
title = (product.get("productName") or product.get("name") or "").strip()
title = decode_html_entities(normalize_text(title)) or "Unknown"
body = (product.get("description") or "").strip()
body = decode_html_entities(normalize_text(body))
vendor = (product.get("brand") or "").strip()
vendor = decode_html_entities(normalize_text(vendor))
price = ""
compare_price = ""
sku = ""
items = product.get("items") or []
if items:
item = items[0]
sku = str(item.get("itemId") or product.get("productId") or "")
sellers = item.get("sellers") or []
if sellers:
offer = sellers[0].get("commertialOffer") or {}
p = offer.get("Price") or offer.get("calculatedPrice")
if p is not None:
price = str(p)
lp = offer.get("ListPrice") or offer.get("calculatedListPrice")
if lp is not None:
compare_price = str(lp)
def _collect_image_urls(obj, out):
if isinstance(obj, dict):
for key in ("imageUrl", "imageTag", "url", "src"):
val = obj.get(key)
if isinstance(val, str):
u = val
if "src=" in u:
mm = re.search(r'src=["\']([^"\']+)["\']', u)
if mm:
u = mm.group(1)
if "vtexassets" in u and u not in out:
out.append(u)
for v in obj.values():
_collect_image_urls(v, out)
elif isinstance(obj, list):
for x in obj:
_collect_image_urls(x, out)
images = []
_collect_image_urls(product, images)
if not images:
text = json.dumps(product)
for m in re.finditer(r'https?://[^"\'?\s]*vtexassets[^"\'?\s]+', text):
url = m.group(0).rstrip("\\")
if url not in images and ("arquivos" in url or "images" in url or any(ext in url for ext in [".jpg", ".jpeg", ".png", ".webp"])):
images.append(url)
handle = slug[:100] if slug else (re.sub(r"[^\w\-]", "-", slug_from_title(title)).strip("-")[:100] if title else "product")
base_row = {
"Handle": handle or "product",
"Title": title,
"Body (HTML)": body,
"Vendor": vendor,
"Type": collection2 or collection1 or "",
"Tags": "",
"Published": "TRUE",
"_coll_slug": coll_slug or f"{_coll_to_slug(collection1)}_{_coll_to_slug(collection2)}",
"Option1 Name": "",
"Option1 Value": "",
"Option2 Name": "",
"Option2 Value": "",
"Option3 Name": "",
"Option3 Value": "",
"Variant SKU": sku,
"Variant Grams": "",
"Variant Inventory Tracker": "shopify",
"Variant Inventory Qty": 100,
"Variant Inventory Policy": "deny",
"Variant Fulfillment Service": "manual",
"Variant Price": price,
"Variant Compare At Price": compare_price,
"Variant Requires Shipping": "TRUE",
"Variant Taxable": "TRUE",
"Variant Barcode": "",
"Image Src": "",
"Image Position": 1,
"Image Alt Text": title,
"Gift Card": "FALSE",
"SEO Title": title,
"SEO Description": (body[:160] if body else ""),
"Google Shopping / Google Product Category": "",
"Google Shopping / Gender": "",
"Google Shopping / Age Group": "",
"Google Shopping / MPN": handle,
"Google Shopping / AdWords Grouping": "",
"Google Shopping / AdWords Labels": "",
"Google Shopping / Condition": "New",
"Google Shopping / Custom Product": "FALSE",
"Google Shopping / Custom Label 0": "",
"Google Shopping / Custom Label 1": "",
"Google Shopping / Custom Label 2": "",
"Google Shopping / Custom Label 3": "",
"Google Shopping / Custom Label 4": "",
"Variant Image": "",
"Variant Weight Unit": "kg",
"Variant Tax Code": "",
"Cost per item": "",
"Status": "active",
"Collection": collection,
}
for k in FIELDS:
if k not in base_row:
base_row[k] = ""
rows = []
if images:
for pos, url in enumerate(images, 1):
row = dict(base_row)
row["Image Src"] = url
row["Image Position"] = pos
if pos > 1:
row["Title"] = ""
row["Body (HTML)"] = ""
row["Vendor"] = ""
row["Variant Price"] = ""
row["Variant Compare At Price"] = ""
row["Variant SKU"] = ""
rows.append(row)
else:
rows.append(base_row)
return rows
def slug_from_title(title):
"""从标题生成 handle"""
s = unicodedata.normalize("NFKD", str(title))
s = "".join(c for c in s if not unicodedata.combining(c))
s = re.sub(r"[^\w\s\-]", "", s)
return re.sub(r"[-\s]+", "-", s).strip("-").lower()
def _coll_to_slug(name):
"""分类名转文件名用 slug:Higiene e Perfumaria -> higiene-e-perfumaria"""
if not name:
return "default"
s = unicodedata.normalize("NFKD", str(name))
s = "".join(c for c in s if not unicodedata.combining(c))
s = re.sub(r"[^\w\s\-]", "", s)
s = re.sub(r"[-\s]+", "-", s).strip("-").lower()
return s or "default"
def save_products_txt(tuples, path=None):
"""写入 (slug, coll1, coll2, slug1, slug2) 到文本"""
if path is None:
os.makedirs(OUTPUT_DIR, exist_ok=True)
path = os.path.join(OUTPUT_DIR, PRODUCTS_TXT)
seen = set()
lines = []
for t in tuples:
slug, c1, c2 = t[0], t[1], t[2]
cs1 = t[3] if len(t) > 3 else _coll_to_slug(c1)
cs2 = t[4] if len(t) > 4 else _coll_to_slug(c2)
key = (slug, c1, c2)
if key in seen:
continue
seen.add(key)
lines.append(f"{slug},{c1},{c2},{cs1},{cs2}\n")
with open(path, "w", encoding="utf-8") as f:
f.writelines(lines)
print(f"已写入 {len(lines)} 条到 {path}")
def load_products_txt(path=None):
"""从 txt 加载 (slug, coll1, coll2, slug1, slug2),兼容旧格式"""
if path is None:
path = os.path.join(OUTPUT_DIR, PRODUCTS_TXT)
triples = []
seen = set()
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = [x.strip() for x in line.split(",", 4)]
slug = parts[0]
c1 = parts[1] if len(parts) > 1 else ""
c2 = parts[2] if len(parts) > 2 else ""
cs1 = parts[3] if len(parts) > 3 else _coll_to_slug(c1)
cs2 = parts[4] if len(parts) > 4 else _coll_to_slug(c2)
key = (slug, c1, c2)
if key in seen:
continue
seen.add(key)
triples.append((slug, c1, c2, cs1, cs2))
except FileNotFoundError:
pass
return triples
def process_one(args):
"""处理单个产品 (slug, coll1, coll2, slug1, slug2)。404 或异常状态码时跳过"""
global processed_count
slug = args[0]
coll1 = args[1] if len(args) > 1 else ""
coll2 = args[2] if len(args) > 2 else ""
cs1 = args[3] if len(args) > 3 else _coll_to_slug(coll1)
cs2 = args[4] if len(args) > 4 else _coll_to_slug(coll2)
try:
data = fetch_product_data(slug)
rows = parse_carrefour_product(data, slug=slug, collection1=coll1, collection2=coll2, coll_slug=f"{cs1}_{cs2}")
for row in rows:
write_queue.put(row)
with counter_lock:
processed_count += 1
print(f"Progress: {processed_count}/{total_tasks} - {slug[:50]}")
except HTTPError as e:
print(f"跳过 {slug[:50]} (HTTP {e.code})")
except URLError as e:
print(f"跳过 {slug[:50]} (网络异常)")
except Exception as e:
print(f"跳过 {slug[:50]} ({e})")
def writer_thread():
"""CSV 写入线程。测试模式:全部写入一个文件;否则按 coll_slug 分文件。所有文件输出到 OUTPUT_DIR"""
global test_mode
os.makedirs(OUTPUT_DIR, exist_ok=True)
files = {}
writers = {}
single_file = "carrefour_test.csv" if test_mode else None
while True:
row = write_queue.get()
if row is None:
break
if single_file:
coll = "_single"
else:
coll = row.get("_coll_slug", "") or "default"
if coll not in writers:
fname = single_file if single_file else f"carrefour_{coll}.csv"
fpath = os.path.join(OUTPUT_DIR, fname)
f = open(fpath, "w", newline="", encoding="utf-8")
w = csv.DictWriter(f, fieldnames=FIELDS)
w.writeheader()
files[coll] = f
writers[coll] = w
out_row = {k: v for k, v in row.items() if k in FIELDS}
writers[coll].writerow(out_row)
files[coll].flush()
write_queue.task_done()
for f in files.values():
f.close()
def main():
global total_tasks, test_mode
list_only = "--list-only" in sys.argv or "-l" in sys.argv
test_mode = "--test" in sys.argv or "-t" in sys.argv
from_file = "--from-file" in sys.argv or "-f" in sys.argv
per_category = 2 if test_mode else None
# --only Corpo 只处理指定二级分类
only_cat = None
for i, arg in enumerate(sys.argv):
if arg in ("--only", "-o") and i + 1 < len(sys.argv):
only_cat = sys.argv[i + 1].strip()
break
triples = []
all_products = []
if from_file:
triples = load_products_txt()
if not triples:
triples = load_products_txt(path=PRODUCTS_TXT)
if not triples:
print(f"无产品。请确保 {OUTPUT_DIR}/{PRODUCTS_TXT} 或 {PRODUCTS_TXT} 存在且有内容。")
return
if only_cat:
triples = [t for t in triples if (t[2] if len(t) > 2 else "").lower() == only_cat.lower()]
if not triples:
print(f"文本中无 {only_cat} 分类的产品")
return
print(f"从文本加载 {len(triples)} 个 {only_cat} 产品,直接采集")
else:
print(f"从文本加载 {len(triples)} 个产品,直接采集")
else:
categories = load_categories()
if only_cat:
categories = [(s1, s2, l1, l2) for s1, s2, l1, l2 in categories if l2.lower() == only_cat.lower()]
if not categories:
print(f"未找到分类: {only_cat}")
return
print(f"仅处理: {only_cat} ({len(categories)} 个)")
else:
print(f"加载 {len(categories)} 个分类")
if test_mode:
print("[--test] 测试模式:每个分类采集 2 个产品,输出到 carrefour_output/carrefour_test.csv")
for slug1, slug2, level1, level2 in categories:
print(f"\n采集分类: {level1} / {level2}")
products = fetch_all_products_from_category(slug1, slug2, level1, level2, max_products=per_category)
all_products.extend(products)
if all_products:
save_products_txt(all_products)
if list_only:
print(f"\n[--list-only] 仅获取列表,已保存到 {OUTPUT_DIR}/{PRODUCTS_TXT},共 {len(all_products)} 条")
return
triples = load_products_txt()
if not triples and all_products:
triples = all_products
if not triples:
print("无产品可采集。请检查分类配置或网络。")
return
total_tasks = len(triples)
print(f"\n开始采集产品详情,共 {total_tasks} 个,每个产品写入两个 Collection(一级+二级)")
writer = threading.Thread(target=writer_thread)
writer.start()
with ThreadPoolExecutor(max_workers=20) as ex:
for future in as_completed([ex.submit(process_one, t) for t in triples]):
try:
future.result()
except Exception as e:
print(f"Task error: {e}")
time.sleep(0.5)
write_queue.put(None)
writer.join()
if test_mode:
print(f"完成,处理 {processed_count}/{total_tasks},已写入 carrefour_output/")
else:
print(f"完成,处理 {processed_count}/{total_tasks},已按 Collection 分文件写入 carrefour_output/")
if __name__ == "__main__":
main()