搜索结果

×

搜索结果将在这里显示。

🚑 https://www.coach.com/shop/new/womens-new-arrivals/view-all

import random, csv, requests, logging, time, json, re
from selenium import webdriver
from copy import deepcopy
from time import sleep
from lxml import etree
from DrissionPage import Chromium, ChromiumOptions
from DrissionPage.common import Keys

# 挂掉之后,main函数的run传一个参数True
# 要考虑图片数量可能达不到需要点击、加载等
# 一开始滚动的时候手动点掉广告

class Coach:
    def __init__(self):
        self.url = 'https://www.coach.com/shop/new/womens-new-arrivals/view-all'
        self.headers = {}

        self.id = 0  # 保存到csv文件不用id字段
        self.init_data = {
            'Handle': '',
            'Title': '',
            'Body (HTML)': '',
            'Vendor': '',
            'Type': '',
            'Tags': '',
            'Published': 'TRUE',
            'Option1 Name': 'Color',
            'Option1 Value': '',
            'Option2 Name': 'Size',
            'Option2 Value': '',
            'Option3 Name': '',
            'Option3 Value': '',
            'Variant SKU': '',
            'Variant Grams': '',
            'Variant Inventory Tracker': 'Shopify',
            'Variant Inventory Qty': '99999',
            'Variant Inventory Policy': '',
            'Variant Fulfillment Service': '',
            'Variant Price': '',
            'Variant Compare At Price': '',
            'Variant Requires Shipping': '',
            'Variant Taxable': '',
            'Variant Barcode': '',
            'Image Src': '',
            'Image Position': '',
            'Image Alt Text': '',
            'Gift Card': '',
            'SEO Title': '',
            'SEO Description': '',
            'Variant Image': '',
            'Status': '',
            'Collection': '',
        }
        self.empty_data = {
            'Handle': '',
            'Title': '',
            'Body (HTML)': '',
            'Vendor': '',
            'Type': '',
            'Tags': '',
            'Published': '',
            'Option1 Name': '',
            'Option1 Value': '',
            'Option2 Name': '',
            'Option2 Value': '',
            'Option3 Name': '',
            'Option3 Value': '',
            'Variant SKU': '',
            'Variant Grams': '',
            'Variant Inventory Tracker': '',
            'Variant Inventory Qty': '',
            'Variant Inventory Policy': '',
            'Variant Fulfillment Service': '',
            'Variant Price': '',
            'Variant Compare At Price': '',
            'Variant Requires Shipping': '',
            'Variant Taxable': '',
            'Variant Barcode': '',
            'Image Src': '',
            'Image Position': '',
            'Image Alt Text': '',
            'Gift Card': '',
            'SEO Title': '',
            'SEO Description': '',
            'Variant Image': '',
            'Status': '',
            'Collection': '',
        }
        self.field_names = ['Handle', 'Title', 'Body (HTML)', 'Vendor', 'Type', 'Tags', 'Published', 'Option1 Name',
                            'Option1 Value', 'Option2 Name', 'Option2 Value', 'Option3 Name', 'Option3 Value',
                            'Variant SKU', 'Variant Grams', 'Variant Inventory Tracker', 'Variant Inventory Qty',
                            'Variant Inventory Policy', 'Variant Fulfillment Service', 'Variant Price',
                            'Variant Compare At Price', 'Variant Requires Shipping', 'Variant Taxable',
                            'Variant Barcode', 'Image Src', 'Image Position', 'Image Alt Text', 'Gift Card',
                            'SEO Title', 'SEO Description', 'Variant Image', 'Status', 'Collection']
        self.file = None
        self.writer = None

        self.browser = None
        self.tab = None

        self.cnt = 0

    def simulated_smooth_scroll(self, driver, step=1000, interval=0.5, timeout=30):
        # 平滑移动到底部

        start_time = time.time()
        last_height = driver.execute_script("return document.documentElement.scrollHeight")
        current_position = 0

        while time.time() - start_time < timeout:
            # 计算剩余滚动距离
            remaining = last_height - current_position

            # 动态调整步长
            current_step = min(step, remaining) if remaining > 0 else 0

            if current_step <= 0:
                break

            # 执行分步滚动
            driver.execute_script(f"window.scrollBy(0, {current_step})")
            current_position += current_step

            # 等待滚动和内容加载
            time.sleep(interval * (current_step / step))  # 动态间隔

            # 检查新高度
            new_height = driver.execute_script(
                "return document.documentElement.scrollHeight"
            )

            # 更新高度(处理动态加载)
            if new_height > last_height:
                last_height = new_height

    def get_driver(self, url, xpath_txt=None, is_turn=False):
        options = webdriver.ChromeOptions()
        options.add_argument('--headless')
        options.add_argument('--disable-gpu')
        options.page_load_strategy = "none"

        driver = webdriver.Chrome(options=options)

        driver.implicitly_wait(10)
        driver.maximize_window()

        while True:
            try:
                print('正在获取', url, '的页面数据')
                driver.get(url)

                if is_turn:
                    sleep(1)
                    self.simulated_smooth_scroll(driver)

                if xpath_txt:
                    driver.find_element('xpath', xpath_txt)
                else:
                    self.random_sleep(5)

                break

            except:
                print(url, '没定位到,重新请求...')

        # self.writer_to_file(driver.page_source, 'w', 'utf-8')

        return driver

    def driver_continue(self, driver, url, xpath_txt=None, is_turn=False):
        flag = True
        while flag:
            flag = False
            try:
                print('正在获取', url, '的页面数据')
                driver.get(url)

                if is_turn:
                    self.random_sleep()
                    self.simulated_smooth_scroll(driver)

                driver.find_element('xpath', xpath_txt)

            except:
                flag = True
                print(url, '没定位到,重新请求...')

        # self.writer_to_file(driver.page_source, 'w', 'utf-8')

    def get_page_html(self, url, xpath_txt=None, is_turn=False):
        driver = self.get_driver(url, xpath_txt, is_turn=is_turn)

        page_source = driver.page_source

        driver.close()

        return etree.HTML(page_source)

    def writer_to_file(self, data, mode, encoding=None):
        if 'b' in encoding:
            open('./text.html',mode).write(data)
        else:
            open('./text.html', mode, encoding=encoding).write(data)

        print('写入文件成功!')

    def driver_click(self, driver, timeout=2):
        driver.click()
        self.random_sleep(timeout)

    def driver_back(self, driver, timeout=2):
        driver.back()
        self.random_sleep(timeout)

    def driver_refresh(self, driver, timeout=2):
        driver.refresh()
        self.random_sleep(timeout)

    def tab_wait(self, tab, timeout=3):
        tab.wait(timeout)
        return tab

    def get_dp_html(self, tab, url, xpath_txt='html', is_turn=False):
        print('正在获取', url, '的数据')

        while True:
            tab.get(url)
            self.tab_wait(tab)

            if is_turn:
                tab.scroll.to_bottom()
                self.tab_wait(tab)

            ele = tab.ele(f'x:{xpath_txt}')
            if ele:
                break

            print('没有请求到元素,重新请求中')
            tab.wait(2)

        res = etree.HTML(tab.html)

        return res

    def random_sleep(self, timeout=2):
        sleep(random.random() + timeout)

    def save_csv(self, data):
        self.writer.writerow({
            'Handle': data['Handle'],
            'Title': data['Title'],
            'Body (HTML)': data['Body (HTML)'],
            'Vendor': data['Vendor'],
            'Type': data['Type'],
            'Tags': data['Tags'],
            'Published': data['Published'],
            'Option1 Name': data['Option1 Name'],
            'Option1 Value': data['Option1 Value'],
            'Option2 Name': data['Option2 Name'],
            'Option2 Value': data['Option2 Value'],
            'Option3 Name': data['Option3 Name'],
            'Option3 Value': data['Option3 Value'],
            'Variant SKU': data['Variant SKU'],
            'Variant Grams': data['Variant Grams'],
            'Variant Inventory Tracker': data['Variant Inventory Tracker'],
            'Variant Inventory Qty': data['Variant Inventory Qty'],
            'Variant Inventory Policy': data['Variant Inventory Policy'],
            'Variant Fulfillment Service': data['Variant Fulfillment Service'],
            'Variant Price': data['Variant Price'],
            'Variant Compare At Price': data['Variant Compare At Price'],
            'Variant Requires Shipping': data['Variant Requires Shipping'],
            'Variant Taxable': data['Variant Taxable'],
            'Variant Barcode': data['Variant Barcode'],
            'Image Src': data['Image Src'],
            'Image Position': data['Image Position'],
            'Image Alt Text': data['Image Alt Text'],
            'Gift Card': data['Gift Card'],
            'SEO Title': data['SEO Title'],
            'SEO Description': data['SEO Description'],
            'Variant Image': data['Variant Image'],
            'Status': data['Status'],
            'Collection': data['Collection']
        })

    def get_response(self, url):
        print('正在获取', url, '的数据')

        while True:
            try:
                response = requests.get(url, headers=self.headers)
                break
            except:
                print('没有请求到,重新请求')
                self.random_sleep()

        self.random_sleep()

        return response

    def get_html(self, url):
        response = self.get_response(url)
        return etree.HTML(response.text)

    def tab_run_js(self, tab, js_code):
        while True:
            try:
                tab.run_js(js_code)
                break
            except Exception as e:
print('捕获tab_run_js方法的run_js:', e)
                tab.wait(1)

    def ele_click(self, tab, ele):
        while True:
            try:
                tab.actions.click(ele)
                break
            except Exception as e:
                print('捕获ele_click方法的actions.click:', e)
                tab.wait(2)

    def dp_click_ad(self, tab, xpath_txt):
        ad_ele = tab.ele(f'x:{xpath_txt}', timeout=2)
        if ad_ele:
            print('有广告:', ad_ele)
            self.ele_click(tab, ad_ele)
            self.tab_wait(tab, 1)

    def save_all_data(self, product_data_list):
        for product_data in product_data_list:
            self.cnt += 1
            self.save_csv(product_data)
            print('第', self.cnt, '行保存完成!')

    def infinite_scroll(self, tab):
        product_url_list = []
        w_cnt = 0
        turn_cnt = 0
        is_bottom = False

        while True:
            self.dp_click_ad(tab, '//button[@class="css-2vqtum"]')
            self.dp_click_ad(tab, '//button[@class="css-71t821"]')

            if is_bottom:
                break

            tab.scroll.to_bottom()
            self.tab_wait(tab, 1)
            self.tab_run_js(self.tab, 'window.scrollBy(0, -1000)')

            turn_cnt += 1
            print(f'翻页了{turn_cnt}次')

            n_cnt = 0

            self.tab_wait(tab, 5)
            product_url_ele_list = tab.eles('x://div[@class="product-thumbnail plpv3 css-1gasjii"]//a[@class="css-avqw6d"]')

            for product_url_ele in product_url_ele_list:
                product_url = product_url_ele.attr('href')
                if 'https:' not in product_url:
                    product_url = 'https://www.coach.com' + product_url

                if product_url in product_url_list:
                    continue

                n_cnt += 1
                product_url_list.append(product_url)

            print('产品个数:', len(product_url_ele_list))
            print('获取到的产品个数', len(product_url_list))

            if n_cnt == 0:
                w_cnt += 1
            else:
                w_cnt = 0

            if w_cnt >= 5:
                print('到底了')
                is_bottom = True

        return product_url_list

    def get_detail_tab(self, url, xpath_txt='html'):
        print('正在获取', url, '的数据')

        tab = self.browser.latest_tab
        tab.set.window.max()

        while True:
            tab.get(url)

            sleep(2)

            wait_minute = 20
            try:
                t_ele = tab.ele('x://h1')
            except Exception as e:
                print(e)
                tab.wait(2)
                continue

            if t_ele and t_ele.text == 'Access Denied':
                print(f'ip被封了,等待{wait_minute}分钟再尝试请求')
                sleep(60 * wait_minute)
                continue

            t_cnt = 0
            t_flag = False
            while True:
                if t_cnt >= 5:
                    t_flag = True
                    print('重新请求')
                    break

                try:
                    tab.run_js('window.scrollTo(0, 1000);')
                    break
                except Exception as e:
                    print('捕获get_detail_tab方法的第1个run_js:', e)
                    t_cnt += 1
                    tab.wait(2)

            if t_flag:
                continue

            ele = tab.ele(f'x:{xpath_txt}')
            if ele:
                break

            print('没有请求到元素,重新请求中')
            tab.wait(2)

