🌏 https://www.stihlusa.com/special-offers/
import random, csv, requests, logging, time, json, re, threading
from selenium import webdriver
from copy import deepcopy
from time import sleep
from lxml import etree
from DrissionPage import Chromium, ChromiumOptions
class Stihlusa:
def __init__(self):
self.url = 'https://www.stihlusa.com/special-offers/'
self.headers = {}
self.id = 0 # 保存到csv文件不用id字段
self.init_data = {
'Handle': '',
'Title': '',
'Body (HTML)': '',
'Vendor': '',
'Type': '',
'Tags': '',
'Published': 'TRUE',
'Option1 Name': 'Option',
'Option1 Value': '',
'Option2 Name': '',
'Option2 Value': '',
'Option3 Name': '',
'Option3 Value': '',
'Variant SKU': '',
'Variant Grams': '',
'Variant Inventory Tracker': 'Shopify',
'Variant Inventory Qty': '99999',
'Variant Inventory Policy': '',
'Variant Fulfillment Service': '',
'Variant Price': '',
'Variant Compare At Price': '',
'Variant Requires Shipping': '',
'Variant Taxable': '',
'Variant Barcode': '',
'Image Src': '',
'Image Position': '',
'Image Alt Text': '',
'Gift Card': '',
'SEO Title': '',
'SEO Description': '',
'Variant Image': '',
'Status': '',
'Collection': '',
}
self.empty_data = {
'Handle': '',
'Title': '',
'Body (HTML)': '',
'Vendor': '',
'Type': '',
'Tags': '',
'Published': '',
'Option1 Name': '',
'Option1 Value': '',
'Option2 Name': '',
'Option2 Value': '',
'Option3 Name': '',
'Option3 Value': '',
'Variant SKU': '',
'Variant Grams': '',
'Variant Inventory Tracker': '',
'Variant Inventory Qty': '',
'Variant Inventory Policy': '',
'Variant Fulfillment Service': '',
'Variant Price': '',
'Variant Compare At Price': '',
'Variant Requires Shipping': '',
'Variant Taxable': '',
'Variant Barcode': '',
'Image Src': '',
'Image Position': '',
'Image Alt Text': '',
'Gift Card': '',
'SEO Title': '',
'SEO Description': '',
'Variant Image': '',
'Status': '',
'Collection': '',
}
self.field_names = ['Handle', 'Title', 'Body (HTML)', 'Vendor', 'Type', 'Tags', 'Published', 'Option1 Name',
'Option1 Value', 'Option2 Name', 'Option2 Value', 'Option3 Name', 'Option3 Value',
'Variant SKU', 'Variant Grams', 'Variant Inventory Tracker', 'Variant Inventory Qty',
'Variant Inventory Policy', 'Variant Fulfillment Service', 'Variant Price',
'Variant Compare At Price', 'Variant Requires Shipping', 'Variant Taxable',
'Variant Barcode', 'Image Src', 'Image Position', 'Image Alt Text', 'Gift Card',
'SEO Title', 'SEO Description', 'Variant Image', 'Status', 'Collection']
self.file = None
self.writer = None
self.browser = None
self.tab = None
self.cnt = 0
self.type = 'Shop STIHL Deals'
def simulated_smooth_scroll(self, driver, step=1000, interval=0.5, timeout=30):
# 平滑移动到底部
start_time = time.time()
last_height = driver.execute_script("return document.documentElement.scrollHeight")
current_position = 0
while time.time() - start_time < timeout:
# 计算剩余滚动距离
remaining = last_height - current_position
# 动态调整步长
current_step = min(step, remaining) if remaining > 0 else 0
if current_step <= 0:
break
# 执行分步滚动
driver.execute_script(f"window.scrollBy(0, {current_step})")
current_position += current_step
# 等待滚动和内容加载
time.sleep(interval * (current_step / step)) # 动态间隔
# 检查新高度
new_height = driver.execute_script(
"return document.documentElement.scrollHeight"
)
# 更新高度(处理动态加载)
if new_height > last_height:
last_height = new_height
def get_driver(self, url, xpath_txt=None, is_turn=False):
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--disable-gpu')
options.page_load_strategy = "none"
driver = webdriver.Chrome(options=options)
driver.implicitly_wait(10)
driver.maximize_window()
while True:
try:
print('正在获取', url, '的页面数据')
driver.get(url)
if is_turn:
sleep(1)
self.simulated_smooth_scroll(driver)
if xpath_txt:
driver.find_element('xpath', xpath_txt)
else:
self.random_sleep(5)
break
except:
print(url, '没定位到,重新请求...')
# self.writer_to_file(driver.page_source, 'w', 'utf-8')
return driver
def driver_continue(self, driver, url, xpath_txt=None, is_turn=False):
flag = True
while flag:
flag = False
try:
print('正在获取', url, '的页面数据')
driver.get(url)
if is_turn:
self.random_sleep()
self.simulated_smooth_scroll(driver)
driver.find_element('xpath', xpath_txt)
except:
flag = True
print(url, '没定位到,重新请求...')
# self.writer_to_file(driver.page_source, 'w', 'utf-8')
def get_page_html(self, url, xpath_txt=None, is_turn=False):
driver = self.get_driver(url, xpath_txt, is_turn=is_turn)
page_source = driver.page_source
driver.close()
return etree.HTML(page_source)
def writer_to_file(self, data, mode, encoding=None):
if 'b' in encoding:
open('./text.html', mode).write(data)
else:
open('./text.html', mode, encoding=encoding).write(data)
print('写入文件成功!')
def driver_click(self, driver, timeout=2):
driver.click()
self.random_sleep(timeout)
def driver_back(self, driver, timeout=2):
driver.back()
self.random_sleep(timeout)
def driver_refresh(self, driver, timeout=2):
driver.refresh()
self.random_sleep(timeout)
def tab_wait(self, tab, timeout=3):
tab.wait(timeout)
return tab
def get_dp_html(self, tab, url, xpath_txt='html', is_turn=False):
tab = self.tab_get(tab, url, xpath_txt=xpath_txt)
res = etree.HTML(tab.html)
return res
def random_sleep(self, timeout=2):
sleep(random.random() + timeout)
def save_csv(self, data):
self.writer.writerow({
'Handle': data['Handle'],
'Title': data['Title'],
'Body (HTML)': data['Body (HTML)'],
'Vendor': data['Vendor'],
'Type': data['Type'],
'Tags': data['Tags'],
'Published': data['Published'],
'Option1 Name': data['Option1 Name'],
'Option1 Value': data['Option1 Value'],
'Option2 Name': data['Option2 Name'],
'Option2 Value': data['Option2 Value'],
'Option3 Name': data['Option3 Name'],
'Option3 Value': data['Option3 Value'],
'Variant SKU': data['Variant SKU'],
'Variant Grams': data['Variant Grams'],
'Variant Inventory Tracker': data['Variant Inventory Tracker'],
'Variant Inventory Qty': data['Variant Inventory Qty'],
'Variant Inventory Policy': data['Variant Inventory Policy'],
'Variant Fulfillment Service': data['Variant Fulfillment Service'],
'Variant Price': data['Variant Price'],
'Variant Compare At Price': data['Variant Compare At Price'],
'Variant Requires Shipping': data['Variant Requires Shipping'],
'Variant Taxable': data['Variant Taxable'],
'Variant Barcode': data['Variant Barcode'],
'Image Src': data['Image Src'],
'Image Position': data['Image Position'],
'Image Alt Text': data['Image Alt Text'],
'Gift Card': data['Gift Card'],
'SEO Title': data['SEO Title'],
'SEO Description': data['SEO Description'],
'Variant Image': data['Variant Image'],
'Status': data['Status'],
'Collection': data['Collection']
})
def get_response(self, url):
print('正在获取', url, '的数据')
while True:
try:
response = requests.get(url, headers=self.headers)
break
except:
print('没有请求到,重新请求')
self.random_sleep()
self.random_sleep()
return response
def get_html(self, url):
response = self.get_response(url)
return etree.HTML(response.text)
def tab_run_js(self, tab, js_code, timeout=2):
while True:
try:
tab.run_js(js_code)
break
except Exception as e:
print('捕获tab_run_js方法的run_js:', e)
tab.wait(timeout)
tab.wait(timeout)
def ele_click(self, tab, ele, timeout=2):
while True:
try:
tab.actions.click(ele)
break
except Exception as e:
print('捕获ele_click方法的actions.click:', e)
tab.wait(timeout)
tab.wait(timeout)
def dp_click_ad(self, tab, xpath_txt):
ad_ele = tab.ele(f'x:{xpath_txt}', timeout=2)
if ad_ele:
print('有广告:', ad_ele)
self.ele_click(tab, ad_ele)
self.tab_wait(tab, 1)
def infinite_scroll(self, tab, timeout=2):
turn_cnt = 0
while True:
tab.scroll.to_bottom()
self.tab_wait(tab, timeout)
self.tab_run_js(self.tab, 'window.scrollBy(0, -2600)')
self.tab_wait(tab, timeout)
next_button_ele = tab.ele('x://button[./text()="Load More"]')
if next_button_ele:
self.ele_click(tab, next_button_ele, timeout)
else:
self.tab_wait(tab, 5)
break
turn_cnt += 1
print(f'翻页了{turn_cnt}次')
self.tab_wait(tab, 5)
def move_about(self, tab): # 乱动 应该要实现异步 不知道dp能多线程操作tab吗,可以
for cnt in range(random.randint(20, 30)):
row = random.randint(0, 1500)
col = random.randint(0, 1500)
tab.actions.move(row, col)
print(f'鼠标移动到了 ({row}, {col})')
def tab_get(self, tab, url, xpath_txt='html', backup_xpath_txt='html', timeout=3, ip_timeout=30):
print('正在获取', url, '的数据')
tab = self.browser.latest_tab
while True:
t = threading.Thread(target=self.move_about, args=(tab,))
t.start()
tab.get(url)
t.join()
self.tab_wait(tab, timeout)
ele = tab.ele(f'x:{xpath_txt}')
t_ele = tab.ele(f'x:{backup_xpath_txt}')
if ele:
print('第1个xpath找到的')
break
elif t_ele:
t_ele = tab.ele('x://h1[contains(@class, "font-title")]/span')
if t_ele and ('Error' in t_ele.text):
while True:
tt_ele = tab.ele('x://h1[contains(@class, "font-title")]/span')
if not tt_ele or ('Error' not in tt_ele.text):
break
print(f'还没解开人机验证等, {ip_timeout} 秒后尝试')
tab.wait(ip_timeout)
break
if backup_xpath_txt != 'html' and t_ele:
print('第2个xpath找到的')
break
print('没有请求到元素,重新请求中')
return tab
def get_product_img_url_list(self, tab, data):
tab.wait(5)
self.dp_click_ad(tab, '//button[@class="css-2vqtum"]')
data['Option1 Value'] = tab.ele('x://p[contains(@class, "color-name")]').text
product_color_img_url_list = []
img_temp_ele = tab.ele('x://li[contains(@class, "is-prev")]')
if img_temp_ele:
product_color_img_url_cnt = int(img_temp_ele.attr('data-slide-index'))
else:
product_color_img_url_cnt = len(tab.eles('x://div[@class="css-1161qt5"]/div')) - 1
print(data['Option1 Value'], '色的图片个数应为:', product_color_img_url_cnt)
tab.wait(6)
raw_product_color_img_ele_list = tab.eles('x://li[contains(@class, "splide__slide")]/div | //li[contains(@class, "splide__slide")]//video/source | //div[@class="css-1161qt5"]/div[@class="css-113el1s"]/div')
for raw_product_color_img_ele in raw_product_color_img_ele_list:
raw_product_color_img_url = raw_product_color_img_ele.attr('src')
if not raw_product_color_img_url:
continue
raw_product_color_img_url = raw_product_color_img_url.split('?')[0]
if raw_product_color_img_url in product_color_img_url_list:
continue
product_color_img_url_list.append(raw_product_color_img_url)
print(data['Option1 Value'], '色获取到的图片个数为:', len(product_color_img_url_list))
return product_color_img_url_list
def init_tab(self):
co = ChromiumOptions()
co.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe')
co.auto_port()
# co.headless()
# co.set_argument('--no-sandbox')
self.browser = Chromium(co)
self.tab = self.browser.latest_tab
self.tab.set.window.max()
def handle_img_url(self, raw_img_url):
return raw_img_url
def save_all_data(self, product_data_list):
for product_data in product_data_list:
self.cnt += 1
self.save_csv(product_data)
print('第', self.cnt, '行保存完成!')
# 取消翻译
def cancel_translate(self, url):
tab = self.tab_get(self.tab, url, '//div[@class="collection_item overflow-hidden"]')
tab.actions.move_to(tab.ele('x://div[@class="gt-translate-btn-bg"]'))
tab.wait(2)
tab.actions.click(tab.ele('x://div[@class="gt-translate-switch"]'))
tab.wait(2)
def click_accept(self, tab=None, url=None, xpath_txt='html'):
if not tab and url:
tab = self.tab_get(tab, url, xpath_txt=xpath_txt)
temp_ele = tab.ele('x://button[@id="onetrust-reject-all-handler"]')
if temp_ele:
self.ele_click(tab, temp_ele)
def test_product(self, url):
self.file = open('./test.csv', 'w', newline='', encoding='utf-8-sig')
self.writer = csv.DictWriter(self.file, fieldnames=self.field_names)
self.writer.writeheader()
logging.captureWarnings(True)
self.init_tab()
self.click_accept(url=url)
self.id += 1
print(self.id, '开始')
product_data_list = self.product_detail_parse(url, self.type)
self.save_all_data(product_data_list)
print(self.id, '结束')
if self.file:
self.file.close()
if self.browser:
self.browser.quit()
def product_detail_parse(self, url, _type):
_type = _type.replace('-', ' ').title()
product_data_list = []
sku_id = 0
data = deepcopy(self.init_data)
data['Type'] = _type
data['Collection'] = data['Type']
tab = self.tab_get(self.tab, url, '//div[@class="drawer-body"]/div/div[@class="h-full"]') # 第2个请求
data['Title'] = tab.ele('x://h1[contains(@class, "font-title")]').text.strip()
data['Handle'] = (_type + '-' + data['Title']).replace(' ', '-').lower()
body_ele_1 = tab.ele('x://div[contains(@class, "relative")]//ul[contains(@class, "list-disc")]')
body_ele_2 = tab.ele('x://div[@class="lg:px-4"]//div[contains(@class, "border-r")]')
if body_ele_1:
data['Body (HTML)'] = body_ele_1.html
elif body_ele_2:
data['Body (HTML)'] = body_ele_2.html
else:
data['Body (HTML)'] = ''
# ------------------------------------------------
# 获取 选项文本、对应的价格 和 选项的数量
product_option_txt_list = []
product_option_txt_filter_list = []
product_option_price_list = []
product_option_compare_price_list = []
product_option_txt_ele_list = tab.eles('x://div[@class="drawer-body"]/div/div[@class="h-full"]//div[contains(@class, "font-title")]')
for product_option_txt_ele in product_option_txt_ele_list:
product_option_txt = product_option_txt_ele.ele('x:./div[@data-test="variant-name"]').text.strip()
if product_option_txt in product_option_txt_filter_list:
continue
product_option_txt_filter_list.append(product_option_txt)
product_option_txt_list.append(product_option_txt)
product_option_price = 0
product_option_compare_price = 0
price_ele = product_option_txt_ele.ele('x:.//span[@data-test="price"]')
if price_ele: # 有价格
product_option_price = price_ele.text.replace('$', '').replace(',', '').strip()
compare_price_ele = product_option_txt_ele.ele('x:.//div[contains(@class, "text-black")]/span[contains(@class, "text-sm")]')
if compare_price_ele:
product_option_compare_price = compare_price_ele.text.replace('$', '').replace(',', '').strip()
else:
product_option_compare_price = ''
else: # 没有价格
product_option_price = '999999'
product_option_compare_price = ''
product_option_price_list.append(product_option_price)
product_option_compare_price_list.append(product_option_compare_price)
product_option_txt_len = len(product_option_txt_list)
# ------------------------------------------------
# ------------------------------------------------
# 获取 图片url 和 图片的数量
product_img_url_list = []
product_img_url_ele_list = tab.eles('x://div[contains(@class, "swiper-thumbs")]/div[@class="swiper-wrapper"]/div')
for product_img_url_ele in product_img_url_ele_list:
if 'video' in product_img_url_ele.attr('class'):
continue
product_img_url = product_img_url_ele.ele('x:./img').attr('src').split('?')[0]
product_img_url_list.append(product_img_url)
product_img_url_len = len(product_img_url_list)
# ------------------------------------------------
for product_option_txt_id in range(product_option_txt_len):
_data = deepcopy(data)
sku_id += 1
_data['Option1 Value'] = product_option_txt_list[product_option_txt_id]
_data['Variant SKU'] = (_type + '-' + _data['Title'] + '-' + _data['Option1 Value'] +'-' + str(sku_id)).replace(' ', '-').lower()
_data['Variant Price'] = product_option_price_list[product_option_txt_id]
_data['Variant Compare At Price'] = product_option_compare_price_list[product_option_txt_id]
_data['Image Src'] = product_img_url_list[0]
_data['Image Position'] = 1
_data['Variant Image'] = _data['Image Src']
print(f'第{self.id}个产品 {url} 的第{sku_id}个sku的选项个数为{product_option_txt_len},图片个数为:{product_img_url_len}')
product_data_list.append(_data)
print(_data)
for i in range(1, product_img_url_len):
temp_data = deepcopy(self.empty_data)
temp_data['Handle'] = _data['Handle']
temp_data['Published'] = 'TRUE'
temp_data['Image Src'] = product_img_url_list[i]
temp_data['Image Position'] = i + 1
product_data_list.append(temp_data)
print(temp_data)
return product_data_list
def parse(self, url):
tab = self.tab_get(self.tab, url, '//div[@data-test="producttilelist-list"]/a') # 第1个请求
self.click_accept(tab)
product_total_num = int(tab.ele('x://div[@class="ais-Stats"]//span[@class="font-bold"]').text.strip())
print(f'产品的总数量为:{product_total_num}')
# -------------------------------------------------------
# 获取所有产品的url
product_url_list = []
product_url_ele_list = []
product_url_ele_len = 0
while True:
self.infinite_scroll(tab)
product_url_ele_list = tab.eles('x://div[@data-test="producttilelist-list"]/a')
product_url_ele_len = len(product_url_ele_list)
if product_url_ele_len == product_total_num:
break
print('没有获取到全部的产品url,继续翻页')
for product_url_ele in product_url_ele_list:
product_url = product_url_ele.attr('href')
if 'https' not in product_url:
product_url = 'https://www.stihlusa.com' + product_url
product_url_list.append(product_url)
# -------------------------------------------------------
for product_url in product_url_list:
self.id += 1
with open('./filter.txt', 'r', encoding='utf-8') as f:
filter_txt = f.read()
if product_url.split('?')[0] in filter_txt:
print(self.id, '已完成')
continue
print(f'第{self.id}个产品开始')
product_data_list = self.product_detail_parse(product_url, self.type)
self.save_all_data(product_data_list)
print(f'第{self.id}个产品结束')
with open('./filter.txt', 'a', encoding='utf-8') as f:
f.write(product_url.split('?')[0] + '\n')
def run(self, is_continue=False):
if is_continue:
self.file = open('./stihlusa.csv', 'a', newline='', encoding='utf-8-sig')
self.writer = csv.DictWriter(self.file, fieldnames=self.field_names)
else:
self.file = open('./stihlusa.csv', 'w', newline='', encoding='utf-8-sig')
self.writer = csv.DictWriter(self.file, fieldnames=self.field_names)
self.writer.writeheader()
with open('./filter.txt', 'w', encoding='utf-8') as f:
f.write('')
self.init_tab()
logging.captureWarnings(True)
self.parse(self.url)
if self.file:
self.file.close()
if self.browser:
self.browser.quit()
if __name__ == '__main__':
stihlusa = Stihlusa()
stihlusa.run()
url1 = 'https://www.stihlusa.com/products/lawn-mowers/zero-turn-mowers/rz560/?aqid=692600ba1f9c000f3d4121842717e916' # 没有价格
url2 = 'https://www.stihlusa.com/products/trimmers-and-brushcutters/battery-trimmers/fsa120/?aqid=7057359af849e3567373a029189e50e4' # 多选项,有打折(打折和图片有问题)
url3 = 'https://www.stihlusa.com/products/lawn-mowers/push-mowers/rma765v/?aqid=7057359af849e3567373a029189e50e4' # 单选项,没有打折价格
url4 = 'https://www.stihlusa.com/products/lawn-mowers/push-mowers/rma460/?aqid=dd59fcad2241e6e2b09e21e2876a09b2' # 多选项,有打折,有个选项不可选
url5 = 'https://www.stihlusa.com/products/chain-saws/battery-saws/msa300co/?aqid=7659379d482513412ed82215aac08a84' # 多选项,有选项是同一个名字,要过滤
# stihlusa.test_product(url1)
# https://www.stihlusa.com/special-offers/
# 选项最多只会改变价格,不会改变图片
# 改url
# 改文件名
# 改类型
# 改init_data的字段名