🦊 https://platinumshoecare.com/collections/nike _ 1
import random, csv, requests, logging, time, json, re
from selenium import webdriver
from copy import deepcopy
from time import sleep
from lxml import etree
class Platinumshoecare:
def __init__(self):
self.url = 'https://platinumshoecare.com/collections/nike?page=1'
self.headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"pragma": "no-cache",
"priority": "u=0, i",
# "referer": "https://platinumshoecare.com/collections/nike",
"sec-ch-ua": "\"Google Chrome\";v=\"135\", \"Not-A.Brand\";v=\"8\", \"Chromium\";v=\"135\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36"
}
self.id = 0 # 保存到csv文件不用id字段
self.init_data = {
'Handle': '',
'Title': '',
'Body (HTML)': '',
'Vendor': '',
'Type': '',
'Tags': '',
'Published': 'TRUE',
'Option1 Name': 'Color',
'Option1 Value': '',
'Option2 Name': 'Size',
'Option2 Value': '',
'Option3 Name': '',
'Option3 Value': '',
'Variant SKU': '',
'Variant Grams': '',
'Variant Inventory Tracker': 'Shopify',
'Variant Inventory Qty': '99999',
'Variant Inventory Policy': '',
'Variant Fulfillment Service': '',
'Variant Price': '',
'Variant Compare At Price': '',
'Variant Requires Shipping': '',
'Variant Taxable': '',
'Variant Barcode': '',
'Image Src': '',
'Image Position': '',
'Image Alt Text': '',
'Gift Card': '',
'SEO Title': '',
'SEO Description': '',
'Variant Image': '',
'Status': '',
'Collection': '',
}
self.empty_data = {
'Handle': '',
'Title': '',
'Body (HTML)': '',
'Vendor': '',
'Type': '',
'Tags': '',
'Published': '',
'Option1 Name': '',
'Option1 Value': '',
'Option2 Name': '',
'Option2 Value': '',
'Option3 Name': '',
'Option3 Value': '',
'Variant SKU': '',
'Variant Grams': '',
'Variant Inventory Tracker': '',
'Variant Inventory Qty': '',
'Variant Inventory Policy': '',
'Variant Fulfillment Service': '',
'Variant Price': '',
'Variant Compare At Price': '',
'Variant Requires Shipping': '',
'Variant Taxable': '',
'Variant Barcode': '',
'Image Src': '',
'Image Position': '',
'Image Alt Text': '',
'Gift Card': '',
'SEO Title': '',
'SEO Description': '',
'Variant Image': '',
'Status': '',
'Collection': '',
}
self.field_names = ['Handle', 'Title', 'Body (HTML)', 'Vendor', 'Type', 'Tags', 'Published', 'Option1 Name',
'Option1 Value', 'Option2 Name', 'Option2 Value', 'Option3 Name', 'Option3 Value',
'Variant SKU', 'Variant Grams', 'Variant Inventory Tracker', 'Variant Inventory Qty',
'Variant Inventory Policy', 'Variant Fulfillment Service', 'Variant Price',
'Variant Compare At Price', 'Variant Requires Shipping', 'Variant Taxable',
'Variant Barcode', 'Image Src', 'Image Position', 'Image Alt Text', 'Gift Card',
'SEO Title', 'SEO Description', 'Variant Image', 'Status', 'Collection']
self.file = None
self.writer = None
def simulated_smooth_scroll(self, driver, step=1000, interval=0.5, timeout=30):
# 平滑移动到底部
start_time = time.time()
last_height = driver.execute_script("return document.documentElement.scrollHeight")
current_position = 0
while time.time() - start_time < timeout:
# 计算剩余滚动距离
remaining = last_height - current_position
# 动态调整步长
current_step = min(step, remaining) if remaining > 0 else 0
if current_step <= 0:
break
# 执行分步滚动
driver.execute_script(f"window.scrollBy(0, {current_step})")
current_position += current_step
# 等待滚动和内容加载
time.sleep(interval * (current_step / step)) # 动态间隔
# 检查新高度
new_height = driver.execute_script(
"return document.documentElement.scrollHeight"
)
# 更新高度(处理动态加载)
if new_height > last_height:
last_height = new_height
def get_driver(self, url, xpath_txt=None, is_turn=False):
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--disable-gpu')
options.page_load_strategy = "none"
driver = webdriver.Chrome(options=options)
driver.implicitly_wait(10)
driver.maximize_window()
flag = True
while flag:
flag = False
try:
print('正在获取', url, '的页面数据')
driver.get(url)
if is_turn:
sleep(1)
self.simulated_smooth_scroll(driver)
if xpath_txt:
driver.find_element('xpath', xpath_txt)
else:
self.random_sleep(5)
except:
flag = True
print(url, '没定位到,重新请求...')
# self.writer_to_file(driver.page_source, 'w', 'utf-8')
return driver
def driver_continue(self, driver, url, xpath_txt=None, is_turn=False):
flag = True
while flag:
flag = False
try:
print('正在获取', url, '的页面数据')
driver.get(url)
if is_turn:
self.random_sleep()
self.simulated_smooth_scroll(driver)
driver.find_element('xpath', xpath_txt)
except:
flag = True
print(url, '没定位到,重新请求...')
# self.writer_to_file(driver.page_source, 'w', 'utf-8')
def get_page_html(self, url, xpath_txt=None, is_turn=False):
driver = self.get_driver(url, xpath_txt, is_turn=is_turn)
page_source = driver.page_source
driver.close()
return etree.HTML(page_source)
def writer_to_file(self, data, mode, encoding=None):
if 'b' in encoding:
open('./text.html', mode).write(data)
else:
open('./text.html', mode, encoding=encoding).write(data)
print('写入文件成功!')
def driver_click(self, driver, timeout=2):
driver.click()
self.random_sleep(timeout)
def driver_back(self, driver, timeout=2):
driver.back()
self.random_sleep(timeout)
def driver_refresh(self, driver, timeout=2):
driver.refresh()
self.random_sleep(timeout)
def random_sleep(self, timeout=1):
sleep(random.random() + timeout)
def save_csv(self, data):
self.writer.writerow({
'Handle': data['Handle'],
'Title': data['Title'],
'Body (HTML)': data['Body (HTML)'],
'Vendor': data['Vendor'],
'Type': data['Type'],
'Tags': data['Tags'],
'Published': data['Published'],
'Option1 Name': data['Option1 Name'],
'Option1 Value': data['Option1 Value'],
'Option2 Name': data['Option2 Name'],
'Option2 Value': data['Option2 Value'],
'Option3 Name': data['Option3 Name'],
'Option3 Value': data['Option3 Value'],
'Variant SKU': data['Variant SKU'],
'Variant Grams': data['Variant Grams'],
'Variant Inventory Tracker': data['Variant Inventory Tracker'],
'Variant Inventory Qty': data['Variant Inventory Qty'],
'Variant Inventory Policy': data['Variant Inventory Policy'],
'Variant Fulfillment Service': data['Variant Fulfillment Service'],
'Variant Price': data['Variant Price'],
'Variant Compare At Price': data['Variant Compare At Price'],
'Variant Requires Shipping': data['Variant Requires Shipping'],
'Variant Taxable': data['Variant Taxable'],
'Variant Barcode': data['Variant Barcode'],
'Image Src': data['Image Src'],
'Image Position': data['Image Position'],
'Image Alt Text': data['Image Alt Text'],
'Gift Card': data['Gift Card'],
'SEO Title': data['SEO Title'],
'SEO Description': data['SEO Description'],
'Variant Image': data['Variant Image'],
'Status': data['Status'],
'Collection': data['Collection']
})
def get_response(self, url):
print('正在获取', url, '的数据')
response = None
flag = True
while flag:
flag = False
try:
response = requests.get(url, headers=self.headers)
except:
flag = True
print('没有请求到,重新请求')
self.random_sleep()
self.random_sleep()
return response
def get_html(self, url):
response = self.get_response(url)
return etree.HTML(response.text)
def parse(self, url):
data = deepcopy(self.init_data)
data['Type'] ='Nike'
data['Collection'] = data['Type']
next_url = url
while True:
index_html = self.get_html(next_url) # 第1个请求
product_url_list = index_html.xpath('//a[@class="grid-product__link"]/@href')
for product_url in product_url_list:
self.id += 1
print(self.id, '开始')
product_url = 'https://platinumshoecare.com' + product_url
product_html = self.get_html(product_url) # 第2个请求
data['Handle'] = product_url.split('/')[-1]
data['Title'] = product_html.xpath('//h1[@class="h2 product-single__title"]/text()')[0]
data['Body (HTML)'] = etree.tostring(product_html.xpath('//div[@class="rte"]')[0], pretty_print=True, method='html').decode('utf-8')
data['Variant Price'] = product_html.xpath('//span[@class="product__price"]/text()')[0].strip().strip('$')
product_size_list = product_html.xpath('//div[@class="variant-input-wrap"]/select/option/text()')
print('尺寸的个数为:', len(product_size_list))
for product_size in product_size_list:
match = re.search(r'\bUK\s+(\d+\.?\d*)', product_size)
if match:
temp_product_size = match.group(1)
else:
raise Exception('没解析到尺寸')
data['Option2 Value'] = str(float(temp_product_size) + 1).replace('.0', '')
product_img_url_list = product_html.xpath('//div[@class="product__thumb-item"]/a/@href')
product_img_url_len = len(product_img_url_list)
print('图片个数为:', product_img_url_len)
data['Image Src'] = 'https:' + product_img_url_list[0]
data['Image Position'] = 1
data['Variant Image'] = data['Image Src']
self.save_csv(data)
print(data)
for i in range(1, product_img_url_len):
temp_data = deepcopy(self.empty_data)
temp_data['Handle'] = data['Handle']
temp_data['Published'] = 'TRUE'
temp_data['Image Src'] = 'https:' + product_img_url_list[i]
temp_data['Image Position'] = i + 1
self.save_csv(temp_data)
print(temp_data)
print(self.id, '结束')
try:
next_url = 'https://platinumshoecare.com' + index_html.xpath('//span[@class="next"]/a/@href')[0]
except:
break
def run(self):
self.file = open('./platinumshoecare.csv', 'w', newline='', encoding='utf-8-sig')
self.writer = csv.DictWriter(self.file, fieldnames=self.field_names)
self.writer.writeheader()
logging.captureWarnings(True)
self.parse(self.url)
self.file.close()
if __name__ == '__main__':
platinumshoecare = Platinumshoecare()
platinumshoecare.run()