jcpenney-getpid
import requests
import json
import time
from datetime import datetime
def get_product_ids(page):
url = "https://search-api.jcpenney.com/v1/search-service/g/men/mens-coats-jackets"
params = {
"productGridView": "medium",
"id": "cat100290087",
"page": page,
"responseType": "organic"
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Referer": "https://www.jcpenney.com/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"sec-ch-ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"'
}
try:
response = requests.get(url, params=params, headers=headers)
data = response.json()
# 保存响应到文件以便检查
with open(f"jcpenney_response_page{page}.json", "w", encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
if "organicZoneInfo" in data:
products = data["organicZoneInfo"].get("products", [])
print(f"\nPage {page} - Found {len(products)} products")
product_ids = []
for i, product in enumerate(products):
ppid = product.get("ppId")
name = product.get("name")
if ppid:
product_ids.append(ppid)
print(f"{i+1}. {name} - {ppid}")
print(f"Successfully extracted {len(product_ids)} ppIds")
return product_ids
else:
print("No organicZoneInfo found in response")
return []
except Exception as e:
print(f"Error: {str(e)}")
return []
def main():
page = 1
all_ids = []
while True:
ids = get_product_ids(page)
if not ids:
break
all_ids.extend(ids)
page += 1
time.sleep(1)
with open("jcpenney_pid.txt", "w") as f:
for pid in all_ids:
f.write(f"{pid}\n")
print(f"\nTotal ppIds saved: {len(all_ids)}")
if __name__ == "__main__":
main()
jcpenney-getinfo
import json
import csv
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
import unicodedata
from datetime import datetime
import logging
# 设置日志记录
logging.basicConfig(
filename=f'jcpenney_scraper_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Shopify 标准 CSV 格式字段
fields = [
"Handle", "Title", "Body (HTML)", "Vendor", "Type", "Tags", "Published",
"Option1 Name", "Option1 Value", "Option2 Name", "Option2 Value",
"Option3 Name", "Option3 Value", "Variant SKU", "Variant Grams",
"Variant Inventory Tracker", "Variant Inventory Qty", "Variant Inventory Policy",
"Variant Fulfillment Service", "Variant Price", "Variant Compare At Price",
"Variant Requires Shipping", "Variant Taxable", "Variant Barcode",
"Image Src", "Image Position", "Image Alt Text", "Gift Card",
"SEO Title", "SEO Description", "Google Shopping / Google Product Category",
"Google Shopping / Gender", "Google Shopping / Age Group",
"Google Shopping / MPN", "Google Shopping / AdWords Grouping",
"Google Shopping / AdWords Labels", "Google Shopping / Condition",
"Google Shopping / Custom Product", "Google Shopping / Custom Label 0",
"Google Shopping / Custom Label 1", "Google Shopping / Custom Label 2",
"Google Shopping / Custom Label 3", "Google Shopping / Custom Label 4",
"Variant Image", "Variant Weight Unit", "Variant Tax Code",
"Cost per item", "Status"
]
# 创建一个线程安全的写入队列
write_queue = Queue()
# 创建一个线程安全的计数器
counter_lock = threading.Lock()
processed_count = 0
total_pids = 0
def normalize_text(text):
"""处理特殊字符和乱码"""
if not text:
return ""
# 将特殊字符转换为基本拉丁字符
normalized = unicodedata.normalize('NFKD', str(text))
# 移除组合字符
normalized = ''.join(c for c in normalized if not unicodedata.combining(c))
# 特殊字符映射
char_map = {
'ł': 'l', 'Ł': 'L',
'ą': 'a', 'Ą': 'A',
'ć': 'c', 'Ć': 'C',
'ę': 'e', 'Ę': 'E',
'ń': 'n', 'Ń': 'N',
'ó': 'o', 'Ó': 'O',
'ś': 's', 'Ś': 'S',
'ź': 'z', 'Ź': 'Z',
'ż': 'z', 'Ż': 'Z',
'–': '-', '—': '-',
'"': '"', '"': '"',
''': "'", ''': "'",
}
for old, new in char_map.items():
normalized = normalized.replace(old, new)
return normalized
def get_json_data(ppid):
"""从JCPenney API获取JSON数据"""
try:
url = f"https://browse-api.jcpenney.com/browse-aggregator/v2/product-summaries-aggregator?ppId={ppid}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
if not data.get("response"):
logging.warning(f"No response data for ppId {ppid}")
return None
return data
except requests.exceptions.RequestException as e:
logging.error(f"Request failed for ppId {ppid}: {str(e)}")
return None
except Exception as e:
logging.error(f"Error fetching data for ppId {ppid}: {str(e)}")
return None
def process_product(ppid):
"""处理单个商品数据"""
global processed_count
try:
logging.info(f"Processing ppId: {ppid}")
data = get_json_data(ppid)
if not data or not data.get("response"):
logging.warning(f"No valid data for ppId {ppid}")
return
# 获取产品数据
product = data["response"][0]
# 记录基本信息
handle = product.get("id", "").strip()
logging.info(f"Processing product {handle} from ppId {ppid}")
# 商品基础信息
title = normalize_text(product.get("name", ""))
body_html = normalize_text(product.get("lots", [])[0].get("description", "")) if product.get("lots") else ""
vendor = normalize_text(product.get("brand", {}).get("name", ""))
product_type = normalize_text(product.get("category", {}).get("name", ""))
published = "TRUE"
# 获取价格信息
pricing = product.get("pricing", {}).get("root", {})
amounts = pricing.get("amounts", [])
sale_price = ""
compare_price = ""
for amount in amounts:
if amount.get("type") == "SALE":
sale_price = amount.get("min", "")
elif amount.get("type") == "ORIGINAL":
compare_price = amount.get("min", "")
# 获取变体信息
dimensions = product.get("dimensions", [])
size_dimension = next((d for d in dimensions if d.get("name") == "size"), None)
color_dimension = next((d for d in dimensions if d.get("name") == "color"), None)
# 获取颜色对应的图片信息
color_images = {}
if color_dimension:
for color_option in color_dimension.get("options", []):
color_value = color_option.get("value", "")
color_images[color_value] = []
# 获取主图
product_image = color_option.get("productImage", {})
if product_image and product_image.get("url"):
color_images[color_value].append({
"src": product_image.get("url"),
"position": 1,
"alt": normalize_text(color_option.get("altText", title))
})
# 获取备用图片
alt_images = color_option.get("altImages", [])
for idx, alt_img in enumerate(alt_images, 2):
if alt_img.get("url"):
color_images[color_value].append({
"src": alt_img.get("url"),
"position": idx,
"alt": normalize_text(color_option.get("altText", title))
})
# 构造变体数据
rows = []
items = product.get("lots", [])[0].get("items", []) if product.get("lots") else []
if items:
for item in items:
option_values = item.get("optionValues", [])
size_value = next((opt.get("value", "") for opt in option_values if opt.get("name") == "size"), "")
color_value = next((opt.get("value", "") for opt in option_values if opt.get("name") == "color"), "")
variant_sku = item.get("id", "")
# 为每个变体创建基础数据
base_row = {
"Handle": handle,
"Title": title,
"Body (HTML)": body_html,
"Vendor": vendor,
"Type": product_type,
"Tags": "",
"Published": published,
"Option1 Name": "Size" if size_value else "",
"Option1 Value": normalize_text(size_value),
"Option2 Name": "Color" if color_value else "",
"Option2 Value": normalize_text(color_value),
"Variant SKU": variant_sku,
"Variant Grams": "",
"Variant Inventory Tracker": "shopify",
"Variant Inventory Qty": "100",
"Variant Inventory Policy": "deny",
"Variant Fulfillment Service": "manual",
"Variant Price": sale_price,
"Variant Compare At Price": compare_price,
"Variant Requires Shipping": "TRUE",
"Variant Taxable": "TRUE",
"Variant Barcode": item.get("primaryBarcode", ""),
"Gift Card": "FALSE",
"SEO Title": title,
"SEO Description": body_html,
"Google Shopping / Google Product Category": "",
"Google Shopping / Gender": "Unisex",
"Google Shopping / Age Group": "Adult",
"Google Shopping / MPN": handle,
"Google Shopping / AdWords Grouping": "",
"Google Shopping / AdWords Labels": "",
"Google Shopping / Condition": "New",
"Google Shopping / Custom Product": "FALSE",
"Google Shopping / Custom Label 0": "",
"Google Shopping / Custom Label 1": "",
"Google Shopping / Custom Label 2": "",
"Google Shopping / Custom Label 3": "",
"Google Shopping / Custom Label 4": "",
"Variant Image": "",
"Variant Weight Unit": "kg",
"Variant Tax Code": "",
"Cost per item": "",
"Status": "active"
}
# 获取当前颜色的图片
current_images = color_images.get(color_value, [])
if not current_images: # 如果没有找到对应颜色的图片,使用默认图片
current_images = color_images.get(list(color_images.keys())[0], []) if color_images else []
# 为每个图片创建一行数据
for img in current_images:
row = base_row.copy()
row["Image Src"] = img["src"]
row["Image Position"] = img["position"]
row["Image Alt Text"] = img["alt"]
# 只在第一个图片行显示产品信息
if img["position"] > 1:
row["Handle"] = handle
row["Title"] = ""
row["Body (HTML)"] = ""
row["Vendor"] = ""
row["Type"] = ""
row["Tags"] = ""
row["Option1 Name"] = ""
row["Option1 Value"] = ""
row["Option2 Name"] = ""
row["Option2 Value"] = ""
row["Variant SKU"] = ""
row["SEO Title"] = ""
row["SEO Description"] = ""
rows.append(row)
else:
# 如果没有变体,创建单个产品数据
base_row = {
"Handle": handle,
"Title": title,
"Body (HTML)": body_html,
"Vendor": vendor,
"Type": product_type,
"Tags": "",
"Published": published,
"Option1 Name": "",
"Option1 Value": "",
"Option2 Name": "",
"Option2 Value": "",
"Variant SKU": handle,
"Variant Grams": "",
"Variant Inventory Tracker": "shopify",
"Variant Inventory Qty": "100",
"Variant Inventory Policy": "deny",
"Variant Fulfillment Service": "manual",
"Variant Price": sale_price,
"Variant Compare At Price": compare_price,
"Variant Requires Shipping": "TRUE",
"Variant Taxable": "TRUE",
"Variant Barcode": "",
"Gift Card": "FALSE",
"SEO Title": title,
"SEO Description": body_html,
"Google Shopping / Google Product Category": "",
"Google Shopping / Gender": "Unisex",
"Google Shopping / Age Group": "Adult",
"Google Shopping / MPN": handle,
"Google Shopping / AdWords Grouping": "",
"Google Shopping / AdWords Labels": "",
"Google Shopping / Condition": "New",
"Google Shopping / Custom Product": "FALSE",
"Google Shopping / Custom Label 0": "",
"Google Shopping / Custom Label 1": "",
"Google Shopping / Custom Label 2": "",
"Google Shopping / Custom Label 3": "",
"Google Shopping / Custom Label 4": "",
"Variant Image": "",
"Variant Weight Unit": "kg",
"Variant Tax Code": "",
"Cost per item": "",
"Status": "active"
}
# 为每个图片创建一行数据
for img in color_images.get(list(color_images.keys())[0], []):
row = base_row.copy()
row["Image Src"] = img["src"]
row["Image Position"] = img["position"]
row["Image Alt Text"] = img["alt"]
if img["position"] > 1:
row["Handle"] = handle
row["Title"] = ""
row["Body (HTML)"] = ""
row["Vendor"] = ""
row["Type"] = ""
row["Tags"] = ""
row["Option1 Name"] = ""
row["Option1 Value"] = ""
row["Option2 Name"] = ""
row["Option2 Value"] = ""
row["Variant SKU"] = ""
row["SEO Title"] = ""
row["SEO Description"] = ""
rows.append(row)
# 将处理好的行数据放入写入队列
for row in rows:
write_queue.put(row)
# 更新进度
with counter_lock:
global processed_count
processed_count += 1
msg = f"Progress: {processed_count}/{total_pids} - Processed: {ppid}"
print(msg)
logging.info(msg)
except Exception as e:
logging.error(f"Error processing {ppid}: {str(e)}")
import traceback
logging.error(traceback.format_exc())
def writer_thread(output_file):
"""CSV写入线程"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"jcpenney_products_{timestamp}.csv"
with open(output_file, "w", newline="", encoding="utf-8-sig") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fields)
writer.writeheader()
while True:
row = write_queue.get()
if row is None: # 结束信号
break
writer.writerow(row)
csvfile.flush() # 确保立即写入磁盘
write_queue.task_done()
def main():
start_time = datetime.now()
logging.info(f"Starting process at {start_time}")
# 读取产品ID文件
with open("jcpenney_pid.txt", "r", encoding="utf-8") as f:
pids = [line.strip() for line in f if line.strip()]
global total_pids
total_pids = len(pids)
logging.info(f"Total PIDs to process: {total_pids}")
print(f"Total PIDs to process: {total_pids}")
# 启动CSV写入线程
writer = threading.Thread(target=writer_thread, args=(None,))
writer.start()
# 使用线程池处理PIDs
failed_pids = []
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任务
future_to_pid = {executor.submit(process_product, pid): pid for pid in pids}
# 等待所有任务完成
for future in as_completed(future_to_pid):
pid = future_to_pid[future]
try:
future.result()
except Exception as e:
failed_pids.append(pid)
logging.error(f"Task failed for pid {pid}: {str(e)}")
print(f"Task failed for pid {pid}: {str(e)}")
# 发送结束信号给写入线程
write_queue.put(None)
# 等待写入线程完成
writer.join()
end_time = datetime.now()
duration = end_time - start_time
# 记录最终统计信息
summary = f"""
Process completed at {end_time}
Duration: {duration}
Total PIDs: {total_pids}
Processed: {processed_count}
Failed: {len(failed_pids)}
Failed PIDs: {failed_pids}
"""
logging.info(summary)
print(summary)
if __name__ == "__main__":
main()