搜索结果

×

搜索结果将在这里显示。

📚 boticario_cj

获取url

# -*- coding: utf-8 -*-
"""
Boticario 产品链接采集脚本
使用 Playwright 浏览器自动化,按 XPath 获取产品链接,翻页直到无更多链接,保存到文本
格式: 产品链接,分类slug
"""
import sys
import time

# Windows 控制台 UTF-8
try:
    sys.stdout.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
    pass

# 分类 URL 与 slug 对应
CATEGORIES = [
    ("https://www.boticario.com.br/corpo-e-banho/", "corpo-e-banho"),
    ("https://www.boticario.com.br/para-usar-ja/", "para-usar-ja"),
]

# XPath(产品链接支持多种结构,按顺序尝试)
PRODUCT_LINKS_XPATHS = [
    "/html/body/main/div[2]/section[7]/section/div/div[3]/article/div[1]/a",
    "/html/body/main/div[2]/section[13]/section/div/div[3]/article/div[1]/a",
    "/html/body/main/div[2]/section[*]/section/div/div[3]/article/div[1]/a",
]
# 下一页按钮 XPath(兼容多种结构)
NEXT_PAGE_BUTTON_XPATHS = [
    "/html/body/main/div[2]/section[7]/section/div/div[3]/div/button",
    "/html/body/main/div[2]/section[13]/section/div/div[3]/div/button",
]

OUTPUT_FILE = "boticario_products.txt"

def _accept_cookies(page):
    """如有 Cookie 同意横幅,点击接受"""
    try:
        btn = page.get_by_role("button", name="Aceitar todos os cookies")
        if btn.is_visible():
            btn.click()
            time.sleep(0.5)
    except Exception:
        pass

def _get_product_links(page):
    """获取当前页产品链接,兼容多种 XPath 结构,合并去重"""
    seen = set()
    links = []
    for xpath in PRODUCT_LINKS_XPATHS:
        try:
            loc = page.locator(f"xpath={xpath}")
            for i in range(loc.count()):
                href = loc.nth(i).get_attribute("href")
                if href and href.strip():
                    if not href.startswith("http"):
                        href = "https://www.boticario.com.br" + (href if href.startswith("/") else "/" + href)
                    href = href.strip()
                    if href not in seen:
                        seen.add(href)
                        links.append(href)
        except Exception:
            continue
    return links

def _has_next_page(page):
    """检查是否有下一页按钮且可点击,兼容多种 XPath"""
    for xpath in NEXT_PAGE_BUTTON_XPATHS:
        try:
            btn = page.locator(f"xpath={xpath}")
            if btn.count() == 0:
                continue
            first = btn.first
            if not first.is_visible():
                continue
            if first.is_enabled():
                return True
        except Exception:
            continue
    return False

def _click_next_page(page):
    """点击下一页,按 XPath 顺序尝试"""
    for xpath in NEXT_PAGE_BUTTON_XPATHS:
        try:
            loc = page.locator(f"xpath={xpath}")
            if loc.count() == 0:
                continue
            first = loc.first
            if first.is_visible() and first.is_enabled():
                first.click()
                time.sleep(1.5)
                return True
        except Exception:
            continue
    return False

def _crawl_category(page, url, slug):
    """采集单个分类:先把页面点完,再读取链接"""
    all_links = []
    seen = set()

    # 阶段1:先把页面点完(点击下一页直到无更多)
    page.goto(url, wait_until="domcontentloaded", timeout=30000)
    time.sleep(2)
    _accept_cookies(page)
    time.sleep(0.5)

    page_urls = []
    page_num = 0
    while True:
        page_num += 1
        current_url = page.url
        page_urls.append(current_url)
        print(f"  [{slug}] 第 {page_num} 页已加载: {current_url[:60]}...")

        if not _has_next_page(page):
            break
        if not _click_next_page(page):
            break
        time.sleep(1)

    # 阶段2:再读取链接(逐页访问并采集)
    unique_urls = list(dict.fromkeys(page_urls))
    if len(unique_urls) == 1 and len(page_urls) > 1:
        # 分页不改变 URL,DOM 会累积产品,只采集本页新增的(每页约 36 个)
        print(f"  [{slug}] 分页不改变 URL,改为边点边采集(每页仅取新增链接)")
        page.goto(url, wait_until="domcontentloaded", timeout=30000)
        time.sleep(2)
        _accept_cookies(page)
        page_num = 0
        prev_count = 0
        while True:
            page_num += 1
            links = _get_product_links(page)
            # 只取本页新增的链接(DOM 累积,新增的在末尾)
            new_links = links[prev_count:]
            prev_count = len(links)
            added = 0
            for u in new_links:
                if u not in seen:
                    seen.add(u)
                    all_links.append((u, slug))
                    added += 1
            print(f"  [{slug}] 第 {page_num} 页: 本页 {len(new_links)} 个,新增 {added},累计 {len(all_links)}")
            if not _has_next_page(page):
                break
            if not _click_next_page(page):
                break
            time.sleep(1)
    else:
        # 分页改变 URL,逐页访问采集
        for i, page_url in enumerate(unique_urls):
            page.goto(page_url, wait_until="domcontentloaded", timeout=30000)
            time.sleep(1.5)
            links = _get_product_links(page)
            added = 0
            for u in links:
                if u not in seen:
                    seen.add(u)
                    all_links.append((u, slug))
                    added += 1
            print(f"  [{slug}] 第 {i+1} 页: {len(links)} 个链接,新增 {added},累计 {len(all_links)}")

    return all_links

