Pythonで非同期処理(asyncio)

Pythonでasyncioを使った非同期処理のスクリプト作成方法を紹介します。asyncioは、Python3.4 で追加され、 Python3.5でasync/awaitが実装がされました。今回はPython3.5.3でasync/awaitを使用して、次の非同期処理を行うPythonスクリプトを作成します。

  1. タスクの実行と非同期処理のデバッグ方法
  2. asyncio.sleep関数による一定時間の待ち
  3. タスク/イベントループのキャンセル

なお、Pythonでの非同期処理については、「asyncio — 非同期 I/O、イベントループ、コルーチンおよびタスク」に詳細が記述されています。

タスクの実行と非同期処理のデバッグ方法

イベントループを作成して非同期処理を行う関数を二つ用意します。最初は起動時に非同期処理の関数を起動し、起動された関数から残りの非同期処理を行う関数を起動します。最初に起動する関数をタスクを管理するスケジューラ、もう一つを実際の処理を行うタスクを想定し、二つのモジュールに分割してみました。使用する関数を次に示します。なお、ロギングの処理については、「pythonによるloggingモジュールの使い方」を参照してください。今回はファイルからのコンフィグではなくスクリプトから設定(logging.basicConfig)しました。

asyncio.get_event_loop
イベントループを取得します
loop.run_forever
loop.stop() が呼ばれるまでイベントループを実行します。
asyncio.ensure_future
コルーチンオブジェクト の実行をスケジュールします: このときフューチャにラップします。Task オブジェクトを返します。
loop.set_debug
イベントループのデバッグモードを設定します。
loop.slow_callback_duration
デバックモードで、もしコープバックあるいはタスクのステップの実行が、秒単位のこの間隔を上回るならば、スローコールバック/タスクがログされます。
warnings.simplefilter
単純なエントリを 警告フィルタ仕様 のリストに挿入します。引数の意味は filterwarnings() と同じですが、この関数により挿入されるフィルタはカテゴリと行番号が一致していればすべてのモジュールのすべてのメッセージに合致しますので、正規表現は必要ありません。

【asynctest.py】

import logging.config 
import asyncio
import asynctest 
import asynctest1 
import sys
import warnings

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(filename)s:%(lineno)d %(message)s',
    stream=sys.stderr,
)
LOG = logging.getLogger('')

async def operation():
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub())
    
loop = asyncio.get_event_loop()
LOG.info('enabling debugging')

# Enable debugging
loop.set_debug(True)

# Make the threshold for "slow" tasks very very small for
# illustration. The default is 0.1, or 100 milliseconds.
loop.slow_callback_duration = 0.001

# Report all mistakes managing asynchronous resources.
warnings.simplefilter('always', ResourceWarning)

LOG.info('entering event loop')

asyncio.ensure_future(operation())
loop.run_forever()
loop.close()
exit()

【asynctest1.py】

import logging.config 
import asyncio
import sys

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(filename)s:%(lineno)d %(message)s',
    stream=sys.stderr,
)
LOG = logging.getLogger('')

async def operationsub():
    LOG.info('start operationsub')

実行すると次のログが出力されます。LOG.info関数で設定したログと、asyncioの関数から出力されるログ(base_events.py:1420)が表示されます。

$ python3 asynctest.py
2018-01-01 05:01:25,237 selector_events.py:65 Using selector: EpollSelector
2018-01-01 05:01:25,239 asynctest.py:20 enabling debugging
2018-01-01 05:01:25,240 asynctest.py:32 entering event loop
2018-01-01 05:01:25,246 asynctest.py:16 start operation
2018-01-01 05:01:25,250 base_events.py:1420 Executing result=None created at /home/pi/async/asynctest.py:34> took 0.004 seconds
2018-01-01 05:01:25,251 asynctest1.py:13 start operationsub

Pythonスクリプト「asynctest.py」の23行目をコメントにして、asyncioのデバッグログを無効にします。

