「Pythonで非同期処理(asyncio)」に続き、Pythonでasyncioを使った非同期処理のスクリプト作成方法を紹介します。asyncioは、Python3.4 で追加され、 Python3.5でasync/awaitが実装されました。今回はPython3.5.3でasync/awaitを使用して、次の非同期処理を行うPythonスクリプトを作成します。
- Futureによるコールバック関数の作成
 - Futureのキャンセル
 - イベントオブジェクトの実装
 - キューオブジェクトの実装
 
なお、Pythonでの非同期処理については、「asyncio — 非同期 I/O、イベントループ、コルーチンおよびタスク」に詳細が記述されています。
Futureによるコールバック関数の作成
Futureによりタスク完了時にコールバックを発生できます。
- future.add_done_callback
 - フューチャが終了したときに実行するコールバックを追加します。
 - future.set_result
 - フューチャの終了をマークしその結果を設定します。
 - future.result
 - このフューチャが表す結果を返します。
 
次にPythonスクリプトを示します。
【asynctest.py】
import logging.config 
import asyncio
import asynctest 
import asynctest1 
import sys
import warnings
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(filename)s:%(lineno)d %(message)s',
    stream=sys.stderr,
)
LOG = logging.getLogger()
def stop_operation(future):
    LOG.info(future.result())
async def operation(loop):
    future = asyncio.Future()
    future.add_done_callback(stop_operation)
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub(future))
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    LOG.info('operation2')
    await asyncio.sleep(3)
    LOG.info('operation3')
    loop.stop()
    
    
LOG.info('start')
    
loop = asyncio.get_event_loop()
LOG.info('enabling debugging')
# Enable debugging
# loop.set_debug(True)
# Make the threshold for "slow" tasks very very small for
# illustration. The default is 0.1, or 100 milliseconds.
loop.slow_callback_duration = 0.001
# Report all mistakes managing asynchronous resources.
warnings.simplefilter('always', ResourceWarning)
LOG.info('entering event loop')
asyncio.ensure_future(operation(loop))
loop.run_forever()
loop.close()
exit()
【asynctest1.py】
import logging.config 
import asyncio
import sys
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(filename)s:%(lineno)d %(message)s',
    stream=sys.stderr,
)
LOG = logging.getLogger()
async def operationsub(future):
    LOG.info('start operationsub')
    LOG.info('operationsub10')
    await asyncio.sleep(1)
    LOG.info('operationsub11')
    future.set_result('operation is done!')
実行すると次のログが出力されます。タスク終了時にfuture.set_result関数によりで設定された「operation is done!」が、future.add_done_callback関数により登録されたコールバック関数「stop_operation」で表示されます。
$ python3 asynctest.py 2018-01-02 08:05:12,957 asynctest.py:35 start 2018-01-02 08:05:12,958 selector_events.py:65 Using selector: EpollSelector 2018-01-02 08:05:12,960 asynctest.py:38 enabling debugging 2018-01-02 08:05:12,961 asynctest.py:50 entering event loop 2018-01-02 08:05:12,961 asynctest.py:24 start operation 2018-01-02 08:05:12,962 asynctest.py:26 operation1 2018-01-02 08:05:12,963 asynctest1.py:13 start operationsub 2018-01-02 08:05:12,963 asynctest1.py:14 operationsub10 2018-01-02 08:05:13,163 asynctest.py:29 operation2 2018-01-02 08:05:13,965 asynctest1.py:16 operationsub11 2018-01-02 08:05:13,966 asynctest.py:16 operation is done! 2018-01-02 08:05:16,167 asynctest.py:31 operation3
Futureのキャンセル
作成したFutureを途中でキャンセルします。
コールバック関数「stop_operation」を次にように変更します。Futureがキャンセルされた場合もコールバック関数は呼ばれるので、future.cancelled関数で、すでにキャンセルされているかをチェックします。
def stop_operation(future):
    LOG.info("stop_operation")
    if not future.cancelled():
        LOG.info(future.result())