def main():
    try:
        from playwright.sync_api import sync_playwright
    except ImportError:
        print("请先安装 Playwright: pip install playwright")
        print("然后运行: playwright install chromium  (或使用已安装的 Chrome)")
        return 1

    print("启动浏览器,采集 Boticario 产品链接...")

    with sync_playwright() as p:
        # 使用 Chrome 浏览器,移除自动化标识让浏览器更真实
        browser = p.chromium.launch(
            channel="chrome",
            headless=False,
            ignore_default_args=["--enable-automation"],  # 移除"受自动化软件控制"标识
            args=[
                "--incognito",
                "--disable-blink-features=AutomationControlled",  # 隐藏 AutomationControlled
                "--disable-application-cache",
                "--disable-cache",
                "--disable-offline-load-stale-cache",
                "--disable-infobars",  # 不显示"Chrome 正受到自动化测试软件的控制"
                "--no-sandbox",
                "--disable-dev-shm-usage",
            ],
        )
        # 模拟真实浏览器:巴西地区、常见分辨率、完整 UA
        context = browser.new_context(
            viewport={"width": 1920, "height": 1080},
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
            locale="pt-BR",
            timezone_id="America/Sao_Paulo",
            permissions=["geolocation"],
            color_scheme="light",
            extra_http_headers={
                "Accept-Language": "pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7",
                "sec-ch-ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
                "sec-ch-ua-mobile": "?0",
                "sec-ch-ua-platform": '"Windows"',
            },
        )
        # 注入脚本:隐藏 webdriver 标识,模拟真实浏览器指纹
        context.add_init_script("""
            Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
            Object.defineProperty(navigator, 'languages', { get: () => ['pt-BR', 'pt', 'en-US', 'en'] });
            if (!window.chrome) window.chrome = {};
            if (!window.chrome.runtime) window.chrome.runtime = {};
        """)
        context.clear_cookies()
        page = context.new_page()
        # 通过 CDP 清空浏览器缓存
        try:
            cdp = context.new_cdp_session(page)
            cdp.send("Network.clearBrowserCache")
            cdp.send("Network.clearBrowserCookies")
        except Exception:
            pass

        all_results = []
        for url, slug in CATEGORIES:
            print(f"\n采集分类: {slug} ({url})")
            results = _crawl_category(page, url, slug)
            all_results.extend(results)

        # 保存到文本: 产品链接,分类slug
        if all_results:
            with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
                for link, slug in all_results:
                    f.write(f"{link},{slug}\n")
            print(f"\n✓ 已保存 {len(all_results)} 条到 {OUTPUT_FILE}")
        else:
            print("\n未采集到任何产品链接")

        print("\n按 Enter 关闭浏览器...")
        input()
        browser.close()

    print("完成")
    return 0

if __name__ == "__main__":
    sys.exit(main())
# -*- coding: utf-8 -*-
"""
Boticario 产品详情采集脚本 - 参考 carrefour_cj.py
流程: 读取 boticario_products.txt -> 浏览器进入每个产品页 -> 采集 blz.product / JSON-LD / 图片 -> 输出 Shopify CSV
产品描述以 blz.product 为准
"""
import html
import csv
import os
import sys
import re
import unicodedata
import asyncio
from queue import Queue
import threading

# Windows 控制台 UTF-8
try:
    sys.stdout.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
    pass

# Shopify CSV 字段
FIELDS = [
    "Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
    "Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
    "Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
    "Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
    "Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
    "Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
    "Image Src", "Image Position", "Image Alt Text", "Gift Card",
    "SEO Title", "SEO Description", "Google Shopping / Google Product Category",
    "Google Shopping / Gender", "Google Shopping / Age Group",
    "Google Shopping / MPN", "Google Shopping / AdWords Grouping",
    "Google Shopping / AdWords Labels", "Google Shopping / Condition",
    "Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
    "Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
    "Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
    "Variant Image", "Variant Weight Unit", "Variant Tax Code",
    "Cost per item", "Status", "Collection"
]

