搜索结果

×

搜索结果将在这里显示。

🥳 sandro-paris

sandro-paris-get

import requests
from bs4 import BeautifulSoup

# 定义目标 URL
url = "https://fr.sandro-paris.com/fr/selection/denim/?start=0&sz=85"

# 设置请求头,模拟浏览器访问
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}

# 发送 HTTP GET 请求
response = requests.get(url, headers=headers)

# 检查响应状态
if response.status_code == 200:
    # 解析 HTML 内容
    soup = BeautifulSoup(response.text, 'html.parser')

    # 查找所有 class="product" 的 div
    products = soup.find_all('div', class_="product")

    # 打开文件保存结果
    with open('sandro-paris.txt', 'w', encoding='utf-8') as file:
        for product in products:
            # 提取 data-pid 属性值
            data_pid = product.get('data-pid', 'N/A')
            # 将结果写入文件
            file.write(f"{data_pid}\n")

    print("数据已成功保存到 sandro-paris.txt 文件中!")
else:
    print(f"请求失败,状态码:{response.status_code}")

sandro-paris

import json
import csv
import requests
import time
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
import unicodedata

# Shopify 标准 CSV 格式字段
fields = [
    "Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
    "Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
    "Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
    "Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
    "Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
    "Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
    "Image Src", "Image Position", "Image Alt Text", "Gift Card",
    "SEO Title", "SEO Description", "Google Shopping / Google Product Category",
    "Google Shopping / Gender", "Google Shopping / Age Group",
    "Google Shopping / MPN", "Google Shopping / AdWords Grouping",
    "Google Shopping / AdWords Labels", "Google Shopping / Condition",
    "Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
    "Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
    "Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
    "Variant Image", "Variant Weight Unit", "Variant Tax Code",
    "Cost per item", "Status"
]

# 创建一个线程安全的写入队列
write_queue = Queue()
# 创建一个线程安全的计数器
counter_lock = threading.Lock()
processed_count = 0
total_urls = 0

def normalize_text(text):
    """处理特殊字符和乱码"""
    if not text:
        return ""
    # 将特殊字符转换为基本拉丁字符
    normalized = unicodedata.normalize('NFKD', text)
    # 移除组合字符
    normalized = ''.join(c for c in normalized if not unicodedata.combining(c))
    # 特殊字符映射
    char_map = {
        # 法语特殊字符
        'à': 'a', 'á': 'a', 'â': 'a', 'ã': 'a', 'ä': 'a', 'å': 'a',
        'À': 'A', 'Á': 'A', 'Â': 'A', 'Ã': 'A', 'Ä': 'A', 'Å': 'A',
        'è': 'e', 'é': 'e', 'ê': 'e', 'ë': 'e',
        'È': 'E', 'É': 'E', 'Ê': 'E', 'Ë': 'E',
        'ì': 'i', 'í': 'i', 'î': 'i', 'ï': 'i',
        'Ì': 'I', 'Í': 'I', 'Î': 'I', 'Ï': 'I',
        'ò': 'o', 'ó': 'o', 'ô': 'o', 'õ': 'o', 'ö': 'o',
        'Ò': 'O', 'Ó': 'O', 'Ô': 'O', 'Õ': 'O', 'Ö': 'O',
        'ù': 'u', 'ú': 'u', 'û': 'u', 'ü': 'u',
        'Ù': 'U', 'Ú': 'U', 'Û': 'U', 'Ü': 'U',
        'ý': 'y', 'ÿ': 'y', 'Ý': 'Y',
        'ñ': 'n', 'Ñ': 'N',
        'ç': 'c', 'Ç': 'C',
        'œ': 'oe', 'Œ': 'OE',
        'æ': 'ae', 'Æ': 'AE',
        # 波兰语特殊字符
        'ł': 'l', 'Ł': 'L',
        'ą': 'a', 'Ą': 'A',
        'ć': 'c', 'Ć': 'C',
        'ę': 'e', 'Ę': 'E',
        'ń': 'n', 'Ń': 'N',
        'ó': 'o', 'Ó': 'O',
        'ś': 's', 'Ś': 'S',
        'ź': 'z', 'Ź': 'Z',
        'ż': 'z', 'Ż': 'Z',
        # 标点符号
        '–': '-', '—': '-',
        '"': '"', '"': '"',
        ''': "'", ''': "'",
        '«': '"', '»': '"',
        # HTML实体
        '&': '&',
        '&lt;': '<',
        '&gt;': '>',
        '&quot;': '"',
        '&apos;': "'",
        '&nbsp;': ' ',
        '&eacute;': 'e',
        '&egrave;': 'e',
        '&agrave;': 'a',
        '&ccedil;': 'c',
    }

    # 先尝试解码可能的HTML实体
    try:
        from html import unescape
        normalized = unescape(normalized)
    except:
        pass

    # 应用字符映射
    for old, new in char_map.items():
        normalized = normalized.replace(old, new)

    # 移除任何剩余的非ASCII字符
    normalized = ''.join(c for c in normalized if ord(c) < 128)

    return normalized.strip()

