今天来说说,Python 中的任务切分。以爬虫为例,从一个存 url 的 txt 文件中,读取其内容,我们会获取一个 url 列表。我们把这一个 url 列表称为大任务。
列表切分在
不考虑内存占用的情况下,我们对上面的大任务进行一个切分。比如我们将大任务切分成的小任务是每秒最多只访问5个URL。
import os import time CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) def read_file(): file_path = os.path.join(CURRENT_DIR, "url_list.txt") with open(file_path, "r", encoding="utf-8") as fs: result = [i.strip() for i in fs.readlines()] return result def fetch(url): print(url) def run(): max_count = 5 url_list = read_file() for index in range(0, len(url_list), max_count): start = time.time() fetch(url_list[index:index + max_count]) end = time.time() - start if end < 1: time.sleep(1 - end) if __name__ == '__main__': run()
关键代码都在for循环里,首先我们通过声明range的第三个参数,该参数指定迭代的步长为5,这样每次index增加都是以5为基数,即0,5,10。。。
然后我们对url_list做切片,每次取其五个元素,这五个元素会随着index的增加不断的在改变,如果最后不够五个了,按照切片的特性这个时候就会有多少取多少了,不会造成索引超下标的问题。
随着url列表的增加,我们会发现内存的占用也在提高了。这个时候我们就需要对代码进行修改了,我们知道生成器是比较节省内存的空间的,修改之后代码变成,下面的这样。
生成器切分
# -*- coding: utf-8 -*- # @时间 : 2019-11-23 23:47 # @作者 : 陈祥安 # @文件名 : g.py # @公众号: Python学习开发 import os import time from itertools import islice CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) def read_file(): file_path = os.path.join(CURRENT_DIR, "url_list.txt") with open(file_path, "r", encoding="utf-8") as fs: for i in fs: yield i.strip() def fetch(url): print(url) def run(): max_count = 5 url_gen = read_file() while True: url_list = list(islice(url_gen, 0, max_count)) if not url_list: break start = time.time() fetch(url_list) end = time.time() - start if end < 1: time.sleep(1 - end) if __name__ == '__main__': run()
首先,我们修改了文件读取的方式,把原来读列表的形式,改为了生成器的形式。这样我们在调用该文件读取方法的时候大大节省了内存。
然后就是对上面for循环进行改造,因为生成器的特性,这里不适合使用for进行迭代,因为每一次的迭代都会消耗生成器的元素,通过使用itertools的islice对url_gen进行切分,islice是生成器的切片,这里我们每次切分出含有5个元素的生成器,因为生成器没有__len__方法所以,我们将其转为列表,然后判断列表是否为空,就可以知道迭代是否该结束了。
修改之后的代码,不管是性能还是节省内存上都大大的提高。读取千万级的文件不是问题。
除此之外,在使用异步爬虫的时候,也许会用到异步生成器切片。下面就和大家讨论,异步生成器切分的问题
异步生成器切分
首先先来看一个简单的异步生成器。
我们知道调用下面的代码会得到一个生成器
def foo(): for i in range(20): yield i
如果在def前面加一个async,那么在调用的时候它就是个异步生成器了。
完整示例代码如下
import asyncio async def foo(): for i in range(20): yield i async def run(): async_gen = foo() async for i in async_gen: print(i) if __name__ == '__main__': asyncio.run(run())
关于async for的切分有点复杂,这里推荐使用aiostream模块,使用之后代码改为下面这样
import asyncio from aiostream import stream async def foo(): for i in range(22): yield i async def run(): index = 0 limit = 5 while True: xs = stream.iterate(foo()) ys = xs[index:index + limit] t = await stream.list(ys) if not t: break print(t) index += limit if __name__ == '__main__': asyncio.run(run())
极牛网精选文章《Python实用技巧大任务切分》文中所述为作者独立观点,不代表极牛网立场。如若转载请注明出处:https://geeknb.com/5599.html