Python 3 异步编程

Python 3 异步编程

基础概念:

https://zhuanlan.zhihu.com/p/337978321

协程和多线程都是实现并发执行的有效方法,但它们各有优势和适用场景。是否需要在有了协程的情况下使用多线程,取决于具体的应用需求和环境。

协程的优势:

  1. 轻量级:协程比线程更轻量,创建和切换的开销更小。
  2. 非阻塞I/O:协程非常适合于处理 I/O 密集型任务,如网络请求、文件读写等。
  3. 简化异步编程:协程通过 asyncawait 语法,简化了异步编程的复杂性。
  4. 更好的性能:在 I/O 密集型任务中,协程通常可以提供更好的性能和资源利用率。

多线程的优势:

  1. CPU 密集型任务:多线程可以在多核 CPU 上并行执行,适合 CPU 密集型任务。
  2. 阻塞操作:对于阻塞型操作(如等待外部资源),多线程可以使得一个线程的阻塞不会影响其他线程的执行。
  3. 现有库的支持:许多现有的库和框架已经是线程安全的,直接支持多线程。

协程与多线程结合使用:

在某些情况下,结合使用协程和多线程可以提供最佳的性能和资源利用。例如:

总结:

协程(Coroutine)和事件循环(Event Loop)是异步编程中的两个核心概念,尤其是在 Python 的 asyncio 库中。

协程(Coroutine)

  1. 定义:协程是一种程序组件,它可以在执行过程中挂起(暂停执行)并在稍后某个时刻从挂起的地方继续执行。这与传统的函数不同,后者一旦开始执行,就会运行到结束,中途无法停止。

  2. 特点

    • 异步操作:协程通常用于执行异步操作,比如 I/O 操作(文件读写、网络请求等),这些操作可能需要等待外部事件的完成,而不是占用 CPU 时间进行计算。
    • 非阻塞:在协程等待一个操作完成时,程序可以执行其他任务,而不是停滞在那里。这是通过协程的挂起和恢复实现的。
  3. 在 Python 中的使用

    • 使用 async def 定义协程函数。
    • 使用 await 关键字挂起协程的执行,等待异步操作完成。

事件循环(Event Loop)

  1. 定义:事件循环是一个编程构造,用于调度和管理程序中的各种事件和操作。在异步编程中,它是运行异步程序和处理异步事件的核心。

  2. 功能

    • 管理操作:事件循环负责调度协程和其他异步操作,例如 I/O 事件、定时事件等。
    • 处理回调:当异步操作完成时,事件循环负责触发相应的回调函数。
    • 循环机制:事件循环以循环的方式运行,等待和分发事件和操作。
  3. 在 Python 中的使用

    • asyncio 模块提供了事件循环的实现。
    • 使用 asyncio.run() 启动事件循环,并执行最高级别的入口协程。
    • 事件循环会运行直到所有的任务都完成,或者被显式停止。

协程与事件循环的关系

在 Python 的 asyncio 库中,协程和事件循环共同工作,使得编写高效的异步代码成为可能,特别是在涉及大量 I/O 操作的场景中。

await, run_async(), 和 asyncio.run() 是 Python 异步编程中的重要概念和函数,它们在处理异步操作时有不同的作用和用法。以下是它们之间的主要区别:

  1. await:

    • await 用于等待一个异步函数(协程)的结果。
    • 它只能在定义为 async 的函数内部使用。
    • 当你在一个协程中使用 await 时,程序执行将暂停,直到 await 后的异步操作完成,然后继续执行后续代码。
    • 示例:
      async def async_function():
          result = await some_async_operation()
      
  2. run_async():

    • run_async() 不是 Python 标准库的一部分。Tortoise ORM 提供了一个 run_async 函数,用于在非异步环境中运行异步代码。

    • 这个函数接受一个协程作为参数,并运行这个协程。

    • 示例(使用 Tortoise ORM):

      from tortoise import run_async
      
      async def async_function():
          # 异步操作
          pass
      
      run_async(async_function())
      
  3. asyncio.run():

    • asyncio.run() 是 Python 3.7+ 中引入的一个函数,用于运行最高级别的入口点 "main" 函数,并负责管理异步事件循环。
    • 这个函数将创建一个新的事件循环,并在这个循环里运行传入的协程。
    • 它适用于只运行一次的异步代码。在协程完成后,事件循环将关闭。
    • 在 协程内部(async await ) 不应该使用 asyncio.run()

