搜索结果

×

搜索结果将在这里显示。

🕹️ lacoste

lacoste-getpid

from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import logging
import time
import random
import os
import sys

# 创建logger
logger = logging.getLogger('lacoste_scraper')
logger.setLevel(logging.INFO)

# 创建文件处理器
file_handler = logging.FileHandler("lacoste_scraper.log", mode='w', encoding='utf-8')
file_handler.setLevel(logging.INFO)

# 创建控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)

# 创建格式器
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

# 添加处理器到logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

def setup_driver():
    edge_options = webdriver.EdgeOptions()
    edge_options.add_argument('--disable-blink-features=AutomationControlled')
    edge_options.add_argument('--disable-extensions')
    edge_options.add_experimental_option('useAutomationExtension', False)
    edge_options.add_experimental_option("excludeSwitches", ["enable-automation"])

    service = Service("msedgedriver.exe")
    driver = webdriver.Edge(service=service, options=edge_options)

    # 修改 navigator.webdriver
    driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
    return driver

def fetch_urls(base_url, output_file):
    driver = setup_driver()
    page = 1
    extracted_urls = set()

    try:
        while True:
            url = f"{base_url}page={page}"
            logger.info(f"Fetching page {page}: {url}")

            try:
                driver.get(url)
                logger.info("Waiting for page to load...")
                time.sleep(5)  # 添加固定等待时间确保页面基本加载完成

                # 等待产品链接加载
                try:
                    elements = WebDriverWait(driver, 30).until(  # 增加等待时间到30秒
                        EC.presence_of_all_elements_located((By.XPATH, "//div[contains(@class, 'product-tile')]//a[contains(@href, '/es/lacoste/')]"))
                    )
                    logger.info(f"Found {len(elements)} elements on page")
                except Exception as e:
                    logger.error(f"Timeout waiting for elements: {str(e)}")
                    break

                # 获取所有链接
                links = driver.find_elements(By.XPATH, "//div[contains(@class, 'product-tile')]//a[contains(@href, '/es/lacoste/')]")
                links = [link.get_attribute('href') for link in links]
                logger.info(f"Extracted {len(links)} links from page")

                # 打印前几个链接用于调试
                for i, link in enumerate(links[:3]):
                    logger.info(f"Sample link {i + 1}: {link}")

                if not links:
                    logger.info(f"No more links found on page {page}. Stopping.")
                    break

                for link in links:
                    if link and "?color=" in link:
                        pid = link.split("/")[-1].split("?")[0]
                        color = link.split("color=")[-1]
                        json_url = f"/on/demandware.store/Sites-ES-Site/es/Product-PartialsData?pid={pid}&color={color}&size=2&full=true&format=json"

                        if json_url not in extracted_urls:
                            extracted_urls.add(json_url)
                            logger.info(f"Found new product: {pid} - Color: {color}")

            except Exception as e:
                logger.error(f"Error on page {page}: {str(e)}")
                break

            page += 1
            time.sleep(random.uniform(3, 6))

    finally:
        driver.quit()

        with open(output_file, "w", encoding='utf-8') as f:
            for url in sorted(extracted_urls):
                f.write(url + "\n")

        logger.info(f"Finished fetching URLs. Total URLs saved: {len(extracted_urls)}")

if __name__ == "__main__":
    BASE_URL = "https://www.lacoste.com/es/lacoste/?"
    OUTPUT_FILE = "lacoste-pid.txt"

    logger.info("Starting Lacoste scraper...")
    fetch_urls(BASE_URL, OUTPUT_FILE)

lacoste

import json
import csv
import time
from urllib.parse import urlparse
from queue import Queue
import threading
import unicodedata
import logging
from datetime import datetime
import sys
from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging.handlers import RotatingFileHandler

# 设置日志
log_file = 'lacoste_scraper.log'
max_bytes = 10 * 1024 * 1024  # 10MB
backup_count = 3  # 保留3个备份文件