tab.wait(1)
        while True:
            try:
                tab.run_js('window.scrollTo(0, -1000);')
                break
            except Exception as e:
                print('捕获get_detail_tab方法的第2个run_js:', e)
                tab.wait(1)

        return tab

    def get_product_img_url_list(self, tab, data):
        tab.wait(5)

        data['Option1 Value'] = tab.ele('x://p[contains(@class, "color-name")] | //div[@class="css-o2zsjq"]').text.replace('Color:', '').strip()

        product_color_img_url_list = []

        img_temp_ele = tab.ele('x://li[contains(@class, "is-prev")]')
        if img_temp_ele:
            p = int(not int(tab.ele('x://ul[@id="splide01-list"]/li[1]').attr('data-slide-index')))
            product_color_img_url_cnt = int(img_temp_ele.attr('data-slide-index')) + p
        else:
            t_ele_list = tab.eles('x://ul[@id="splide01-list"]/li')
            if len(t_ele_list) > 0:
                product_color_img_url_cnt = len(t_ele_list)
            else:
                product_color_img_url_cnt = len(tab.eles('x://div[@class="css-1161qt5"]/div')) - 1

        print(data['Option1 Value'], '色的图片个数应为:', product_color_img_url_cnt)

        while True:
            t_flag = True
            raw_product_color_img_ele_list = tab.eles('x://div[contains(@class, "product-thumbails-slider")]//ul[@class="splide__list"]//li[contains(@class, "splide__slide")] | //div[@class="css-1161qt5"]/div[@class="css-113el1s"]/div | //div[@class="css-8h57m5"]//div[contains(@class, "splide__track--loop")]/ul[@class="splide__list"]//li[contains(@class, "splide__slide")]')
            # 格式变了的话,可能第一个条件就不成立了

            flag = True
            for raw_product_color_img_ele in raw_product_color_img_ele_list:
                if ('splide__slide' not in raw_product_color_img_ele.attr('class')) or ('is-active' in raw_product_color_img_ele.attr('class')):
                    flag = False

                if flag:
                    continue

                if 'splide__slide' not in raw_product_color_img_ele.attr('class'):
                    raw_product_color_img_url = raw_product_color_img_ele.attr('src')
                    if not raw_product_color_img_url:
                        print('raw_product_color_img_ele没找到链接, 重新找')
                        t_flag = False
                        tab.wait(2)
                        break
                else:
                    try:
                        raw_product_color_img_url = raw_product_color_img_ele.ele('x:.//div[contains(@class, "css-16g44fy")]/img | .//div[contains(@class, "css-14ktbsh")]/img | .//video[contains(@class, "pdp-carousel-d")]/source').attr('src')
                    except Exception as e:
                        print(e)
                        print('raw_product_color_img_ele没找到内部的xpath, 重新找')
                        print(raw_product_color_img_ele.html)
                        t_flag = False
                        tab.wait(2)
                        break

                if not raw_product_color_img_url:
                    continue

                raw_product_color_img_url = raw_product_color_img_url.split('?')[0]

                if raw_product_color_img_url in product_color_img_url_list:
                    continue

                product_color_img_url_list.append(raw_product_color_img_url)

            if t_flag:
                break

        print(data['Option1 Value'], '色获取到的图片个数为:', len(product_color_img_url_list))

        return product_color_img_url_list

    def init_tab(self):
        co = ChromiumOptions()
        co.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe')
        co.auto_port()
        # co.headless()
        # co.set_argument('--no-sandbox')

        self.browser = Chromium(co)

        self.tab = self.browser.latest_tab
        self.tab.set.window.max()

    def handle_img_url(self, raw_img_url):
        return raw_img_url

    def product_detail_parse(self, tab, url, data):
        product_data_list = []

        self.get_detail_tab(url, '//div[@class="chakra-accordion__item css-1mm4d2m"]/div[@class="chakra-collapse"] | //div[@class="chakra-accordion__item css-1grohqe"]//div[@id="product-details"] | //div[@class="css-5fcx2y"]') # 第2个请求

        while True:
            t_ele = tab.ele('x://button[contains(@class, "right-arrow css-1qymyi7")]')
            if not t_ele:
                break

            self.ele_click(tab, t_ele)

        while True:
            tt_flag = True
            temp_url_ele_list = tab.eles('x://button[contains(@class, "variant-image-swatch")] | //div[@class="css-1wsk9bu"]/span/div | //div[@class="css-1wsk9bu"]/a/span/div')

            product_color_url_list = []

            product_color_url_list.append(url)

            filter_id = []
            current_id = tab.ele('x://button[contains(@class, "activeColorSwatch")] | //div[contains(@class, "activeColorSwatch")]').attr('data-product-id')
            filter_id.append(current_id)

            t_cnt = 0
            for temp_url_ele in temp_url_ele_list:
                t_cnt += 1
                t_id = temp_url_ele.attr('data-product-id')
                if t_id in filter_id:
                    continue

                filter_id.append(t_id)

                temp_id = ''
                try:
                    temp_id = temp_url_ele.ele('x:.//img').attr('src').split('?')[0].split('/')[-1][:-3].upper().split('_')[0] + '-' + temp_url_ele.attr('data-product-id').split(' ')[-1]
                except Exception as e:
                    print(t_cnt, ':', e)
                    print(temp_url_ele.html)
                    print('重新定位')
                    sleep(3)
                    tt_flag = False
                    break

                temp_id = temp_id.replace('/', '%2F')

                if 'coachtopia' == url.split('?')[0].split('/')[4]:
                    temp_url = 'https://www.coach.com/products/' + url.split('?')[0].split('/')[4] + '/' + url.split('?')[0].split('/')[5] + '/' + temp_id + '.html'
                else:
                    temp_url = 'https://www.coach.com/products/' + url.split('?')[0].split('/')[4] + '/' + temp_id + '.html'

                product_color_url_list.append(temp_url)

            if tt_flag:
                break

        print('颜色个数为:', len(product_color_url_list))

        product_size_ele_list = tab.eles('x://button[contains(@class, "variation-size")] | //button[contains(@class, "product-size-button")]', timeout=5)

        product_size_list = []
        if len(product_size_ele_list) <= 0:
            product_size_list.append('One Size')
        else:
            for product_size_ele in product_size_ele_list:
                product_size_list.append(product_size_ele.text)

        for product_color_url in product_color_url_list:
            self.get_detail_tab(product_color_url, '//div[@class="chakra-accordion__item css-1mm4d2m"]/div[@class="chakra-collapse"] | //div[@class="chakra-accordion__item css-1grohqe"]//div[@id="product-details"] | //div[@class="css-5fcx2y"]')  # 第3个请求

            print('产品尺寸个数为:', len(product_size_list))
            product_color_img_url_list = self.get_product_img_url_list(tab, data) # 找图片
            product_color_img_url_len = len(product_color_img_url_list)

            print('正在获取第', self.id, '个产品', product_color_url, '的', data['Option1 Value'], '色的数据')
            print('图片个数为:', product_color_img_url_len)

            for product_size in product_size_list:
                _data = deepcopy(data)

                _data['Option2 Value'] = product_size

                _data['Handle'] = url.split('/')[-2]
                _data['Title'] = tab.ele('x://h1[contains(@class, "pdp-product-title")] | //h3[contains(@class, "product-thumbnail-name")]').text.strip()
                _data['Body (HTML)'] = tab.ele('x://div[@class="chakra-accordion__item css-1mm4d2m"]/div[@class="chakra-collapse"] | //div[@class="chakra-accordion__item css-1grohqe"]//div[@id="product-details"] | //div[@class="css-5fcx2y"]').html

                _data['Variant SKU'] = _data['Handle'] + '-' + product_color_url.split('?')[0].split('/')[-1].replace('.html', '') + '-' + _data['Option2 Value']
                _data['Variant SKU'] = _data['Variant SKU'].replace(' ', '-').lower()

                _data['Variant Price'] = tab.ele('x://p[contains(@class, "active-price")] | //span[contains(@class, "css-1y0navc")]').text.strip().replace('$', '')

                _data['Image Src'] = self.handle_img_url(product_color_img_url_list[0])
                _data['Image Position'] = 1
                _data['Variant Image'] = _data['Image Src']

                product_data_list.append(_data)
                print(_data)

                for i in range(1, product_color_img_url_len):
                    temp_data = deepcopy(self.empty_data)

                    temp_data['Handle'] = _data['Handle']
                    temp_data['Published'] = 'TRUE'
                    temp_data['Image Src'] = self.handle_img_url(product_color_img_url_list[i])
                    temp_data['Image Position'] = i + 1

                    product_data_list.append(temp_data)
                    print(temp_data)

        return product_data_list

    def parse(self, url, data):
        data['Type'] = 'New Arrivals for Women'
        data['Collection'] = data['Type']

        print('正在获取', url, '的数据')
        self.tab.get(url)  # 第1个请求

        self.tab.wait(5)

        product_url_list = self.infinite_scroll(self.tab)

        total_product_num = int(self.tab.ele('x://p[@class="chakra-text total-count plp-v3-1 css-1dp7uqa"]').text.strip().split(' ')[0])

        print('产品个数应为:', total_product_num)
        print('获取的产品个数为:', len(product_url_list))

        if total_product_num > len(product_url_list):
            print('重复了', total_product_num - len(product_url_list), '个产品')

        for product_url in product_url_list:
            self.id += 1

            if 'https:' not in product_url:
                product_url = 'https://www.coach.com' + product_url

            with open('./filter.txt', 'r', encoding='utf-8') as f:
                filter_txt = f.read()

            if product_url in filter_txt:
                print(self.id, '已完成')
                continue

            print(self.id, '开始')

            product_data_list = self.product_detail_parse(self.tab, product_url, data)

            self.save_all_data(product_data_list)

            print(self.id, '结束')

            with open('./filter.txt', 'a', encoding='utf-8') as f:
                f.write(product_url + '\n')

    def run(self, is_continue=False):
        if is_continue:
            self.file = open('./coach.csv', 'a', newline='', encoding='utf-8-sig')
            self.writer = csv.DictWriter(self.file, fieldnames=self.field_names)
        else:
            self.file = open('./coach.csv', 'w', newline='', encoding='utf-8-sig')
            self.writer = csv.DictWriter(self.file, fieldnames=self.field_names)
            self.writer.writeheader()

            with open('./filter.txt', 'w', encoding='utf-8') as f:
                f.write('')

        logging.captureWarnings(True)

        self.init_tab()

        data = deepcopy(self.init_data)

        self.parse(self.url, data)

        # self.product_detail_parse(self.tab, 'https://www.coach.com/products/coachtopia/alterego-shoulder-bag-in-checkerboard-upcrafted-leather/CBE00.html?frp=CBE00+L77', data)

        if self.file:
            self.file.close()

        if self.browser:
            self.browser.quit()

if __name__ == '__main__':
    coach = Coach()
    coach.run()

# https://www.coach.com/shop/new/womens-new-arrivals/view-all

# 上传代码的时候把True给去掉 √
# 试试全部产品数据 √