搜索结果

×

搜索结果将在这里显示。

🕹️ stoneisland

stoneisland-getpid

from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import json
import time
import random
import threading
from queue import Queue
import os

# 输出文件
output_file = "stoneisland-pid.txt"

# 基础 URL
base_url = "https://www.stoneisland.com/on/demandware.store/Sites-StoneEU-Site/it_IT/SearchApi-Search"

# 线程数
NUM_THREADS = 10

# 创建一个队列用于存储待处理的start值
task_queue = Queue()

# 创建一个集合用于存储所有产品ID,使用threading.Lock确保线程安全
product_ids = set()
product_ids_lock = threading.Lock()

# 创建一个计数器记录总产品数,使用threading.Lock确保线程安全
total_products = 0
total_products_lock = threading.Lock()

# 创建文件锁
file_lock = threading.Lock()

def worker(thread_id):
    global total_products

    # 配置 Edge WebDriver
    service = Service('./msedgedriver.exe')
    options = webdriver.EdgeOptions()
    options.add_argument('--disable-gpu')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--no-sandbox')

    try:
        # 初始化浏览器
        driver = webdriver.Edge(service=service, options=options)
        wait = WebDriverWait(driver, 30)

        while True:
            try:
                # 从队列获取start值
                start = task_queue.get_nowait()
            except:
                break

            try:
                # 构建URL
                url = f"{base_url}?cgid=salesstoneisland-viewallsales&srule=Out%20of%20Stock_GO_Live_26.07&sz=20&start={start}"
                print(f"\n线程 {thread_id} 正在访问: {url}")

                driver.get(url)
                time.sleep(random.uniform(2, 4))

                try:
                    pre_element = wait.until(
                        EC.presence_of_element_located((By.TAG_NAME, "pre"))
                    )

                    # 获取JSON数据
                    json_text = pre_element.text
                    data = json.loads(json_text)

                    # 从data中获取products数组
                    products = data.get("data", {}).get("products", [])

                    if not products:
                        print(f"线程 {thread_id}: 没有更多产品,结束获取 (start={start})")
                        task_queue.task_done()
                        continue

                    # 遍历每个产品
                    for product in products:
                        product_id = product.get("id")
                        if product_id:
                            with product_ids_lock:
                                if product_id not in product_ids:
                                    product_ids.add(product_id)
                                    with file_lock:
                                        with open(output_file, "a", encoding="utf-8") as file:
                                            file.write(f"{product_id}\n")
                                    with total_products_lock:
                                        global total_products
                                        total_products += 1
                                        print(f"线程 {thread_id} 找到产品ID: {product_id}, 总计: {total_products}")

                    task_queue.task_done()

                except Exception as e:
                    print(f"线程 {thread_id} 等待页面加载超时或解析数据出错: {e}")
                    time.sleep(random.uniform(5, 8))
                    task_queue.put(start)  # 将失败的任务放回队列
                    task_queue.task_done()
                    continue

            except Exception as e:
                print(f"线程 {thread_id} 处理页面时出错: {e}")
                time.sleep(random.uniform(5, 8))
                task_queue.put(start)  # 将失败的任务放回队列
                task_queue.task_done()
                continue

    except Exception as e:
        print(f"线程 {thread_id} 运行过程中发生错误: {e}")

    finally:
        try:
            driver.quit()
        except:
            pass

def main():
    # 清空输出文件
    with open(output_file, "w", encoding="utf-8") as file:
        file.write("")

    # 初始化任务队列
    for start in range(0, 1000, 20):  # 假设最多1000个商品
        task_queue.put(start)

    # 创建并启动线程
    threads = []
    for i in range(NUM_THREADS):
        thread = threading.Thread(target=worker, args=(i+1,))
        threads.append(thread)
        thread.start()

    # 等待所有线程完成
    for thread in threads:
        thread.join()

    print(f"\n抓取完成,共获取 {total_products} 个唯一产品ID")
    print(f"结果已保存到 {output_file}")

if __name__ == "__main__":
    main()

stoneisland

import json
import csv
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
import unicodedata
from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException

# Shopify标准CSV格式字段
fields = [
    "Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
    "Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
    "Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
    "Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
    "Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
    "Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
    "Image Src", "Image Position", "Image Alt Text", "Gift Card",
    "SEO Title", "SEO Description", "Google Shopping / Google Product Category",
    "Google Shopping / Gender", "Google Shopping / Age Group",
    "Google Shopping / MPN", "Google Shopping / AdWords Grouping",
    "Google Shopping / AdWords Labels", "Google Shopping / Condition",
    "Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
    "Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
    "Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
    "Variant Image", "Variant Weight Unit", "Variant Tax Code",
    "Cost per item", "Status"
]

