Pythonでasyncioを使った非同期処理のスクリプト作成方法を紹介します。asyncioは、Python3.4 で追加され、 Python3.5でasync/awaitが実装がされました。今回はPython3.5.3でasync/awaitを使用して、次の非同期処理を行うPythonスクリプトを作成します。
- タスクの実行と非同期処理のデバッグ方法
- asyncio.sleep関数による一定時間の待ち
- タスク/イベントループのキャンセル
なお、Pythonでの非同期処理については、「asyncio — 非同期 I/O、イベントループ、コルーチンおよびタスク」に詳細が記述されています。
タスクの実行と非同期処理のデバッグ方法
イベントループを作成して非同期処理を行う関数を二つ用意します。最初は起動時に非同期処理の関数を起動し、起動された関数から残りの非同期処理を行う関数を起動します。最初に起動する関数をタスクを管理するスケジューラ、もう一つを実際の処理を行うタスクを想定し、二つのモジュールに分割してみました。使用する関数を次に示します。なお、ロギングの処理については、「pythonによるloggingモジュールの使い方」を参照してください。今回はファイルからのコンフィグではなくスクリプトから設定(logging.basicConfig)しました。
- asyncio.get_event_loop
- イベントループを取得します
- loop.run_forever
- loop.stop() が呼ばれるまでイベントループを実行します。
- asyncio.ensure_future
- コルーチンオブジェクト の実行をスケジュールします: このときフューチャにラップします。Task オブジェクトを返します。
- loop.set_debug
- イベントループのデバッグモードを設定します。
- loop.slow_callback_duration
- デバックモードで、もしコープバックあるいはタスクのステップの実行が、秒単位のこの間隔を上回るならば、スローコールバック/タスクがログされます。
- warnings.simplefilter
- 単純なエントリを 警告フィルタ仕様 のリストに挿入します。引数の意味は filterwarnings() と同じですが、この関数により挿入されるフィルタはカテゴリと行番号が一致していればすべてのモジュールのすべてのメッセージに合致しますので、正規表現は必要ありません。
【asynctest.py】
import logging.config import asyncio import asynctest import asynctest1 import sys import warnings logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(filename)s:%(lineno)d %(message)s', stream=sys.stderr, ) LOG = logging.getLogger('') async def operation(): LOG.info('start operation') task = asyncio.ensure_future(asynctest1.operationsub()) loop = asyncio.get_event_loop() LOG.info('enabling debugging') # Enable debugging loop.set_debug(True) # Make the threshold for "slow" tasks very very small for # illustration. The default is 0.1, or 100 milliseconds. loop.slow_callback_duration = 0.001 # Report all mistakes managing asynchronous resources. warnings.simplefilter('always', ResourceWarning) LOG.info('entering event loop') asyncio.ensure_future(operation()) loop.run_forever() loop.close() exit()
【asynctest1.py】
import logging.config import asyncio import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(filename)s:%(lineno)d %(message)s', stream=sys.stderr, ) LOG = logging.getLogger('') async def operationsub(): LOG.info('start operationsub')
実行すると次のログが出力されます。LOG.info関数で設定したログと、asyncioの関数から出力されるログ(base_events.py:1420)が表示されます。
$ python3 asynctest.py 2018-01-01 05:01:25,237 selector_events.py:65 Using selector: EpollSelector 2018-01-01 05:01:25,239 asynctest.py:20 enabling debugging 2018-01-01 05:01:25,240 asynctest.py:32 entering event loop 2018-01-01 05:01:25,246 asynctest.py:16 start operation 2018-01-01 05:01:25,250 base_events.py:1420 Executingresult=None created at /home/pi/async/asynctest.py:34> took 0.004 seconds 2018-01-01 05:01:25,251 asynctest1.py:13 start operationsub
Pythonスクリプト「asynctest.py」の23行目をコメントにして、asyncioのデバッグログを無効にします。
loop.set_debug(True)
実行すると次のログが出力されます。asyncioの関数から出力されるログ(base_events.py:1420)が出力されなくなります。
$ python3 asynctest.py 2018-01-01 05:04:38,471 selector_events.py:65 Using selector: EpollSelector 2018-01-01 05:04:38,473 asynctest.py:20 enabling debugging 2018-01-01 05:04:38,473 asynctest.py:32 entering event loop 2018-01-01 05:04:38,474 asynctest.py:16 start operation 2018-01-01 05:04:38,474 asynctest1.py:13 start operationsub
asyncio.sleep関数による一定時間の待ち
asyncio.sleep関数を用いて一定時間タスクを待たせます。なお、asyncio.sleep関数のパラメータは、時間で単位は秒、小数点も使用できます。
asynctest.pyのoperation関数を次のように変更します。
async def operation(): LOG.info('start operation') task = asyncio.ensure_future(asynctest1.operationsub()) LOG.info('operation1') await asyncio.sleep(0.2) LOG.info('operation2') await asyncio.sleep(3) LOG.info('operation3')
asynctest1.pyのoperationsub関数を次のように変更します。
async def operationsub(): LOG.info('start operationsub') LOG.info('operationsub10') await asyncio.sleep(1) LOG.info('operationsub11')
次のコマンドでPythonスクリプトを実行すると、operationsub関数でasyncio.sleepが実行されるとoperation関数のスリープが時間が経過して解除され、operation関数が実行されているログが取得できます。
$ python3 asynctest.py 2018-01-01 05:07:01,764 selector_events.py:65 Using selector: EpollSelector 2018-01-01 05:07:01,766 asynctest.py:25 enabling debugging 2018-01-01 05:07:01,766 asynctest.py:37 entering event loop 2018-01-01 05:07:01,767 asynctest.py:16 start operation 2018-01-01 05:07:01,767 asynctest.py:18 operation1 2018-01-01 05:07:01,768 asynctest1.py:13 start operationsub 2018-01-01 05:07:01,768 asynctest1.py:14 operationsub10 2018-01-01 05:07:01,968 asynctest.py:20 operation2 2018-01-01 05:07:02,770 asynctest1.py:16 operationsub11 2018-01-01 05:07:04,972 asynctest.py:22 operation3
タスク/イベントループのキャンセル
asyncio.get_event_loop関数で取得したイベントループは、stop関数でキャンセルします。asyncio.ensure_future(asynctest1.operationsub())関数で作成したタスクは、cancel関数でキャンセルします。実際のpythonスクリプト「operation(loop)」を次に示します。
async def operation(loop): LOG.info('start operation') task = asyncio.ensure_future(asynctest1.operationsub()) LOG.info('operation1') await asyncio.sleep(0.2) task.cancel() LOG.info('operation2') await asyncio.sleep(3) LOG.info('operation3') loop.stop()
asyncio.get_event_loop関数の戻り値で受け取ったイベントループを、タスク起動時のパラメータとして渡します。
asyncio.ensure_future(operation(loop))
次のコマンドでpythonスクリプトを実行します。operationsub関数のキャンセルにより本来表示されるログ「operationsub20」が表示されていません。これはsleepしているときにキャンセルされたことを示します。また、イベントループがキャンセルされたことにより、loop.run_foreverからpythonスクリプトに制御が移され、pythonスクリプトが終了されました。
$ python3 asynctest.py 2018-01-01 09:22:13,283 asynctest.py:28 start 2018-01-01 09:22:13,284 selector_events.py:65 Using selector: EpollSelector 2018-01-01 09:22:13,285 asynctest.py:31 enabling debugging 2018-01-01 09:22:13,286 asynctest.py:43 entering event loop 2018-01-01 09:22:13,287 asynctest.py:16 start operation 2018-01-01 09:22:13,287 asynctest.py:18 operation1 2018-01-01 09:22:13,288 asynctest1.py:13 start operationsub 2018-01-01 09:22:13,288 asynctest1.py:14 operationsub10 2018-01-01 09:22:13,489 asynctest.py:21 operation2 2018-01-01 09:22:16,493 asynctest.py:23 operation3