「Pythonで非同期処理(asyncio)」に続き、Pythonでasyncioを使った非同期処理のスクリプト作成方法を紹介します。asyncioは、Python3.4 で追加され、 Python3.5でasync/awaitが実装されました。今回はPython3.5.3でasync/awaitを使用して、次の非同期処理を行うPythonスクリプトを作成します。
- Futureによるコールバック関数の作成
- Futureのキャンセル
- イベントオブジェクトの実装
- キューオブジェクトの実装
なお、Pythonでの非同期処理については、「asyncio — 非同期 I/O、イベントループ、コルーチンおよびタスク」に詳細が記述されています。
Futureによるコールバック関数の作成
Futureによりタスク完了時にコールバックを発生できます。
- future.add_done_callback
- フューチャが終了したときに実行するコールバックを追加します。
- future.set_result
- フューチャの終了をマークしその結果を設定します。
- future.result
- このフューチャが表す結果を返します。
次にPythonスクリプトを示します。
【asynctest.py】
import logging.config import asyncio import asynctest import asynctest1 import sys import warnings logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(filename)s:%(lineno)d %(message)s', stream=sys.stderr, ) LOG = logging.getLogger() def stop_operation(future): LOG.info(future.result()) async def operation(loop): future = asyncio.Future() future.add_done_callback(stop_operation) LOG.info('start operation') task = asyncio.ensure_future(asynctest1.operationsub(future)) LOG.info('operation1') await asyncio.sleep(0.2) LOG.info('operation2') await asyncio.sleep(3) LOG.info('operation3') loop.stop() LOG.info('start') loop = asyncio.get_event_loop() LOG.info('enabling debugging') # Enable debugging # loop.set_debug(True) # Make the threshold for "slow" tasks very very small for # illustration. The default is 0.1, or 100 milliseconds. loop.slow_callback_duration = 0.001 # Report all mistakes managing asynchronous resources. warnings.simplefilter('always', ResourceWarning) LOG.info('entering event loop') asyncio.ensure_future(operation(loop)) loop.run_forever() loop.close() exit()
【asynctest1.py】
import logging.config import asyncio import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(filename)s:%(lineno)d %(message)s', stream=sys.stderr, ) LOG = logging.getLogger() async def operationsub(future): LOG.info('start operationsub') LOG.info('operationsub10') await asyncio.sleep(1) LOG.info('operationsub11') future.set_result('operation is done!')
実行すると次のログが出力されます。タスク終了時にfuture.set_result関数によりで設定された「operation is done!」が、future.add_done_callback関数により登録されたコールバック関数「stop_operation」で表示されます。
$ python3 asynctest.py 2018-01-02 08:05:12,957 asynctest.py:35 start 2018-01-02 08:05:12,958 selector_events.py:65 Using selector: EpollSelector 2018-01-02 08:05:12,960 asynctest.py:38 enabling debugging 2018-01-02 08:05:12,961 asynctest.py:50 entering event loop 2018-01-02 08:05:12,961 asynctest.py:24 start operation 2018-01-02 08:05:12,962 asynctest.py:26 operation1 2018-01-02 08:05:12,963 asynctest1.py:13 start operationsub 2018-01-02 08:05:12,963 asynctest1.py:14 operationsub10 2018-01-02 08:05:13,163 asynctest.py:29 operation2 2018-01-02 08:05:13,965 asynctest1.py:16 operationsub11 2018-01-02 08:05:13,966 asynctest.py:16 operation is done! 2018-01-02 08:05:16,167 asynctest.py:31 operation3
Futureのキャンセル
作成したFutureを途中でキャンセルします。
コールバック関数「stop_operation」を次にように変更します。Futureがキャンセルされた場合もコールバック関数は呼ばれるので、future.cancelled関数で、すでにキャンセルされているかをチェックします。
def stop_operation(future): LOG.info("stop_operation") if not future.cancelled(): LOG.info(future.result())
asynctest.pyのoperation関数を次にように変更します。future.cancel関数でFutureをキャンセルします。
async def operation(loop): future = asyncio.Future() future.add_done_callback(stop_operation) LOG.info('start operation') task = asyncio.ensure_future(asynctest1.operationsub(future)) LOG.info('operation1') await asyncio.sleep(0.2) future.cancel() LOG.info('operation2') await asyncio.sleep(3) LOG.info('operation3') loop.stop()
asynctest1.pyのoperationsub関数を次にように変更します。future.cancelled関数ですでにキャンセルされていないかを確認します。
async def operationsub(future): LOG.info('start operationsub') LOG.info('operationsub10') await asyncio.sleep(1) LOG.info('operationsub11') if not future.cancelled(): future.set_result('operation is done!')
実行すると次のログが出力されます。operationsub関数をキャンセルしたために、コールバック関数「stop_operation」で「stop_operation」が表示されていますが、すでにキャンセルされているために、「’operation is done」が表示されません。
$ python3 asynctest.py 2018-01-02 08:16:08,612 asynctest.py:34 start 2018-01-02 08:16:08,613 selector_events.py:65 Using selector: EpollSelector 2018-01-02 08:16:08,614 asynctest.py:37 enabling debugging 2018-01-02 08:16:08,615 asynctest.py:49 entering event loop 2018-01-02 08:16:08,615 asynctest.py:23 start operation 2018-01-02 08:16:08,616 asynctest.py:25 operation1 2018-01-02 08:16:08,617 asynctest1.py:13 start operationsub 2018-01-02 08:16:08,617 asynctest1.py:14 operationsub10 2018-01-02 08:16:08,817 asynctest.py:28 operation2 2018-01-02 08:16:08,818 asynctest.py:16 stop_operation 2018-01-02 08:16:09,619 asynctest1.py:16 operationsub11 2018-01-02 08:16:11,821 asynctest.py:30 operation3
asynctest1.pyのoperationsub関数では、future.cancelled関数を用いてFutureがキャンセルされたかを判断しないと、すでにキャンセルしているFutureに対して処理結果を設定しようとするので次の例外が発生します。
2018-01-02 08:09:53,100 asynctest.py:28 operation2 2018-01-02 08:09:53,101 asynctest.py:16 stop_operation 2018-01-02 08:09:53,903 asynctest1.py:16 operationsub11 2018-01-02 08:09:56,105 asynctest.py:30 operation3 2018-01-02 08:09:56,106 base_events.py:1258 Task exception was never retrieved future:exception=InvalidStateError(‘CANCELLED: ‘,)> Traceback (most recent call last): File “/usr/lib/python3.5/asyncio/tasks.py”, line 239, in _step result = coro.send(None) File “/home/pi/async/asynctest1.py”, line 17, in operationsub future.set_result(‘operation is done!’) File “/usr/lib/python3.5/asyncio/futures.py”, line 348, in set_result raise InvalidStateError(‘{}: {!r}’.format(self._state, self)) asyncio.futures.InvalidStateError: CANCELLED:
イベントオブジェクトの実装
非同期処理でのイベントオブジェクトは、asyncio.Event関数を使用します。
asynctest.pyのoperation関数を次にように変更します。event.clear関数によりイベントをクリアします。ただし、イベントを作成すると初期値はイベントのクリアになります。event.set関数でイベントを設定します。
async def operation(loop): event = asyncio.Event() event.clear() LOG.info('start operation') task = asyncio.ensure_future(asynctest1.operationsub(event)) LOG.info('operation1') await asyncio.sleep(0.2) event.set() LOG.info('operation2') await asyncio.sleep(3) LOG.info('operation3') loop.stop()
asynctest1.pyのoperationsub関数を次にように変更します。event.wait関数によりイベントの設定を待ちます。
async def operationsub(event): LOG.info('start operationsub') LOG.info('operationsub10') await event.wait() LOG.info('operationsub11')
実行すると次のログが出力されます。asynctest1のoperationsub関数を起動した後にイベントを設定しているので、タイムスタンプを見ると、asynctest1の「operationsub11」の表示がすぐにログされています。。
$ python3 asynctest.py 2018-01-03 03:17:14,957 asynctest.py:34 start 2018-01-03 03:17:14,958 selector_events.py:65 Using selector: EpollSelector 2018-01-03 03:17:14,960 asynctest.py:37 enabling debugging 2018-01-03 03:17:14,961 asynctest.py:49 entering event loop 2018-01-03 03:17:14,961 asynctest.py:23 start operation 2018-01-03 03:17:14,962 asynctest.py:25 operation1 2018-01-03 03:17:14,963 asynctest1.py:13 start operationsub 2018-01-03 03:17:14,963 asynctest1.py:14 operationsub10 2018-01-03 03:17:15,163 asynctest.py:28 operation2 2018-01-03 03:17:15,164 asynctest1.py:16 operationsub11 2018-01-03 03:17:18,168 asynctest.py:30 operation3
キューオブジェクトの実装
非同期処理でのキューオブジェクトは、asyncio.Queue関数を使用します。
asynctest.pyのoperation関数を次にように変更します。queue.put_nowait関数によりキューデータを指定されたキューに送信します。送信するキューデータは、今回はリストデータにしました。どのようなデータ形式でもキューデータとして送信できます。
async def operation(loop): queue = asyncio.Queue() LOG.info('start operation') task = asyncio.ensure_future(asynctest1.operationsub(queue)) LOG.info('operation1') await asyncio.sleep(0.2) queue.put_nowait(["start", 1,2,3]) LOG.info('operation2') await asyncio.sleep(0.2) queue.put_nowait(["next", 4,5,6]) LOG.info('operation3') await asyncio.sleep(0.2) queue.put_nowait(["stop", 7,8,9]) await asyncio.sleep(3) LOG.info('operation4') loop.stop()
asynctest1.pyのoperationsub関数を次にように変更します。queue.empty関数によりキューの受信を待ちます。キューを受信すると、受信したキューデータのリストに「stop」が含まれているかを判断し、「stop」が含まれていればキューの受信を完了します。
async def operationsub(queue): LOG.info('start operationsub') LOG.info('operationsub10') while True: if not queue.empty(): queuedata = await queue.get() LOG.info('queuedata:{}'.format(queuedata)) if queuedata[0]=="stop": break await asyncio.sleep(0.2) LOG.info('operationsub11')
実行すると次のログが出力されます。queue.put_nowait関数で送信したリストデータ[‘start’, 1, 2, 3]、[‘next’, 4, 5, 6]、[‘stop’, 7, 8, 9]の表示が次々に表示されており、最後のデータを調べて、while 文を抜けて「operationsub11」が表示されています。。
$ python3 asynctest.py 2018-01-03 04:26:01,481 asynctest.py:38 start 2018-01-03 04:26:01,482 selector_events.py:65 Using selector: EpollSelector 2018-01-03 04:26:01,484 asynctest.py:41 enabling debugging 2018-01-03 04:26:01,485 asynctest.py:53 entering event loop 2018-01-03 04:26:01,486 asynctest.py:22 start operation 2018-01-03 04:26:01,486 asynctest.py:24 operation1 2018-01-03 04:26:01,487 asynctest1.py:13 start operationsub 2018-01-03 04:26:01,488 asynctest1.py:14 operationsub10 2018-01-03 04:26:01,688 asynctest.py:27 operation2 2018-01-03 04:26:01,689 asynctest1.py:18 queuedata:[‘start’, 1, 2, 3] 2018-01-03 04:26:01,890 asynctest.py:30 operation3 2018-01-03 04:26:01,891 asynctest1.py:18 queuedata:[‘next’, 4, 5, 6] 2018-01-03 04:26:02,092 asynctest1.py:18 queuedata:[‘stop’, 7, 8, 9] 2018-01-03 04:26:02,093 asynctest1.py:23 operationsub11 2018-01-03 04:26:05,095 asynctest.py:34 operation4