# 创建线程安全的写入队列和计数器
write_queue = Queue()
counter_lock = threading.Lock()
processed_count = 0
total_urls = 0

def normalize_text(text):
    """处理特殊字符和乱码"""
    if not text:
        return ""
    # 将特殊字符转换为基本拉丁字符
    normalized = unicodedata.normalize('NFKD', text)
    # 移除组合字符
    normalized = ''.join(c for c in normalized if not unicodedata.combining(c))
    # 特殊字符映射
    char_map = {
        # 意大利语特殊字符
        'à': 'a', 'á': 'a', 'â': 'a', 'ã': 'a', 'ä': 'a', 'å': 'a',
        'è': 'e', 'é': 'e', 'ê': 'e', 'ë': 'e',
        'ì': 'i', 'í': 'i', 'î': 'i', 'ï': 'i',
        'ò': 'o', 'ó': 'o', 'ô': 'o', 'õ': 'o', 'ö': 'o',
        'ù': 'u', 'ú': 'u', 'û': 'u', 'ü': 'u',
        'ý': 'y', 'ÿ': 'y',
        'ñ': 'n',
        'ç': 'c',
    }

    # 应用字符映射
    for old, new in char_map.items():
        normalized = normalized.replace(old, new)

    # 移除任何剩余的非ASCII字符
    normalized = ''.join(c for c in normalized if ord(c) < 128)

    return normalized.strip()

def get_json_data(pid):
    """从Stone Island API获取JSON数据"""
    max_retries = 5
    retry_delay = 3

    for attempt in range(max_retries):
        driver = None
        try:
            print(f"\n{'='*50}")
            print(f"Processing PID: {pid}")
            print(f"Attempt {attempt + 1}/{max_retries}")

            # 设置Edge WebDriver
            service = Service('msedgedriver.exe')
            options = webdriver.EdgeOptions()
            options.add_argument('--headless')  # 无头模式
            options.add_argument('--disable-gpu')
            options.add_argument('--no-sandbox')
            options.add_argument('--disable-dev-shm-usage')
            options.add_argument('--window-size=1920,1080')
            options.add_argument('--disable-blink-features=AutomationControlled')

            # 添加自定义请求头
            options.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')

            driver = webdriver.Edge(service=service, options=options)

            # 直接访问API URL
            api_url = f"https://www.stoneisland.com/on/demandware.store/Sites-StoneEU-Site/it_IT/Api-ProductTiles?pid={pid}"
            print(f"Accessing API URL: {api_url}")

            driver.get(api_url)

            # 等待页面加载完成
            wait = WebDriverWait(driver, 15)
            wait.until(lambda d: d.execute_script("return document.readyState") == "complete")

            # 获取页面内容
            json_text = None
            try:
                # 等待pre标签出现
                pre_element = wait.until(EC.presence_of_element_located((By.TAG_NAME, "pre")))
                json_text = pre_element.text
                print("Successfully found pre element with JSON data")
            except Exception as e:
                print(f"Failed to find pre element: {str(e)}")
                # 如果找不到pre标签,尝试获取整个页面源码
                json_text = driver.page_source
                # 提取JSON部分
                import re
                match = re.search(r'<pre.*?>(.*?)</pre>', json_text, re.DOTALL)
                if match:
                    json_text = match.group(1).strip()
                    print("Extracted JSON from page source")

            if not json_text:
                print("No JSON data found in the page")
                if attempt < max_retries - 1:
                    continue
                return None

            # 清理JSON文本
