搜索结果

×

搜索结果将在这里显示。

🍕 adidas-nl

from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
import json
import time
import random
import csv
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import threading

def fetch_products(driver, start):
    url = f"https://www.adidas.nl/plp-app/_next/data/yihGhDVNwW3-AondZe8Ef/hoodies.json?path=hoodies&start={start}"
    try:
        print(f"\nTrying to fetch URL: {url}")
        driver.get(url)
        time.sleep(2)

        # 获取页面源码
        page_source = driver.page_source
        print("Page source length:", len(page_source))

        # 尝试找到JSON内容
        if '<pre' in page_source:
            json_content = driver.find_element(By.TAG_NAME, "pre").text
            print("Found pre tag with content length:", len(json_content))
            data = json.loads(json_content)
            print("Successfully parsed JSON")

            # 直接从pageProps获取products
            if 'pageProps' in data and 'products' in data['pageProps']:
                products = data['pageProps']['products']
                print(f"Found {len(products)} products")
                return products
            else:
                print("No products found in pageProps")
                print("Available keys in pageProps:", list(data.get('pageProps', {}).keys()))
                return None
        else:
            print("No pre tag found in page source")
            return None

    except Exception as e:
        print(f"Error fetching data: {str(e)}")
        print("Full error details:", str(e.__class__.__name__))
        return None

def get_product_details(driver, product_id):
    url = f"https://www.adidas.nl/api/products/{product_id}"
    try:
        driver.get(url)
        time.sleep(2)

        # 获取页面源码
        page_source = driver.page_source
        if '<pre' in page_source:
            json_content = driver.find_element(By.TAG_NAME, "pre").text
            return json.loads(json_content)
        return None
    except Exception as e:
        print(f"Error getting product {product_id}: {str(e)}")
        return None

def format_product_data(product_data):
    if not product_data:
        return None

    try:
        # 获取价格信息
        pricing_info = product_data.get('pricing_information', {})
        current_price = pricing_info.get('currentPrice', '')
        standard_price = pricing_info.get('standard_price', '')

        # 获取描述信息
        description = product_data.get('product_description', {})
        title = description.get('title', '')
        text = description.get('text', '')

        # 获取属性列表
        attributes = product_data.get('attribute_list', {})

        # 获取所有图片URL
        view_list = product_data.get('view_list', [])
        images = []
        for idx, view in enumerate(view_list):
            image_url = view.get('image_url', '')
            if image_url:
                images.append({
                    'url': image_url,
                    'position': idx + 1,
                    'alt': title
                })

        # 获取meta数据
        meta_data = product_data.get('meta_data', {})

        # 获取变体列表(尺码)
        variations = product_data.get('variation_list', [])

        # 获取颜色变体
        color_variations = []
        product_links = product_data.get('product_link_list', [])
        for link in product_links:
            if link.get('type') == 'color-variation':
                color_variations.append({
                    'id': link.get('productId', ''),
                    'color': link.get('search_color', ''),
                    'name': link.get('name', '')
                })

        # 为每个尺码创建一个变体记录
        product_variants = []
        for variation in variations:
            size = variation.get('size', '')
            sku = variation.get('sku', '')

            variant = {
                "Handle": product_data.get("id", ""),
                "Title": title,
                "Body (HTML)": text,
                "Vendor": "Adidas",
                "Type": attributes.get("category", ""),
                "Tags": ",".join(attributes.get("productType", [])) if isinstance(attributes.get("productType"), list) else attributes.get("productType", ""),
                "Published": "TRUE",
                "Option1 Name": "Size",
                "Option1 Value": size,
                "Option2 Name": "Color",
                "Option2 Value": attributes.get("color", ""),
                "Option3 Name": "",
                "Option3 Value": "",
                "Variant SKU": sku,
                "Variant Grams": "0",
                "Variant Inventory Tracker": "",
                "Variant Inventory Qty": "1",
                "Variant Inventory Policy": "deny",
                "Variant Fulfillment Service": "manual",
                "Variant Price": str(current_price),
                "Variant Compare At Price": str(standard_price),
                "Variant Requires Shipping": "TRUE",
                "Variant Taxable": "TRUE",
                "Variant Barcode": "",
                "Image Src": images[0]['url'] if images else '',
                "Image Position": "1",
                "Image Alt Text": title,
                "Gift Card": "FALSE",
                "SEO Title": meta_data.get("page_title", ""),
                "SEO Description": meta_data.get("description", ""),
                "Google Shopping / Google Product Category": attributes.get("category", ""),
                "Google Shopping / Gender": attributes.get("gender", ""),
                "Google Shopping / Age Group": "",
                "Google Shopping / MPN": product_data.get("model_number", ""),
                "Google Shopping / AdWords Grouping": "",
                "Google Shopping / AdWords Labels": "",
                "Google Shopping / Condition": "New",
                "Google Shopping / Custom Product": "FALSE",
                "Google Shopping / Custom Label 0": "",
                "Google Shopping / Custom Label 1": "",
                "Google Shopping / Custom Label 2": "",
                "Google Shopping / Custom Label 3": "",
                "Google Shopping / Custom Label 4": "",
                "Variant Image": images[0]['url'] if images else '',
                "Variant Weight Unit": "g",
                "Variant Tax Code": "",
                "Cost per item": "",
                "Status": "active"
            }
            product_variants.append(variant)

        # 添加额外的图片记录
        for image in images[1:]:  # 跳过第一张图片,因为它已经包含在变体中
            image_record = {
                "Handle": product_data.get("id", ""),
                "Title": "",
                "Body (HTML)": "",
                "Vendor": "",
                "Type": "",
                "Tags": "",
                "Published": "",
                "Option1 Name": "",
                "Option1 Value": "",
                "Option2 Name": "",
                "Option2 Value": "",
                "Option3 Name": "",
                "Option3 Value": "",
                "Variant SKU": "",
                "Variant Grams": "",
                "Variant Inventory Tracker": "",
                "Variant Inventory Qty": "",
                "Variant Inventory Policy": "",
                "Variant Fulfillment Service": "",
                "Variant Price": "",
                "Variant Compare At Price": "",
                "Variant Requires Shipping": "",
                "Variant Taxable": "",
                "Variant Barcode": "",
                "Image Src": image['url'],
                "Image Position": str(image['position']),
                "Image Alt Text": image['alt'],
                "Gift Card": "",
                "SEO Title": "",
                "SEO Description": "",
                "Google Shopping / Google Product Category": "",
                "Google Shopping / Gender": "",
                "Google Shopping / Age Group": "",
                "Google Shopping / MPN": "",
                "Google Shopping / AdWords Grouping": "",
                "Google Shopping / AdWords Labels": "",
                "Google Shopping / Condition": "",
                "Google Shopping / Custom Product": "",
                "Google Shopping / Custom Label 0": "",
                "Google Shopping / Custom Label 1": "",
                "Google Shopping / Custom Label 2": "",
                "Google Shopping / Custom Label 3": "",
                "Google Shopping / Custom Label 4": "",
                "Variant Image": "",
                "Variant Weight Unit": "",
                "Variant Tax Code": "",
                "Cost per item": "",
                "Status": ""
            }
            product_variants.append(image_record)

        return product_variants
    except Exception as e:
        print(f"Error formatting product data: {str(e)}")
        return None

