关键词搜索

源码搜索 ×
×

Python 进阶 — 协程

发布2023-02-15浏览2103次

详情内容

目录

协程

协程(co-routine,又称微线程)是一种多方协同的工作方式。当前执行者在某个时刻主动让出(yield)控制流,并记住自身当前的状态,以便在控制流返回时能从上次让出的位置恢复(resume)执行。

简而言之,协程的核心思想就在于执行者对控制流的 “主动让出” 和 “恢复”。相对于,线程此类的 “抢占式调度” 而言,协程是一种 “协作式调度” 方式。

在这里插入图片描述

协程的应用场景

抢占式调度的缺点

在 I/O 密集型场景中,抢占式调度的解决方案是 “异步 + 回调” 机制。

在这里插入图片描述

其存在的问题是,在某些场景中会使得整个程序的可读性非常差。以图片下载为例,图片 Server 提供了异步接口。当 Server 接收到 Client 发出的请求之后,就立即返回异步处理消息,包含了该请求的一个唯一标识 ID。等待图片服务完成下载之后再把执行结果放到一个 MQ(消息队列)中,此时 Client 就需要不断的轮训这个 MQ 才能拿到下载是否完成的结果。

在这里插入图片描述

可见,整体的逻辑被拆分为了好几个部分,各个子部分都会存在状态的迁移,代码逻辑复杂。

在这里插入图片描述

用户态协同调度的优势

而随着网络技术的发展和高并发要求,Co-routine 所能够提供的用户态协同调度机制的优势在网络操作、文件操作、数据库操作、消息队列操作等重 I/O 操作场景中逐渐被挖掘。

Co-routine 的提出,能够将 I/O 的处理权从被动的内核态 Kernel Thread 调度交还给主动的用户态 Co-routine 程序自身实现的调度。用户态程序在执行 I/O 时,主动的通过 yield(让出)CPU 的执行权给其他协程,多个协程之间处于平等、对称、合作的关系。

在这里插入图片描述

协程的运行原理

当程序运行时,操作系统会为每个程序分配一块同等大小的虚拟内存空间,并将程序的代码和所有静态数据加载到其中。然后,创建和初始化 Stack 存储,用于储存程序的局部变量,函数参数和返回地址;创建和初始化 Heap 内存;创建和初始化 I/O 相关的任务。当前期准备工作完成后,操作系统将 CPU 的控制权移交给新创建的进程,进程开始运行。

在这里插入图片描述

一个进程可以有一个或多个线程,同一进程中的多个线程将共享该进程中的全部系统资源,如:虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈和线程本地存储。

在这里插入图片描述

协程是一种比线程更加轻量级的存在,协程不是被操作系统内核所管理,而完全是由用户态程序所控制。协程与线程以及进程的关系如下图所示。可见,协程自身无法利用多核,需要配合进程来使用才可以在多核平台上发挥作用。

在这里插入图片描述

  • 协程之间的切换不需要涉及任何 System Call(系统调用)或任何阻塞调用。
  • 协程只在一个线程中执行,切换由用户态控制,而线程的阻塞状态是由操作系统内核来完成的,因此协程相比线程节省线程创建和切换的开销。
  • 协程中不存在同时写变量的冲突,因此,也就不需要用来守卫关键区块的同步性原语,比如:互斥锁、信号量等,并且不需要来自操作系统的支持。

协程通过 “挂起点” 来主动 yield(让出)CPU,并保存自身的状态,等候恢复。例如:首先在 funcA 函数中执行,运行一段时间后调用协程,协程开始执行,直到第一个挂起点,此后就像普通函数一样返回 funcA 函数。

funcA 函数执行一些代码后再次调用该协程,注意,协程这时就和普通函数不一样了。协程并不是从第一条指令开始执行而是从上一次的挂起点开始执行,执行一段时间后遇到第二个挂起点,这时协程再次像普通函数一样返回 funcA 函数,funcA 函数执行一段时间后整个程序结束。

在这里插入图片描述

可见,协程之所可以能够 “主动让出” 和 “被恢复”,是解析器在函数运行时堆栈中保存了其运行的 Context(上下文)。

在这里插入图片描述

Python 中的协程

Python 对协程的支持经历了多个版本:

  1. Python2.x 对协程的支持比较有限,通过 yield 关键字支持的生成器实现了一部分协程的功能但不完全。
  2. 第三方库 gevent 对协程有更好的支持。
  3. Python3.4 中提供了 asyncio 模块。
  4. Python3.5 中引入了 async/await 关键字。
  5. Python3.6 中 asyncio 模块更加完善和稳定。
  6. Python3.7 中内置了 async/await 关键字。

async/await 的示例程序:

import asyncio
from pathlib import Path
import logging
from urllib.request import urlopen, Request
import os
from time import time
import aiohttp
 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


CODEFLEX_IMAGES_URLS = ['https://codeflex.co/wp-content/uploadshttps://files.jxasp.com/image/2021/01/pandas-dataframe-python-1024x512.png',
                        'https://codeflex.co/wp-content/uploadshttps://files.jxasp.com/image/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg',
                        'https://codeflex.co/wp-content/uploadshttps://files.jxasp.com/image/2021/02/boto3-s3-multipart-upload-1024x536.jpg',
                        'https://codeflex.co/wp-content/uploadshttps://files.jxasp.com/image/2018/02/kafka-cluster-architecture.jpg',
                        'https://codeflex.co/wp-content/uploadshttps://files.jxasp.com/image/2016/09/redis-cluster-topology.png']
 
 
async def download_image_async(session, dir, img_url):
    download_path = dir / os.path.basename(img_url)
    async with session.get(img_url) as response:
        with download_path.open('wb') as f:
            while True:
                # 在 async 函数中使用 await 关键字表示等待 task 执行完成,也就是等待 yeild 让出控制权。
                # 同时,asyncio 使用事件循环 event_loop 来实现整个过程。
                chunk = await response.content.read(512)
                if not chunk:
                    break
                f.write(chunk)
    logger.info('Downloaded: ' + img_url)
 
 
# 使用 async 关键字声明一个异步/协程函数。
# 调用该函数时,并不会立即运行,而是返回一个协程对象,后续在 event_loop 中执行。
async def main():
    images_dir = Path("codeflex_images")
    Path("codeflex_images").mkdir(parents=False, exist_ok=True)
 
    async with aiohttp.ClientSession() as session:
        tasks = [(download_image_async(session, images_dir, img_url)) for img_url in CODEFLEX_IMAGES_URLS]
        await asyncio.gather(*tasks, return_exceptions=True)
 
 
if __name__ == '__main__':
    start = time()
    
    # event_loop 事件循环充当管理者的角色,将控制权在几个协程函数之间切换。
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main())
    finally:
        event_loop.close()
 
    logger.info('Download time: %s seconds', time() - start)

    参考文档

    https://mp.weixin.qq.com/s/LItTjy2uN6iJvN2MqNPQ7Q
    https://mp.weixin.qq.com/s/Yj7O3mucbFHxx3p5UnlvnA

    相关技术文章

    点击QQ咨询
    开通会员
    返回顶部
    ×
    微信扫码支付
    微信扫码支付
    确定支付下载
    请使用微信描二维码支付
    ×

    提示信息

    ×

    选择支付方式

    • 微信支付
    • 支付宝付款
    确定支付下载