# 创建 RotatingFileHandler
file_handler = RotatingFileHandler(
    log_file,
    maxBytes=max_bytes,
    backupCount=backup_count,
    encoding='utf-8'
)

# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)

# 创建控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)

# 配置根日志记录器
logging.root.setLevel(logging.INFO)
logging.root.addHandler(file_handler)
logging.root.addHandler(console_handler)

# 清理之前的处理器
for handler in logging.root.handlers[:]:
    if isinstance(handler, logging.FileHandler) and not isinstance(handler, RotatingFileHandler):
        logging.root.removeHandler(handler)

# Shopify 标准 CSV 格式字段
fields = [
    "Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
    "Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
    "Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
    "Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
    "Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
    "Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
    "Image Src", "Image Position", "Image Alt Text", "Gift Card",
    "SEO Title", "SEO Description", "Google Shopping / Google Product Category",
    "Google Shopping / Gender", "Google Shopping / Age Group",
    "Google Shopping / MPN", "Google Shopping / AdWords Grouping",
    "Google Shopping / AdWords Labels", "Google Shopping / Condition",
    "Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
    "Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
    "Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
    "Variant Image", "Variant Weight Unit", "Variant Tax Code",
    "Cost per item", "Status"
]

# 创建一个线程安全的写入队列
write_queue = Queue()
# 创建一个线程安全的计数器
counter_lock = threading.Lock()
processed_count = 0
total_urls = 0
success_count = 0
error_count = 0

def create_driver():
    """创建Edge WebDriver实例"""
    try:
        edge_options = webdriver.EdgeOptions()
        edge_options.add_argument('--headless')
        edge_options.add_argument('--disable-gpu')
        edge_options.add_argument('--no-sandbox')
        edge_options.add_argument('--disable-dev-shm-usage')
        edge_options.add_argument('--disable-blink-features=AutomationControlled')
        edge_options.add_argument('--disable-extensions')
        edge_options.add_argument('--disable-notifications')
        edge_options.add_argument('--ignore-certificate-errors')
        edge_options.add_argument('--log-level=3')
        edge_options.add_experimental_option('excludeSwitches', ['enable-logging'])

        # 添加用户代理
        edge_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')

        # 使用当前目录下的msedgedriver.exe
        driver_path = os.path.join(os.getcwd(), 'msedgedriver.exe')
        if not os.path.exists(driver_path):
            logging.error(f"msedgedriver.exe not found in {driver_path}")
            return None

        service = Service(driver_path)
        driver = webdriver.Edge(service=service, options=edge_options)

        # 设置页面加载超时
        driver.set_page_load_timeout(30)
        driver.set_script_timeout(30)

        # 设置窗口大小
        driver.set_window_size(1920, 1080)

        return driver
    except Exception as e:
        logging.error(f"Error creating WebDriver: {str(e)}")
        return None

def normalize_text(text):
    """处理特殊字符和乱码"""
    if not text:
        return ""
    # 将特殊字符转换为基本拉丁字符
    normalized = unicodedata.normalize('NFKD', text)
    # 移除组合字符
    normalized = ''.join(c for c in normalized if not unicodedata.combining(c))
    # 特殊字符映射
    char_map = {
        'ł': 'l', 'Ł': 'L',
        'ą': 'a', 'Ą': 'A',
        'ć': 'c', 'Ć': 'C',
        'ę': 'e', 'Ę': 'E',
        'ń': 'n', 'Ń': 'N',
        'ó': 'o', 'Ó': 'O',
        'ś': 's', 'Ś': 'S',
        'ź': 'z', 'Ź': 'Z',
        'ż': 'z', 'Ż': 'Z',
        '–': '-', '—': '-',
        '"': '"', '"': '"',
        ''': "'", ''': "'",
    }
    for old, new in char_map.items():
        normalized = normalized.replace(old, new)
    return normalized