asyncio.run()函数的参数应该是一个协程对象,通常是通过调用一个异步函数(使用async def定义的函数)来创建的。

例如:

async def main():
   # 这里是你的异步代码
   pass

# 运行main协程
asyncio.run(main())

在这个例子中,main是一个异步函数,调用main()会创建一个协程对象,然后asyncio.run(main())会运行这个协程。

请注意,asyncio.run()会创建一个新的事件循环,运行协程,然后关闭事件循环。如果你的代码已经在一个事件循环中运行,你应该使用await来运行协程,而不是asyncio.run()

通常情况下,await 用于协程内部等待其他异步操作,asyncio.run() 用于程序的入口点,而 run_async()(如果是第三方库提供的)用于特定的场景,比如在同步环境中运行异步代码。

异步编程

Python 的 asyncawait 关键字是用于异步编程的一部分,主要出现在 Python 3.5 及以后的版本。这一机制允许 Python 程序以非阻塞的方式执行 I/O 密集型任务,如网络请求、数据库操作等。这对于提高程序性能和响应性特别有用。

Async

async 关键字用于定义一个“异步函数”。这种函数执行时,不会阻塞调用它的线程。相反,它允许程序在等待操作完成时继续执行其他任务。

async def my_async_function():
	# 异步操作

Await

await 关键字则用于异步函数内部,用来挂起异步操作的执行。await 后面跟的是一个表达式,通常是一个异步函数调用。当程序执行到 await 时,它会将控制权交还给事件循环,并暂停该异步函数的进一步执行,直到 await 的表达式完成。
async修饰的函数(也称为异步函数或协程函数)可以不包含await表达式。然而,如果一个async函数没有await表达式,那么它就是一个同步函数,因为它不会挂起执行。

async def another_async_function():
	await my_async_function()  # 等待 my_async_function 完成
	# my_async_function 完成后的操作

事件循环

异步编程依赖于事件循环(event loop),它是整个机制的核心。事件循环负责调度异步操作,并在操作完成时恢复相应的异步函数。在 Python 中,asyncio 模块提供了管理事件循环的工具。

协程

协程(Coroutine)是一种程序组件,它在高级语言中用于并发编程。与传统的子程序(如函数或方法)不同,协程的特点在于它可以在特定位置暂停执行,并在之后从同一点恢复执行。这使得协程非常适合处理异步操作和构建并发应用程序。

协程的关键特性包括:

非阻塞性:协程可以在等待一个操作完成时让出控制权,允许其他协程运行,从而实现非阻塞执行。

协作式多任务:与抢占式多任务(如线程)不同,协程需要明确地让出控制权(通常通过暂停点,如 yield 或 await)。这意味着协程控制其自身的执行,从而简化了同步和状态管理。

高效的异步操作:在处理 I/O 密集型任务(如网络通信或文件操作)时,协程允许程序在等待操作完成时执行其他任务,提高了程序的效率和响应性。

简化的错误处理和资源管理:协程能够使用类似于同步代码的结构来处理错误和管理资源,这简化了异步代码的编写和理解。

使用示例

import asyncio

async def hello():
	print('Hello')
	await asyncio.sleep(1)
	print('World')

async def main():
	await hello()

asyncio.run(main())

在这个示例中,hello 是一个异步函数。它首先打印 "Hello",然后 await asyncio.sleep(1) 挂起函数执行,等待 1 秒。在这段时间内,事件循环可以处理其他任务。1 秒后,事件循环再次唤醒 hello 函数,接着打印 "World"。

