"""
https://www.camper.com/eshop-api/api/v1/products/list?profile=en_US&targetId=W&family=1&filter.collection=tose&order=default&pager.offset=160&pager.limit=40&square=true
products_list = response["products"]
mark = products["id"]
url = products["url"]
color = products["color"]
"""
import re
import os
import json
import threading
from queue import Queue, Empty
import requests
from lxml import etree
from fake_useragent import UserAgent
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from tenacity import retry, stop_after_attempt, wait_exponential
# 初始化队列
first_queue = Queue() # 存储分类页URL
second_queue = Queue() # 存储详情页URL
# 配置日志
logger.remove()
logger.add("Record_url.log", rotation="500 MB", level="INFO")
logger.add(lambda msg: print(msg, end=""), level="INFO")
# User-Agent 生成器
ua = UserAgent()
# 输出文件和锁
OUTPUT_FILE = "all_products.json"
file_lock = threading.Lock()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def fetch_response(target_url):
"""获取网页响应,失败时重试3次"""
headers = {"User-Agent": ua.random}
response = requests.get(target_url, headers=headers, allow_redirects=False, timeout=10)
response.raise_for_status()
return response
def save_to_json(product_data):
"""线程安全地将产品数据保存到JSON文件"""
with file_lock:
if not os.path.exists(OUTPUT_FILE):
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump([], f)
try:
with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
existing_products = json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
existing_products = []
existing_products.append(product_data) # 直接添加,假设每次清空文件
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(existing_products, f, ensure_ascii=False, indent=2)
logger.info(f"Saved product: {product_data['Title']} (URL: {product_data['URL']})")
def parse_data():
"""解析详情页数据"""
while True:
try:
item = second_queue.get(timeout=5)
url, category, mark, color = item["url"], item["category"], item["mark"], item["color"]
try:
response = fetch_response(url).text
mather = re.findall('<script type="application/ld\+json">(.*?)</script>', response)
json_data = json.loads(mather[0])
title = json_data.get("name", f"{url} - No Title")
price = json_data.get("offers", {}).get("price", "N/A")
description = json_data.get('description', '')
try:
features_list = re.findall('"features":(.*?),"productDescription"', response)[0]
features = "".join([features["name"] for features in json.loads(features_list)])
except IndexError:
features = ""
body = description + features
img_list = json_data.get("image")
if not img_list or price == "N/A":
logger.warning(f"Parse Data Error Images and Price:{item}")
continue
size_list = re.findall('"value":"(.*?)","quantity"', response) or ["One Size"]
other_links = []
product_data = {
"Mark": mark,
"Title": title,
"Price": price,
"Color": color,
"Body": body,
"Img_list": img_list,
"Size_list": size_list,
"Other_link": other_links,
"Category": category,
"variant_image": img_list[0],
"URL": url
}
save_to_json(product_data)
except Exception as e:
logger.error(f'Parse Data Error:{item} -{e}')
finally:
second_queue.task_done()
except Empty:
logger.info("- - - - - - - - - - - - - 详细队列为空,退出解析线程")
break
def get_classify():
offset = 0
while 1:
url = f"https://www.camper.com/eshop-api/api/v1/products/list?profile=en_US&targetId=W&family=1&filter.collection=tose&order=default&pager.offset={offset}&pager.limit=40&square=true"
response = fetch_response(url).json()
products_list = response.get("products")
if not products_list:
break
for products in products_list:
mark = products["id"]
detail_url = products["url"]
color = products["color"]
data = {"mark": mark, "url": detail_url, "category": "", "color": color}
second_queue.put(data)
print(f"{url} --- 完成,Items:{len(products_list)}")
offset += 40
def main():
"""主函数"""
get_classify()
# 第三步:解析详情页
with ThreadPoolExecutor(max_workers=12) as executor:
parse_futures = [executor.submit(parse_data) for _ in range(12)]
for future in parse_futures:
future.result()
print("- - - - - - - - - - - - - - Crawling completed - - - - - - - - - - - - - -")
if __name__ == "__main__":
main()