Pythonで非同期処理(asyncio)」に続き、Pythonでasyncioを使った非同期処理のスクリプト作成方法を紹介します。asyncioは、Python3.4 で追加され、 Python3.5でasync/awaitが実装されました。今回はPython3.5.3でasync/awaitを使用して、次の非同期処理を行うPythonスクリプトを作成します。

  1. Futureによるコールバック関数の作成
  2. Futureのキャンセル
  3. イベントオブジェクトの実装
  4. キューオブジェクトの実装

なお、Pythonでの非同期処理については、「asyncio — 非同期 I/O、イベントループ、コルーチンおよびタスク」に詳細が記述されています。

Futureによるコールバック関数の作成

Futureによりタスク完了時にコールバックを発生できます。

future.add_done_callback
フューチャが終了したときに実行するコールバックを追加します。
future.set_result
フューチャの終了をマークしその結果を設定します。
future.result
このフューチャが表す結果を返します。


次にPythonスクリプトを示します。

【asynctest.py】

import logging.config 
import asyncio
import asynctest 
import asynctest1 
import sys
import warnings

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(filename)s:%(lineno)d %(message)s',
    stream=sys.stderr,
)
LOG = logging.getLogger()

def stop_operation(future):
    LOG.info(future.result())
async def operation(loop):
    future = asyncio.Future()
    future.add_done_callback(stop_operation)
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub(future))
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    LOG.info('operation2')
    await asyncio.sleep(3)
    LOG.info('operation3')
    loop.stop()
    
    
LOG.info('start')
    
loop = asyncio.get_event_loop()
LOG.info('enabling debugging')

# Enable debugging
# loop.set_debug(True)

# Make the threshold for "slow" tasks very very small for
# illustration. The default is 0.1, or 100 milliseconds.
loop.slow_callback_duration = 0.001

# Report all mistakes managing asynchronous resources.
warnings.simplefilter('always', ResourceWarning)

LOG.info('entering event loop')

asyncio.ensure_future(operation(loop))
loop.run_forever()
loop.close()
exit()

【asynctest1.py】

import logging.config 
import asyncio
import sys

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(filename)s:%(lineno)d %(message)s',
    stream=sys.stderr,
)
LOG = logging.getLogger()

async def operationsub(future):
    LOG.info('start operationsub')
    LOG.info('operationsub10')
    await asyncio.sleep(1)
    LOG.info('operationsub11')
    future.set_result('operation is done!')

実行すると次のログが出力されます。タスク終了時にfuture.set_result関数によりで設定された「operation is done!」が、future.add_done_callback関数により登録されたコールバック関数「stop_operation」で表示されます。

$ python3 asynctest.py
2018-01-02 08:05:12,957 asynctest.py:35 start
2018-01-02 08:05:12,958 selector_events.py:65 Using selector: EpollSelector
2018-01-02 08:05:12,960 asynctest.py:38 enabling debugging
2018-01-02 08:05:12,961 asynctest.py:50 entering event loop
2018-01-02 08:05:12,961 asynctest.py:24 start operation
2018-01-02 08:05:12,962 asynctest.py:26 operation1
2018-01-02 08:05:12,963 asynctest1.py:13 start operationsub
2018-01-02 08:05:12,963 asynctest1.py:14 operationsub10
2018-01-02 08:05:13,163 asynctest.py:29 operation2
2018-01-02 08:05:13,965 asynctest1.py:16 operationsub11
2018-01-02 08:05:13,966 asynctest.py:16 operation is done!
2018-01-02 08:05:16,167 asynctest.py:31 operation3

Futureのキャンセル

作成したFutureを途中でキャンセルします。

コールバック関数「stop_operation」を次にように変更します。Futureがキャンセルされた場合もコールバック関数は呼ばれるので、future.cancelled関数で、すでにキャンセルされているかをチェックします。

def stop_operation(future):
    LOG.info("stop_operation")
    if not future.cancelled():
        LOG.info(future.result())