注意事项

异步机制结合多线程

在Python中,如果一个线程被阻塞(例如,它正在等待I/O操作完成或者正在执行一个长时间运行的计算),这不会影响其他线程的执行。这是因为Python的线程是操作系统级别的线程,每个线程都有自己的执行上下文,并且由操作系统调度。

当一个线程被阻塞时,操作系统会将CPU的控制权转移到其他未被阻塞的线程,这样其他线程可以继续执行。这就是所谓的上下文切换。

然而,需要注意的是,由于Python的全局解释器锁(GIL)的存在,Python在任何时候只允许一个线程执行Python字节码。这意味着,尽管Python的线程是操作系统级别的线程,但是它们在执行Python代码时并不是真正的并行执行。这可能会限制Python多线程的性能,特别是在CPU密集型任务中。

如果你的代码中有一个线程被阻塞,并且你希望其他线程能够并行执行,你可能需要考虑使用进程或者异步编程。进程是真正的并行执行,每个进程有自己的Python解释器和内存空间。异步编程使用单线程和事件循环来实现并发,它适合I/O密集型任务。

可以在线程中使用async和await,但是需要注意的是,asyncio库是基于单线程的,它使用协程(coroutine)来实现并发。这意味着,尽管你可以在多线程环境中使用async和await,但是你不能在一个线程中等待另一个线程的协程。
如果你想在一个线程中运行asyncio事件循环,你需要使用asyncio.run或者loop.run_until_complete来运行你的协程。然后,在另一个线程中,你可以使用asyncio.run_coroutine_threadsafe来安全地运行协程。

import asyncio
import threading
async def my_coroutine():
	await asyncio.sleep(1)
	print("Coroutine finished")

def run_event_loop():
	asyncio.run(my_coroutine())

### 在一个新的线程中运行事件循环
thread = threading.Thread(target=run_event_loop)
thread.start()

在这个例子中,我们在一个新的线程中运行了asyncio事件循环,并在这个事件循环中运行了一个协程。

在Python中,使用asyncio库进行异步编程时,事件循环(Event Loop)是核心概念之一。事件循环负责管理和分发程序中的各种事件,以及在异步程序中调度协程的执行。当你使用asyncio.get_event_loop()loop.run_until_complete(main())时,你实际上是在设置和启动一个事件循环来运行你的异步代码。让我们一步步详细解释这个过程。

asyncio.get_event_loop()

loop.run_until_complete(main())

示例解释

考虑以下代码:

import asyncio

async def main():
    # 异步代码或协程调用
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

在这段代码中:

注意

从Python 3.7开始,可以直接使用asyncio.run(coroutine())作为启动新异步程序的首选方法,因为它更简洁,自动创建和关闭事件循环,减少了样板代码。例如:

import asyncio

async def main():
    # 异步代码或协程调用
    pass

asyncio.run(main())

asyncio.run(main())是一个高级API,用于运行最高级别的入口点“main”协程,并自动管理事件循环的生命周期。在大多数情况下,推荐使用asyncio.run(),除非你需要更细粒度的控制事件循环,或者在老版本的Python中(3.7以下),你必须手动管理事件循环。

新线程中运行 asyncio.run() 的坑

asyncio.run() 是一个阻塞操作,它会运行传入的协程,直到该协程完成。如果你的 start_ws_own_listener(ws_own_url) 协程永远不会结束(例如,它包含一个无限循环),那么 asyncio.run() 将永远不会返回,后面的代码将不会执行。

如果你想要在 start_ws_own_listener(ws_own_url) 运行的同时执行其他代码,你可以使用 asyncio.create_task() 来创建一个新的任务,这个任务会在当前的事件循环中运行。

在新线程中使用 asyncio.run() ,会报错:There is no current event loop in thread 'Thread-1 (start_ws_own_listener)'.