def collect_ids():
    # 设置Edge WebDriver
    service = Service("msedgedriver.exe")
    options = webdriver.EdgeOptions()
    # 添加更多选项来模拟真实浏览器
    options.add_argument('--disable-blink-features=AutomationControlled')
    options.add_argument('--disable-extensions')
    options.add_experimental_option('useAutomationExtension', False)
    options.add_experimental_option("excludeSwitches", ["enable-automation"])

    driver = webdriver.Edge(service=service, options=options)
    # 设置一些JavaScript变量来避免检测
    driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")

    try:
        with open('adidas-nl.txt', 'w', encoding='utf-8') as f:
            for i in range(12):
                start = i * 48
                print(f"\nFetching data for start={start}")

                products = fetch_products(driver, start)
                if not products:
                    print(f"No products returned for start={start}")
                    time.sleep(5)
                    continue

                for product in products:
                    # 获取主产品ID
                    product_id = product.get('id', '')
                    if product_id:
                        f.write(f"{product_id}\n")
                        print(f"Saved product ID: {product_id}")

Weight Unit",
                     "Variant Tax Code", "Cost per item", "Status"]

        # 创建结果队列
        result_queue = Queue()

        # 创建CSV文件并保持打开
        with open('adidas-nl.csv', 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()

            # 创建线程池
            with ThreadPoolExecutor(max_workers=10) as executor:
                # 提交所有任务
                future_to_id = {executor.submit(process_product, product_id, result_queue): product_id 
                              for product_id in product_ids}

                # 处理完成的任务
                completed = 0
                total = len(product_ids)

                # 创建写入线程
                def write_results():
                    while completed < total:
                        try:
                            product_id, formatted_data = result_queue.get(timeout=1)
                            for variant in formatted_data:
                                writer.writerow(variant)
                            f.flush()  # 立即写入文件
                            print(f"Saved product {product_id} to CSV")
                        except:
                            continue

                # 启动写入线程
                write_thread = threading.Thread(target=write_results)
                write_thread.start()

                # 等待所有任务完成
                for future in future_to_id:
                    try:
                        success = future.result()
                        completed += 1
                        print(f"Progress: {completed}/{total}")
                    except Exception as e:
                        print(f"Task error: {str(e)}")
                        completed += 1

                # 等待写入线程完成
                write_thread.join()

        print("All products processed")

    except Exception as e:
        print(f"Error in collect_products: {str(e)}")

def main():
    while True:
        print("\n=== Adidas NL Scraper ===")
        print("1. 采集产品ID")
        print("2. 采集产品详细信息 (10线程)")
        print("3. 退出")
        choice = input("请选择功能 (1-3): ")

        if choice == "1":
            print("\n开始采集产品ID...")
            collect_ids()
        elif choice == "2":
            print("\n开始采集产品详细信息 (10线程)...")
            collect_products()
        elif choice == "3":
            print("\n程序退出")
            break
        else:
            print("\n无效的选择,请重试")

if __name__ == "__main__":
    main()