def get_json_data(url):
    """从URL获取JSON数据"""
    max_retries = 3
    retry_delay = 2  # 秒

    for attempt in range(max_retries):
        try:
            print(f"\n{'='*50}")
            print(f"Processing URL: {url}")
            print(f"Attempt {attempt + 1}/{max_retries}")

            # 从原始URL中提取pid和颜色代码
            if "_" in url:
                parts = url.split("_")
                if len(parts) != 2:
                    raise ValueError(f"Invalid URL format: {url}")

                pid = parts[0]
                color = parts[1]
                print(f"Extracted PID: {pid}, Color: {color}")

                # 构造新的URL格式
                base_url = "https://fr.sandro-paris.com/on/demandware.store/Sites-Sandro-FR-Site/fr_FR/Product-Variation"
                params = {
                    f"dwvar_{pid}__{color}_color": color,
                    f"dwvar_{pid}__{color}_size": "1_40",
                    "pid": f"{pid}_{color}",
                    "quantity": "1",
                    "isQuickView": "false"
                }

                full_url = f"{base_url}?{'&'.join(f'{k}={v}' for k, v in params.items())}"
                print(f"Full URL: {full_url}")

                # 使用requests的params参数来正确编码URL参数
                print("Sending request...")
                response = requests.get(
                    base_url,
                    params=params,
                    headers={
                        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                        'Accept': 'application/json',
                        'Accept-Language': 'fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7',
                        'Referer': 'https://fr.sandro-paris.com/',
                        'Connection': 'keep-alive'
                    },
                    timeout=10
                )

                print(f"Response Status Code: {response.status_code}")
                print(f"Response Headers: {dict(response.headers)}")

                # 检查响应状态码
                if response.status_code == 404:
                    print(f"Product not found: {url}")
                    return None

                response.raise_for_status()

                # 检查响应内容是否为JSON
                try:
                    print("Parsing JSON response...")
                    data = response.json()
                    print(f"Response data keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}")
                except json.JSONDecodeError:
                    print(f"Invalid JSON response for {url}")
                    print(f"Response content (first 500 chars): {response.text[:500]}")
                    raise

                if not data or not isinstance(data, dict):
                    print(f"Invalid response structure. Data type: {type(data)}")
                    raise ValueError(f"Invalid JSON response structure: {data}")

                # 检查响应中是否包含必要的数据
                if "product" not in data:
                    print(f"Missing 'product' key in response. Available keys: {list(data.keys())}")
                    raise ValueError(f"Missing product data in response")

                print(f"Successfully retrieved data for {url}")
                return data
            else:
                raise ValueError(f"Invalid URL format (no color code): {url}")

        except requests.RequestException as e:
            print(f"\nRequest error for {url} (attempt {attempt + 1}/{max_retries})")
            print(f"Error type: {type(e).__name__}")
            print(f"Error message: {str(e)}")
            if hasattr(e, 'response'):
                print(f"Response status code: {e.response.status_code if e.response else 'No response'}")
                print(f"Response headers: {dict(e.response.headers) if e.response else 'No headers'}")
                print(f"Response text: {e.response.text[:500] if e.response else 'No response text'}")
        except ValueError as e:
            print(f"\nValue error for {url} (attempt {attempt + 1}/{max_retries})")
            print(f"Error message: {str(e)}")
        except Exception as e:
            print(f"\nUnexpected error for {url} (attempt {attempt + 1}/{max_retries})")
            print(f"Error type: {type(e).__name__}")
            print(f"Error message: {str(e)}")
            import traceback
            print(f"Traceback: {traceback.format_exc()}")

        if attempt < max_retries - 1:
            print(f"Retrying in {retry_delay} seconds...")
            time.sleep(retry_delay)

    print(f"\nFailed to fetch data after {max_retries} attempts for {url}")
    return None