PRODUCTS_TXT = "boticario_products.txt"
OUTPUT_DIR = "boticario_output"
VENDOR = "O Boticário"
SKIP_COLLECTIONS = ["perfumaria"]  # 不采集的分类
SKIP_COLLECTIONS_SET = {c.lower() for c in SKIP_COLLECTIONS}

# 产品图片 XPath(兼容多种结构)
PRODUCT_IMAGE_XPATHS = [
    "/html/body/main/div[3]/div[2]/div[2]/div[1]/span/img",
    "/html/body/main/div[3]/div[2]/div[2]/div[1]//img",
]

CONCURRENT_WORKERS = 5  # 同时启动的采集数量

write_queue = Queue()
counter_lock = threading.Lock()
processed_count = 0
total_tasks = 0
test_mode = False

def decode_html_entities(text):
    """解码 HTML 实体"""
    if not text:
        return ""
    s = str(text)
    for _ in range(2):
        s = html.unescape(s)
    return s

def normalize_text(text):
    """处理特殊字符"""
    if not text:
        return ""
    normalized = unicodedata.normalize("NFKD", str(text))
    normalized = "".join(c for c in normalized if not unicodedata.combining(c))
    return normalized

def slug_from_title(title):
    """从标题生成 handle"""
    s = unicodedata.normalize("NFKD", str(title))
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = re.sub(r"[^\w\s\-]", "", s)
    return re.sub(r"[-\s]+", "-", s).strip("-").lower()

def _coll_to_slug(name):
    """分类名转文件名用 slug"""
    if not name:
        return "default"
    s = unicodedata.normalize("NFKD", str(name))
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = re.sub(r"[^\w\s\-]", "", s)
    s = re.sub(r"[-\s]+", "-", s).strip("-").lower()
    return s or "default"

def load_products_txt(path=None):
    """从 txt 加载 (url, collection),格式: 产品链接,分类slug"""
    if path is None:
        path = PRODUCTS_TXT
    items = []
    seen = set()
    try:
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith("#"):
                    continue
                parts = [x.strip() for x in line.split(",", 1)]
                url = parts[0]
                coll = parts[1] if len(parts) > 1 else ""
                if coll.lower() in SKIP_COLLECTIONS_SET:
                    continue
                if url in seen:
                    continue
                seen.add(url)
                items.append((url, coll))
    except FileNotFoundError:
        pass
    return items

async def _extract_blz_product(page):
    """从页面提取 blz.product"""
    try:
        result = await page.evaluate("""() => {
            const p = (typeof blz !== 'undefined' && blz && blz.product) ? blz.product : null;
            if (!p) return null;
            return {
                sku: p.sku || '',
                name: p.name || '',
                slugName: p.slugName || '',
                subTotal: p.subTotal != null ? p.subTotal : '',
                description: p.description || '',
                quantity: p.quantity,
                buybox: p.buybox,
                kit: p.kit,
                tags: Array.isArray(p.tags) ? p.tags : []
            };
        }""")
        return result
    except Exception:
        return None

async def _extract_json_ld(page):
    """从页面提取 application/ld+json"""
    try:
        result = await page.evaluate("""() => {
            const scripts = document.querySelectorAll('script[type="application/ld+json"]');
            const arr = [];
            scripts.forEach(s => {
                try {
                    const data = JSON.parse(s.innerText);
                    arr.push(data);
                } catch (e) {}
            });
            return arr;
        }""")
        return result if result else []
    except Exception:
        return []

async def _extract_images_from_page(page):
    """从页面 XPath 提取产品图片 URL"""
    images = []
    seen = set()
    for xpath in PRODUCT_IMAGE_XPATHS:
        try:
            loc = page.locator(f"xpath={xpath}")
            count = await loc.count()
            for i in range(count):
                src = await loc.nth(i).get_attribute("src")
                if src and src.strip() and src not in seen:
                    if not src.startswith("http"):
                        src = "https://www.boticario.com.br" + (src if src.startswith("/") else "/" + src)
                    seen.add(src)
                    images.append(src.strip())
        except Exception:
            continue
        if images:
            break
    return images

