import json
import csv
import requests
import time
import re
import os
import hashlib
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
import unicodedata
from lxml import html
# Shopify 标准 CSV 格式字段
fields = [
"Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
"Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
"Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
"Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
"Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
"Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
"Image Src", "Image Position", "Image Alt Text", "Gift Card",
"SEO Title", "SEO Description", "Google Shopping / Google Product Category",
"Google Shopping / Gender", "Google Shopping / Age Group",
"Google Shopping / MPN", "Google Shopping / AdWords Grouping",
"Google Shopping / AdWords Labels", "Google Shopping / Condition",
"Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
"Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
"Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
"Variant Image", "Variant Weight Unit", "Variant Tax Code",
"Cost per item", "Status"
]
# 创建一个线程安全的写入队列
write_queue = Queue()
# 创建一个线程安全的计数器
counter_lock = threading.Lock()
processed_count = 0
total_urls = 0
# 缓存目录
CACHE_DIR = "html_cache"
# 确保缓存目录存在
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
def normalize_text(text):
"""处理特殊字符和乱码"""
if not text:
return ""
# 将特殊字符转换为基本拉丁字符
normalized = unicodedata.normalize('NFKD', text)
# 移除组合字符
normalized = ''.join(c for c in normalized if not unicodedata.combining(c))
# 特殊字符映射
char_map = {
'ł': 'l', 'Ł': 'L',
'ą': 'a', 'Ą': 'A',
'ć': 'c', 'Ć': 'C',
'ę': 'e', 'Ę': 'E',
'ń': 'n', 'Ń': 'N',
'ó': 'o', 'Ó': 'O',
'ś': 's', 'Ś': 'S',
'ź': 'z', 'Ź': 'Z',
'ż': 'z', 'Ż': 'Z',
'–': '-', '—': '-',
'"': '"', '"': '"',
''': "'", ''': "'",
}
for old, new in char_map.items():
normalized = normalized.replace(old, new)
return normalized
def get_json_data(url):
"""从URL获取JSON数据(如果URL是JSON API端点)
注意:对于普通的产品页面URL(如 /products/xxx),通常返回HTML而不是JSON,
这种情况下会直接返回None,不尝试解析。
"""
# 如果URL看起来是普通产品页面,不尝试获取JSON(通常是HTML页面)
if '/products/' in url and not url.endswith('.json'):
return None
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/json'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# 检查响应内容类型
content_type = response.headers.get('Content-Type', '').lower()
if 'application/json' not in content_type:
# 如果Content-Type明确不是JSON(比如是HTML),直接返回None
if 'text/html' in content_type:
return None
# 否则尝试解析看看
try:
return response.json()
except (ValueError, json.JSONDecodeError):
# 如果解析失败,说明不是JSON格式,返回None
return None
return response.json()
except (ValueError, json.JSONDecodeError):
# JSON解析错误,说明返回的不是JSON格式,这是正常的(可能是HTML页面)
return None
except Exception as e:
# 其他错误才打印
print(f"Error fetching JSON data from {url}: {str(e)}")
return None
def get_cache_filename(url):
"""根据URL生成缓存文件名"""
# 使用URL的hash值作为文件名
url_hash = hashlib.md5(url.encode('utf-8')).hexdigest()
return os.path.join(CACHE_DIR, f"{url_hash}.html")
def save_html_cache(url, html_content):
"""保存HTML内容到缓存"""
try:
cache_file = get_cache_filename(url)
with open(cache_file, 'w', encoding='utf-8') as f:
f.write(html_content)
return True
except Exception as e:
print(f"Error saving cache for {url}: {str(e)}")
return False
def load_html_cache(url):
"""从缓存加载HTML内容"""
try:
cache_file = get_cache_filename(url)
if os.path.exists(cache_file):
with open(cache_file, 'r', encoding='utf-8') as f:
return f.read()
return None
except Exception as e:
print(f"Error loading cache for {url}: {str(e)}")
return None
def get_html_data(url, use_cache=True, silent=False):
"""从URL获取HTML数据,支持缓存
Args:
url: 要获取的URL
use_cache: 是否使用缓存
silent: 是否静默模式(不打印缓存信息)
"""
# 先尝试从缓存加载
if use_cache:
cached_content = load_html_cache(url)
if cached_content:
if not silent:
print(f"[缓存] {url[:80]}...")
return cached_content
# 缓存不存在或禁用缓存,从网络获取
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
html_content = response.text
# 保存到缓存
if use_cache and html_content:
save_html_cache(url, html_content)
if not silent:
print(f"[获取] {url[:80]}...")
return html_content
except Exception as e:
print(f"Error fetching HTML from {url}: {str(e)}")
return None
def extract_product_links_from_list(html_content, base_url=""):
"""从列表页提取商品链接"""
try:
tree = html.fromstring(html_content)
# XPath路径 - 用户提供的路径: /html/body/div[1]/div[4]/div[4]/collection-filter/div/section/div[6]/div/div/div[1]/div[*]/div[*]/div[1]/a
# 转换为lxml支持的XPath格式,使用//来匹配任意层级的div
# 精确匹配用户提供的路径结构
link_xpath_exact = '/html/body/div[1]/div[4]/div[4]/collection-filter/div/section/div[6]/div/div/div[1]//div//div//div[1]/a'
# 备用路径:更通用的匹配
link_xpath_alt1 = '//body/div[1]/div[4]/div[4]//collection-filter//section//div[6]//div//div//div[1]//div//div//div[1]//a'
# 备用路径:通过class或标签名匹配
link_xpath_alt2 = '//collection-filter//section//div[6]//div//div//div[1]//div//div//div[1]//a'
# 备用路径:匹配所有包含/products/的链接
link_xpath_alt3 = '//a[contains(@href, "/products/")]'
# 尝试多个XPath路径,按优先级顺序
links = []
for xpath in [link_xpath_exact, link_xpath_alt1, link_xpath_alt2, link_xpath_alt3]:
try:
elements = tree.xpath(xpath)
if elements:
print(f"使用XPath找到 {len(elements)} 个链接元素: {xpath[:50]}...")
for elem in elements:
href = elem.get('href', '')
if href:
# 处理相对URL
if href.startswith('/'):
if base_url:
from urllib.parse import urljoin
full_url = urljoin(base_url, href)
else:
# 如果没有base_url,尝试从原始URL构建
if 'us.st-dupont.com' in base_url or not base_url:
full_url = f"https://us.st-dupont.com{href}"
else:
full_url = href
elif href.startswith('http'):
full_url = href
else:
continue
# 只保留商品页面链接
if '/products/' in full_url and full_url not in links:
links.append(full_url)
if links:
print(f"成功提取到 {len(links)} 个商品链接")
break
except Exception as e:
print(f"XPath执行错误 {xpath[:50]}...: {str(e)}")
continue
return links
except Exception as e:
print(f"Error extracting product links from list page: {str(e)}")
import traceback
traceback.print_exc()
return []
def extract_data_from_html(html_content):
"""使用XPath从HTML中提取数据"""
try:
tree = html.fromstring(html_content)
# XPath路径
detail_xpath = '/html/body/div[5]/div[2]/div[3]/div/div[2]/div/div/div/div/div[2]/div[10]/div[1]/div[1]/div/div/div'
title_xpath = '/html/body/div[4]/div[2]/div[2]/div/div/div/nav/text()'
price_xpath = '/html/body/div[4]/div[2]/div[3]/div/div[2]/div/div/div/div/div[2]/div[2]/p/span'
color_xpath = '/html/body/div[5]/div[2]/div[3]/div/div[2]/div/div/div/div/div[2]/div[6]/div/p'
image_xpath = '/html/body/div[5]/div[2]/div[3]/div/div[1]/div[2]/div/div/div[*]/a/img'
# 提取详情(保留HTML格式)
# 根据用户提供的CSS选择器路径转换
# #shopify-section... > div.product-content__tabs > div.product-content__tabs > div:nth-child(1) > div > div
# 实际结构:div.product-content__tabs > div.product-content__tabs > div.accordion > div.accordion__item > div.accordion__body > div.accordion__content > div.accordion__entry
detail_xpaths = [
'//div[@class="product-content__tabs"]/div[@class="product-content__tabs"]/div[1]/div/div', # 用户指定的路径(精确匹配)
'//div[contains(@class, "product-content__tabs")]/div[contains(@class, "product-content__tabs")]/div[1]/div/div', # 更通用的匹配
'//div[@class="product-content__tabs"]/div[@class="product-content__tabs"]//div[@class="accordion__entry"]', # 通过完整路径匹配
'//div[@class="accordion__entry"]', # 通过class匹配(最直接)
'//div[contains(@class, "accordion__entry")]', # 更通用的匹配
detail_xpath, # 原始路径
'/html/body/div[5]/div[2]/div[3]/div/div[2]/div/div/div/div/div[2]/div[10]/div[1]/div[1]/div/div/div', # 备用路径
]
detail = ""
for xpath in detail_xpaths:
try:
detail_elements = tree.xpath(xpath)
if detail_elements:
# 保留HTML格式,使用html.tostring转换为HTML字符串
detail_parts = []
for elem in detail_elements:
if hasattr(elem, 'text_content'):
# 获取元素的HTML内容
html_str = html.tostring(elem, encoding='unicode', method='html')
detail_parts.append(html_str)
else:
detail_parts.append(str(elem))
detail = ' '.join(detail_parts)
# 清理多余的空白字符,但保留HTML标签
detail = re.sub(r'\s+', ' ', detail).strip()
if detail:
break
except Exception as e:
continue
# 调试信息:检查详情提取情况
if not detail:
# 尝试更通用的方法
try:
# 查找所有包含描述的div
desc_elements = tree.xpath('//div[contains(@class, "accordion")]//div[contains(@class, "entry")]')
if desc_elements:
detail_parts = []
for elem in desc_elements:
html_str = html.tostring(elem, encoding='unicode', method='html')
detail_parts.append(html_str)
detail = ' '.join(detail_parts)
detail = re.sub(r'\s+', ' ', detail).strip()
except Exception:
pass
# 提取标题(尝试多个XPath路径)
title = ""
title_xpaths = [
title_xpath, # 原始路径
'/html/body/div[4]/div[2]/div[2]/div/div/div/nav/text()', # 备用路径1
'//h1[@class="title-product-collection"]/text()', # 通过class匹配
'//div[@class="product-content__title"]//h1/text()', # 通过父元素匹配
'//h1[contains(@class, "title")]/text()', # 更通用的匹配
]
for xpath in title_xpaths:
try:
title_elements = tree.xpath(xpath)
if title_elements:
title = normalize_text(' '.join([str(elem).strip() for elem in title_elements if elem]))
if title:
break
except Exception:
continue
# 提取价格(从span元素直接提取)
price_elements = tree.xpath(price_xpath)
price = ""
if price_elements:
for elem in price_elements:
# 获取span元素的文本内容
price_text = elem.text_content() if hasattr(elem, 'text_content') else str(elem)
if price_text:
# 提取价格(匹配 $数字 或 数字 格式)
price_match = re.search(r'[\$€£]?\s*([\d,]+\.?\d*)', price_text)
if price_match:
price = normalize_text(price_match.group(1).replace(',', '').strip())
break
# 提取颜色(尝试多个XPath路径)
color = ""
color_xpaths = [
color_xpath, # 原始路径
'/html/body/div[5]/div[2]/div[3]/div/div[2]/div/div/div/div/div[2]/div[6]/div/p', # 备用路径1
'//h2[@class="title-product-name"]/text()', # 通过class匹配(实际结构)
'//h2[contains(@class, "title-product-name")]/text()', # 更通用的匹配
'//div[@class="product-content__title"]//h2[@class="title-product-name"]/text()', # 通过父元素匹配
]
for xpath in color_xpaths:
try:
color_elements = tree.xpath(xpath)
if color_elements:
# 过滤掉空文本,取最后一个非空的(通常是颜色)
for elem in reversed(color_elements):
if hasattr(elem, 'strip'):
color_text = str(elem).strip()
else:
color_text = elem.text_content() if hasattr(elem, 'text_content') else str(elem)
color_text = color_text.strip()
if color_text and color_text.lower() not in ['', 'guilloche lighter', 'lighter']:
color = normalize_text(color_text)
break
if color:
break
except Exception:
continue
# 如果还没找到,尝试从所有title-product-name中取最后一个非空的
if not color:
try:
all_titles = tree.xpath('//h2[@class="title-product-name"]/text()')
for title in reversed(all_titles):
title_text = str(title).strip() if hasattr(title, 'strip') else str(title).strip()
if title_text and title_text.lower() not in ['', 'guilloche lighter', 'lighter']:
color = normalize_text(title_text)
break
except Exception:
pass
# 提取图片
# 根据实际HTML结构,图片在product-carousel中
image_xpaths = [
'//div[@class="product-main__images"]//img[@class="product-carousel__image"]', # 最精确的路径
'//div[contains(@class, "product-carousel")]//img', # 通过class匹配
'//img[contains(@class, "product-carousel__image")]', # 直接匹配图片class
'//div[@class="product-main__images"]//img', # 产品图片区域的所有img
'/html/body/div[5]/div[2]/div[3]/div/div[1]/div[2]//img', # 备用路径
]
images = []
image_elements = []
# 尝试每个XPath路径
for xpath in image_xpaths:
try:
elements = tree.xpath(xpath)
if elements:
image_elements = elements
break
except Exception:
continue
if image_elements:
for elem in image_elements:
# 优先获取src属性,如果没有则尝试data-src(懒加载)
img_src = elem.get('src', '') or elem.get('data-src', '') or elem.get('data-lazy-src', '')
if not img_src:
# 如果都没有,尝试从父元素<a>获取href
if hasattr(elem, 'getparent'):
parent = elem.getparent()
if parent is not None and parent.tag == 'a':
href = parent.get('href', '')
if href and href.startswith('http'):
img_src = href
if img_src:
# 处理相对URL
if img_src.startswith('//'):
img_src = 'https:' + img_src
elif img_src.startswith('/'):
img_src = 'https://us.st-dupont.com' + img_src
elif not img_src.startswith('http'):
# 如果是相对路径,尝试构建完整URL
from urllib.parse import urljoin
img_src = urljoin('https://us.st-dupont.com', img_src)
# 保留URL的查询参数(如?v=...),这些参数通常用于缓存控制
# 不清理URL,保持原始格式
if img_src and img_src not in images:
images.append(img_src)
return {
'detail': detail,
'title': title,
'price': price,
'color': color,
'images': images
}
except Exception as e:
print(f"Error extracting data from HTML: {str(e)}")
return {
'detail': '',
'title': '',
'price': '',
'color': '',
'images': []
}
def process_product(url):
"""处理单个商品数据 - 仅使用HTML XPath提取"""
global processed_count
# 从HTML提取数据
html_content = get_html_data(url)
if not html_content:
print(f"无法获取HTML内容: {url}")
return
html_data = extract_data_from_html(html_content)
# 检查是否成功提取到标题
if not html_data.get('title'):
print(f"警告: 无法从HTML提取标题: {url}")
return
# 使用HTML数据创建产品记录
handle = url.split('/')[-1] if '/' in url else url
title = html_data.get('title', '')
body_html = html_data.get('detail', '')
vendor = ""
product_type = ""
tags = ""
published = "TRUE"
seo_title = title
seo_description = body_html
variant_price = html_data.get('price', '')
color = html_data.get('color', '')
images = html_data.get('images', [])
# 调试信息:检查图片提取情况
if not images:
print(f"提示: 未提取到图片: {url}")
else:
print(f"提取到 {len(images)} 张图片: {url}")
# 打印第一张图片URL用于调试
if images and images[0]:
print(f" 第一张图片: {images[0][:80]}...")
# 调试信息:检查颜色提取情况
if color:
print(f"提取到颜色: {color} - {url}")
# 调试信息:检查详情提取情况
if not body_html:
print(f"提示: 未提取到详情: {url}")
else:
print(f"提取到详情 ({len(body_html)} 字符): {url[:60]}...")
# 如果没有图片,至少创建一个空行
if not images:
images = [""]
# 为每个图片创建一行(Shopify格式支持多图片)
rows = []
for idx, image_url in enumerate(images):
# 确保image_url是字符串
image_url_str = str(image_url) if image_url else ""
row = {
"Handle": handle,
"Title": title if idx == 0 else "", # 只在第一行显示标题
"Body (HTML)": body_html if idx == 0 else "", # 只在第一行显示详情
"Vendor": vendor if idx == 0 else "",
"Type": product_type if idx == 0 else "",
"Tags": tags if idx == 0 else "",
"Published": published if idx == 0 else "",
"Option1 Name": "Color",
"Option1 Value": color if idx == 0 else "", # 只在第一行显示颜色
"Option2 Name": "",
"Option2 Value": "",
"Variant SKU": handle,
"Variant Grams": "",
"Variant Inventory Tracker": "shopify",
"Variant Inventory Qty": 100,
"Variant Inventory Policy": "deny",
"Variant Fulfillment Service": "manual",
"Variant Price": variant_price,
"Variant Compare At Price": "",
"Variant Requires Shipping": "TRUE",
"Variant Taxable": "TRUE",
"Variant Barcode": "",
"Image Src": image_url_str,
"Image Position": idx + 1,
"Image Alt Text": title,
"Gift Card": "FALSE",
"SEO Title": seo_title if idx == 0 else "",
"SEO Description": seo_description if idx == 0 else "",
"Google Shopping / Google Product Category": "",
"Google Shopping / Gender": "Unisex",
"Google Shopping / Age Group": "Adult",
"Google Shopping / MPN": handle,
"Google Shopping / AdWords Grouping": "",
"Google Shopping / AdWords Labels": "",
"Google Shopping / Condition": "New",
"Google Shopping / Custom Product": "FALSE",
"Google Shopping / Custom Label 0": "",
"Google Shopping / Custom Label 1": "",
"Google Shopping / Custom Label 2": "",
"Google Shopping / Custom Label 3": "",
"Google Shopping / Custom Label 4": "",
"Variant Image": "",
"Variant Weight Unit": "kg",
"Variant Tax Code": "",
"Cost per item": "",
"Status": "active"
}
rows.append(row)
# 将所有行放入写入队列
for row in rows:
write_queue.put(row)
with counter_lock:
processed_count += 1
print(f"Progress: {processed_count}/{total_urls} - Processed: {url}")
def writer_thread(output_file):
"""CSV写入线程"""
row_count = 0
try:
with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fields)
writer.writeheader()
print(f"CSV文件已创建: {output_file}")
while True:
row = write_queue.get()
if row is None: # 结束信号
break
writer.writerow(row)
csvfile.flush() # 确保立即写入磁盘
row_count += 1
if row_count % 10 == 0:
print(f"已写入 {row_count} 行到CSV文件")
write_queue.task_done()
print(f"CSV写入完成,共写入 {row_count} 行数据到 {output_file}")
except Exception as e:
print(f"CSV写入错误: {str(e)}")
import traceback
traceback.print_exc()
def get_cache_stats():
"""获取缓存统计信息"""
try:
if not os.path.exists(CACHE_DIR):
return 0, 0
cache_files = [f for f in os.listdir(CACHE_DIR) if f.endswith('.html')]
total_size = sum(os.path.getsize(os.path.join(CACHE_DIR, f)) for f in cache_files)
return len(cache_files), total_size
except Exception:
return 0, 0
def scrape_list_pages(base_url, start_page=1, end_page=8):
"""从列表页提取所有商品链接"""
all_product_urls = []
# 清理base_url,移除page参数
from urllib.parse import urlparse, urlencode, parse_qs, urlunparse
parsed = urlparse(base_url)
query_params = parse_qs(parsed.query)
# 移除page参数
if 'page' in query_params:
del query_params['page']
# 重建基础URL(不含page参数)
clean_query = urlencode(query_params, doseq=True)
base_url_clean = urlunparse((
parsed.scheme,
parsed.netloc,
parsed.path,
parsed.params,
clean_query,
parsed.fragment
)).rstrip('?').rstrip('&')
for page in range(start_page, end_page + 1):
# 构建每页的URL
if page == 1:
list_url = base_url_clean
else:
# 添加page参数
separator = '&' if '?' in base_url_clean else '?'
list_url = f"{base_url_clean}{separator}page={page}"
print(f"正在爬取列表页 {page}/{end_page}: {list_url}")
html_content = get_html_data(list_url)
if html_content:
product_links = extract_product_links_from_list(html_content, list_url)
all_product_urls.extend(product_links)
print(f"从第 {page} 页提取到 {len(product_links)} 个商品链接")
else:
print(f"无法获取第 {page} 页的内容,可能已到最后一页")
# 如果连续两页都获取不到内容,停止爬取
if page > start_page:
break
# 添加延迟避免请求过快
time.sleep(1)
# 去重
unique_urls = list(dict.fromkeys(all_product_urls))
print(f"总共提取到 {len(unique_urls)} 个唯一商品链接")
return unique_urls
def main():
output_file = "shopify_products.csv"
# 显示缓存统计信息
cache_count, cache_size = get_cache_stats()
if cache_count > 0:
cache_size_mb = cache_size / (1024 * 1024)
print(f"缓存信息: {cache_count} 个已缓存页面 ({cache_size_mb:.2f} MB)")
else:
print("缓存信息: 无缓存")
print("-" * 60)
# 检查是否有列表页URL需要爬取
list_urls = []
product_urls = []
# 读取链接文件
try:
with open("1.txt", "r", encoding="utf-8") as f:
urls = [line.strip() for line in f if line.strip()]
# 分离列表页URL和商品页URL
for url in urls:
# 判断是否为列表页URL
is_list_page = False
if 'collections' in url:
# 包含collections且包含page参数,或者是collections结尾
if 'page=' in url or url.rstrip('/').endswith('collections'):
is_list_page = True
# 或者URL格式类似 /collections/xxx
elif '/collections/' in url and '/products/' not in url:
is_list_page = True
if is_list_page:
list_urls.append(url)
else:
product_urls.append(url)
except FileNotFoundError:
print("未找到 1.txt 文件,将使用默认列表页URL")
# 如果没有文件,使用用户提供的URL
list_urls = ["https://us.st-dupont.com/collections/cigar-cutters"]
# 如果存在列表页URL,先爬取列表页获取商品链接
if list_urls:
for list_url in list_urls:
print(f"开始爬取列表页: {list_url}")
extracted_urls = scrape_list_pages(list_url, start_page=1, end_page=2)
product_urls.extend(extracted_urls)
# 去重所有商品URL
all_product_urls = list(dict.fromkeys(product_urls))
if not all_product_urls:
print("没有找到任何商品链接,程序退出")
return
global total_urls
total_urls = len(all_product_urls)
print(f"总共需要处理 {total_urls} 个商品链接")
# 启动CSV写入线程
writer = threading.Thread(target=writer_thread, args=(output_file,))
writer.start()
# 使用线程池处理URLs
with ThreadPoolExecutor(max_workers=10) as executor:
# 提交所有任务
futures = [executor.submit(process_product, url) for url in all_product_urls]
# 等待所有任务完成
for future in as_completed(futures):
try:
future.result() # 获取任务结果(如果有异常会在这里抛出)
except Exception as e:
print(f"Task failed: {e}")
# 等待所有任务完成并确保数据已写入队列
print("等待所有任务完成...")
# 发送结束信号给写入线程
write_queue.put(None)
# 等待写入线程完成
writer.join()
# 检查文件是否存在
if os.path.exists(output_file):
file_size = os.path.getsize(output_file)
print(f"\n✓ CSV文件已保存: {output_file}")
print(f"✓ 文件大小: {file_size} 字节")
else:
print(f"\n✗ 警告: CSV文件未创建: {output_file}")
print(f"✓ 总共处理: {processed_count}/{total_urls} 个商品")
if __name__ == "__main__":
main()