def process_product(url):
    """处理单个商品数据"""
    global processed_count

    print(f"\nStarting to process product: {url}")
    data = get_json_data(url)

    if not data or not isinstance(data, dict) or "product" not in data:
        print(f"Skipping {url} due to invalid data")
        print(f"Data type: {type(data)}")
        if isinstance(data, dict):
            print(f"Available keys: {list(data.keys())}")
        return

    try:
s found, using default size")
            sizes = [{"name": "Size", "value": "ONE SIZE", "stock": 100}]

        rows = []
        for idx, size in enumerate(sizes):
            variant_sku = f"{handle}-{size['value']}"

            # 构造每一行数据
            row = {
                "Handle": handle,
                "Title": title if idx == 0 else "",
                "Body (HTML)": "" if idx == 0 else "",
                "Vendor": vendor if idx == 0 else "",
                "Type": product_type if idx == 0 else "",
                "Tags": "" if idx == 0 else "",
                "Published": published if idx == 0 else "",
                "Option1 Name": size["name"],
                "Option1 Value": size["value"],
                "Option2 Name": color_name,
                "Option2 Value": color_value,
                "Variant SKU": variant_sku,
                "Variant Grams": "",
                "Variant Inventory Tracker": "shopify",
                "Variant Inventory Qty": size["stock"],
                "Variant Inventory Policy": "deny",
                "Variant Fulfillment Service": "manual",
                "Variant Price": sale_price,
                "Variant Compare At Price": list_price if list_price != sale_price else "",
                "Variant Requires Shipping": "TRUE",
                "Variant Taxable": "TRUE",
                "Variant Barcode": "",
                "Image Src": images[idx % len(images)]["src"] if images else "",
                "Image Position": idx + 1,
                "Image Alt Text": images[idx % len(images)]["alt"] if images else title,
                "Gift Card": "FALSE",
                "SEO Title": title if idx == 0 else "",
                "SEO Description": "" if idx == 0 else "",
                "Google Shopping / Google Product Category": "",
                "Google Shopping / Gender": "Unisex",
                "Google Shopping / Age Group": "Adult",
                "Google Shopping / MPN": handle,
                "Google Shopping / AdWords Grouping": "",
                "Google Shopping / AdWords Labels": "",
                "Google Shopping / Condition": "New",
                "Google Shopping / Custom Product": "FALSE",
                "Google Shopping / Custom Label 0": "",
                "Google Shopping / Custom Label 1": "",
                "Google Shopping / Custom Label 2": "",
                "Google Shopping / Custom Label 3": "",
                "Google Shopping / Custom Label 4": "",
                "Variant Image": "",
                "Variant Weight Unit": "kg",
                "Variant Tax Code": "",
                "Cost per item": "",
                "Status": "active"
            }
            rows.append(row)

        # 将处理好的行数据放入写入队列
        for row in rows:
            write_queue.put(row)

        # 更新进度
        with counter_lock:
            global processed_count
            processed_count += 1
            print(f"Progress: {processed_count}/{total_urls} - Processed: {url}")

    except KeyError as e:
        print(f"Missing key in data for {url}: {e}")
        print(f"Available keys: {list(product.keys()) if product else 'No product data'}")
    except Exception as e:
        print(f"Error processing {url}: {e}")
        import traceback
        print(f"Traceback: {traceback.format_exc()}")

def writer_thread(output_file):
    """CSV写入线程"""
    with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fields)
        writer.writeheader()

        whileTrue:
            row = write_queue.get()
            if row is None:  # 结束信号
                break
            writer.writerow(row)
            csvfile.flush()  # 确保立即写入磁盘
            write_queue.task_done()

def main():
    output_file = "shopify_products.csv"

    # 读取链接文件
    with open("sandro-paris.txt", "r", encoding="utf-8") as f:
        urls = [line.strip() for line in f if line.strip()]

    global total_urls
    total_urls = len(urls)
    print(f"Total URLs to process: {total_urls}")

    # 启动CSV写入线程
    writer = threading.Thread(target=writer_thread, args=(output_file,))
    writer.start()

    # 使用线程池处理URLs
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 提交所有任务
        futures = [executor.submit(process_product, url) for url in urls]

        # 等待所有任务完成
        for future in as_completed(futures):
            try:
                future.result()  # 获取任务结果(如果有异常会在这里抛出)
            except Exception as e:
                print(f"Task failed: {e}")

    # 发送结束信号给写入线程
    write_queue.put(None)
    # 等待写入线程完成
    writer.join()

    print(f"All products data has been written to {output_file}")
    print(f"Total processed: {processed_count}/{total_urls}")

if __name__ == "__main__":
    main()