def get_json_data(driver, url, index):
    """使用Edge WebDriver获取JSON数据"""
    global success_count, error_count
    try:
        logging.info(f"[{index}/{total_urls}] Fetching: {url}")

        # 添加重试机制
        max_retries = 3
        for retry in range(max_retries):
            try:
                driver.get(url)
                # 等待页面加载完成
                WebDriverWait(driver, 10).until(
                    lambda d: d.execute_script('return document.readyState') == 'complete'
                )

                # 等待pre标签出现或直接获取页面源码
                try:
                    # 首先尝试等待pre标签
                    pre_element = WebDriverWait(driver, 5).until(
                        EC.presence_of_element_located((By.TAG_NAME, "pre"))
                    )
                    response_text = pre_element.text
                except:
                    # 如果找不到pre标签,直接获取页面源码
                    response_text = driver.page_source

                # 尝试解析JSON
                try:
                    data = json.loads(response_text)
                    # 记录原始数据结构
                    logging.info(f"[{index}/{total_urls}] Raw JSON data: {json.dumps(data, indent=2)}")
                except json.JSONDecodeError:
                    # 如果不是有效的JSON,尝试提取JSON部分
                    start_idx = response_text.find('{')
                    end_idx = response_text.rfind('}') + 1
                    if start_idx != -1 and end_idx != 0:
                        json_str = response_text[start_idx:end_idx]
                        data = json.loads(json_str)
                        # 记录原始数据结构
                        logging.info(f"[{index}/{total_urls}] Raw JSON data (extracted): {json.dumps(data, indent=2)}")
                    else:
                        raise Exception("No valid JSON found in response")

                success_count += 1
                logging.info(f"[{index}/{total_urls}] Successfully fetched data from {url}")
                return data

            except TimeoutException:
                if retry < max_retries - 1:
                    logging.warning(f"[{index}/{total_urls}] Timeout, retrying {retry + 1}/{max_retries}")
                    time.sleep(2)
                    continue
                else:
                    raise
            except Exception as e:
                if retry < max_retries - 1:
                    logging.warning(f"[{index}/{total_urls}] Error, retrying {retry + 1}/{max_retries}: {str(e)}")
                    time.sleep(2)
                    continue
                else:
                    raise

    except TimeoutException:
        error_count += 1
        logging.error(f"[{index}/{total_urls}] Timeout fetching {url}")
        return None
    except WebDriverException as e:
        error_count += 1
        logging.error(f"[{index}/{total_urls}] WebDriver error for {url}: {str(e)}")
        return None
    except Exception as e:
        error_count += 1
        logging.error(f"[{index}/{total_urls}] Unexpected error for {url}: {str(e)}")
        return None

