- JVM がスケジューリングする仮想スレッド
VirtualThreadが Java 19 にてプレビューリリースされます - ここでは、
VirtualThreadでどのようにタスクは中断・再開されるのかを焦点にコードを読んだまとめです - OpenJDK のタグ jdk-19+25(すでに結構古い)の Java のコード、 C++ のコードを追いかけていきます
- C++ 読んだことがない(文法もわからない)、 JVM の基本的な構造(
frameなど)を理解してないのでよくわからない箇所がいくつもあります
/**
* A thread that is scheduled by the Java virtual machine rather than the operating
* system.
*/
final class VirtualThread extends Thread {
}- OS でなく、 JVM によってスケジュールされる
Thread - 利用するための API 使用例は以下の通り
Thread thread = Thread.ofVirtual()
.name("my-virtual-thread")
.allowSetThreadLocals(true)
.inheritInheritableThreadLocals(true)
.unstarted(() -> {
for(int i = 0; i < 10; i++) {
System.out.println("hello world");
}
});
thread.start();
thread.join();
リクエストをスレッドに割り当てる同期アプリケーションはプログラムが読みやすい等のメリットがある一方で、 一定時間に到着するリクエストの増大に対応するにはスレッドをより多く作る必要があります。 従来の JDK のスレッドは生成コストの高い OS スレッドのラッパーであり、 OS で利用できるスレッドの数には制限があります。 そのため、アプリケーションの同時処理能力の性能向上は、スレッドの数で頭打ちになることが多く、 その他のリソース(CPU/ネットワーク接続)が有効活用されていません。
そこで非同期フレームワーク(Netty/reactive-streams)を用いてアプリケーションを組み立てることで、 スレッド数の頭打ちを解消できます。 しかし、これらはコールバック関数(ラムダ式)の組み合わせによってアプリケーションを構築するため、 逐次的なロジックの組み合わせを前提としてる Java では複雑なコードになります。
一つの OS スレッドに、複数の(OS スレッドに紐付かない)仮想のスレッドを担わせることで スレッドが無数にあるように見せられます。 これによりプログラミングのスタイルを変更することなく、 スレッド数の上限の問題を克服します。
- JEP 425: Virtual Threads - https://openjdk.org/jeps/425
- 上記の日本語訳 - https://b.chiroito.dev/entry/virtualthreads1
java.lang.Threadを継承した軽量スレッド- 生成コストが低いため、容易にいくらでも生成できる
- プールせずに使い、目的の処理が終わったら破棄する
VirtualThreadの実行を OS のスレッドに割り当てるスケジューラーVirtualThread#start()はForkJoinPoolにVirtualThread#runContinuation()を実行するタスクを登録する- タスクを実行するプールされるスレッド(OS に紐づく)のクラスは
jdk.internal.misc.CarrierThreadjava.lang.Threadに追加されたフィールドにcarrierThreadがある- みかけのスレッドが
VirtualThreadで、実際の OS スレッドがCarrierThread
- 限定継続をあらわす(?)クラス
VirtualThreadに渡されたタスクの実行開始・中断・再開を扱う- 仮想スレッド内で仮想スレッドを起動したケースにも対応している
- スレッドの sp 、フレームのサイズ、フレームの引数のサイズを持つクラス(?)
- スレッドの
park/unparkを行う
- 各 poller にファイルディスクリプタを登録するインターフェース(抽象クラス)
- Linux :
EPollPoller - Windows :
WEPollPoller- Windows ではwepollを使ってる!? - Mac :
KQueuePoller
- Linux :
- ネットワーク起因でブロックしているスレッドを持っている
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import static java.net.http.HttpResponse.BodyHandlers.discarding;
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.GET().uri(URI.create("https://example.com")).build();
Thread thread = Thread.ofVirtual()
.name("example-virtual-thread")
.unstarted(() -> {
HttpResponse<Void> response = client.send(request, discarding());
System.out.println(response.statusCode());
});
thread.start();
thread.join();
VirtualThreadはインスタンス生成時に下記のフィールドが割り当てられる- スケジューラー - スタティックフィールドの
ForkJoinPoolを割り当てる Continuation- 正確にはVirtualThread$VThreadContinuation/コンストラクターではVirtualThread#run(Runnable)runContinuation-runContinuation()メソッドを呼び出すRunnablestateはNEW
- スケジューラー - スタティックフィールドの
VirtualThreadのstart()メソッドを呼び出すと、stateがSTARTEDに更新され、submitRunContinuation()メソッドが呼び出されるsubmitRunContinuation()メソッドでは、スケジューラー(ForkJoinPool)にrunContinuationがタスク登録されるForkJoinPoolにタスクが登録されると、そのタスクはワークキューに詰まれて、ワークキューにCarrierThreadが生成・実行開始するVirtualThread#join()を呼び出すと、VirtualThreadのterminationフィールド(CountDownLatch) を用いてタスクの終了を待つ
- 実行開始した
CarrierThreadはVirtualThread#runContinuationを呼び出す- なお、
runContinuationを呼び出したスレッドがVirtualThreadである場合はjava.lang.WrongThreadExceptionが発生する
- なお、
runContinuationでは以下の処理が行われるstateをRUNNINGに更新Continuation#runを呼び出す
Continuationのスタティック・ネイティブ・メソッドenterSpecial(Continuation, false, true)を呼び出す- このメソッドは
@IntrinsicCandidateなメソッドで、 HotSpot VM が手による温かみのあるアセンブラに置き換える - 詳しくは JEP 348 : Compiler Intrinsics for Java SE APIs https://openjdk.org/jeps/348 (日本語 https://kagamihoge.hatenablog.com/entry/2019/03/07/185647)
enterSpecialの呼び出し先は、 JVM の初期化時(?)にgen_continuation_enter関数により生成される
- このメソッドは
gen_continuation_enter(Continuation#enterSpecial) では、 以下の処理を行い、Continuation#enterを実行する- 次に呼び出すメソッドのアドレス解決(
Continuation#enter()) ContinuationEntryのメモリ確保・準備Continuation#enter()の呼び出し
- 次に呼び出すメソッドのアドレス解決(
Continuation#enterは、 タスク(VirtualThread#run(Runnable))を呼び出すVirtualThread#run(Runnable)では、currentCarrierThread(今実行しているワーカースレッド = OS スレッド)に対してsetCurrentThread(this)を呼び出して、現在のスレッドをCarrierThreadからVirtualThreadに変更する- このことをマウントと読んでいる
- タスクを起動する(
Runnable#run())。これで仮想スレッド上で、Runnableの実行が開始される。 HttpClient#send(HttpRequest, BodyHandler)は内部的にはsendAsync(HttpRequest, BodyHandler)を実行して得たFuture<Void>(正確にはCompletableFuture) からFuture#getを実行してレスポンスを取り出すCompletableFuture#getは最終的にLockSupport#park(Object)を呼び出すLockSupport#park(Object)は最終的に現在実行中の仮想スレッドに対してVirtualThread#doPark()を呼び出すVirtualThread#doPark()は以下の通りstateをPARKINGに更新するcarrierThread(= 今実行している仮想スレッドの OS スレッド)に対して、setCurrentThread(carrierThread)を呼び出し、現在のスレッドをVirtualThreadからCarrierThreadに戻す(アンマウント)- スタティックメソッド
Continuation#yield(ContinuationScope)から、最終的にContinuation#doYield()を呼び出す
Continuation#doYield()は@IntrinsicCandidateなネイティブメソッドで次の通りTemplateInterpreterGenerator::generate_Continuation_doYield_entryとStubGenerator::generate_cont_doYield()により生成される- 実体の関数はたぶん
Config::freeze、 パラメーターはJavaThread*とスタックポインタ
freeze_internalの主要な動作を担うのがFreezeオブジェクトのFreeze::freeze_slow()関数Freeze::freeze_start_frame()にて、呼び出し元のスタックポインタ、リンクアドレス、プログラムカウンタを取り出す???(実行中のコードの位置)stackChunkOop(jdk.internal.vm.StackChunk) を生成、上記 i. で取得した実行中のコードの位置、および現在のContinuationを保存する- 上記 ii. の
StackChunkを現在のContinuation.tailに保存する - この処理を失敗した場合は
doYield()の呼び出し元に戻る/成功した場合はContinuation#enterSpecial(Continuation, false, true)の呼び出し元(Continuation#run)に戻る(??????)
VirtualThread#runContinuation()に戻り、afterYield()を呼び出すVirtualThread#afterYield()でstateをPARKINGからPARKEDに更新して、runContinuationを終了- スケジューラーに登録された別のタスク(仮想スレッド)を開始する
- 仮想スレッドの動作2. にて返した
CompletableFutureに対して、complete(value)を呼び出し、Future#getの値を設定する LockSupport#unpark(VirtualThread)を呼び出す。これは最終的にVirtualThread#unpark()を呼び出すVirtualThread#unparkでは以下の通りstateをPARKEDからRUNNABLEに更新するsubmitRunContinuation経由で、スケジューラー(ForkJoinPool)にrunContinuationをタスク登録する
- ワーカースレッドの動作 1. と同じ(
stateはRUNNABLEからRUNNING) ContinuationのネイティブメソッドenterSpecial(Continuation, true, true)を呼び出す- 第 2 パラメーターが
trueになっている - これは、この
Continuation.tailにStackChunkが積まれているため
- 第 2 パラメーターが
gen_continuation_enter(Continuation#enterSpecial) では以下の処理を行うContinuationEntryのメモリ確保・準備cont_thaw関数の呼び出し(generate_cont_thaw関数で生成される)
cont_thaw関数ではContinuationEntryとjdk.internal.vm.Continuationからフレームの状態を復元、スタックを調整する(???)cont_thaw関数の戻り先がdoYield()呼び出し地点になる(???!!!???!!!?!?!?!?!)
Continuation.yield(ContinuationScope)からVirtualThread#yieldContinuation()に戻り、再びcurrentThreadがCarrierThreadからVirtualThreadに戻る(マウント)
CompletableFuture#getに戻り、 ネットワークスレッドの動作 1. で設定したオブジェクトを返すVirtualThreadのタスクが終了し、VirtualThread#run(Runnable)に戻るcurrentThreadにcarrierThreadを設定する(アンマウント)stateをRUNNINGからTERMINATEDに更新する
VirtualThread#run(Runnable)->Continuation#enter0()->Continuation#enter(Continuation, boolean)と戻り、Continuation.doneにtrueが設定されるContinuation#enterSpecial(ネイティブ)に戻り、 クリーンアップしてContinuation#run()に戻り、Continuation.tailをnullに設定するVirtualThread#runContinuation()に戻りVirtualThread#afterTerminate()へ進み、VirtualThread.terminationをカウントダウンするCarrierThreadのタスクが終了し、別のタスク(仮想スレッド)を開始する
VirtualThread.teminationに対するCountDownLatch#await()が完了し、VirtualThread#join()が完了- main プログラム終了