>

如何优雅地实现Python通用多线;进程并行模块

- 编辑:宜春市空间模块有限公司 -

如何优雅地实现Python通用多线;进程并行模块

  当单线程性能不足时,我们通常会使用多线程/多进程去加速运行。而这些代码往往多得令人绝望,需要考虑:

  不仅如此,彩色液晶模块若改为多进程或协程,代码还要继续修改。若多处使用并行,则这些代码还会重复很多遍,非常痛苦。

  于是,我们考虑将并行的所有逻辑封装到一个模块之内,向外部提供像串行执行一样的编程体验,还能彻底解决上面所述的疑难问题。所有代码不足180行。

  上面的代码会使用三个进程,并行地打印1-10的平方。当打印完10之后,进程自动回收释放。就像串行程序一样简单。

  我们通常会将任务分割为很多个子块,从而方便并行。因此可以将任务抽象为生成器。类似下面的操作,每个seed都是任务的种子。

  该函数通过传入生成器函数(generator)和任务的定义(worker函数),即可再返回一个生成器。消费时:

  我们看到,通过定义高阶函数,serial_yield就像map函数,对seed进行加工后输出。彩色液晶模块

  考虑如下场景: boss负责分发任务到任务队列,多个worker从任务队列捞数据,处理完之后,再写入结果队列。主线程从结果队列中取结果即可。

  使用Python创建worker的代码如下,func是任务的定义(是个函数)

  创建队列的代码如下,注意seeds可能是无穷流,因此需要限定队列的长度,当入队列发现队列已满时,则任务需要阻塞。

  对第一种情况,我们让boss在seed消费完之后,在队列里放入多个Empty标志,worker收到Empty之后,就会自动退出,下面是boss的实现逻辑:

  再定义multi_yield的主要代码。 代码非常好理解,创建任务和结果队列,再创建boss和worker线程(或进程/协程)并启动,之后不停地从结果队列里取数据就可以了。

  这样我们就实现了一个与serial_yield功能类似的multi_yield。可以定义多个worker,从队列中领任务,而不需重复地创建和销毁,更不需要线程池。当然,代码不完全,运行时可能出问题。但以上代码已经说明了核心的功能。完整的代码可以在文末找到。

  我们无法判断队列的长度,若队列满,那么put操作会永远卡死在那里,任务都不会结束。

  最开始想到的,是通过在multi_yield函数参数中添加一个返回bool的函数,这样当外部break时,同时将该函数的返回值置为True,内部检测到该标志位后强制退出。伪代码如下:

  但这样并不优雅,引入了更多的函数作为参数,还必须手工控制变量值,非常繁琐。在多进程模式下,stop标志位还如何解决?

  我们希望外部在循环时执行了break后,会自动通知内部的生成器。实现方法似乎就是with语句,即contextmanager.

  它实现了with的原语,参数是dispose函数,作用是退出with代码块后的回收逻辑。

  由于值类型的标志位无法在多进程环境中传递,我们再创建StopWrapper类,用于管理停止标志和回收资源:

  最后的问题是,如何解决队列满或空时,put/get的无限等待问题呢?考虑包装一下put/get:包装在while True之中,每隔两秒get/put,这样即使阻塞时,也能保证可以检查退出标志位。所有线程在主线s内自动退出。

  如何使用呢?我们只需在multi_yield的yield语句之外加上一行就可以了:

  仔细阅读上面的代码, 外部循环时退出循环,则会自动触发stop_wrapper的stop操作,回收全部资源,而不需通过外部的标志位传递!这样调用方在心智完全不需有额外的负担。

  实现生成器和上下文管理器的编程语言,都可以通过上述方式实现自动协程资源回收。笔者也实现了一个C#版本的,有兴趣欢迎交流。

  一些实现的细节很有趣,我们借助在函数中定义函数,可以不用复杂的类去承担职责,而仅仅只需函数。而类似的思想,在函数式编程中非常常见。

  该工具已经被笔者的流式语言etlpy所集成。但是依然有较多改进的空间,如没有集成分布式执行模式。

本文由使用说明发布,转载请注明来源:如何优雅地实现Python通用多线;进程并行模块