搜索结果

×

搜索结果将在这里显示。

🌸 camper

"""
https://www.camper.com/eshop-api/api/v1/products/list?profile=en_US&targetId=W&family=1&filter.collection=tose&order=default&pager.offset=160&pager.limit=40&square=true

products_list = response["products"]

mark = products["id"]
url = products["url"]
color = products["color"]

"""
import re
import os
import json
import threading
from queue import Queue, Empty
import requests
from lxml import etree
from fake_useragent import UserAgent
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from tenacity import retry, stop_after_attempt, wait_exponential

# 初始化队列
first_queue = Queue()  # 存储分类页URL
second_queue = Queue()  # 存储详情页URL

# 配置日志
logger.remove()
logger.add("Record_url.log", rotation="500 MB", level="INFO")
logger.add(lambda msg: print(msg, end=""), level="INFO")

# User-Agent 生成器
ua = UserAgent()

# 输出文件和锁
OUTPUT_FILE = "all_products.json"
file_lock = threading.Lock()

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def fetch_response(target_url):
    """获取网页响应,失败时重试3次"""
    headers = {"User-Agent": ua.random}
    response = requests.get(target_url, headers=headers, allow_redirects=False, timeout=10)
    response.raise_for_status()
    return response

def save_to_json(product_data):
    """线程安全地将产品数据保存到JSON文件"""
    with file_lock:
        if not os.path.exists(OUTPUT_FILE):
            with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
                json.dump([], f)

        try:
            with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
                existing_products = json.load(f)
        except (json.JSONDecodeError, FileNotFoundError):
            existing_products = []

        existing_products.append(product_data)  # 直接添加,假设每次清空文件
        with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
            json.dump(existing_products, f, ensure_ascii=False, indent=2)
        logger.info(f"Saved product: {product_data['Title']} (URL: {product_data['URL']})")

def parse_data():
    """解析详情页数据"""
    while True:
        try:
            item = second_queue.get(timeout=5)
            url, category, mark, color = item["url"], item["category"], item["mark"], item["color"]
            try:
                response = fetch_response(url).text
                mather = re.findall('<script type="application/ld\+json">(.*?)</script>', response)
                json_data = json.loads(mather[0])

                title = json_data.get("name", f"{url} - No Title")
                price = json_data.get("offers", {}).get("price", "N/A")
                description = json_data.get('description', '')
                try:
                    features_list = re.findall('"features":(.*?),"productDescription"', response)[0]
                    features = "".join([features["name"] for features in json.loads(features_list)])
                except IndexError:
                    features = ""
                body = description + features
                img_list = json_data.get("image")
                if not img_list or price == "N/A":
                    logger.warning(f"Parse Data Error Images and Price:{item}")
                    continue
                size_list = re.findall('"value":"(.*?)","quantity"', response) or ["One Size"]
                other_links = []
                product_data = {
                    "Mark": mark,
                    "Title": title,
                    "Price": price,
                    "Color": color,
                    "Body": body,
                    "Img_list": img_list,
                    "Size_list": size_list,
                    "Other_link": other_links,
                    "Category": category,
                    "variant_image": img_list[0],
                    "URL": url
                }
                save_to_json(product_data)
            except Exception as e:
                logger.error(f'Parse Data Error:{item} -{e}')
            finally:
                second_queue.task_done()

        except Empty:
            logger.info("- - - - - - - - - - - - - 详细队列为空,退出解析线程")
            break

def get_classify():
    offset = 0
    while 1:
        url = f"https://www.camper.com/eshop-api/api/v1/products/list?profile=en_US&targetId=W&family=1&filter.collection=tose&order=default&pager.offset={offset}&pager.limit=40&square=true"
        response = fetch_response(url).json()

        products_list = response.get("products")
        if not products_list:
            break
        for products in products_list:
            mark = products["id"]
            detail_url = products["url"]
            color = products["color"]
            data = {"mark": mark, "url": detail_url, "category": "", "color": color}
            second_queue.put(data)
        print(f"{url} --- 完成,Items:{len(products_list)}")
        offset += 40

def main():
    """主函数"""
    get_classify()

    # 第三步:解析详情页
    with ThreadPoolExecutor(max_workers=12) as executor:
        parse_futures = [executor.submit(parse_data) for _ in range(12)]
        for future in parse_futures:
            future.result()
    print("- - - - - - - - - - - - - - Crawling completed - - - - - - - - - - - - - -")

if __name__ == "__main__":
    main()