asynctest.pyのoperation関数を次にように変更します。future.cancel関数でFutureをキャンセルします。

async def operation(loop):
    future = asyncio.Future()
    future.add_done_callback(stop_operation)
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub(future))
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    future.cancel()
    LOG.info('operation2')
    await asyncio.sleep(3)
    LOG.info('operation3')
    loop.stop()

asynctest1.pyのoperationsub関数を次にように変更します。future.cancelled関数ですでにキャンセルされていないかを確認します。

async def operationsub(future):
    LOG.info('start operationsub')
    LOG.info('operationsub10')
    await asyncio.sleep(1)
    LOG.info('operationsub11')
    if not future.cancelled():
        future.set_result('operation is done!')

実行すると次のログが出力されます。operationsub関数をキャンセルしたために、コールバック関数「stop_operation」で「stop_operation」が表示されていますが、すでにキャンセルされているために、「’operation is done」が表示されません。

$ python3 asynctest.py
2018-01-02 08:16:08,612 asynctest.py:34 start
2018-01-02 08:16:08,613 selector_events.py:65 Using selector: EpollSelector
2018-01-02 08:16:08,614 asynctest.py:37 enabling debugging
2018-01-02 08:16:08,615 asynctest.py:49 entering event loop
2018-01-02 08:16:08,615 asynctest.py:23 start operation
2018-01-02 08:16:08,616 asynctest.py:25 operation1
2018-01-02 08:16:08,617 asynctest1.py:13 start operationsub
2018-01-02 08:16:08,617 asynctest1.py:14 operationsub10
2018-01-02 08:16:08,817 asynctest.py:28 operation2
2018-01-02 08:16:08,818 asynctest.py:16 stop_operation
2018-01-02 08:16:09,619 asynctest1.py:16 operationsub11
2018-01-02 08:16:11,821 asynctest.py:30 operation3

asynctest1.pyのoperationsub関数では、future.cancelled関数を用いてFutureがキャンセルされたかを判断しないと、すでにキャンセルしているFutureに対して処理結果を設定しようとするので次の例外が発生します。

2018-01-02 08:09:53,100 asynctest.py:28 operation2
2018-01-02 08:09:53,101 asynctest.py:16 stop_operation
2018-01-02 08:09:53,903 asynctest1.py:16 operationsub11
2018-01-02 08:09:56,105 asynctest.py:30 operation3
2018-01-02 08:09:56,106 base_events.py:1258 Task exception was never retrieved
future:  exception=InvalidStateError(‘CANCELLED: ‘,)>
Traceback (most recent call last):
  File “/usr/lib/python3.5/asyncio/tasks.py”, line 239, in _step
    result = coro.send(None)
  File “/home/pi/async/asynctest1.py”, line 17, in operationsub
    future.set_result(‘operation is done!’)
  File “/usr/lib/python3.5/asyncio/futures.py”, line 348, in set_result
    raise InvalidStateError(‘{}: {!r}’.format(self._state, self))
asyncio.futures.InvalidStateError: CANCELLED: 

イベントオブジェクトの実装

非同期処理でのイベントオブジェクトは、asyncio.Event関数を使用します。

asynctest.pyのoperation関数を次にように変更します。event.clear関数によりイベントをクリアします。ただし、イベントを作成すると初期値はイベントのクリアになります。event.set関数でイベントを設定します。

async def operation(loop):
    event = asyncio.Event()
    event.clear()
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub(event))
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    event.set()
    LOG.info('operation2')
    await asyncio.sleep(3)
    LOG.info('operation3')
    loop.stop()
    

asynctest1.pyのoperationsub関数を次にように変更します。event.wait関数によりイベントの設定を待ちます。

async def operationsub(event):
    LOG.info('start operationsub')
    LOG.info('operationsub10')
    await event.wait()
    LOG.info('operationsub11')

実行すると次のログが出力されます。asynctest1のoperationsub関数を起動した後にイベントを設定しているので、タイムスタンプを見ると、asynctest1の「operationsub11」の表示がすぐにログされています。。

