admin 管理员组

文章数量: 887021

20240407

1.生成器迭代器可迭代对象:

生成器:用特殊方式定义的迭代器就是生成器,目前两种定义方式:(1)用()的列表生成式(2)用yield写的函数
迭代器:调用next函数并不断能返回下一个值的就是迭代器
可迭代对象:能放循环遍历的,如for循环遍历,列表等

2.作业题复习:

(1)读取一个超过内存的大文件 :常见的就是分块读取,按照固定大小读取,用read就可以

(2)选择代理ip的标准:连接速度;安全性;请求间隔;价格;稳定性等。

3.同步异步阻塞非阻塞:

阻塞:程序未得到所需计算资源时被挂起的状态
非阻塞:程序在等待某个操作过程中,可以处理其他的事情,不被阻塞
同步:不同单元为完成某个任务,需协调进行
异步:不同单元未完成某个任务,可以独立完成,无需协调

4.协程基本用法: 

python中使用协程最常用的库莫过于asyncio;

(event_loop,coroutine,task,future)

async/await 关键字,是从 Python 3.5 才出现的,专门用于定义协程。其中, async 定义一个协 程, await 用来挂起阻塞方法的执行。 如:
import asyncio
async def execute(x):
print('Number:', x)
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')

5.协程线程对比:协程处理速度较快,线程随着开的数量增加,不能充分利用,会速度下降

6.控制协程数量及多任务:

self.semaphore=asyncio.Semaphore(CONCURRENCY)

两个列表的变量生成式:

l = [[1,2],[3,4],[5,6]]

lists = [x for y in l for x in y]

print(lists)

7.异步mysql

import asyncio
import aiomysql
# UUID 是 通用唯一识别码(Universally Unique Identifier)的缩写,是一种软件建构的标准,亦为开放软件基金会组织在分布式计算环境领域的一部分。其目的,是让分布式系统中的所有元素,都能有唯一的辨识信息,而不需要通过中央控制端来做辨识信息的指定。如此一来,每个人都可以创建不与其它人冲突的UUID。在这样的情况下,就不需考虑数据库创建时的名称重复问题。最广泛应用的UUID,是微软公司的全局唯一标识符(GUID),而其他重要的应用,则有Linux ext2/ext3文件系统、LUKS加密分区、GNOME、KDE、Mac OS X等等。另外我们也可以在e2fsprogs包中的UUID库找到实现。
import shortuuid
import pymysql

#   异步
#   aiomysql做数据库连接的时候,需要这个loop对象
async def async_basic(loop):
    pool = await aiomysql.create_pool(
        host="127.0.0.1",
        port=3306,
        user="root",
        password="123456",
        db="test3",
        loop=loop
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            for x in range(10000):
                content = shortuuid.uuid()
                sql = "insert into mybrank(brank) values('{}')".format(content)
            #执行SQL语句
                await cursor.execute(sql)
            await connmit()
    #关闭连接池
    pool.close()
    await pool.wait_closed()


# #同步
# def sync_basic():
#     conn = pymysql.connect(
#         host="127.0.0.1",
#         port=3306,
#         user="root",
#         password="root",
#         db="dvwa",
#         loop=loop
#     )
#     with conn.cursor() as cursor:
#         for x in range(10000):
#             content = shortuuid.uuid()
#             sql = f"insert into guestbook(comment_id,comment,name) values(2,'asd','{content})"
#             # 执行SQL语句
#             cursor.execute(sql)
#         connmit()



if __name__ == '__main__':
    #   异步:数量大时用异步
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_basic(loop))
    #  同步:sync_basic()

 如用异步mysql抓取小说到数据库:

import asyncio
import json
import time
import aiohttp
import logging
from aiohttp import ContentTypeError
# from motor.motor_asyncio import AsyncIOMotorClient
from lxml import etree
import aiomysql
from pymysql.converters import escape_string
CONCURRENCY = 4
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
class Spider(object):
    def __init__(self):
        # 设置协程的数量
        self.semaphore = asyncio.Semaphore(CONCURRENCY)

    async def scrape_api(self, url):
        async with self.semaphore:
            logging.info('scraping %s', url)
            try:
                logging.info('scraping %s', url)
                # 发送请求
                async with self.session.get(url) as response:
                    # await asyncio.sleep(1)
                    # 返回请求结果
                    return await response.text(encoding='gbk')
            except ContentTypeError as e:
                logging.error('error occurred while scraping %s', url, exc_info=True)

    def save_file(self, tite,contents):
            op = open('1.txt','a+',encoding='utf8')
            op.write(tite[0]+'\n'+''.join(contents)+'\n')
            op.close()

    async def async_basic(self,title,contents):
        pool = await aiomysql.create_pool(
            host="127.0.0.1",
            port=3306,
            user="root",
            password="123456",
            db="book",
            loop=loop
        )
        async with pool.acquire() as conn:
            async with conn.cursor() as cursor:
                # for x in range(10000):
                #     content = shortuuid.uuid()
                    # sql = "insert into mybrank(brank) values('{}')".format(content)
                sql = "insert into mybook(chapter,content) values(%s,%s)"
                    # 执行SQL语句
                await cursor.execute(sql,(title[0],''.join(contents)))
                await connmit()
        # 关闭连接池
        pool.close()
        await pool.wait_closed()

    def scrape_detail(self, source):
        title =  etree.HTML(source).xpath('//div[@class="bdsub"]/dl/dd/h1/text()')
        contents =  etree.HTML(source).xpath('//dd[@id="contents"]/text()')
        return title,contents

    async def main(self):
        # 设置header
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36'
        }
        # aiohttp模块准备发送请求(主要加header头信息以及代理,cookie等头信息)
        self.session = aiohttp.ClientSession(headers=headers)
        # source = await self.scrape_api('https://www.23us/book/24/24664/')
        # 异步请求获取页面源码
        source = await self.scrape_api('https://www.72us/html/3/3713/')
        # 获取所有章节的的链接
        chapter_href_list = etree.HTML(source).xpath('//td[@class="L"]/a/@href')
        
        scrape_detail_tasks = [asyncio.ensure_future(self.scrape_api('https://www.72us/html/3/3713/'+href)) for href in chapter_href_list[:200]]
        content_list = await asyncio.gather(*scrape_detail_tasks)

        scrape_contents = [self.scrape_detail(contents) for contents in content_list]
        # scrape_save = [self.save_file(title,contents) for title,contents in scrape_contents]
        [await self.async_basic(title,contents) for title,contents in scrape_contents]
        await self.session.close()