def process_product(driver, url, index, output_file):
    """处理单个商品数据"""
    global processed_count

    data = get_json_data(driver, url, index)
    if not data:
        return

    try:
        # 获取产品数据
        product = data.get("product", {})
        if not product:
            logging.error(f"[{index}/{total_urls}] No product data found in response")
            return

        # 商品基础信息
        handle = product.get("masterProductId", "")
        title = normalize_text(product.get("name", ""))

        # 获取并组合description中的texts和list
        description_blocks = product.get("description", {}).get("blocks", [])
        main_block = next((block for block in description_blocks if block.get("type") == "main"), {})

        texts = main_block.get("texts", [])
        features_list = main_block.get("list", [])

        # 组合HTML
        body_html = ""
        if texts:
            body_html += "<div>" + "<br>".join(normalize_text(text) for text in texts) + "</div>"
        if features_list:
            body_html += "<br><ul>" + "".join(f"<li>{normalize_text(item)}</li>" for item in features_list) + "</ul>"

        vendor = "Lacoste"

        # 获取并组合商品分类信息
        context_data = data.get("product", {}).get("context", {})
        if not context_data:
            # 如果在product中没有找到context,则从根级别获取
            context_data = data.get("context", {})

        logging.info(f"[{index}/{total_urls}] Context data: {json.dumps(context_data, indent=2)}")

        ecommerce_items = context_data.get("data", {}).get("ecommerce", {}).get("items", [{}])
        logging.info(f"[{index}/{total_urls}] Ecommerce items: {json.dumps(ecommerce_items, indent=2)}")

        product_type = "Clothing"  # 默认值
        if ecommerce_items:
            item_data = ecommerce_items[0]
            logging.info(f"[{index}/{total_urls}] Item data: {json.dumps(item_data, indent=2)}")

            item_category = item_data.get("item_category", "")
            item_category2 = item_data.get("item_category2", "")
            item_category3 = item_data.get("item_category3", "")
            item_category4 = item_data.get("item_category4", "")

            logging.info(f"[{index}/{total_urls}] Categories: {item_category}, {item_category2}, {item_category3}, {item_category4}")

            if any([item_category, item_category2, item_category3, item_category4]):
                product_type = ", ".join(filter(None, [item_category, item_category2, item_category3, item_category4]))
                logging.info(f"[{index}/{total_urls}] Final product type: {product_type}")
            else:
                logging.info(f"[{index}/{total_urls}] Using default product type: {product_type}")

        published = "TRUE"

        # 获取颜色信息
        color_info = product.get("color", {})
        colour = normalize_text(color_info.get("label", ""))

        # 商品图片信息
        gallery = product.get("gallery", {})
        images = [img.get("desktopUrl", "") for img in gallery.get("images", [])]
        if not images:
            images = [""]

        # 获取价格信息
        pricing = product.get("pricing", {})
        price = pricing.get("salesPrice", {}).get("value", "")
        compare_price = pricing.get("standardPrice", {}).get("value", "")

        # 获取尺码信息
        size_info = product.get("variations", {}).get("size", {}).get("list", [])

        # 立即写入CSV文件
        with open(output_file, "a", newline="", encoding="utf-8") as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fields)

            for idx, size in enumerate(size_info):
                size_value = normalize_text(size.get("label", ""))
                variant_sku = size.get("ean", "")

                # 构造每一行数据
                row = {
                    "Handle": handle,
                    "Title": title if idx == 0 else "",
                    "Body (HTML)": body_html if idx == 0 else "",
                    "Vendor": vendor if idx == 0 else "",
                    "Type": product_type if idx == 0 else "",
                    "Tags": "" if idx == 0 else "",
                    "Published": published if idx == 0 else "",
                    "Option1 Name": "Size",
                    "Option1 Value": size_value,
                    "Option2 Name": "Color",
                    "Option2 Value": colour,
                    "Variant SKU": variant_sku,
                    "Variant Grams": "",
                    "Variant Inventory Tracker": "shopify",
                    "Variant Inventory Qty": 100 if not size.get("unavailable") else 0,
                    "Variant Inventory Policy": "deny",
                    "Variant Fulfillment Service": "manual",
                    "Variant Price": price,
                    "Variant Compare At Price": compare_price,
                    "Variant Requires Shipping": "TRUE",
                    "Variant Taxable": "TRUE",
                    "Variant Barcode": variant_sku,
                    "Image Src": images[0] if images else "",
                    "Image Position": 1 if idx == 0 else "",
                    "Image Alt Text": title,
                    "Gift Card": "FALSE",
                    "SEO Title": title if idx == 0 else "",
                    "SEO Description": body_html if idx == 0 else "",
                    "Google Shopping / Google Product Category": "",
                    "Google Shopping / Gender": "Unisex",
                    "Google Shopping / Age Group": "Adult",
                    "Google Shopping / MPN": handle,
                    "Google Shopping / AdWords Grouping": "",
                    "Google Shopping / AdWords Labels": "",
                    "Google Shopping / Condition": "New",
                    "Google Shopping / Custom Product": "FALSE",
                    "Google Shopping / Custom Label 0": "",
                    "Google Shopping / Custom Label 1": "",
                    "Google Shopping / Custom Label 2": "",
                    "Google Shopping / Custom Label 3": "",
                    "Google Shopping / Custom Label 4": "",
                    "Variant Image": "",
                    "Variant Weight Unit": "kg",
                    "Variant Tax Code": "",
                    "Cost per item": "",
                    "Status": "active"
                }
                writer.writerow(row)
                csvfile.flush()  # 确保立即写入磁盘

                # 添加额外的图片行
                if idx == 0 and len(images) > 1:
                    for img_idx, img_url in enumerate(images[1:], 2):
                        image_row = {field: "" for field in fields}
                        image_row["Handle"] = handle
                        image_row["Image Src"] = img_url
                        image_row["Image Position"] = img_idx
                        image_row["Image Alt Text"] = title
                        writer.writerow(image_row)
                        csvfile.flush()  # 确保立即写入磁盘

        # 更新进度
        with counter_lock:
            global processed_count
            processed_count += 1
            logging.info(f"[{processed_count}/{total_urls}] Successfully processed: {url}")

    except Exception as e:
        logging.error(f"[{index}/{total_urls}] Error processing {url}: {str(e)}")

