🎂 https://www.suunto.com/en-us/Product-search/See-all-Sports-Watches/
import random, csv, requests, logging, time, json
from selenium import webdriver
from copy import deepcopy
from time import sleep
from lxml import etree
class Suunto:
def __init__(self):
self.products_url = 'https://www.suunto.com/api/search/products?activecategory=3&languageId=en-US&page=1&sortby=&pageid=7465&searchTerm=&sku=&showProductsAndAccessories=false&disablePagination=false'
self.headers = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json",
"Referer": "",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36"
}
self.id = 0 # 保存到csv文件不用id字段
self.init_data = {
'Handle': '',
'Title': '',
'Body (HTML)': '',
'Vendor': '',
'Type': '',
'Tags': '',
'Published': 'TRUE',
'Option1 Name': 'Color',
'Option1 Value': '',
'Option2 Name': 'Size',
'Option2 Value': '',
'Option3 Name': '',
'Option3 Value': '',
'Variant SKU': '',
'Variant Grams': '',
'Variant Inventory Tracker': 'Shopify',
'Variant Inventory Qty': '',
'Variant Inventory Policy': '',
'Variant Fulfillment Service': '',
'Variant Price': '',
'Variant Compare At Price': '',
'Variant Requires Shipping': '',
'Variant Taxable': '',
'Variant Barcode': '',
'Image Src': '',
'Image Position': '',
'Image Alt Text': '',
'Gift Card': '',
'SEO Title': '',
'SEO Description': '',
'Variant Image': '',
'Status': '',
'Collection': '',
}
self.empty_data = {
'Handle': '',
'Title': '',
'Body (HTML)': '',
'Vendor': '',
'Type': '',
'Tags': '',
'Published': '',
'Option1 Name': '',
'Option1 Value': '',
'Option2 Name': '',
'Option2 Value': '',
'Option3 Name': '',
'Option3 Value': '',
'Variant SKU': '',
'Variant Grams': '',
'Variant Inventory Tracker': '',
'Variant Inventory Qty': '',
'Variant Inventory Policy': '',
'Variant Fulfillment Service': '',
'Variant Price': '',
'Variant Compare At Price': '',
'Variant Requires Shipping': '',
'Variant Taxable': '',
'Variant Barcode': '',
'Image Src': '',
'Image Position': '',
'Image Alt Text': '',
'Gift Card': '',
'SEO Title': '',
'SEO Description': '',
'Variant Image': '',
'Status': '',
'Collection': '',
}
self.field_names = ['Handle', 'Title', 'Body (HTML)', 'Vendor', 'Type', 'Tags', 'Published', 'Option1 Name',
'Option1 Value', 'Option2 Name', 'Option2 Value', 'Option3 Name', 'Option3 Value',
'Variant SKU', 'Variant Grams', 'Variant Inventory Tracker', 'Variant Inventory Qty',
'Variant Inventory Policy', 'Variant Fulfillment Service', 'Variant Price',
'Variant Compare At Price', 'Variant Requires Shipping', 'Variant Taxable',
'Variant Barcode', 'Image Src', 'Image Position', 'Image Alt Text', 'Gift Card',
'SEO Title', 'SEO Description', 'Variant Image', 'Status', 'Collection']
self.file = None
self.writer = None
def simulated_smooth_scroll(self, driver, step=1000, interval=0.5, timeout=30):
# 平滑移动到底部
start_time = time.time()
last_height = driver.execute_script("return document.documentElement.scrollHeight")
current_position = 0
while time.time() - start_time < timeout:
# 计算剩余滚动距离
remaining = last_height - current_position
# 动态调整步长
current_step = min(step, remaining) if remaining > 0 else 0
if current_step <= 0:
break
# 执行分步滚动
driver.execute_script(f"window.scrollBy(0, {current_step})")
current_position += current_step
# 等待滚动和内容加载
time.sleep(interval * (current_step / step)) # 动态间隔
# 检查新高度
new_height = driver.execute_script(
"return document.documentElement.scrollHeight"
)
# 更新高度(处理动态加载)
if new_height > last_height:
last_height = new_height
def get_driver(self, url, xpath_txt, is_turn=False):
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--disable-gpu')
options.page_load_strategy = "none"
driver = webdriver.Chrome(options=options)
driver.implicitly_wait(10)
driver.maximize_window()
flag = True
while flag:
flag = False
try:
print('正在获取', url, '的页面数据')
driver.get(url)
if is_turn:
sleep(1)
self.simulated_smooth_scroll(driver)
driver.find_element('xpath', xpath_txt)
except:
flag = True
print(url, '没定位到,重新请求...')
# self.writer_to_file(driver.page_source, 'w', 'utf-8')
return driver
def driver_continue(self, driver, url, xpath_txt, is_turn=False):
flag = True
while flag:
flag = False
try:
print('正在获取', url, '的页面数据')
driver.get(url)
if is_turn:
sleep(1)
self.simulated_smooth_scroll(driver)
driver.find_element('xpath', xpath_txt)
except:
flag = True
print(url, '没定位到,重新请求...')
# self.writer_to_file(driver.page_source, 'w', 'utf-8')
def get_page_html(self, url, xpath_txt, is_turn=False):
driver = self.get_driver(url, xpath_txt, is_turn=is_turn)
page_source = driver.page_source
driver.close()
return etree.HTML(page_source)
def writer_to_file(self, data, mode, encoding=None):
if 'b' in encoding:
open('./text.html',mode).write(data)
else:
open('./text.html', mode, encoding=encoding).write(data)
print('写入文件成功!')
def random_sleep(self):
sleep(random.random() + 0.5)
def save_csv(self, data):
self.writer.writerow({
'Handle': data['Handle'],
'Title': data['Title'],
'Body (HTML)': data['Body (HTML)'],
'Vendor': data['Vendor'],
'Type': data['Type'],
'Tags': data['Tags'],
'Published': data['Published'],
'Option1 Name': data['Option1 Name'],
'Option1 Value': data['Option1 Value'],
'Option2 Name': data['Option2 Name'],
'Option2 Value': data['Option2 Value'],
'Option3 Name': data['Option3 Name'],
'Option3 Value': data['Option3 Value'],
'Variant SKU': data['Variant SKU'],
'Variant Grams': data['Variant Grams'],
'Variant Inventory Tracker': data['Variant Inventory Tracker'],
'Variant Inventory Qty': data['Variant Inventory Qty'],
'Variant Inventory Policy': data['Variant Inventory Policy'],
'Variant Fulfillment Service': data['Variant Fulfillment Service'],
'Variant Price': data['Variant Price'],
'Variant Compare At Price': data['Variant Compare At Price'],
'Variant Requires Shipping': data['Variant Requires Shipping'],
'Variant Taxable': data['Variant Taxable'],
'Variant Barcode': data['Variant Barcode'],
'Image Src': data['Image Src'],
'Image Position': data['Image Position'],
'Image Alt Text': data['Image Alt Text'],
'Gift Card': data['Gift Card'],
'SEO Title': data['SEO Title'],
'SEO Description': data['SEO Description'],
'Variant Image': data['Variant Image'],
'Status': data['Status'],
'Collection': data['Collection']
})
def get_json(self, url, Referer):
self.headers['Referer'] = Referer
res = requests.get(url, headers=self.headers, verify=False).json()
self.random_sleep()
return res
def post_json(self, url, Referer, sku):
self.headers['Referer'] = Referer
data = {
"variables": {
"skus": [
sku
],
"countryCode": "us",
"pageSize": 1
},
"query": "query ($skus: [String], $countryCode: String!, $pageSize: Int!) {\n products(\n filter: {sku: {in: $skus}}\n pageSize: $pageSize\n countryCode: $countryCode\n ) {\n items {\n sku\n uid\n name\n salable_qty\n shipping_estimation\n price_catalog {\n price\n is_catalog_price\n discount_price\n formatted_discount_price\n currency_code\n formatted_price\n omnibus_price\n formatted_omnibus_price\n __typename\n }\n __typename\n }\n __typename\n }\n}\n"
}
data = json.dumps(data, separators=(',', ':'))
res = requests.post(url, headers=self.headers, data=data, verify=False).json()
self.random_sleep()
return res
def parse(self, url):
data = deepcopy(self.init_data)
data['Type'] = 'SPORTS WATCHES'
data['Collection'] = data['Type']
raw_json = self.get_json(url, 'https://www.suunto.cn/en-us/Product-search/See-all-Sports-Watches/') # 第1个请求
total_product_count = raw_json['TotalVariantCount']
print('产品总数量:', total_product_count)
type_raw_json_list = raw_json['ProductLines']
print('款式数量:', len(type_raw_json_list))
for type_raw_json in type_raw_json_list:
product_json_list = type_raw_json['Products']
print(type_raw_json['Name'], '款式的产品数量:', len(product_json_list))
for product_json in product_json_list:
self.id += 1
print(self.id, '开始')
data['Variant Image'] = 'https://www.suunto.com' + product_json['MainImagePath']
product_id = product_json['PageLinkId']
product_sku = product_json['ReviewSsid']
product_url = 'https://www.suunto.com' + product_json['Url']
product_basic_info_url = 'https://www.suunto.com/api/productbasicinfo/getitem/***/en-US'
product_basic_info_url = product_basic_info_url.replace('***', str(product_id))
product_basic_info_json = self.get_json(product_basic_info_url, product_url) # 第2个请求
product_url = product_basic_info_json['Url']
product_images_url = 'https://www.suunto.com/api/productimages/getitems/***/en-US'
product_images_url = product_images_url.replace('***', str(product_id))
product_images_json = self.get_json(product_images_url, product_url) # 第3个请求
product_price_url = 'https://store.suunto.com/graphql?faction=products&productsSku[]=' + product_sku
product_price_json = self.post_json(product_price_url, 'https://www.suunto.com/', product_sku)['data']['products']['items'][0]['price_catalog'] # 第4个请求
data['Handle'] = product_basic_info_json['PageName'].replace(' ', '-').lower()
data['Title'] = product_basic_info_json['PageName']
data['Body (HTML)'] = product_basic_info_json['Description']
data['Option1 Value'] = product_basic_info_json['ProductVariantName']
data['Variant Price'] = product_price_json['discount_price']
data['Variant Compare At Price'] = product_price_json['price']
product_size_list = product_basic_info_json['ProductSizes']
if None == product_size_list:
data['Option2 Value'] = 'one size'
else:
for product_size in product_size_list:
if product_size['Url'] in product_url:
data['Option2 Value'] = product_size['Width']
break
product_additional_image_json_list = product_images_json['AdditionalImageUrls']
product_gallery_footer_images_json_list = product_images_json['ProductGalleryFooterImages']
product_additional_image_json_len = len(product_additional_image_json_list)
product_gallery_footer_images_json_len = len(product_gallery_footer_images_json_list)
print(data['Title'], '产品的图片数量:', product_additional_image_json_len + product_gallery_footer_images_json_len)
data['Image Src'] = 'https://www.suunto.com' + product_additional_image_json_list[0]['Url']
data['Image Position'] = 1
data['Variant SKU'] = data['Handle'] + str(data['Option2 Value'])
self.save_csv(data)
print(data)
id = 1
for i in range(1, product_additional_image_json_len):
id += 1
temp_data = deepcopy(self.empty_data)
temp_data['Handle'] = data['Handle']
temp_data['Image Src'] = 'https://www.suunto.com' + product_additional_image_json_list[i]['Url']
temp_data['Image Position'] = id
temp_data['Published'] = 'TRUE'
self.save_csv(temp_data)
print(temp_data)
for i in range(product_gallery_footer_images_json_len):
id += 1
temp_data = deepcopy(self.empty_data)
temp_data['Handle'] = data['Handle']
temp_data['Image Src'] = product_gallery_footer_images_json_list[i]['VideoUrl']
if None == temp_data['Image Src']:
temp_data['Image Src'] = 'https://www.suunto.com' + product_gallery_footer_images_json_list[i]['DesktopBackground']
temp_data['Image Position'] = id
temp_data['Published'] = 'TRUE'
self.save_csv(temp_data)
print(temp_data)
print(self.id, '结束')
if self.id == total_product_count:
print('全部采集完成!')
else:
print('漏掉了', total_product_count - self.id, '个产品')
def run(self):
self.file = open('suunto.csv', 'w', newline='', encoding='utf-8-sig')
self.writer = csv.DictWriter(self.file, fieldnames=self.field_names)
self.writer.writeheader()
logging.captureWarnings(True)
self.parse(self.products_url)
self.file.close()
if __name__ == '__main__':
suunto = Suunto()
suunto.run()