asynctest.pyのoperation関数を次にように変更します。future.cancel関数でFutureをキャンセルします。
async def operation(loop):
    future = asyncio.Future()
    future.add_done_callback(stop_operation)
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub(future))
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    future.cancel()
    LOG.info('operation2')
    await asyncio.sleep(3)
    LOG.info('operation3')
    loop.stop()
asynctest1.pyのoperationsub関数を次にように変更します。future.cancelled関数ですでにキャンセルされていないかを確認します。
async def operationsub(future):
    LOG.info('start operationsub')
    LOG.info('operationsub10')
    await asyncio.sleep(1)
    LOG.info('operationsub11')
    if not future.cancelled():
        future.set_result('operation is done!')
実行すると次のログが出力されます。operationsub関数をキャンセルしたために、コールバック関数「stop_operation」で「stop_operation」が表示されていますが、すでにキャンセルされているために、「’operation is done」が表示されません。
$ python3 asynctest.py 2018-01-02 08:16:08,612 asynctest.py:34 start 2018-01-02 08:16:08,613 selector_events.py:65 Using selector: EpollSelector 2018-01-02 08:16:08,614 asynctest.py:37 enabling debugging 2018-01-02 08:16:08,615 asynctest.py:49 entering event loop 2018-01-02 08:16:08,615 asynctest.py:23 start operation 2018-01-02 08:16:08,616 asynctest.py:25 operation1 2018-01-02 08:16:08,617 asynctest1.py:13 start operationsub 2018-01-02 08:16:08,617 asynctest1.py:14 operationsub10 2018-01-02 08:16:08,817 asynctest.py:28 operation2 2018-01-02 08:16:08,818 asynctest.py:16 stop_operation 2018-01-02 08:16:09,619 asynctest1.py:16 operationsub11 2018-01-02 08:16:11,821 asynctest.py:30 operation3
asynctest1.pyのoperationsub関数では、future.cancelled関数を用いてFutureがキャンセルされたかを判断しないと、すでにキャンセルしているFutureに対して処理結果を設定しようとするので次の例外が発生します。
2018-01-02 08:09:53,100 asynctest.py:28 operation2 2018-01-02 08:09:53,101 asynctest.py:16 stop_operation 2018-01-02 08:09:53,903 asynctest1.py:16 operationsub11 2018-01-02 08:09:56,105 asynctest.py:30 operation3 2018-01-02 08:09:56,106 base_events.py:1258 Task exception was never retrieved future:exception=InvalidStateError(‘CANCELLED: ‘,)> Traceback (most recent call last): File “/usr/lib/python3.5/asyncio/tasks.py”, line 239, in _step result = coro.send(None) File “/home/pi/async/asynctest1.py”, line 17, in operationsub future.set_result(‘operation is done!’) File “/usr/lib/python3.5/asyncio/futures.py”, line 348, in set_result raise InvalidStateError(‘{}: {!r}’.format(self._state, self)) asyncio.futures.InvalidStateError: CANCELLED: 
イベントオブジェクトの実装
非同期処理でのイベントオブジェクトは、asyncio.Event関数を使用します。
asynctest.pyのoperation関数を次にように変更します。event.clear関数によりイベントをクリアします。ただし、イベントを作成すると初期値はイベントのクリアになります。event.set関数でイベントを設定します。
async def operation(loop):
    event = asyncio.Event()
    event.clear()
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub(event))
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    event.set()
    LOG.info('operation2')
    await asyncio.sleep(3)
    LOG.info('operation3')
    loop.stop()
    
asynctest1.pyのoperationsub関数を次にように変更します。event.wait関数によりイベントの設定を待ちます。
async def operationsub(event):
    LOG.info('start operationsub')
    LOG.info('operationsub10')
    await event.wait()
    LOG.info('operationsub11')