def process_urls_batch(urls, start_idx, batch_size, output_file):
    """处理一批URL"""
    max_retries = 3
    for retry in range(max_retries):
        driver = create_driver()
        if driver:
            try:
                for i, url in enumerate(urls):
                    # 确保URL包含full=true参数
                    if "full=true" not in url:
                        if "?" in url:
                            url += "&full=true&format=json"
                        else:
                            url += "?full=true&format=json"
                    process_product(driver, url, start_idx + i, output_file)
                break  # 如果成功完成,跳出重试循环
            except Exception as e:
                logging.error(f"Batch processing error (attempt {retry + 1}/{max_retries}): {str(e)}")
                if retry < max_retries - 1:
                    time.sleep(5)  # 在重试之前等待
                    continue
            finally:
                try:
driver.quit()
                except:
                    pass
        else:
            if retry < max_retries - 1:
                logging.error(f"Failed to create driver (attempt {retry + 1}/{max_retries}), retrying...")
                time.sleep(5)
            else:
                logging.error("Failed to create driver after all retries")
                break

def main():
    start_time = datetime.now()
    logging.info("Starting Lacoste data scraping...")

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file = f"lacoste_products_{timestamp}.csv"

    # 创建CSV文件并写入表头
    with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fields)
        writer.writeheader()

    # 读取链接文件
    try:
        with open("lacoste-pid.txt", "r", encoding="utf-8") as f:
            urls = [f"https://www.lacoste.com{line.strip()}" for line in f if line.strip()]

        global total_urls
        total_urls = len(urls)
        logging.info(f"Loaded {total_urls} URLs from lacoste-pid.txt")
    except Exception as e:
        logging.error(f"Error reading lacoste-pid.txt: {str(e)}")
        return

    # 将URLs分成批次处理
    batch_size = total_urls // 5  # 5个线程
    if batch_size == 0:
        batch_size = 1

    with ThreadPoolExecutor(max_workers=100) as executor:
        futures = []
        for i in range(0, total_urls, batch_size):
            batch_urls = urls[i:i + batch_size]
            future = executor.submit(process_urls_batch, batch_urls, i + 1, batch_size, output_file)
            futures.append(future)

        # 等待所有任务完成
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                logging.error(f"Batch processing failed: {str(e)}")

    end_time = datetime.now()
    duration = end_time - start_time

    logging.info(f"Scraping completed in {duration}")
    logging.info(f"Total URLs processed: {processed_count}/{total_urls}")
    logging.info(f"Successful requests: {success_count}")
    logging.info(f"Failed requests: {error_count}")
    logging.info(f"Results saved to: {output_file}")

if __name__ == "__main__":
    main()