jellycat-getpid
import requests
import csv
import time
# 输出文件
output_file = "jellycat.csv"
# 基础 URL
base_url = "https://bmcyq0.a.searchspring.io/api/search/search.json"
# 基础参数
params = {
"lastViewed": "GO3AT,A4SCLB,BARM3BEE",
"userId": "39defc28-ec7e-49c4-a996-40ac03e0ffaa",
"domain": "https://us.jellycat.com/shop-all",
"sessionId": "8cdc5c9d-d60a-42c3-a36d-ababe3ae5072",
"pageLoadId": "fb444866-0bfc-4a77-a213-847e8fbe29ad",
"siteId": "bmcyq0",
"bgfilter.categories_hierarchy": "Explore All",
"bgfilter.ss_visibility": "visible",
"bgfilter.ss_is_retired": "0",
"redirectResponse": "full",
"ajaxCatalog": "Snap",
"resultsFormat": "native"
}
# 创建CSV文件并写入表头
with open(output_file, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(['uid', 'categories'])
page = 1
while True:
try:
print(f"正在处理第 {page} 页")
params['page'] = page
# 发送请求
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(base_url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
# 检查是否还有结果
if not data.get('results'):
print("没有更多数据,结束爬取")
break
# 提取数据并保存
with open(output_file, 'a', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
for item in data['results']:
uid = item.get('uid', '')
categories = ','.join(item.get('categories', []))
writer.writerow([uid, categories])
print(f"第 {page} 页处理完成")
page += 1
# 添加延时,避免请求过快
time.sleep(1)
except Exception as e:
print(f"处理第 {page} 页时出错:{e}")
break
print(f"所有页面处理完成,结果已保存到 {output_file}")
jellycat
import json
import csv
import requests
import time
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
import unicodedata
# Shopify 标准 CSV 格式字段
fields = [
"Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
"Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
"Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
"Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
"Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
"Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
"Image Src", "Image Position", "Image Alt Text", "Gift Card",
"SEO Title", "SEO Description", "Google Shopping / Google Product Category",
"Google Shopping / Gender", "Google Shopping / Age Group",
"Google Shopping / MPN", "Google Shopping / AdWords Grouping",
"Google Shopping / AdWords Labels", "Google Shopping / Condition",
"Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
"Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
"Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
"Variant Image", "Variant Weight Unit", "Variant Tax Code",
"Cost per item", "Status"
]
# 创建一个线程安全的写入队列
write_queue = Queue()
# 创建一个线程安全的计数器
counter_lock = threading.Lock()
processed_count = 0
total_urls = 0
def normalize_text(text):
"""处理特殊字符和乱码"""
if not text:
return ""
# 将特殊字符转换为基本拉丁字符
normalized = unicodedata.normalize('NFKD', text)
# 移除组合字符
normalized = ''.join(c for c in normalized if not unicodedata.combining(c))
# 特殊字符映射
char_map = {
'ł': 'l', 'Ł': 'L',
'ą': 'a', 'Ą': 'A',
'ć': 'c', 'Ć': 'C',
'ę': 'e', 'Ę': 'E',
'ń': 'n', 'Ń': 'N',
'ó': 'o', 'Ó': 'O',
'ś': 's', 'Ś': 'S',
'ź': 'z', 'Ź': 'Z',
'ż': 'z', 'Ż': 'Z',
'–': '-', '—': '-',
'"': '"', '"': '"',
''': "'", ''': "'",
}
for old, new in char_map.items():
normalized = normalized.replace(old, new)
return normalized
def get_json_data(url):
"""从URL获取JSON数据"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching data from {url}: {str(e)}")
return None
def get_product_data(pid):
"""从API获取商品数据"""
url = "https://swymstore-v3pro-01.swymrelay.com/api/v2/provider/getPlatformProducts"
params = {
"pid": "x6brv90zUZdp4etUf11CpCBVljhpJagHbBu1QBjBe9A="
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
}
data = {
'productids': f'[{pid}]',
'regid': 'YVYb_z2Z4XeNGM1BCLvoRr1yMcxxaz4PyChBQpUy2_ERGa7PRf3jDDxJAd1JCe6zFiJrSQn_LyDgN2cqMeaoRhX_oJ6ognBrnHFu-RilwwzIFXZ19s7Iz0h4H-DSQo-RkqXe4uJN_pVieDYHIhS8hUNROTLfD0funig1tlHOEj8',
'sessionid': 'bi3t015ssbaj8tphc45abfxvnfmq2pmbss7qw312fjitggrp0yt2a7bmy992q8ho'
}
try:
response = requests.post(url, params=params, headers=headers, data=data)
print(f"\nRequest URL: {response.url}")
print(f"Request Headers: {headers}")
print(f"Request Data: {data}")
print(f"Response Status Code: {response.status_code}")
print(f"Response Body: {response.text}")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching data for PID {pid}: {str(e)}")
return None
def process_product(url):
"""处理单个商品数据"""
global processed_count
data = get_json_data(url)
if not data:
return
try:
# 获取产品数据
product = data.get("product", {})
if not product:
return
# 商品基础信息
handle = product.get("id", "").strip()
title = normalize_text(product.get("productName", ""))
body_html = "" # Tezenis doesn't provide detailed description in this API
vendor = "Tezenis"
product_type = normalize_text(product.get("categoryName", ""))
published = "TRUE"
# 价格信息
price_data = product.get("price", {}).get("sales", {})
variant_price = price_data.get("value", "")
# 获取颜色信息
variation_attrs = product.get("variationAttributes", [])
colour = ""
size_name = ""
size_values = []
for attr in variation_attrs:
if attr.get("attributeId") == "color":
values = attr.get("values", [])
if values:
colour = normalize_text(values[0].get("displayValue", ""))
elif attr.get("displayName"): # 获取尺码名称
size_name = normalize_text(attr.get("displayName", ""))
size_values = attr.get("values", [])
# 商品图片信息
images = []
product_images = product.get("images", {}).get("large", [])
for img in product_images:
resized_urls = img.get("resizedUrls", [])
image_url = ""
for url in resized_urls:
if "sw=2000" in url:
image_url = url
break
if not image_url and resized_urls: # 如果没有2000尺寸,使用第一个URL
image_url = resized_urls[0]
if image_url:
images.append({
"src": image_url,
"position": len(images) + 1,
"alt": title
})
# 构造变体数据
rows = []
# 如果有尺码变体
if size_values:
for size_data in size_values:
variant_size = normalize_text(size_data.get("id", ""))
variant_sku = size_data.get("sku", "").strip()
if not variant_sku:
variant_sku = f"{handle}-{variant_size}"
# 为每个尺码创建基础数据
base_row = {
"Handle": handle,
"Title": title,
"Body (HTML)": body_html,
"Vendor": vendor,
"Type": product_type,
"Tags": "",
"Published": published,
"Option1 Name": size_name,
"Option1 Value": variant_size,
"Option2 Name": "Color" if colour else "",
"Option2 Value": colour,
"Variant SKU": variant_sku,
"Variant Grams": "",
"Variant Inventory Tracker": "shopify",
"Variant Inventory Qty": 100, # 默认库存量
"Variant Inventory Policy": "deny",
"Variant Fulfillment Service": "manual",
"Variant Price": variant_price,
"Variant Compare At Price": "",
"Variant Requires Shipping": "TRUE",
"Variant Taxable": "TRUE",
"Variant Barcode": "",
"Gift Card": "FALSE",
"SEO Title": title,
"SEO Description": body_html,
"Google Shopping / Google Product Category": "",
"Google Shopping / Gender": "Unisex",
"Google Shopping / Age Group": "Adult",
"Google Shopping / MPN": handle,
"Google Shopping / AdWords Grouping": "",
"Google Shopping / AdWords Labels": "",
"Google Shopping / Condition": "New",
"Google Shopping / Custom Product": "FALSE",
"Google Shopping / Custom Label 0": "",
"Google Shopping / Custom Label 1": "",
"Google Shopping / Custom Label 2": "",
"Google Shopping / Custom Label 3": "",
"Google Shopping / Custom Label 4": "",
"Variant Image": "",
"Variant Weight Unit": "kg",
"Variant Tax Code": "",
"Cost per item": "",
"Status": "active"
}
# 为每个图片创建一行数据
for img in images:
row = base_row.copy()
row["Image Src"] = img["src"]
row["Image Position"] = img["position"]
row["Image Alt Text"] = img["alt"]
# 只在第一个图片行显示产品信息
if img["position"] > 1:
row["Title"] = ""
row["Body (HTML)"] = ""
row["Vendor"] = ""
row["Type"] = ""
row["Tags"] = ""
row["Option1 Name"] = ""
row["Option1 Value"] = ""
row["Option2 Name"] = ""
row["Option2 Value"] = ""
row["Variant SKU"] = ""
row["SEO Title"] = ""
row["SEO Description"] = ""
rows.append(row)
else:
# 如果没有变体,创建单个产品的基础数据
base_row = {
"Handle": handle,
"Title": title,
"Body (HTML)": body_html,
"Vendor": vendor,
"Type": product_type,
"Tags": "",
"Published": published,
"Option1 Name": "Size" if size_name else "",
"Option1 Value": size_name,
"Option2 Name": "Color" if colour else "",
"Option2 Value": colour,
"Variant SKU": handle,
"Variant Grams": "",
"Variant Inventory Tracker": "shopify",
"Variant Inventory Qty": 100, # 默认库存量
"Variant Inventory Policy": "deny",
"Variant Fulfillment Service": "manual",
"Variant Price": variant_price,
"Variant Compare At Price": "",
"Variant Requires Shipping": "TRUE",
"Variant Taxable": "TRUE",
"Variant Barcode": "",
"Gift Card": "FALSE",
"SEO Title": title,
"SEO Description": body_html,
"Google Shopping / Google Product Category": "",
"Google Shopping / Gender": "Unisex",
"Google Shopping / Age Group": "Adult",
"Google Shopping / MPN": handle,
"Google Shopping / AdWords Grouping": "",
"Google Shopping / AdWords Labels": "",
"Google Shopping / Condition": "New",
"Google Shopping / Custom Product": "FALSE",
"Google Shopping / Custom Label 0": "",
"Google Shopping / Custom Label 1": "",
"Google Shopping / Custom Label 2": "",
"Google Shopping / Custom Label 3": "",
"Google Shopping / Custom Label 4": "",
"Variant Image": "",
"Variant Weight Unit": "kg",
"Variant Tax Code": "",
"Cost per item": "",
"Status": "active"
}
# 为每个图片创建一行数据
for img in images:
row = base_row.copy()
row["Image Src"] = img["src"]
row["Image Position"] = img["position"]
row["Image Alt Text"] = img["alt"]
# 只在第一个图片行显示产品信息
if img["position"] > 1:
row["Title"] = ""
row["Body (HTML)"] = ""
row["Vendor"] = ""
row["Type"] = ""
row["Tags"] = ""
row["Option1 Name"] = ""
row["Option1 Value"] = ""
row["Option2 Name"] = ""
row["Option2 Value"] = ""
row["Variant SKU"] = ""
row["SEO Title"] = ""
row["SEO Description"] = ""
rows.append(row)
# 将处理好的行数据放入写入队列
for row in rows:
write_queue.put(row)
# 更新进度
with counter_lock:
global processed_count
processed_count += 1
print(f"Progress: {processed_count}/{total_urls} - Processed: {url}")
except KeyError as e:
print(f"Missing key in data for {url}: {e}")
except Exception as e:
print(f"Error processing {url}: {e}")
def writer_thread(output_file):
"""CSV写入线程"""
with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fields)
writer.writeheader()
while True:
row = write_queue.get()
if row is None: # 结束信号
break
writer.writerow(row)
csvfile.flush() # 确保立即写入磁盘
write_queue.task_done()
def process_products():
# Shopify CSV字段
fields = [
"Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
"Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
"Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
"Variant Inventory Tracker", "Variant Inventory Qty",
"Variant Inventory Policy", "Variant Fulfillment Service",
"Variant Price", "Variant Compare At Price", "Variant Requires Shipping",
"Variant Taxable", "Variant Barcode", "Image Src", "Image Position",
"Image Alt Text", "Gift Card", "SEO Title", "SEO Description",
"Google Shopping / Google Product Category", "Google Shopping / Gender",
"Google Shopping / Age Group", "Google Shopping / MPN",
"Google Shopping / AdWords Grouping", "Google Shopping / AdWords Labels",
"Google Shopping / Condition", "Google Shopping / Custom Product",
"Google Shopping / Custom Label 0", "Google Shopping / Custom Label 1",
"Google Shopping / Custom Label 2", "Google Shopping / Custom Label 3",
"Google Shopping / Custom Label 4", "Variant Image", "Variant Weight Unit",
"Variant Tax Code", "Cost per item", "Status"
]
# 读取CSV文件获取pid和categories
products_data = {}
with open('jellycat.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
products_data[row['pid']] = row['categories']
# 创建新的CSV文件
with open('jellycat_processed.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fields)
writer.writeheader()
# 处理每个pid
for pid, categories in products_data.items():
print(f"Processing PID: {pid}")
data = get_product_data(pid)
if data and data[0].get('status') and data[0].get('productdata'):
product_data = data[0]['productdata']
# 基本信息
base_row = {
"Handle": pid,
"Title": product_data.get('name', ''),
"Body (HTML)": product_data.get('description', ''),
"Vendor": "Jellycat",
"Type": categories, # 使用CSV文件中的categories
"Published": "TRUE",
"Status": "active",
"Variant Inventory Tracker": "shopify",
"Variant Inventory Policy": "deny",
"Variant Fulfillment Service": "manual",
"Variant Requires Shipping": "TRUE",
"Variant Taxable": "TRUE",
"Gift Card": "FALSE",
"Variant Weight Unit": "kg"
}
# 变体信息
variants = product_data.get('variants', [])
if variants:
variant = variants[0] # 使用第一个变体
base_row.update({
"Variant SKU": variant.get('sku', ''),
"Variant Price": variant.get('price', ''),
"Variant Inventory Qty": variant.get('inventory_level', 0),
"Variant Barcode": variant.get('gtin', '')
})
# 图片信息
images = product_data.get('images', [])
if images:
for idx, img in enumerate(images, 1):
row = base_row.copy()
row.update({
"Image Src": img.get('url_standard', ''),
"Image Position": idx,
"Image Alt Text": img.get('description', '')
})
# 只在第一行显示产品信息
if idx > 1:
for key in ["Title", "Body (HTML)", "Vendor", "Type"]:
row[key] = ""
writer.writerow(row)
else:
# 如果没有图片,至少写入一行基本信息
writer.writerow(base_row)
else:
print(f"No data found for PID: {pid}")
# 添加延时避免请求过快
time.sleep(1)
def main():
output_file = "shopify_products.csv"
# 读取链接文件
with open("tezenis.txt", "r", encoding="utf-8") as f:
urls = [line.strip() for line in f if line.strip()]
global total_urls
total_urls = len(urls)
print(f"Total URLs to process: {total_urls}")
# 启动CSV写入线程
writer = threading.Thread(target=writer_thread, args=(output_file,))
writer.start()
# 使用线程池处理URLs
with ThreadPoolExecutor(max_workers=20) as executor:
# 提交所有任务
futures = [executor.submit(process_product, url) for url in urls]
# 等待所有任务完成
for future in as_completed(futures):
try:
future.result() # 获取任务结果(如果有异常会在这里抛出)
except Exception as e:
print(f"Task failed: {e}")
# 发送结束信号给写入线程
write_queue.put(None)
# 等待写入线程完成
writer.join()
print(f"All products data has been written to {output_file}")
print(f"Total processed: {processed_count}/{total_urls}")
if __name__ == "__main__":
process_products()