実行すると次のログが出力されます。asynctest1のoperationsub関数を起動した後にイベントを設定しているので、タイムスタンプを見ると、asynctest1の「operationsub11」の表示がすぐにログされています。。
$ python3 asynctest.py 2018-01-03 03:17:14,957 asynctest.py:34 start 2018-01-03 03:17:14,958 selector_events.py:65 Using selector: EpollSelector 2018-01-03 03:17:14,960 asynctest.py:37 enabling debugging 2018-01-03 03:17:14,961 asynctest.py:49 entering event loop 2018-01-03 03:17:14,961 asynctest.py:23 start operation 2018-01-03 03:17:14,962 asynctest.py:25 operation1 2018-01-03 03:17:14,963 asynctest1.py:13 start operationsub 2018-01-03 03:17:14,963 asynctest1.py:14 operationsub10 2018-01-03 03:17:15,163 asynctest.py:28 operation2 2018-01-03 03:17:15,164 asynctest1.py:16 operationsub11 2018-01-03 03:17:18,168 asynctest.py:30 operation3
キューオブジェクトの実装
非同期処理でのキューオブジェクトは、asyncio.Queue関数を使用します。
asynctest.pyのoperation関数を次にように変更します。queue.put_nowait関数によりキューデータを指定されたキューに送信します。送信するキューデータは、今回はリストデータにしました。どのようなデータ形式でもキューデータとして送信できます。
 async def operation(loop):
    queue = asyncio.Queue()
    LOG.info('start operation')
    task = asyncio.ensure_future(asynctest1.operationsub(queue))
    LOG.info('operation1')
    await asyncio.sleep(0.2)
    queue.put_nowait(["start", 1,2,3])
    LOG.info('operation2')
    await asyncio.sleep(0.2)
    queue.put_nowait(["next", 4,5,6])
    LOG.info('operation3')
    await asyncio.sleep(0.2)
    queue.put_nowait(["stop", 7,8,9])
    await asyncio.sleep(3)
    LOG.info('operation4')
    loop.stop()
asynctest1.pyのoperationsub関数を次にように変更します。queue.empty関数によりキューの受信を待ちます。キューを受信すると、受信したキューデータのリストに「stop」が含まれているかを判断し、「stop」が含まれていればキューの受信を完了します。
async def operationsub(queue):
    LOG.info('start operationsub')
    LOG.info('operationsub10')
    while True:
        if not queue.empty():
            queuedata = await queue.get()
            LOG.info('queuedata:{}'.format(queuedata))
            if queuedata[0]=="stop":
                break
        await asyncio.sleep(0.2)
    LOG.info('operationsub11')
実行すると次のログが出力されます。queue.put_nowait関数で送信したリストデータ[‘start’, 1, 2, 3]、[‘next’, 4, 5, 6]、[‘stop’, 7, 8, 9]の表示が次々に表示されており、最後のデータを調べて、while 文を抜けて「operationsub11」が表示されています。。
$ python3 asynctest.py 2018-01-03 04:26:01,481 asynctest.py:38 start 2018-01-03 04:26:01,482 selector_events.py:65 Using selector: EpollSelector 2018-01-03 04:26:01,484 asynctest.py:41 enabling debugging 2018-01-03 04:26:01,485 asynctest.py:53 entering event loop 2018-01-03 04:26:01,486 asynctest.py:22 start operation 2018-01-03 04:26:01,486 asynctest.py:24 operation1 2018-01-03 04:26:01,487 asynctest1.py:13 start operationsub 2018-01-03 04:26:01,488 asynctest1.py:14 operationsub10 2018-01-03 04:26:01,688 asynctest.py:27 operation2 2018-01-03 04:26:01,689 asynctest1.py:18 queuedata:[‘start’, 1, 2, 3] 2018-01-03 04:26:01,890 asynctest.py:30 operation3 2018-01-03 04:26:01,891 asynctest1.py:18 queuedata:[‘next’, 4, 5, 6] 2018-01-03 04:26:02,092 asynctest1.py:18 queuedata:[‘stop’, 7, 8, 9] 2018-01-03 04:26:02,093 asynctest1.py:23 operationsub11 2018-01-03 04:26:05,095 asynctest.py:34 operation4