|
[Python] 纯文本查看 复制代码
import asyncio
import base64
import os
import re
from lxml import etree
import requests
import aiohttp
import aiofiles
import time
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.54 Safari/537.36'
}
async def img_download(title, img_list):
title = ''.join(re.findall(r'[\u4e00-\u9fa5]', title)) # title去掉特殊符号作为文件夹名称
if not os.path.exists(f'./img/{title}'):
os.makedirs(f'./img/{title}')
for img_url in img_list:
img_url = img_url.replace('thumb300', 'mw1024') # 解析大图
async with aiohttp.ClientSession() as session:
async with await session.get(img_url, headers=headers) as response:
resp = await response.read()
img_name = title + img_url.split('/')[-1]
path = './img/' + title + '/' + img_name
async with aiofiles.open(path, 'wb') as fp:
await fp.write(resp)
print(img_name + "--下载完成")
async def request_list_page(list_url):
resp = requests.get(list_url, headers=headers).text
tree = etree.HTML(resp)
datas = tree.xpath('//article')
tasks = []
for data in datas:
title = data.xpath('./div/h2/a/text()')[0]
img_list = data.xpath('./div/div[2]/div//div/img/@data-src')
tasks.append(asyncio.create_task(img_download(title, img_list)))
await asyncio.wait(tasks)
start_time = time.time()
if __name__ == '__main__':
for i in range(1, 61):
url = str(base64.b64decode(b'aHR0cHM6Ly9tbXp6dHQuY29tL2JlYXV0eS9mYXNoaW9uL3BhZ2Uv')).split("\'")[1] + str(i)
loop = asyncio.get_event_loop()
loop.run_until_complete(request_list_page(url))
# asyncio.run(request_list_page(url))
print("下载完毕,总耗时秒:" + str(time.time() - start_time))
顺便请教论坛大佬个问题, 代码最后
loop = asyncio.get_event_loop()
loop.run_until_complete(request_list_page(url))
不报错
但是用:
# asyncio.run(request_list_page(url)) (不是说新版用这种写法吗?)报错:RuntimeError: Event loop is closed
|
|