if __name__ == '__main__':
    begin_time = time.time()
    spider = Spider()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(spider.main())
    end_time = time.time()
    print(end_time-begin_time)

8.浏览器操作软件介绍:

selenium+phantomjs:selenium不支持异步,没有防检测,可能会ip封掉

pyteer:防检测,支持异步,但不好用

DrissionPage:近两年刚出,不易被封

9.DrissionPage使用文档:DrissionPage官网

例子,如抓取震坤行:

from DrissionPage import ChromiumPage
import time
# 等待 id 为 div1 的元素显示,设置超时3秒
# page.wait.ele_display('#div1', timeout=3)
# 用 d 模式创建页面对象(默认模式)
page = ChromiumPage()
# 跳转到登录页面
page.get('https://www.zkh/?utm_source=baidu&utm_medium=CPT')
# 定位到账号文本框并输入账号
# page.ele('#key').input('大地瓜')
page.ele('xpath://*
[@id="app"]/div/div/div[3]/div/div[1]/div[1]/input').input('口罩')
# 定位到密码文本框并输入密码
page.ele('xpath://*[@id="app"]/div/div/div[3]/div/div[1]/button/span').click()
# 点击登录按钮
# page.wait.load_start()
# 切换到第一个标签页
# page.to_tab(page.tabs[0])
# 切换到最新打开的标签页
page.to_tab(page.latest_tab)
while True:
page.wait.load_start()
# page.wait.ele_display('xpath://*
[@id="app"]/div/div/div[4]/div[6]/div/div[6]/a/div[1]', timeout=3)
# time.sleep(5)
# height = page.run_js('return document.body.scrollHeight;')
# print(height)
page.scroll.to_location(0,10000)
time.sleep(0.5)
page.scroll.to_location(0,10000)
time.sleep(0.5)
page.scroll.to_location(0,10000)
time.sleep(0.5)
# print(page.html)
# 滚动到某个已获取到的元素
# ele = page.ele('xpath://*
[@id="app"]/div/div/div[4]/div[7]/div/div/div/div[40]/a/div[4]')
# page.scroll.to_see(ele)
divs = page.eles('xpath://div[@class="goods-name clamp2"]')
for i in divs:
title = i.attr('title')
print(title)
# 获取下一页按钮,有就点击
print(len(divs))
btn = page('下一页', timeout=2)
if btn:
btn.click()
page.wait.load_start()
# 没有则退出程序
else:
break
# page.scroll.to_bottom()
# # 滚动到指定位置
# page.run_js('window.scrollBy(0, document.body.scrollHeight)')
# # 滚动页面使自己可见
# page.scroll.down(4000)
# page.run_js(' return document.body.scrollHeight')

10.DrissionPage实现异步请求方案:

from threading import Thread
from DrissionPage import ChromiumPage
from DataRecorder import Recorder
def collect(tab, recorder, title):
"""用于采集的方法
:param tab: ChromiumTab 对象
:param recorder: Recorder 记录器对象
:param title: 类别标题
:return: None
"""
num = 1 # 当前采集页数
while True:
# 遍历所有标题元素
for i in tab.eles('.title project-namespace-path'):
# 获取某页所有库名称,记录到记录器
recorder.add_data((title, i.text, num))
# 如果有下一页,点击翻页
btn = tab('@rel=next', timeout=2)
if btn:
btn.click(by_js=True)
tab.wait.load_start()
num += 1
# 否则,采集完毕
else:
break
def main():
# 新建页面对象
page = ChromiumPage()
# 第一个标签页访问网址
page.get('https://gitee/explore/ai')
# tab = page.get_tab(1) # 获取列表中第二个标签页的对象
# 获取第一个标签页对象,这个东西我认为等于一个page,一个窗口的句柄
tab1 = page.get_tab()
print(tab1)
# 新建一个标签页并访问另一个网址
tab2 = page.new_tab('https://gitee/explore/machine-learning')
# 获取第二个标签页对象
tab2 = page.get_tab(tab2)
# 新建记录器对象
recorder = Recorder('data.csv')
# 多线程同时处理多个页面
Thread(target=collect, args=(tab1, recorder, 'ai')).start()
Thread(target=collect, args=(tab2, recorder, '机器学习')).start()
if __name__ == '__main__':
main()

11.DrissionPage抓取刺猬猫:

from DrissionPage import ChromiumPage
import time

page = ChromiumPage()

page.get('https://www.ciweimao/chapter/111342789')

page.wait.load_start()

chapter_count = 1
while True:
    page.wait.load_start()


    contents = page.eles('@class=chapter')


    for content in contents:
        print(content.text)
    # 获取下一页按钮,有就点击
    # print(len(divs))
    btn = page('下一章', timeout=2)
    if btn:
        if chapter_count < 3:
            btn.click()
            page.wait.load_start()
            chapter_count += 1
        else:
            break
    else:
        break

本文标签: 数据采集 基础 笔记