def parse_boticario_product(blz, ld_list, images, url, collection):
    """从 blz.product、JSON-LD、图片 解析为 Shopify CSV 行"""
    coll_slug = _coll_to_slug(collection)

    # 以 blz.product 为准
    title = ""
    body = ""
    sku = ""
    price = ""
    handle = ""

    if blz and isinstance(blz, dict):
        title = (blz.get("name") or "").strip()
        body = (blz.get("description") or "").strip()
        sku = str(blz.get("sku") or "")
        p = blz.get("subTotal")
        price = str(p) if p is not None else ""
        slug_name = blz.get("slugName") or ""
        handle = slug_name[:100] if slug_name else ""

    # JSON-LD 作为补充(若 blz 缺失)
    for ld in (ld_list or []):
        if not isinstance(ld, dict):
            continue
        n, d = ld.get("name") or "", ld.get("description") or ""
        graph = ld.get("@graph") or []
        for g in graph if isinstance(graph, list) else []:
            if isinstance(g, dict) and g.get("@type") == "Product":
                n = n or g.get("name", "")
                d = d or g.get("description", "")
                if not images:
                    img = g.get("image")
                    if isinstance(img, str) and img.startswith("http"):
                        images = [img]
                    elif isinstance(img, list):
                        images = [x for x in img if isinstance(x, str) and x.startswith("http")]
                break
        if n and not title:
            title = str(n).strip()
        if d and not body:
            body = str(d).strip()
        if not images and ld.get("image"):
            img = ld["image"]
            images = [img] if isinstance(img, str) and img.startswith("http") else [x for x in (img or []) if isinstance(x, str) and x.startswith("http")]

    title = decode_html_entities(normalize_text(title)) or "Unknown"
    body = decode_html_entities(normalize_text(body))
    handle = handle or (re.sub(r"[^\w\-]", "-", slug_from_title(title)).strip("-")[:100] or "product")

    # 从 URL 提取 handle 备用
    if not handle and url:
        m = re.search(r"boticario\.com\.br/([^/?]+)", url)
        if m:
            handle = m.group(1).strip("/")[:100]

    base_row = {
        "Handle": handle or "product",
        "Title": title,
        "Body (HTML)": body,
        "Vendor": VENDOR,
        "Type": collection or "",
        "Tags": "",
        "Published": "TRUE",
        "_coll_slug": coll_slug,
        "Option1 Name": "",
        "Option1 Value": "",
        "Option2 Name": "",
        "Option2 Value": "",
        "Option3 Name": "",
        "Option3 Value": "",
        "Variant SKU": sku,
        "Variant Grams": "",
        "Variant Inventory Tracker": "shopify",
        "Variant Inventory Qty": 100,
        "Variant Inventory Policy": "deny",
        "Variant Fulfillment Service": "manual",
        "Variant Price": price,
        "Variant Compare At Price": "",
        "Variant Requires Shipping": "TRUE",
        "Variant Taxable": "TRUE",
        "Variant Barcode": "",
        "Image Src": "",
        "Image Position": 1,
        "Image Alt Text": title,
        "Gift Card": "FALSE",
        "SEO Title": title,
        "SEO Description": (body[:160] if body else ""),
        "Google Shopping / Google Product Category": "",
        "Google Shopping / Gender": "",
        "Google Shopping / Age Group": "",
        "Google Shopping / MPN": handle,
        "Google Shopping / AdWords Grouping": "",
        "Google Shopping / AdWords Labels": "",
        "Google Shopping / Condition": "New",
        "Google Shopping / Custom Product": "FALSE",
        "Google Shopping / Custom Label 0": "",
        "Google Shopping / Custom Label 1": "",
        "Google Shopping / Custom Label 2": "",
        "Google Shopping / Custom Label 3": "",
        "Google Shopping / Custom Label 4": "",
        "Variant Image": "",
        "Variant Weight Unit": "kg",
        "Variant Tax Code": "",
        "Cost per item": "",
        "Status": "active",
        "Collection": collection,
    }
    for k in FIELDS:
        if k not in base_row:
            base_row[k] = ""

    rows = []
    if images:
        for pos, img_url in enumerate(images, 1):
            row = dict(base_row)
            row["Image Src"] = img_url
            row["Image Position"] = pos
            if pos > 1:
                row["Title"] = ""
                row["Body (HTML)"] = ""
                row["Vendor"] = ""
                row["Variant Price"] = ""
                row["Variant Compare At Price"] = ""
                row["Variant SKU"] = ""
            rows.append(row)
    else:
        rows.append(base_row)
    return rows