loop.set_debug(True)

実行すると次のログが出力されます。asyncioの関数から出力されるログ(base_events.py:1420)が出力されなくなります。

$ python3 asynctest.py
2018-01-01 05:04:38,471 selector_events.py:65 Using selector: EpollSelector
2018-01-01 05:04:38,473 asynctest.py:20 enabling debugging
2018-01-01 05:04:38,473 asynctest.py:32 entering event loop
2018-01-01 05:04:38,474 asynctest.py:16 start operation
2018-01-01 05:04:38,474 asynctest1.py:13 start operationsub

asyncio.sleep関数による一定時間の待ち

asyncio.sleep関数を用いて一定時間タスクを待たせます。なお、asyncio.sleep関数のパラメータは、時間で単位は秒、小数点も使用できます。
asynctest.pyのoperation関数を次のように変更します。

async def operation():
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub())
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    LOG.info('operation2')
    await asyncio.sleep(3)
    LOG.info('operation3')

asynctest1.pyのoperationsub関数を次のように変更します。

async def operationsub():
    LOG.info('start operationsub')
    LOG.info('operationsub10')
    await asyncio.sleep(1)
    LOG.info('operationsub11')

次のコマンドでPythonスクリプトを実行すると、operationsub関数でasyncio.sleepが実行されるとoperation関数のスリープが時間が経過して解除され、operation関数が実行されているログが取得できます。

$ python3 asynctest.py
2018-01-01 05:07:01,764 selector_events.py:65 Using selector: EpollSelector
2018-01-01 05:07:01,766 asynctest.py:25 enabling debugging
2018-01-01 05:07:01,766 asynctest.py:37 entering event loop
2018-01-01 05:07:01,767 asynctest.py:16 start operation
2018-01-01 05:07:01,767 asynctest.py:18 operation1
2018-01-01 05:07:01,768 asynctest1.py:13 start operationsub
2018-01-01 05:07:01,768 asynctest1.py:14 operationsub10
2018-01-01 05:07:01,968 asynctest.py:20 operation2
2018-01-01 05:07:02,770 asynctest1.py:16 operationsub11
2018-01-01 05:07:04,972 asynctest.py:22 operation3

タスク/イベントループのキャンセル

asyncio.get_event_loop関数で取得したイベントループは、stop関数でキャンセルします。asyncio.ensure_future(asynctest1.operationsub())関数で作成したタスクは、cancel関数でキャンセルします。実際のpythonスクリプト「operation(loop)」を次に示します。

async def operation(loop):
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub())
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    task.cancel()
    LOG.info('operation2')
    await asyncio.sleep(3)
    LOG.info('operation3')
    loop.stop()

asyncio.get_event_loop関数の戻り値で受け取ったイベントループを、タスク起動時のパラメータとして渡します。

asyncio.ensure_future(operation(loop))

次のコマンドでpythonスクリプトを実行します。operationsub関数のキャンセルにより本来表示されるログ「operationsub20」が表示されていません。これはsleepしているときにキャンセルされたことを示します。また、イベントループがキャンセルされたことにより、loop.run_foreverからpythonスクリプトに制御が移され、pythonスクリプトが終了されました。

$ python3 asynctest.py
2018-01-01 09:22:13,283 asynctest.py:28 start
2018-01-01 09:22:13,284 selector_events.py:65 Using selector: EpollSelector
2018-01-01 09:22:13,285 asynctest.py:31 enabling debugging
2018-01-01 09:22:13,286 asynctest.py:43 entering event loop
2018-01-01 09:22:13,287 asynctest.py:16 start operation
2018-01-01 09:22:13,287 asynctest.py:18 operation1
2018-01-01 09:22:13,288 asynctest1.py:13 start operationsub
2018-01-01 09:22:13,288 asynctest1.py:14 operationsub10
2018-01-01 09:22:13,489 asynctest.py:21 operation2
2018-01-01 09:22:16,493 asynctest.py:23 operation3