json_text = json_text.strip()
            if json_text.startswith('<pre>'):
                json_text = json_text[5:]
            if json_text.endswith('</pre>'):
                json_text = json_text[:-6]

            # 验证JSON数据
            try:
                data = json.loads(json_text)
            except json.JSONDecodeError as e:
                print(f"JSON decode error: {str(e)}")
                print(f"JSON text (first 500 chars): {json_text[:500]}")
                if attempt < max_retries - 1:
                    continue
                raise

            # 验证数据完整性
            if not data:
                print("Empty JSON data")
                if attempt < max_retries - 1:
                    continue
                return None

            if "products" not in data:
                print("No 'products' key in JSON data")
                print(f"Available keys: {list(data.keys())}")
                if attempt < max_retries - 1:
                    continue
                return None

            if not data["products"]:
                print("Empty products list")
                if attempt < max_retries - 1:
                    continue
                return None

            # 验证产品数据完整性
            product = data["products"][0]
            required_fields = ["id", "productName", "variationAttributes", "price", "imgs"]
            missing_fields = [field for field in required_fields if field not in product]

            if missing_fields:
                print(f"Missing required fields: {missing_fields}")
                print(f"Available fields: {list(product.keys())}")
                if attempt < max_retries - 1:
                    continue
                return None

            print(f"Successfully retrieved complete data for {pid}")
            return data

        except TimeoutException as e:
            print(f"Timeout error for {pid}: {str(e)}")
        except WebDriverException as e:
            print(f"WebDriver error for {pid}: {str(e)}")
        except Exception as e:
            print(f"Unexpected error for {pid}: {str(e)}")
            import traceback
            print(f"Traceback: {traceback.format_exc()}")

        finally:
            # 确保关闭浏览器
            if driver:
                try:
                    driver.quit()
                except:
                    pass

        if attempt < max_retries - 1:
            print(f"Retrying in {retry_delay} seconds...")
            time.sleep(retry_delay)

    print(f"Failed to fetch data after {max_retries} attempts for {pid}")
    return None

def process_product(pid):
    """处理单个商品数据"""
    global processed_count
    success = False

    print(f"\nStarting to process product: {pid}")
    data = get_json_data(pid)

    if not data or "products" not in data or not data["products"]:
        print(f"Skipping {pid} due to invalid data")
        return success

    try:
        # 获取产品数据
        product = data["products"][0]
        print(f"Processing product data...")
        print(f"Product ID: {product.get('id', '')}")
        print(f"Product Name: {product.get('productName', '')}")

        # 商品基础信息
        handle = pid  # 使用输入的pid作为handle
        title = normalize_text(product.get("productName", ""))
        description = normalize_text(product.get("shortDescription", ""))
        vendor = "Stone Island"
        product_type = product.get("analyticsAttributes", {}).get("item_category2", "")
        published = "TRUE"

        # 获取变体属性
        variation_attributes = product.get("variationAttributes", [])
        color_data = next((attr for attr in variation_attributes if attr["attributeId"] == "color"), None)
        size_data = next((attr for attr in variation_attributes if attr["attributeId"] == "size"), None)

        # 获取颜色信息
        colors = []
        if color_data:
            for color in color_data.get("values", []):
                if color.get("selectable", False):
                    colors.append({
                        "name": normalize_text(color_data["displayName"]),
                        "value": normalize_text(color["displayValue"]),
                        "id": color["id"]
                    })

        # 获取尺码信息
        sizes = []
        if size_data:
            for size in size_data.get("values", []):
                if size.get("selectable", False):
                    sizes.append({
                        "name": normalize_text(size_data["displayName"]),
                        "value": normalize_text(size["displayValue"]),
                        "stock": size.get("ATS", 0)
                    })

        # 价格信息
        price_data = product.get("price", {})
        sales_data = price_data.get("sales", {})
        sale_price = sales_data.get("value", "")

        # 图片信息
        images = []
        imgs_data = product.get("imgs", {})
        if imgs_data and "urls" in imgs_data:
            for idx, img_url in enumerate(imgs_data["urls"]):
                images.append({
                    "src": img_url,
                    "alt": imgs_data.get("alt", title)
                })

        # 生成Shopify格式的数据
        rows = []
        first_row = True

        # 为每个颜色和尺码组合创建变体
        for color in colors:
            for size in sizes:
                variant_sku = f"{handle}-{color['id']}-{size['value']}"

                # 构造每一行数据
                row = {
                    "Handle": handle,
                    "Title": title if first_row else "",
                    "Body (HTML)": description if first_row else "",
                    "Vendor": vendor if first_row else "",
                    "Type": product_type if first_row else "",
                    "Tags": "",
                    "Published": published if first_row else "",
                    "Option1 Name": "Color" if first_row else "",
                    "Option1 Value": color['value'],
                    "Option2 Name": "Size" if first_row else "",
                    "Option2 Value": size['value'],
                    "Variant SKU": variant_sku,
                    "Variant Grams": "",
                    "Variant Inventory Tracker": "shopify",
                    "Variant Inventory Qty": size["stock"],
                    "Variant Inventory Policy": "deny",
                    "Variant Fulfillment Service": "manual",
                    "Variant Price": sale_price,
                    "Variant Compare At Price": "",
                    "Variant Requires Shipping": "TRUE",
                    "Variant Taxable": "TRUE",
                    "Variant Barcode": "",
                    "Image Src": images[0]["src"] if images and first_row else "",
                    "Image Position": 1 if first_row else "",
                    "Image Alt Text": images[0]["alt"] if images and first_row else "",
                    "Gift Card": "FALSE" if first_row else "",
                    "SEO Title": title if first_row else "",
                    "SEO Description": description if first_row else "",
                    "Google Shopping / Google Product Category": "",
                    "Google Shopping / Gender": product.get("analyticsAttributes", {}).get("item_category4", "") if first_row else "",
                    "Google Shopping / Age Group": "Adult" if first_row else "",
                    "Google Shopping / MPN": handle if first_row else "",
                    "Google Shopping / AdWords Grouping": "",
                    "Google Shopping / AdWords Labels": "",
                    "Google Shopping / Condition": "New" if first_row else "",
                    "Google Shopping / Custom Product": "FALSE" if first_row else "",
                    "Google Shopping / Custom Label 0": "",
                    "Google Shopping / Custom Label 1": "",
                    "Google Shopping / Custom Label 2": "",
                    "Google Shopping / Custom Label 3": "",
                    "Google Shopping / Custom Label 4": "",
                    "Variant Image": "",
                    "Variant Weight Unit": "kg" if first_row else "",
                    "Variant Tax Code": "",
                    "Cost per item": "",
                    "Status": "active" if first_row else ""
                }
                rows.append(row)
                first_row = False

        # 添加额外的图片行
        if images and len(images) > 1:
            for idx, img in enumerate(images[1:], start=2):
                image_row = {field: "" for field in fields}
                image_row["Handle"] = handle
                image_row["Image Src"] = img["src"]
                image_row["Image Position"] = idx
                image_row["Image Alt Text"] = img["alt"]
                image_row["Status"] = "active"
                rows.append(image_row)

        # 将处理好的行数据放入写入队列
        for row in rows:
            write_queue.put(row)

        # 在所有行写入后,发送PID用于删除
        write_queue.put(pid)

        # 更新进度
        with counter_lock:
            global processed_count
            processed_count += 1
            print(f"Progress: {processed_count}/{total_urls} - Processed: {pid}")

        success = True

    except Exception as e:
        print(f"Error processing {pid}: {e}")
        import traceback
        print(f"Traceback: {traceback.format_exc()}")

    return success

