admin 管理员组文章数量: 887021
20240407
1.生成器迭代器可迭代对象:
生成器:用特殊方式定义的迭代器就是生成器,目前两种定义方式:(1)用()的列表生成式(2)用yield写的函数
迭代器:调用next函数并不断能返回下一个值的就是迭代器
可迭代对象:能放循环遍历的,如for循环遍历,列表等
2.作业题复习:
(1)读取一个超过内存的大文件 :常见的就是分块读取,按照固定大小读取,用read就可以
(2)选择代理ip的标准:连接速度;安全性;请求间隔;价格;稳定性等。
3.同步异步阻塞非阻塞:
阻塞:程序未得到所需计算资源时被挂起的状态
非阻塞:程序在等待某个操作过程中,可以处理其他的事情,不被阻塞
同步:不同单元为完成某个任务,需协调进行
异步:不同单元未完成某个任务,可以独立完成,无需协调
4.协程基本用法:
python中使用协程最常用的库莫过于asyncio;
(event_loop,coroutine,task,future)
async/await 关键字,是从 Python 3.5 才出现的,专门用于定义协程。其中, async 定义一个协 程, await 用来挂起阻塞方法的执行。 如:import asyncio
async def execute(x):
print('Number:', x)
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')
5.协程线程对比:协程处理速度较快,线程随着开的数量增加,不能充分利用,会速度下降
6.控制协程数量及多任务:
self.semaphore=asyncio.Semaphore(CONCURRENCY)
两个列表的变量生成式:
l = [[1,2],[3,4],[5,6]]
lists = [x for y in l for x in y]
print(lists)
7.异步mysql
import asyncio
import aiomysql
# UUID 是 通用唯一识别码(Universally Unique Identifier)的缩写,是一种软件建构的标准,亦为开放软件基金会组织在分布式计算环境领域的一部分。其目的,是让分布式系统中的所有元素,都能有唯一的辨识信息,而不需要通过中央控制端来做辨识信息的指定。如此一来,每个人都可以创建不与其它人冲突的UUID。在这样的情况下,就不需考虑数据库创建时的名称重复问题。最广泛应用的UUID,是微软公司的全局唯一标识符(GUID),而其他重要的应用,则有Linux ext2/ext3文件系统、LUKS加密分区、GNOME、KDE、Mac OS X等等。另外我们也可以在e2fsprogs包中的UUID库找到实现。
import shortuuid
import pymysql
# 异步
# aiomysql做数据库连接的时候,需要这个loop对象
async def async_basic(loop):
pool = await aiomysql.create_pool(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
db="test3",
loop=loop
)
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
for x in range(10000):
content = shortuuid.uuid()
sql = "insert into mybrank(brank) values('{}')".format(content)
#执行SQL语句
await cursor.execute(sql)
await connmit()
#关闭连接池
pool.close()
await pool.wait_closed()
# #同步
# def sync_basic():
# conn = pymysql.connect(
# host="127.0.0.1",
# port=3306,
# user="root",
# password="root",
# db="dvwa",
# loop=loop
# )
# with conn.cursor() as cursor:
# for x in range(10000):
# content = shortuuid.uuid()
# sql = f"insert into guestbook(comment_id,comment,name) values(2,'asd','{content})"
# # 执行SQL语句
# cursor.execute(sql)
# connmit()
if __name__ == '__main__':
# 异步:数量大时用异步
loop = asyncio.get_event_loop()
loop.run_until_complete(async_basic(loop))
# 同步:sync_basic()
如用异步mysql抓取小说到数据库:
import asyncio
import json
import time
import aiohttp
import logging
from aiohttp import ContentTypeError
# from motor.motor_asyncio import AsyncIOMotorClient
from lxml import etree
import aiomysql
from pymysql.converters import escape_string
CONCURRENCY = 4
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
class Spider(object):
def __init__(self):
# 设置协程的数量
self.semaphore = asyncio.Semaphore(CONCURRENCY)
async def scrape_api(self, url):
async with self.semaphore:
logging.info('scraping %s', url)
try:
logging.info('scraping %s', url)
# 发送请求
async with self.session.get(url) as response:
# await asyncio.sleep(1)
# 返回请求结果
return await response.text(encoding='gbk')
except ContentTypeError as e:
logging.error('error occurred while scraping %s', url, exc_info=True)
def save_file(self, tite,contents):
op = open('1.txt','a+',encoding='utf8')
op.write(tite[0]+'\n'+''.join(contents)+'\n')
op.close()
async def async_basic(self,title,contents):
pool = await aiomysql.create_pool(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
db="book",
loop=loop
)
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
# for x in range(10000):
# content = shortuuid.uuid()
# sql = "insert into mybrank(brank) values('{}')".format(content)
sql = "insert into mybook(chapter,content) values(%s,%s)"
# 执行SQL语句
await cursor.execute(sql,(title[0],''.join(contents)))
await connmit()
# 关闭连接池
pool.close()
await pool.wait_closed()
def scrape_detail(self, source):
title = etree.HTML(source).xpath('//div[@class="bdsub"]/dl/dd/h1/text()')
contents = etree.HTML(source).xpath('//dd[@id="contents"]/text()')
return title,contents
async def main(self):
# 设置header
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36'
}
# aiohttp模块准备发送请求(主要加header头信息以及代理,cookie等头信息)
self.session = aiohttp.ClientSession(headers=headers)
# source = await self.scrape_api('https://www.23us/book/24/24664/')
# 异步请求获取页面源码
source = await self.scrape_api('https://www.72us/html/3/3713/')
# 获取所有章节的的链接
chapter_href_list = etree.HTML(source).xpath('//td[@class="L"]/a/@href')
scrape_detail_tasks = [asyncio.ensure_future(self.scrape_api('https://www.72us/html/3/3713/'+href)) for href in chapter_href_list[:200]]
content_list = await asyncio.gather(*scrape_detail_tasks)
scrape_contents = [self.scrape_detail(contents) for contents in content_list]
# scrape_save = [self.save_file(title,contents) for title,contents in scrape_contents]
[await self.async_basic(title,contents) for title,contents in scrape_contents]
await self.session.close()
if __name__ == '__main__':
begin_time = time.time()
spider = Spider()
loop = asyncio.get_event_loop()
loop.run_until_complete(spider.main())
end_time = time.time()
print(end_time-begin_time)
8.浏览器操作软件介绍:
selenium+phantomjs:selenium不支持异步,没有防检测,可能会ip封掉
pyteer:防检测,支持异步,但不好用
DrissionPage:近两年刚出,不易被封
9.DrissionPage使用文档:DrissionPage官网
例子,如抓取震坤行:
from DrissionPage import ChromiumPage
import time
# 等待 id 为 div1 的元素显示,设置超时3秒
# page.wait.ele_display('#div1', timeout=3)
# 用 d 模式创建页面对象(默认模式)
page = ChromiumPage()
# 跳转到登录页面
page.get('https://www.zkh/?utm_source=baidu&utm_medium=CPT')
# 定位到账号文本框并输入账号
# page.ele('#key').input('大地瓜')
page.ele('xpath://*
[@id="app"]/div/div/div[3]/div/div[1]/div[1]/input').input('口罩')
# 定位到密码文本框并输入密码
page.ele('xpath://*[@id="app"]/div/div/div[3]/div/div[1]/button/span').click()
# 点击登录按钮
# page.wait.load_start()
# 切换到第一个标签页
# page.to_tab(page.tabs[0])
# 切换到最新打开的标签页
page.to_tab(page.latest_tab)
while True:
page.wait.load_start()
# page.wait.ele_display('xpath://*
[@id="app"]/div/div/div[4]/div[6]/div/div[6]/a/div[1]', timeout=3)
# time.sleep(5)
# height = page.run_js('return document.body.scrollHeight;')
# print(height)
page.scroll.to_location(0,10000)
time.sleep(0.5)
page.scroll.to_location(0,10000)
time.sleep(0.5)
page.scroll.to_location(0,10000)
time.sleep(0.5)
# print(page.html)
# 滚动到某个已获取到的元素
# ele = page.ele('xpath://*
[@id="app"]/div/div/div[4]/div[7]/div/div/div/div[40]/a/div[4]')
# page.scroll.to_see(ele)
divs = page.eles('xpath://div[@class="goods-name clamp2"]')
for i in divs:
title = i.attr('title')
print(title)
# 获取下一页按钮,有就点击
print(len(divs))
btn = page('下一页', timeout=2)
if btn:
btn.click()
page.wait.load_start()
# 没有则退出程序
else:
break
# page.scroll.to_bottom()
# # 滚动到指定位置
# page.run_js('window.scrollBy(0, document.body.scrollHeight)')
# # 滚动页面使自己可见
# page.scroll.down(4000)
# page.run_js(' return document.body.scrollHeight')
10.DrissionPage实现异步请求方案:
from threading import Thread
from DrissionPage import ChromiumPage
from DataRecorder import Recorder
def collect(tab, recorder, title):
"""用于采集的方法
:param tab: ChromiumTab 对象
:param recorder: Recorder 记录器对象
:param title: 类别标题
:return: None
"""
num = 1 # 当前采集页数
while True:
# 遍历所有标题元素
for i in tab.eles('.title project-namespace-path'):
# 获取某页所有库名称,记录到记录器
recorder.add_data((title, i.text, num))
# 如果有下一页,点击翻页
btn = tab('@rel=next', timeout=2)
if btn:
btn.click(by_js=True)
tab.wait.load_start()
num += 1
# 否则,采集完毕
else:
break
def main():
# 新建页面对象
page = ChromiumPage()
# 第一个标签页访问网址
page.get('https://gitee/explore/ai')
# tab = page.get_tab(1) # 获取列表中第二个标签页的对象
# 获取第一个标签页对象,这个东西我认为等于一个page,一个窗口的句柄
tab1 = page.get_tab()
print(tab1)
# 新建一个标签页并访问另一个网址
tab2 = page.new_tab('https://gitee/explore/machine-learning')
# 获取第二个标签页对象
tab2 = page.get_tab(tab2)
# 新建记录器对象
recorder = Recorder('data.csv')
# 多线程同时处理多个页面
Thread(target=collect, args=(tab1, recorder, 'ai')).start()
Thread(target=collect, args=(tab2, recorder, '机器学习')).start()
if __name__ == '__main__':
main()
11.DrissionPage抓取刺猬猫:
from DrissionPage import ChromiumPage
import time
page = ChromiumPage()
page.get('https://www.ciweimao/chapter/111342789')
page.wait.load_start()
chapter_count = 1
while True:
page.wait.load_start()
contents = page.eles('@class=chapter')
for content in contents:
print(content.text)
# 获取下一页按钮,有就点击
# print(len(divs))
btn = page('下一章', timeout=2)
if btn:
if chapter_count < 3:
btn.click()
page.wait.load_start()
chapter_count += 1
else:
break
else:
break
版权声明:本文标题:(笔记)数据采集基础03 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1726436593h960341.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论