$ python3 asynctest.py
2018-01-03 03:17:14,957 asynctest.py:34 start
2018-01-03 03:17:14,958 selector_events.py:65 Using selector: EpollSelector
2018-01-03 03:17:14,960 asynctest.py:37 enabling debugging
2018-01-03 03:17:14,961 asynctest.py:49 entering event loop
2018-01-03 03:17:14,961 asynctest.py:23 start operation
2018-01-03 03:17:14,962 asynctest.py:25 operation1
2018-01-03 03:17:14,963 asynctest1.py:13 start operationsub
2018-01-03 03:17:14,963 asynctest1.py:14 operationsub10
2018-01-03 03:17:15,163 asynctest.py:28 operation2
2018-01-03 03:17:15,164 asynctest1.py:16 operationsub11
2018-01-03 03:17:18,168 asynctest.py:30 operation3

キューオブジェクトの実装

非同期処理でのキューオブジェクトは、asyncio.Queue関数を使用します。

asynctest.pyのoperation関数を次にように変更します。queue.put_nowait関数によりキューデータを指定されたキューに送信します。送信するキューデータは、今回はリストデータにしました。どのようなデータ形式でもキューデータとして送信できます。

 async def operation(loop):
    queue = asyncio.Queue()
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub(queue))
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    queue.put_nowait(["start", 1,2,3])
    LOG.info('operation2')
    await asyncio.sleep(0.2)
    queue.put_nowait(["next", 4,5,6])
    LOG.info('operation3')
    await asyncio.sleep(0.2)
    queue.put_nowait(["stop", 7,8,9])
    await asyncio.sleep(3)
    LOG.info('operation4')
    loop.stop()

asynctest1.pyのoperationsub関数を次にように変更します。queue.empty関数によりキューの受信を待ちます。キューを受信すると、受信したキューデータのリストに「stop」が含まれているかを判断し、「stop」が含まれていればキューの受信を完了します。

async def operationsub(queue):
    LOG.info('start operationsub')
    LOG.info('operationsub10')
    while True:
        if not queue.empty():
            queuedata = await queue.get()
            LOG.info('queuedata:{}'.format(queuedata))
            if queuedata[0]=="stop":
                break
        await asyncio.sleep(0.2)

    LOG.info('operationsub11')

実行すると次のログが出力されます。queue.put_nowait関数で送信したリストデータ[‘start’, 1, 2, 3]、[‘next’, 4, 5, 6]、[‘stop’, 7, 8, 9]の表示が次々に表示されており、最後のデータを調べて、while 文を抜けて「operationsub11」が表示されています。。

$ python3 asynctest.py
2018-01-03 04:26:01,481 asynctest.py:38 start
2018-01-03 04:26:01,482 selector_events.py:65 Using selector: EpollSelector
2018-01-03 04:26:01,484 asynctest.py:41 enabling debugging
2018-01-03 04:26:01,485 asynctest.py:53 entering event loop
2018-01-03 04:26:01,486 asynctest.py:22 start operation
2018-01-03 04:26:01,486 asynctest.py:24 operation1
2018-01-03 04:26:01,487 asynctest1.py:13 start operationsub
2018-01-03 04:26:01,488 asynctest1.py:14 operationsub10
2018-01-03 04:26:01,688 asynctest.py:27 operation2
2018-01-03 04:26:01,689 asynctest1.py:18 queuedata:[‘start’, 1, 2, 3]
2018-01-03 04:26:01,890 asynctest.py:30 operation3
2018-01-03 04:26:01,891 asynctest1.py:18 queuedata:[‘next’, 4, 5, 6]
2018-01-03 04:26:02,092 asynctest1.py:18 queuedata:[‘stop’, 7, 8, 9]
2018-01-03 04:26:02,093 asynctest1.py:23 operationsub11
2018-01-03 04:26:05,095 asynctest.py:34 operation4