Asyncコードが遅くなる原因と解決策
(secondb.ai)Asyncコードが遅くなる原因と解決策(技術的要約)
この動画では、Pythonのasyncioコードが同期コードより遅くなる一般的な原因と、それを解決するための技術的な方法論を扱います。
1. Asyncioの中核概念
- イベントループ(Event Loop): すべての非同期アプリケーションの中核です。
asyncio.run()で開始され、単一スレッドでタスク実行を管理・スケジューリングします。 - コルーチン(Coroutines):
async defで宣言された非同期関数です。awaitキーワードに出会うと実行を一時停止し、イベントループに制御を返せます。 - タスク(Tasks): コルーチンをラップし、イベントループ上で同時に実行されるようスケジューリングします。
asyncio.create_task()で生成されます。 - Future(Futures): 非同期処理の最終結果を表す低レベルオブジェクトです。
2. 同期コードを非同期へ変換する例
既存の同期式time.sleep()を非同期のawait asyncio.sleep()に置き換え、関数をasync defで宣言し、asyncio.run()でメインコルーチンを実行します。
性能低下を招く一般的なミスと解決策
ミス1: 逐次実行(Sequential Execution)
独立したタスクを並列実行せずに逐次的にawaitすると、総実行時間はすべてのタスクの実行時間の合計になります。
-
誤った例(逐次実行):
# 各 await は前の処理が終わるまで待機する await get_user_notifications() await get_recent_activity() await get_unread_messages() -
解決策(並列実行):
asyncio.gatherまたはasyncio.TaskGroupを使って独立したタスクを同時実行します。総実行時間は最も時間のかかるタスクの時間まで短縮されます。# 3つの処理が同時に開始される await asyncio.gather( get_user_notifications(), get_recent_activity(), get_unread_messages() )
並列実行ツールの比較
asyncio.gather:- 複数のコルーチンを同時に実行します。
- 欠点: エラー処理が不十分です。1つのタスクで例外が発生すると、他の実行中タスクがキャンセルされます。
asyncio.create_task:- タスクごとの制御やエラー処理が可能です。
- バックグラウンド実行に便利ですが、複数タスクを個別に
awaitする手間があります。
asyncio.TaskGroup(Python 3.11+):- 「構造化された並行性」のための最新の代替手段です。
async with構文でタスクグループを管理し、コンテキストを抜ける際にすべてのタスクの完了または例外処理が保証されます。
async with asyncio.TaskGroup() as tg: tg.create_task(some_coro_1()) tg.create_task(some_coro_2()) # 'async with' ブロックが終わるとすべてのタスクが await される
ミス2: 同期ライブラリの使用
asyncioコード内でrequestsやpathlibのような同期(blocking)ライブラリを使うと、イベントループ全体がブロックされます。asyncio.gatherの中で使っても、実際には逐次的に動作します。
- 解決策:
aiohttp(requestsの代替)、aiofiles(files/pathlibの代替)など、非同期(non-blocking)をサポートする専用ライブラリを使う必要があります。
ミス3: CPUバウンド処理によるイベントループのブロック
asyncioは単一スレッドで動作するため、重い計算のようなCPUバウンド処理はイベントループを停止させ、他のI/O処理を遅延させます。
- 解決策:
loop.run_in_executor()を使って、CPUバウンド処理を別のスレッドプール(既定値)またはプロセスプールにオフロードします。loop = asyncio.get_running_loop() # CPU集約的な関数を別スレッドで実行 await loop.run_in_executor( None, # 既定のスレッドプールを使用 cpu_bound_function, arg1 )
ミス4: 重要でない処理によるブロック
ユーザー応答に関係のないロギングのような非中核処理をawaitすると、不要に応答時間が延びます。
- 解決策:
asyncio.create_task()を使ってその処理をバックグラウンドタスクとして分離し、awaitしません。user_profile = await get_user_profile() # ロギングは await せずバックグラウンドで実行 asyncio.create_task(send_logs_to_external_service()) return user_profile
ミス5: タスクの作りすぎ
非常に小さな処理を大量にタスク化すると、コンテキストスイッチのオーバーヘッドが発生し、性能が低下することがあります。
- 解決策1: 小さな処理をまとめて(batching)、より大きい少数のタスクにします。
- 解決策2:
asyncio.Semaphoreを使って、同時実行される最大タスク数を制限します。# 同時に最大10個の処理のみ許可 semaphore = asyncio.Semaphore(10) async with semaphore: await fetch_data()
その他のミス
- "Never Awaited" コルーチン: コルーチンを呼び出しても
awaitしないため、処理が実行されず静かに失敗します。flake8-asyncのようなリンターで検出できます。 - 不適切なリソース管理: ファイルやDBコネクションなどを
try...finallyなしで使うと、リソースリークが発生する可能性があります。async withを使った非同期コンテキストマネージャで解決します。
デバッグと並行モデルの選び方
Asyncioデバッグモード
デフォルトでは無効なデバッグモードを有効化(asyncio.run(debug=True))すると、次のような問題の検出に役立ちます。
awaitされていないコルーチン(RuntimeWarning)。- 誤ったスレッドから呼び出された非同期API。
- 実行時間が100msを超えるコールバック。
- 遅いI/Oセレクタ(selector)処理。
その他のデバッグツール
- Scalene: CPUおよびメモリプロファイラ。
- aio-monitor:
asyncioアプリケーション向けの監視およびCLI。 - pdb: Python標準デバッガ。
- py-stack: 実行中のPythonプロセスのスタックトレースを出力し、ブロッキング箇所を検出。
並行モデル選択ガイド
- Asyncio(単一スレッド): 待ち時間の長い大量のI/Oバウンド処理(例: ネットワークリクエスト、ファイルI/O)に最適です。
- Threads(マルチスレッド): 共有データへのアクセスが必要なI/Oバウンド処理に使われます。GIL(Global Interpreter Lock)のため真の並列処理ではありませんが、I/O待機中に他のスレッドを実行できます。
- Processes(マルチプロセス): CPUバウンド処理(例: 画像処理、重い計算)に使われます。複数のCPUコアを活用して真の並列性を実現できますが、メモリおよび通信オーバーヘッドが大きくなります。
12件のコメント
Python は素晴らしい言語であることは確かですが、非同期インターフェースは設計を誤った機能のように思えます。
4番で
eager_start=Trueが抜けていますね。create_taskは weakref を作るので、永遠に実行されないタスクになってしまうコードです……> https://rosettalens.com/s/ko/python-to-node
この人も Python の async のせいで Node.js に乗り換えたそうだけど
結論: Pythonの非同期インターフェースは、いまだに直感的ではない。
実際、Pythonの非同期処理を最適化するほどのプロジェクトなら、ほかの言語で書いたほうが、性能も安定性もはるかに優れています。
コンパイル言語に移行するのでなければ、性能に大きな差は出るのでしょうか? マルチスレッドであればGILの存在によって大きな差が生じるでしょうが、どうせイベントループで動く非同期構造であれば、言語によってどのような違いが生じるのか気になります。
JITコンパイルの有無が思った以上に大きいです。V8は最適化がよくできています。
元の動画は確認していませんが、ミス4に対する解決策のコードは誤っています。
create_task()が返すタスクインスタンスは、少なくとも1つの変数に代入されている必要があり、その変数はタスクが終了するまで生存していなければなりません。そうでないと、コルーチンの実行中にタスクインスタンスがガベージコレクションされる危険があります。上記のようにタスクを生成する関数がすぐ終了してしまう場合は、タスクインスタンスを返す、グローバル変数に代入する、インスタンス変数に代入する、といった方法を使う必要があります。
P.S)
戻り値が特に不要で、コルーチンが短時間で終わると確信している場合でも、タスクインスタンスについてはいつかは
awaitするように書いておくのが望ましいです。それが嫌なら、タスクとして動かす各コルーチンごとに例外処理をかなり厳密に入れて、ログメッセージを漏れなく出せる構造を用意するべきです。そうしないと、タスクがどれほど重大な問題を起こしても Exception が処理されないまま silently fail するケース が発生します。私が生計を立てるために開発・運用しているプロジェクトでは、数十個のモジュールがそれぞれ
while self.ok(): cmd = await self.cmd_queue.get(); await self.process(cmd);のようなタスクを1つずつ生成して回し続けるパターンを設計したことがあるんです。例外処理パターンを確立するまでは、問題が1つ起きるたびに私のメンタルも一緒に吹き飛ぶという珍しい体験をしましたね(笑)Async/Await パターンの元祖(?)とも言える C# を使う会社に勤めている立場から見ても、1つ目のミスのように
awaitを単純に順番どおりずらっと並べる形の誤ったコードを見かけることがかなり多いですね。そういうコードを見ると、何というか共通して
asyncメソッド呼び出しの前ではawaitキーワードを使わなければならない、ということだけは分かっていて、それ以上の非同期実行順序についてはあまり考えていないためにこういうコードが出てくるのでは、という印象があります。複数の
awaitが出てくるとき、あるものはすぐ下で結果を使うからその手前でTask<T>オブジェクトのawait結果値を受け取り、あるものはかなり後になってから使うのでTask<T>だけ受け取って後でawaitする、といった形で非同期の流れを考慮してコードを書くのは、それだけ頭を使う作業ですからね。少なくとも私は非同期として宣言されたメソッドではこのように処理の流れを考えてコードを書いていますが、既存の保守中に退職者のコードを見るときなどには、『自分はただ単純に同期コードを書きたいのに、途中で使わなければならないメソッドが非同期型しかないから、とりあえずこう書いている』という感じを受けることもありますね。
1番が常に独立しているなら、あのようにするのがよいとは思いますが、
コードを修正して独立していない状態になると、その関数を使っている箇所をすべて確認して修正しなければならない不便さもある気がしますね。
ものすごく時間がかかる処理でないなら、コード管理の面では
awaitを直列で行うほうがよいかもしれません「マルチスレッドはオーバーヘッドの負担が大きいため、次善策としてシングルスレッドを分割して並列処理を実現する」という考え方でアプローチすべきだと思います。だからこそ、基本的にはマルチスレッド以上に、状況によってはさらに気を配る必要があるのが妥当だと思います。
それもそうですね。
本当にきちんとした非同期コードというのは、本質的にかなり気を配らなければならないコードなのだと思います。