def remove_successful_pid(pid):
    """从PID文件中删除成功采集的PID"""
    try:
        with open("stoneisland-pid.txt", "r", encoding="utf-8") as f:
            pids = f.readlines()

        # 过滤掉成功的PID
        pids = [p.strip() for p in pids if p.strip() != pid]

        # 写回文件
        with open("stoneisland-pid.txt", "w", encoding="utf-8") as f:
            f.write("\n".join(pids))

        print(f"Successfully removed PID {pid} from file")
    except Exception as e:
        print(f"Error removing PID {pid} from file: {e}")

def writer_thread(output_file):
    """CSV写入线程"""
    with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fields)
        writer.writeheader()

        while True:
            item = write_queue.get()
            if item is None:  # 结束信号
                break
            if isinstance(item, dict):  # 数据行
                writer.writerow(item)
                csvfile.flush()  # 确保立即写入磁盘
            elif isinstance(item, str):  # PID信号
                try:
                    remove_successful_pid(item)
                except Exception as e:
                    print(f"Error removing PID {item}: {e}")
            write_queue.task_done()

def main():
    output_file = "stoneisland_products.csv"

    # 读取PID文件
    with open("stoneisland-pid.txt", "r", encoding="utf-8") as f:
        pids = [line.strip() for line in f if line.strip()]

    global total_urls
    total_urls = len(pids)
    print(f"Total PIDs to process: {total_urls}")

    # 启动CSV写入线程
    writer = threading.Thread(target=writer_thread, args=(output_file,))
    writer.start()

    # 使用10线程的线程池处理PIDs
    with ThreadPoolExecutor(max_workers=10) as executor:
        # 提交所有任务并获取结果
        future_to_pid = {executor.submit(process_product, pid): pid for pid in pids}

        # 等待所有任务完成并处理结果
        for future in as_completed(future_to_pid):
            pid = future_to_pid[future]
            try:
                success = future.result()
            except Exception as e:
                print(f"Task failed for PID {pid}: {e}")

    # 发送结束信号给写入线程
    write_queue.put(None)
    # 等待写入线程完成
    writer.join()

    print(f"All products data has been written to {output_file}")
    print(f"Total processed: {processed_count}/{total_urls}")

if __name__ == "__main__":
    main()