from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
import json
import time
import random
import csv
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import threading
def fetch_products(driver, start):
url = f"https://www.adidas.nl/plp-app/_next/data/yihGhDVNwW3-AondZe8Ef/hoodies.json?path=hoodies&start={start}"
try:
print(f"\nTrying to fetch URL: {url}")
driver.get(url)
time.sleep(2)
# 获取页面源码
page_source = driver.page_source
print("Page source length:", len(page_source))
# 尝试找到JSON内容
if '<pre' in page_source:
json_content = driver.find_element(By.TAG_NAME, "pre").text
print("Found pre tag with content length:", len(json_content))
data = json.loads(json_content)
print("Successfully parsed JSON")
# 直接从pageProps获取products
if 'pageProps' in data and 'products' in data['pageProps']:
products = data['pageProps']['products']
print(f"Found {len(products)} products")
return products
else:
print("No products found in pageProps")
print("Available keys in pageProps:", list(data.get('pageProps', {}).keys()))
return None
else:
print("No pre tag found in page source")
return None
except Exception as e:
print(f"Error fetching data: {str(e)}")
print("Full error details:", str(e.__class__.__name__))
return None
def get_product_details(driver, product_id):
url = f"https://www.adidas.nl/api/products/{product_id}"
try:
driver.get(url)
time.sleep(2)
# 获取页面源码
page_source = driver.page_source
if '<pre' in page_source:
json_content = driver.find_element(By.TAG_NAME, "pre").text
return json.loads(json_content)
return None
except Exception as e:
print(f"Error getting product {product_id}: {str(e)}")
return None
def format_product_data(product_data):
if not product_data:
return None
try:
# 获取价格信息
pricing_info = product_data.get('pricing_information', {})
current_price = pricing_info.get('currentPrice', '')
standard_price = pricing_info.get('standard_price', '')
# 获取描述信息
description = product_data.get('product_description', {})
title = description.get('title', '')
text = description.get('text', '')
# 获取属性列表
attributes = product_data.get('attribute_list', {})
# 获取所有图片URL
view_list = product_data.get('view_list', [])
images = []
for idx, view in enumerate(view_list):
image_url = view.get('image_url', '')
if image_url:
images.append({
'url': image_url,
'position': idx + 1,
'alt': title
})
# 获取meta数据
meta_data = product_data.get('meta_data', {})
# 获取变体列表(尺码)
variations = product_data.get('variation_list', [])
# 获取颜色变体
color_variations = []
product_links = product_data.get('product_link_list', [])
for link in product_links:
if link.get('type') == 'color-variation':
color_variations.append({
'id': link.get('productId', ''),
'color': link.get('search_color', ''),
'name': link.get('name', '')
})
# 为每个尺码创建一个变体记录
product_variants = []
for variation in variations:
size = variation.get('size', '')
sku = variation.get('sku', '')
variant = {
"Handle": product_data.get("id", ""),
"Title": title,
"Body (HTML)": text,
"Vendor": "Adidas",
"Type": attributes.get("category", ""),
"Tags": ",".join(attributes.get("productType", [])) if isinstance(attributes.get("productType"), list) else attributes.get("productType", ""),
"Published": "TRUE",
"Option1 Name": "Size",
"Option1 Value": size,
"Option2 Name": "Color",
"Option2 Value": attributes.get("color", ""),
"Option3 Name": "",
"Option3 Value": "",
"Variant SKU": sku,
"Variant Grams": "0",
"Variant Inventory Tracker": "",
"Variant Inventory Qty": "1",
"Variant Inventory Policy": "deny",
"Variant Fulfillment Service": "manual",
"Variant Price": str(current_price),
"Variant Compare At Price": str(standard_price),
"Variant Requires Shipping": "TRUE",
"Variant Taxable": "TRUE",
"Variant Barcode": "",
"Image Src": images[0]['url'] if images else '',
"Image Position": "1",
"Image Alt Text": title,
"Gift Card": "FALSE",
"SEO Title": meta_data.get("page_title", ""),
"SEO Description": meta_data.get("description", ""),
"Google Shopping / Google Product Category": attributes.get("category", ""),
"Google Shopping / Gender": attributes.get("gender", ""),
"Google Shopping / Age Group": "",
"Google Shopping / MPN": product_data.get("model_number", ""),
"Google Shopping / AdWords Grouping": "",
"Google Shopping / AdWords Labels": "",
"Google Shopping / Condition": "New",
"Google Shopping / Custom Product": "FALSE",
"Google Shopping / Custom Label 0": "",
"Google Shopping / Custom Label 1": "",
"Google Shopping / Custom Label 2": "",
"Google Shopping / Custom Label 3": "",
"Google Shopping / Custom Label 4": "",
"Variant Image": images[0]['url'] if images else '',
"Variant Weight Unit": "g",
"Variant Tax Code": "",
"Cost per item": "",
"Status": "active"
}
product_variants.append(variant)
# 添加额外的图片记录
for image in images[1:]: # 跳过第一张图片,因为它已经包含在变体中
image_record = {
"Handle": product_data.get("id", ""),
"Title": "",
"Body (HTML)": "",
"Vendor": "",
"Type": "",
"Tags": "",
"Published": "",
"Option1 Name": "",
"Option1 Value": "",
"Option2 Name": "",
"Option2 Value": "",
"Option3 Name": "",
"Option3 Value": "",
"Variant SKU": "",
"Variant Grams": "",
"Variant Inventory Tracker": "",
"Variant Inventory Qty": "",
"Variant Inventory Policy": "",
"Variant Fulfillment Service": "",
"Variant Price": "",
"Variant Compare At Price": "",
"Variant Requires Shipping": "",
"Variant Taxable": "",
"Variant Barcode": "",
"Image Src": image['url'],
"Image Position": str(image['position']),
"Image Alt Text": image['alt'],
"Gift Card": "",
"SEO Title": "",
"SEO Description": "",
"Google Shopping / Google Product Category": "",
"Google Shopping / Gender": "",
"Google Shopping / Age Group": "",
"Google Shopping / MPN": "",
"Google Shopping / AdWords Grouping": "",
"Google Shopping / AdWords Labels": "",
"Google Shopping / Condition": "",
"Google Shopping / Custom Product": "",
"Google Shopping / Custom Label 0": "",
"Google Shopping / Custom Label 1": "",
"Google Shopping / Custom Label 2": "",
"Google Shopping / Custom Label 3": "",
"Google Shopping / Custom Label 4": "",
"Variant Image": "",
"Variant Weight Unit": "",
"Variant Tax Code": "",
"Cost per item": "",
"Status": ""
}
product_variants.append(image_record)
return product_variants
except Exception as e:
print(f"Error formatting product data: {str(e)}")
return None
def collect_ids():
# 设置Edge WebDriver
service = Service("msedgedriver.exe")
options = webdriver.EdgeOptions()
# 添加更多选项来模拟真实浏览器
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument('--disable-extensions')
options.add_experimental_option('useAutomationExtension', False)
options.add_experimental_option("excludeSwitches", ["enable-automation"])
driver = webdriver.Edge(service=service, options=options)
# 设置一些JavaScript变量来避免检测
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
try:
with open('adidas-nl.txt', 'w', encoding='utf-8') as f:
for i in range(12):
start = i * 48
print(f"\nFetching data for start={start}")
products = fetch_products(driver, start)
if not products:
print(f"No products returned for start={start}")
time.sleep(5)
continue
for product in products:
# 获取主产品ID
product_id = product.get('id', '')
if product_id:
f.write(f"{product_id}\n")
print(f"Saved product ID: {product_id}")
Weight Unit",
"Variant Tax Code", "Cost per item", "Status"]
# 创建结果队列
result_queue = Queue()
# 创建CSV文件并保持打开
with open('adidas-nl.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
# 创建线程池
with ThreadPoolExecutor(max_workers=10) as executor:
# 提交所有任务
future_to_id = {executor.submit(process_product, product_id, result_queue): product_id
for product_id in product_ids}
# 处理完成的任务
completed = 0
total = len(product_ids)
# 创建写入线程
def write_results():
while completed < total:
try:
product_id, formatted_data = result_queue.get(timeout=1)
for variant in formatted_data:
writer.writerow(variant)
f.flush() # 立即写入文件
print(f"Saved product {product_id} to CSV")
except:
continue
# 启动写入线程
write_thread = threading.Thread(target=write_results)
write_thread.start()
# 等待所有任务完成
for future in future_to_id:
try:
success = future.result()
completed += 1
print(f"Progress: {completed}/{total}")
except Exception as e:
print(f"Task error: {str(e)}")
completed += 1
# 等待写入线程完成
write_thread.join()
print("All products processed")
except Exception as e:
print(f"Error in collect_products: {str(e)}")
def main():
while True:
print("\n=== Adidas NL Scraper ===")
print("1. 采集产品ID")
print("2. 采集产品详细信息 (10线程)")
print("3. 退出")
choice = input("请选择功能 (1-3): ")
if choice == "1":
print("\n开始采集产品ID...")
collect_ids()
elif choice == "2":
print("\n开始采集产品详细信息 (10线程)...")
collect_products()
elif choice == "3":
print("\n程序退出")
break
else:
print("\n无效的选择,请重试")
if __name__ == "__main__":
main()