搜索结果

×

搜索结果将在这里显示。

✨ birkenstock

"""
https://www.birkenstock.com/us/styles/boston/?start=24&format=page-element&sz=24&loadMoreProducts=true

href_list = tree.xpath('//*[@class="color-swatch mobile-selectable "]/@href')
"""
import re
import os
import json
import threading
from queue import Queue, Empty
import requests
from lxml import etree
from fake_useragent import UserAgent
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from tenacity import retry, stop_after_attempt, wait_exponential, before_sleep_log

# 初始化队列
first_queue = Queue()  # 存储分类页URL
second_queue = Queue()  # 存储详情页URL

# 配置日志
logger.remove()
logger.add("Record_url.log", rotation="500 MB", level="INFO")
logger.add(lambda msg: print(msg, end=""), level="INFO")

# User-Agent 生成器
ua = UserAgent()

# 线程安全的临时存储
temp_file_lock = threading.Lock()
TEMP_FILE = "temp_urls.jsonl"  # 临时存储商品 URL 的 JSONL 文件

# 输出文件和锁
OUTPUT_FILE = "all_products.json"
file_lock = threading.Lock()

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10),
       before_sleep=before_sleep_log(logger, logger.level("INFO").no))
def fetch_response(target_url):
    """获取网页响应,失败时重试3次"""
    headers = {"User-Agent": ua.random}
    response = requests.get(target_url, headers=headers, allow_redirects=False, timeout=10)
    response.raise_for_status()
    return response

def save_to_json(product_data):
    """线程安全地将产品数据保存到JSON文件"""
    with file_lock:
        if not os.path.exists(OUTPUT_FILE):
            with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
                json.dump([], f)
        try:
            with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
                existing_products = json.load(f)
        except (json.JSONDecodeError, FileNotFoundError):
            existing_products = []

        existing_products.append(product_data)  # 直接添加,假设每次清空文件
        with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
            json.dump(existing_products, f, ensure_ascii=False, indent=2)
        logger.info(f"Saved product: {product_data['Title']} (URL: {product_data['URL']})")

def parse_data():
    """解析详情页数据"""
    while True:
        try:
            item = second_queue.get(timeout=5)
            url, category, mark = item["url"], item["category"], item["mark"]
            try:
                response = fetch_response(url).text
                tree = etree.HTML(response)

                title = (re.findall('Product","name":"(.*?)","image"', response) or [f"{url} - No Title"])[0].strip()
                description = (re.findall('"description":"(.*?)","sku"', response) or [""])[0]
                price = (tree.xpath('//*[@class="price-standard"]/text()') or ["N/A"])[-1].strip()
                color = (tree.xpath('//*[@class="selection-text"]/text()') or ["As Show"])[-1].strip()
                img_list = tree.xpath('//*[@class="product-image-thumbnail-slick initially-hidden"]/img/@data-src')
                if not img_list or price == "N/A":
                    logger.warning(f"Parse Data Error Images and Price:{item}")
                    continue
                Size_list = tree.xpath('//*[@class="size-bottom"]/text()') or ["One Size"]
                size_list = [Size.strip() forSize in Size_list]
                other_links = []
                product_data = {
                    "Mark": mark,
                    "Title": title,
                    "Price": price,
                    "Color": color,
                    "Body": description,
                    "Img_list": img_list,
                    "Size_list": size_list,
                    "Other_link": other_links,
                    "Category": category,
                    "variant_image": img_list[0],
                    "URL": url
                }
                save_to_json(product_data)
            except Exception as e:
                logger.error(f'Parse Data Error:{item} -Exception:{e}')
            finally:
                second_queue.task_done()

        except Empty:
            logger.info("- - - - - - - - - - - - - 详细队列为空,退出解析线程")
            break

def deduplicate_and_queue():
    """从临时文件中去重并放入队列"""
    url_data = {}
    with open(TEMP_FILE, "r", encoding="utf-8") as f:
        for line in f:
            data = json.loads(line.strip())
            mark = data["mark"]
            if mark not in url_data:
                url_data[mark] = {"url": data["url"], "categories": set()}
            url_data[mark]["categories"].add(data["category"])

    for mark, data in url_data.items():
        second_queue.put({"url": data["url"], "category": ", ".join(data["categories"]), "mark": mark})
    print(f"- - - - - - - - - - - - - - 重复数据删除后的唯一产品总数: {len(url_data)}")
    # os.remove(TEMP_FILE)  # 删除临时文件

def get_classify():
    page_start = 0
    while 1:
        url = f"https://www.birkenstock.com/us/styles/boston/?start={page_start}&format=page-element&sz=24&loadMoreProducts=true"
        response = fetch_response(url).text
        tree = etree.HTML(response)
        href_list = tree.xpath(
            '//*[@class="color-swatch mobile-selectable "]/@href | //a[contains(@class,"product-tile has-hover-effect")]/@href')
        if not href_list:
            break
        with open(TEMP_FILE, "a", encoding="utf-8") as f:
            for href in href_list:
                if "https" not in href:
                    href = "https://www.birkenstock.com" + href
                data = {"mark": href, "url": href, "category": "Boston"}
                f.write(json.dumps(data) + "\n")  # JSONL 格式
        print(f"{url}  --- 完成 Items:{len(href_list)}")
        page_start += 24

def main():
    get_classify()
    deduplicate_and_queue()
    with ThreadPoolExecutor(max_workers=12) as executor:
        parse_futures = [executor.submit(parse_data) for _ in range(12)]
        for future in parse_futures:
            future.result()
    print("- - - - - - - - - - - - - - Crawling completed - - - - - - - - - - - - - -")

if __name__ == "__main__":
    main()