def writer_thread():
    """CSV 写入线程"""
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    files = {}
    writers = {}
    single_file = "boticario_test.csv" if test_mode else None

    while True:
        row = write_queue.get()
        if row is None:
            break
        coll = "_single" if single_file else (row.get("_coll_slug", "") or "default")
        if coll not in writers:
            fname = single_file if single_file else f"boticario_{coll}.csv"
            fpath = os.path.join(OUTPUT_DIR, fname)
            f = open(fpath, "w", newline="", encoding="utf-8")
            w = csv.DictWriter(f, fieldnames=FIELDS)
            w.writeheader()
            files[coll] = f
            writers[coll] = w
        out_row = {k: v for k, v in row.items() if k in FIELDS}
        writers[coll].writerow(out_row)
        files[coll].flush()
        write_queue.task_done()
    for f in files.values():
        f.close()

async def _accept_cookies(page):
    """如有 Cookie 同意横幅,点击接受"""
    try:
        btn = page.get_by_role("button", name="Aceitar todos os cookies")
        if await btn.is_visible():
            await btn.click()
            await asyncio.sleep(0.5)
    except Exception:
        pass

async def process_one_async(page, url, collection):
    """处理单个产品页(异步)"""
    global processed_count
    try:
        await page.goto(url, wait_until="domcontentloaded", timeout=30000)
        await asyncio.sleep(1.5)
        await _accept_cookies(page)
        await asyncio.sleep(0.5)
        try:
            await page.wait_for_function("() => typeof blz !== 'undefined' && blz && blz.product", timeout=5000)
        except Exception:
            pass

        blz = await _extract_blz_product(page)
        ld_list = await _extract_json_ld(page)
        images = await _extract_images_from_page(page)

        rows = parse_boticario_product(blz, ld_list, images, url, collection)
        for row in rows:
            write_queue.put(row)

        with counter_lock:
            processed_count += 1
            print(f"Progress: {processed_count}/{total_tasks} - {url[:60]}...")
    except Exception as e:
        print(f"跳过 {url[:50]}... ({e})")

async def _worker(page, queue):
    """异步 worker:从 queue 取任务并处理"""
    while True:
        try:
            url, collection = queue.get_nowait()
        except asyncio.QueueEmpty:
            break
        await process_one_async(page, url, collection)
        await asyncio.sleep(0.3)

async def _create_context(browser):
    """创建单个浏览器 context"""
    return await browser.new_context(
        viewport={"width": 1920, "height": 1080},
        user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
        locale="pt-BR",
        timezone_id="America/Sao_Paulo",
    )

async def run_async(workers, items):
    """异步主流程"""
    from playwright.async_api import async_playwright

    async with async_playwright() as p:
        browser = await p.chromium.launch(
            channel="chrome",
            headless=not ("--show" in sys.argv or "-s" in sys.argv),
            ignore_default_args=["--enable-automation"],
            args=[
                "--disable-blink-features=AutomationControlled",
                "--disable-infobars",
                "--no-sandbox",
                "--disable-dev-shm-usage",
            ],
        )
        queue = asyncio.Queue()
        for url, coll in items:
            queue.put_nowait((url, coll))

        pages = []
        for _ in range(workers):
            ctx = await _create_context(browser)
            ctx.add_init_script("""
                Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
                if (!window.chrome) window.chrome = {};
                if (!window.chrome.runtime) window.chrome.runtime = {};
            """)
            pages.append(await ctx.new_page())

        await asyncio.gather(*[_worker(page, queue) for page in pages])
        await browser.close()

def main():
    global total_tasks, test_mode
    test_mode = "--test" in sys.argv or "-t" in sys.argv
    workers = CONCURRENT_WORKERS
    if "--workers" in sys.argv:
        for i, arg in enumerate(sys.argv):
            if arg == "--workers" and i + 1 < len(sys.argv):
                try:
                    workers = int(sys.argv[i + 1])
                except ValueError:
                    pass
                break

    items = load_products_txt()
    if not items:
        print(f"无产品。请确保 {PRODUCTS_TXT} 存在且有内容(格式: 产品链接,分类slug)")
        return 1

    if test_mode:
        items = items[:3]
        print(f"[--test] 测试模式:仅采集前 3 个产品")

    total_tasks = len(items)
    print(f"开始采集产品详情,共 {total_tasks} 个,{workers} 个并发(asyncio)")

    try:
        import playwright.async_api
    except ImportError:
        print("请先安装 Playwright: pip install playwright")
        return 1

    writer = threading.Thread(target=writer_thread)
    writer.start()

    asyncio.run(run_async(workers, items))

    write_queue.put(None)
    writer.join()
    print(f"完成,处理 {processed_count}/{total_tasks},已写入 {OUTPUT_DIR}/")

if __name__ == "__main__":
    sys.exit(main() or 0)