asyncio.run()函数是一个高级函数,用于在主线程中执行协程。它会自动创建一个新的事件循环,然后运行你提供的协程,最后关闭事件循环。但是,asyncio.run()假设它是在主线程中被调用的,如果在其他线程中调用asyncio.run(),它会尝试获取当前线程的事件循环,如果当前线程没有事件循环,就会抛出错误。

Python的asyncio库中的事件循环方法,它们用于在事件循环中运行协程。

loop.run_until_complete(coro): 这个方法会运行传入的协程,然后等待它完成。如果传入的是一个Future对象,它会运行事件循环,直到Future对象完成。如果传入的是一个协程,它会自动将协程包装成一个Task(也就是一个Future对象),然后运行事件循环,直到Task完成。

loop.create_task(coro): 这个方法会将传入的协程包装成一个Task(也就是一个Future对象),然后将这个Task添加到事件循环中。这个Task会在事件循环运行的过程中被调度和执行。这个方法不会运行事件循环,你需要自己手动运行事件循环(例如通过调用loop.run_forever())。

loop.run_forever(): 这个方法会启动事件循环,让它一直运行下去,直到loop.stop()被调用。在事件循环运行的过程中,它会自动调度和执行事件循环中的所有Task。- 协程任务将执行其逻辑,然后完成,不会自动重复执行。回调函数: 你可以通过loop.call_soon()loop.call_later()loop.call_at()等方法提交回调函数给事件循环。这些回调函数在被调用后也不会自动重复执行。

loop.stop(): 这个方法会停止事件循环。当事件循环正在运行时(例如在loop.run_forever()中),你可以调用这个方法来停止事件循环。这个方法不会立即停止事件循环,而是会在当前的迭代结束后停止事件循环。这意味着在调用loop.stop()后,事件循环可能还会运行一段时间,直到当前的迭代结束。

loop.close(): 这个方法会关闭事件循环。一旦事件循环被关闭,它就不能再被用来运行协程了。如果你尝试在一个已经被关闭的事件循环上调用loop.run_until_complete()或者loop.run_forever(),Python会抛出RuntimeError: Event loop is closed错误。在关闭事件循环之前,你应该确保所有的任务都已经完成,否则这些未完成的任务会被取消。

loop.stop()只是暂时停止事件循环,你可以在之后再次启动它;而loop.close()会永久关闭事件循环,一旦事件循环被关闭,你就不能再使用它了。

如果你需要在新的线程中运行协程,你需要手动创建一个事件循环,然后使用这个事件循环来运行你的协程。以下是一个例子:

'''Python

import asyncio

import threading

def start_ws_own_listener():

loop = asyncio.new_event_loop() # 创建一个新的事件循环

asyncio.set_event_loop(loop) # 设置这个事件循环为当前线程的事件循环

# 定义协程
async def coro1(): pass # 需要并行
async def coro2(): pass # 需要并行

async def coro3(): pass # 需要串行
async def coro4(): pass # 需要串行

并行运行 coro1() 和 coro2()

loop.run_until_complete(asyncio.gather(coro1(), coro2()))

# 串行运行 coro3() coro4()等待它们完成
loop.run_until_complete(coro3())
loop.run_until_complete(coro4())

# 串行运行 coro3() coro4()等待它们完成
loop.create_task(coro3()) # 创建任务,不会立即执行
loop.create_task(coro4()) # 创建任务,不会立即执行
loop.run_forever() # 不接受任何参数。会阻塞当前线程,但是运行的过程中,事件循环可以并发地运行多个任务。任务中需要调用 loop.stop() 才能跳出 loop.run_forever()
loop.close() # 关闭事件循环

threading.Thread(target=start_ws_own_listener).start() # 在新的线程中运行 start_ws_own_listener 函数

'''

loop.stop()通常在你想要停止事件循环的地方调用。具体放在哪个结构中,取决于你的程序的逻辑。

例如,如果你有一个无限循环的任务,你可能会在某个条件满足时调用loop.stop()来停止事件循环:

async def infinite_task():

while True:

# do something

if some_condition:

loop.stop()

在这个例子中,当some_condition满足时,loop.stop()会被调用,事件循环会在当前的迭代结束后停止。

另一个常见的用法是在接收到某个信号(例如SIGINT或SIGTERM)时停止事件循环:

)

在这个例子中,当程序接收到SIGINT或SIGTERM信号时,stop_on_signal函数会被调用,这个函数会调用loop.stop()来停止事件循环。

请注意,loop.stop()只是停止事件循环,它不会关闭事件循环。在调用loop.stop()后,你还需要调用loop.close()来关闭事件循环。

不能在loop.run_forever()运行的同时再调用loop.run_until_complete()。这是因为loop.run_until_complete()和loop.run_forever()都是用来启动事件循环的,它们会阻塞当前线程,直到相应的条件满足(对于run_until_complete是传入的协程完成,对于run_forever是loop.stop()被调用)。

如果你在loop.run_forever()运行的任务中调用loop.run_until_complete(),你会得到一个错误,提示事件循环已经在运行。

如果你需要在loop.run_forever()运行的过程中执行一个协程,你可以使用asyncio.ensure_future()或loop.create_task()来创建一个新的任务,这个任务会在事件循环运行的过程中被调度和执行。

事件循环的嵌套执行

首先定义了两个协程coro4()和coro5()。在coro4()中,我们使用loop.create_task()来创建一个新的任务,这个任务会在事件循环运行的过程中被调度和执行。

import asyncio

async def coro5():

await asyncio.sleep(1)

print("coro5 finished")

async def coro4(loop):

loop.create_task(coro5())

print("coro4 finished")

loop = asyncio.get_event_loop()

loop.run_until_complete(coro4(loop))

在这个例子中,当我们调用loop.run_until_complete(coro4(loop))时,coro4()会被执行。在coro4()中,我们创建了一个新的任务coro5(),这个任务会在事件循环运行的过程中被调度和执行。

然而,你会注意到,即使coro5()被创建为一个新的任务,但是当coro4()完成时,事件循环也会立即停止,coro5()可能还没有机会被执行。这是因为loop.run_until_complete()会在传入的协程完成后立即停止事件循环。

如果你想要在coro4()完成后继续运行事件循环,直到所有的任务都完成,你可以使用loop.run_forever(),并在所有的任务都完成后调用loop.stop()来停止事件循环。例如:

import asyncio

async def coro5():

await asyncio.sleep(1)

print("coro5 finished")

async def coro4(loop):

loop.create_task(coro5())

print("coro4 finished")

loop = asyncio.get_event_loop()

loop.run_until_complete(coro4(loop))

loop.run_forever()

在这个例子中,即使coro4()完成,事件循环也会继续运行,直到coro5()完成。

websocket 异步

websocket库中,on_openon_messageon_erroron_close这些回调函数通常是同步函数,而不是异步函数。这是因为websocket库是基于同步的Python socket库构建的,而不是基于异步的asyncio库。

这意味着,即使你在异步函数中创建和运行WebSocketApp,这些回调函数也应该是同步的。如果你需要在这些回调函数中执行异步操作,你需要在这些函数中创建一个新的事件循环,然后在这个事件循环中运行你的异步代码。但是请注意,这可能会导致代码变得复杂,并可能导致性能问题,因为创建和管理多个事件循环通常会有一些开销。

如果你需要在WebSocket通信中使用异步编程,你可能需要考虑使用一个支持异步的WebSocket库,如websocketsaiohttp。这些库是基于asyncio构建的,可以更好地支持异步操作。

websocket有几个包:

1、功能不全的 websocket

导入的时候:

import websocket

2、websocket-client 不带服务端,不支持异步

pip3 install websocket-client

导入的时候也是:

import websocket

3、websockets 带服务端,支持异步

pip3 install websockets

导入的时候:

import websockets

logger

logging 模块的 info、error 和 warning 方法都是同步的,不是异步的,所以你不能在它们前面使用 await 关键字。