You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This document presents an attempt to build a fully synchronous stack and make it work with couple of simple Storage APIs. The goal is to achieve fully synchronous execution or be as close as possible (due to nature of underlying http client) and then compare its runtime behavior with existing solution.
Synchronous stack bodge
The synchronous stack has been bodged through all the layers starting from high level client to the http client call or terminated right above http client (netty). Full implementation can be found in this commit.
The bodge is based on a naive attempt to abstract Reactor by introducing a layer of operators that map mostly 1:1 to Reactor's operators that has been encountered while plowing through the layers of code. These operators add ability to execute them synchronously in addition to reactive chain construction and let upper layer choose which option is materialized in order to fullil API contract. This approach has been chosen to assure that the sync stack has similar computational complexity as asynchronous counterpart, which is important for benchmarks.
The APIs that has been tried are BlobContainerClient.create(), BlobServiceClient.getProperties() and BlobClient.getProperties(). BlobServiceClient.getProperties() and BlobClient.getProperties() has been used for benchmarking because they perform better at scale both with Azurite and real service. The difference between BlobServiceClient.getProperties() and BlobClient.getProperties() is that the former has a body in the response and the later has only headers.
Ingredients
Naive abstraction over Reactor introducing sync call stack - SingleResult and MultiResult - in addition to be able to create reactive chain they can also offer sync equivalent to compute T or Stream<T> which are closest sync equivalents to Mono<T> and Flux<T>.
This is example stack trace when calling into OkHttp.
at okhttp3.internal.connection.RealCall.execute(RealCall.kt:148)
at com.azure.core.http.okhttp.OkHttpAsyncHttpClient.lambda$send2$3(OkHttpAsyncHttpClient.java:89)
at com.azure.core.syncasync.SyncAsyncSingleResult.getSync(SyncAsyncSingleResult.java:22)
at com.azure.core.syncasync.FlatMapSingleResult.getSync(FlatMapSingleResult.java:20)
at com.azure.core.syncasync.MappingSingleResult.getSync(MappingSingleResult.java:20)
at com.azure.core.syncasync.MappingSingleResult.getSync(MappingSingleResult.java:20)
at com.azure.core.syncasync.DeferredSingleResult.getSync(DeferredSingleResult.java:18)
at com.azure.core.syncasync.DoAsyncOnlySingleResult.getSync(DoAsyncOnlySingleResult.java:20)
at com.azure.core.syncasync.DelaySingleResult.getSync(DelaySingleResult.java:27)
at com.azure.core.syncasync.FlatMapSingleResult.getSync(FlatMapSingleResult.java:20)
at com.azure.core.syncasync.OnErrorResumeSingleResult.getSync(OnErrorResumeSingleResult.java:21)
at com.azure.core.syncasync.DeferredSingleResult.getSync(DeferredSingleResult.java:18)
at com.azure.core.syncasync.MappingSingleResult.getSync(MappingSingleResult.java:20)
at com.azure.core.syncasync.FlatMapSingleResult.getSync(FlatMapSingleResult.java:20)
at com.azure.core.http.rest.RestProxy.lambda$handleRestReturnType$17(RestProxy.java:591)
at com.azure.core.syncasync.SyncAsyncSingleResult.getSync(SyncAsyncSingleResult.java:22)
at com.azure.core.syncasync.FlatMapSingleResult.getSync(FlatMapSingleResult.java:20)
at com.azure.core.syncasync.MappingSingleResult.getSync(MappingSingleResult.java:20)
at com.azure.storage.blob.BlobServiceClient.getPropertiesWithResponse(BlobServiceClient.java:344)
at com.azure.storage.blob.BlobServiceClient.getProperties(BlobServiceClient.java:317)
at com.azure.storage.App.main(App.java:95)
Note that stack trace is as horrible as Reactor's traces, but at least it provide insight into what's happening from the API until it hits http layer. Creating imperative style SyncRestProxy might be a better idea to improve here, i.e. be imperative below generated APIs since we have one core that lightens up hundreds of APIs. For logic above generated code these new operators (or something equivalent) might be useful to minimize code duplication.
OkHttp
The OkHttp client offers Call.execute() that's fully synchronous and uses caller thread to do the work. This fact is then reflected in benchmark results.
Netty
We're using Reactor Netty but even with raw Netty we'd be facing sync-over-async at some point. For the purpose of this experiment the async stack is blocked right in NettyAsyncHttpClient. However, it might be worth considering an implementation where we talk to netty directly without involving reactor on sync paths.
JDK Http Client
The JDK Http Client offers synchronous HttpClient.send() API. Which can then materialize response body as InputStream or byte[] for example.
However, the JDK Http Client internally calls into async implementation and blocks, see here.
Testing
Perf
The perf runs are based on azure-storage-perf with the following alterations.
The blobs and container names have been defined constant across all runs to minimize variance coming from the service allocating these resources on different hardware. See here and here.
The HttpClient instance is made global singleton to reflect recommended usage pattern. See here.
The OkHttp client requires customization for higher degree of parallelization for sync-over-async runs. See here. It grossly underperforms on default settings. This problem does not appear with full sync call.
Two versions of azure-storage-perf fat jar has been prepared. One with bodged sync stack and one using lately released packages. See here.
The runs have been performed in West US 2 region with Standard DS3 v2 (4 vcpus, 14 GiB memory) VM running Ubuntu 18. Which then has been updated to D4ds v5 (4 vcpus, 16 GiB memory) for upload scenarios to assure sufficient network bandwith.
There has been 15 runs conducted for each combination of parameters. Top 10 results has been taken for each combination to rule out grossly underperforming outliers (fallacies of distributed computing...). Tables below present AVG with STDDEV and MAX out of these samples.
The heap size has been fixed to 2 GB to minimize possible variance coming from resizing.
Script used to run samples (this script is evolving):
* Full-sync OkHttp started to hit service throttling for BlobServiceClient.getProperties() at the concurrency of 20. Half of the runs were facing this issue - that required more sampling to get some data.
LRS Premium BlockBlobStorage
Executed on DS3 v2.
Avg
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
JDK full sync
JDK sync-over-async
1
471.4±10.5
438.8±6.4
436.3±6.1
407.5±6.2
433.7±6.8
387.9±5.0
2
965.2±10.6
874.4±11.2
870.2±15.0
827.6±11.3
868.1±10.1
809.3±10.0
10
4725.2±43.1
4296.0±42.1
4190.5±72.0
4047.3±26.4
4291.2±52.6
3959.0±36.7
20
9318.0±74.6
7280.7±69.5
7996.1±69.5
7504.2±80.7
7570.3±32.7
6954.6±71.0
Max
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
JDK full sync
JDK sync-over-async
1
483.6
446.0
447.2
415.8
452.5
395.9
2
977.9
886.3
888.8
846.6
888.3
829.0
10
4780.2
4364.1
4241.4
4091.4
4383.3
3992.3
20
9422.3
7396.1
8112.0
8627.2
7656.4
7060.5
BlockBlobClient.upload(InputStream data, long length, boolean overwrite)
This isn't best implementation, i.e. it doesn't avoid unnecessary InputStream->Stream conversions.
JDK client has not been tested - the bodge seems to have a connection leak on this execution path that wasn't easy to track down in short time.
The VM was changed from DS3 v2 to D4ds v5 due to network getting saturated with larger payloads.
LRS Premium BlockBlobStorage 10 KB
Executed on DS3 v2.
Avg
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
202.0±5.3
189.4±2.4
188.1±5.5
186.1±3.2
2
425.3±5.4
406.8±8.1
385.4±4.9
374.9±8.4
10
2145.8±25.7
2000.4±17.4
1975.6±34.4
1944.1±30.5
30
6092.3±66.6
5570.1±87.4
5758.0±84.1
5558.4±73.9
Max
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
210.8
192.8
195.1
192.2
2
437.2
416.36
396.7
396.0
10
2189.7
2036.1
2032.7
1990.8
30
6199.6
5743.7
5950.5
5687.7
LRS Premium BlockBlobStorage 10 KB - second attempt
Executed on D4ds v5.
Avg
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
220.4±3.4
216.2±5.4
205.2±2.9
205.7±4.0
2
450.2±8.4
435.5±7.4
419.2±7.8
418.0±6.1
10
2288.7±32.1
2187.7±22.8
2208.2±32.7
2163.6±54.6
20
4331.3±122.9
4292.3±90.6
4317.9±71.8
4261.4±33.6
30
6486.3±79.8
6319.2±75.8
6316.8±102.6
6298.0±112.3
Max
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
227.1
227.6
209.2
212.0
2
471.6
452.5
432.2
428.0
10
2341.9
2237.0
2263.5
2234.7
20
4558.4
4456.1
4488.0
4306.0
30
6597.6
6516.1
6493.0
6467.8
LRS Premium BlockBlobStorage 20 KB
Executed on D4ds v5.
Avg
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
208.3±4.2
201.3±5.8
206.0±5.0
205.5±1.9
2
435.7±7.6
423.3±7.2
421.2±12.2
411.8±4.0
10
2189.4±34.0
2118.0±24.8
2117.6±48.7
2165.2±35.7
20
4272.5±51.3
4207.4±50.7
4283.5±66.3
4235.8±78.6
30
6278.6±93.3
6073.4±125.6
6171.6±99.6
6274.1±144.7
Max
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
214.4
213.6
212.3
207.5
2
449.1
431.8
435.8
418.0
10
2266.0
2158.6
2194.6
2213.0
20
4272.5
4207.4
4283.5
4235.8
30
6413.9
6323.4
6361.7
6450.1
LRS Premium BlockBlobStorage 30 KB
Executed on D4ds v5.
Avg
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
202.1±4.0
201.8±5.0
203.9±7.3
203.1±2.8
2
424.4±8.3
415.1±13.9
408.8±2.7
404.1±7.8
10
2050.9±40.1
2082.9±36.0
2094.3±32.2
2081.2±38.8
20
4190.0±48.7
4156.3±60.3
4155.0±73.5
4186.8±36.1
30
5973.5±174.6
5861.6±144.8
5982.9±112.3
5851.5±92.8
Max
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
207.7
212.1
217.6
207.7
2
434.7
432.6
413.7
417.8
10
2111.9
2142.1
2133.5
2141.3
20
4259.9
4224.1
4276.6
4265.4
30
6195.2
6091.4
6138.1
6044.5
LRS Premium BlockBlobStorage 50 KB
Executed on D4ds v5.
Avg
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
187.3±3.5
183.7±2.1
185.9±5.6
186.3±5.7
2
378.9±8.5
373.3±5.3
379.3±7.5
369.9±14.8
10
1932.9±23.7
1887.5±26.1
1959.0±45.4
1909.1±49.3
20
3700.1±85.3
3703.4±34.9
3792.3±62.2
3785.0±59.3
30
5440.4±87.6
5294.9±47.6
5514.5±55.1
5504.6±96.6
Max
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
192.9
187.0
197.7
195.2
2
395.4
384.8
394.8
403.9
10
1973.3
1929.7
2022.0
2008.1
20
3878.0
3760.9
3931.9
3871.0
30
5589.0
5396.3
5615.0
5705.8
LRS Premium BlockBlobStorage 100 KB
Executed on D4ds v5.
Avg
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
158.0±4.6
153.8±2.9
160.3±4.6
162.2±2.0
2
314.7±7.9
312.0±7.6
324.8±5.8
316.9±9.8
10
1570.0±31.2
1577.9±19.1
1628.7±34.2
1638.9±25.9
20
3132.4±65.2
3033.6±77.1
3141.0±135.8
3140.4±61.5
30
4592.6±105.8
4479.0±88.5
4461.7±177.3
4643.7±120.0
Max
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
167.9
160.0
167.1
166.2
2
328.7
321.8
337.9
333.5
10
1629.2
1615.7
1691.2
1688.2
20
3263.6
3160.7
3298.7
3226.1
30
4738.4
4649.6
4699.0
4805.1
LRS Premium BlockBlobStorage 500 KB
Executed on D4ds v5.
Avg
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
52.2±2.2
53.9±1.1
59.6±1.1
57.8±1.6
2
112.3±4.6
105.1±7.1
114.4±5.9
110.0±7.2
10
591.3±7.0
572.3±10.5
606.2±8.3
607.3±8.9
20
1133.7±9.9
1130.5±12.2
1225.5±17.6
1223.7±16.5
30
1687.2±12.8
1643.7±16.1
1783.4±32.0
1824.2±34.5
Max
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
56.1
55.5
61.7
61.3
2
117.5
114.4
120.6
119.1
10
602.7
596.5
627.3
621.1
20
1150.4
1153.9
1252.8
1241.3
30
1704.7
1670.3
1857.5
1868.2
LRS Premium BlockBlobStorage 1 MB
Executed on D4ds v5.
Avg
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
40.3±0.6
40.2±0.9
42.1±0.8
42.1±0.5
2
82.5±1.4
80.0±2.0
85.4±1.2
83.4±3.3
10
404.5±3.4
402.1±4.4
430.6±11.5
423.9±6.6
20
807.1±9.0
772.7±14.6
830.9±11.2
851.3±10.3
30
1152.0±8.5
1095.9±10.2
1258.0±13.7
1259.8±12.0
Max
Concurrency
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
1
41.7
41.7
43.2
42.9
2
84.6
84.3
87.0
88.6
10
409.8
408.1
444.2
441.0
20
820.6
798.9
843.3
876.5
30
1170.5
1107.5
1274.0
1283.3
LRS Premium BlockBlobStorage - Comparison of various sizes at concurrency of 30
Executed on D4ds v5.
Avg
Size
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
10 KB
6486.3±79.8
6319.2±75.8
6316.8±102.6
6298.0±112.3
20 KB
6278.6±93.3
6073.4±125.6
6171.6±99.6
6274.1±144.7
30 KB
5973.5±174.6
5861.6±144.8
5982.9±112.3
5851.5±92.8
50 KB
5440.4±87.6
5294.9±47.6
5514.5±55.1
5504.6±96.6
100 KB
4592.6±105.8
4479.0±88.5
4461.7±177.3
4643.7±120.0
500 KB
1687.2±12.8
1643.7±16.1
1783.4±32.0
1824.2±34.5
1 MB
1152.0±8.5
1095.9±10.2
1258.0±13.7
1259.8±12.0
Max
Size
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
10 KB
6597.6
6516.1
6493.0
6467.8
20 KB
6413.9
6323.4
6361.7
6450.1
30 KB
6195.2
6091.4
6138.1
6044.5
50 KB
5589.0
5396.3
5615.0
5705.8
100 KB
4738.4
4649.6
4699.0
4805.1
500 KB
1704.7
1670.3
1857.5
1868.2
1 MB
1170.5
1107.5
1274.0
1283.3
Test Proxy - Comparison of various sizes at concurrency of 30
Executed on D4ds v5.
Proxy on D32ds v5.
Avg
Size
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
10 KB
18371.3±199.2
13438.1±200.5
19272.7±215.3
17353.6±300.9
20 KB
17058.1±219.7
12717.0±157.9
18344.5±204.9
16582.3±318.0
30 KB
16009.6±109.5
12016.2±136.7
17662.3±207.8
15874.9±124.5
50 KB
14065.9±86.9
10656.3±148.7
16532.8±340.5
15513.0±182.9
100 KB
9647.3±52.0
7973.3±34.3
12392.6±135.6
11738.3±81.2
200 KB
6346.3±41.3
5731.0±20.4
7240.5±3.2*
7240.1±2.6*
* - These runs seem to saturate network bandwith by looking at VM egress metrics graph.
Max
Size
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
10 KB
18874.6
13897.4
19569.7
17896.4
20 KB
17426.1
12954.7
18655.1
17221.4
30 KB
16194.4
12380.7
18077.7
16047.8
50 KB
14213.4
10921.4
17053.0
15799.7
100 KB
9754.8
8037.5
12676.5
11857.2
200 KB
6420.5
5781.0
7246.9*
7245.3*
* - These runs seem to saturate network bandwith by looking at VM egress metrics graph.
Test Proxy - Comparison of various sizes at concurrency of 20
Executed on D4ds v5.
Proxy on D32ds v5.
Avg
Size
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
10 KB
18048.2±70.8
13444.3±131.0
18264.1±214.0
16709.5±283.6
20 KB
16883.2±114.0
12414.5±122.2
17215.9±167.3
15978.4±161.7
30 KB
16024.4±197.3
11830.5±124.8
16691.7±203.8
15592.0±198.6
50 KB
14045.9±126.2
10498.2±167.1
15229.8±500.6
14783.9±93.0
100 KB
9446.8±43.4
7816.9±71.8
11230.7±403.6
11325.3±162.6
200 KB
6248.8±26.8
5528.1±14.8
7238.2±0.7*
7238.3±0.5*
* - These runs seem to saturate network bandwith by looking at VM egress metrics graph.
Max
Size
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
10 KB
18217.0
13741.9
18668.3
17248.0
20 KB
17155.4
12573.5
17665.8
16311.6
30 KB
16293.8
12006.0
17064.5
16028.3
50 KB
14349.8
10861.3
16093.1
14955.4
100 KB
9532.9
7986.6
11779.8
11687.1
200 KB
6287.4
5553.2
7239.6*
7239.4*
* - These runs seem to saturate network bandwith by looking at VM egress metrics graph.
Test Proxy - Comparison of various sizes at concurrency of 10
Executed on D4ds v5.
Proxy on D32ds v5.
Avg
Size
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
10 KB
16514.0±82.8
12666.8±134.6
15313.6±174.3
14050.4±193.7
20 KB
15563.7±170.4
11932.7±196.8
14242.2±139.2
13767.6±146.4
30 KB
14856.7±229.6
11419.9±104.9
13890.6±205.7
13246.9±179.2
50 KB
13405.8±105.0
10240.0±72.6
13255.0±222.3
12691.9±116.8
100 KB
9032.7±73.0
7406.9±70.0
9763.1±96.7
9390.1±68.1
200 KB
5937.5±31.6
5141.2±19.5
7233.4±8.1
7083.8±60.0
Max
Size
OkHttp full sync
OkHttp sync-over-async
Netty full sync
Netty sync-over-async
10 KB
16663.5
13019.6
15721.1
14402.0
20 KB
15810.7
12299.2
14434.0
14082.5
30 KB
15087.7
11565.6
14203.2
13527.0
50 KB
13542.6
10380.6
13686.0
12858.7
100 KB
9203.0
7528.4
9961.5
9499.3
200 KB
5995.2
5188.3
7243.3
7182.5
Observations
Results mostly align for low level concurrency, i.e. 1 and 2.
The higher concurrency the larger gap in favor of sync stack with smaller payloads.
True top-bottom sync stack (OkHttp) outperforms the rest at high concurrency with smaller payloads.
The gain from sync stack diminishes as payload size grows. The workload becomes more I/O bound and jumping threads plays lesser role there.
Pushing down sync-over-async (Netty, JDK) brings an improvement that's more visible with higher concurrency with smaller payloads.
Netty seems to be better choice for larger payloads. The IO performance plays larger role there, Netty seems to be doing this better.
OkHttp seems to be better choice for small payloads. The lightweightness and less of context switching favor OkHttp for these type of workloads.
OkHttp grossly underperforms in sync-over-async runs on default settings (not recorded, but it was getting stuck below 2k tps even though concurrency was increased). We might want to tweak these defaults.
OkHttp has a long (minute/minutes) shutdown time after last sync-over-async transaction. I.e. it prevents process shutdown until its default executor scales back to zero. We might want to tweak this default.
Profile
This section contains data from a profiling of a simple scenario that calls BlobServiceClient.getProperties() in a loop. See the code here.
JFR dumps has been obtained for each combination of http client and full-sync vs sync-over-async implementation. I.e. these commands has been run. The app was running against Azurite running on the same host.
Complexity decreases, (almost) no more reactor in hot path.
There's less threads involved and less switching in full sync. Caller thread is anyway blocking, it could equally do some work.
Less threads required for fully synchronous scenarios.
Lessons learnt from the bodge
Abstracting Reactor in naive 1:1 attempt to build on top of its operators isn't sustainable. For some operators might be difficult to write sync equivalent (that's why SyncAsyncOperators were added where logic had to be completely forked). Perhaps identifying and implementing higher chunks of logic would be better (i.e. 1:N Reactor operators).
Sync-async operators produce stack traces as ugly as Reactor alone. This won't solve debuggability problem well. It only has potential to solve code duplication.
If pursued sync-async operators should become implementation detail that's used internally across SDKs. They don't seem to bring value to end user.
It makes sense to not use sync-async operators below generated code. Since there's one core building imperative separate sync stack might be a compromise that has to be taken to make debuggability better.
Above generated code usage of sync-async operators might make sense to reduce code duplication.
Complex scenarios like buffered upload/download, storage streams, likely won't benefit from such operators, or these operators become complex. Which again brings problem of meaningless stacks.
Inventing sync-async abstraction might reduce code duplication both above generated code and in tests (assuming that public surface calls directly into that abstraction).
There are ways to smuggle sync behavior through reactor operators. I.e. by using things like Mono.just() or Mono.fromCallable() with some context switches (similarly like we as to eagerly buffer responses if payload is XML or JSON).
What about binary data?
The bodge uses Stream<ByteBuffer> to mimic Flux<ByteBuffer>. That simplifies plowing through the layers and involves less to achieve sync stack. However, this still involves layers of conversion if underlying HttpClient can speak streams. We should explore how to avoid these conversions - perhaps BinaryData with variety of content types would help here to defer or avoid conversion depending on consumption style.
Should we remove Reactor from internals entirely?
This could be possible if we base internals on CompletableFuture and reactor-over-future just below public API. But should reactive user work with fully reactive stack?
Given where we are today going straight to desired state is a breaking change, see here.
I.e. this can cause:
compilation error for users that implement the interface
NoSuchMethodError if somebody attempts to use HttpPipelinePolicy implementations without recompiling them, i.e. use latest Azure SDK with old jars that bring custom policies (like this one).
Options
Introduce new API as default implementation and attempt to make it work
In this case we introduce new API with default implementation that blocks on async version. We also add logic into HttpPipeline to "jump back" on sync API in the chain.
publicclassHttpPipelineNextPolicy {
...
publicMono<HttpResponse> process() {
if (isSync && !Schedulers.isInNonBlockingThread()) {
// Pipeline executes in synchronous style. We most likely got here via default implementation in the// HttpPipelinePolicy.processSynchronously so go back to sync style here.// Don't do this on non-blocking threads.returnMono.fromCallable(this::processSync);
} else {
// TODO either give up and sync-over-async here or farm this to Schedulers.boundedElastic().
}
...
}
...
Make a breaking change
In this case we introduce new API without default implementation and we make a breaking change.
Introduce HttpPipelinePolicyV2 and deprecate HttpPipelinePolicy and related APIs
In this case we deprecate existing HttpPipelinePolicy and all associated APIs. We create HttpPipelinePolicyV2 and overloads that take it. Internally, we use new types and adapt old type using techniques similar to "Introduce new API as default implementation and attempt to make it work" option.
We introduce a new top level policy HttpSyncPipelinePolicy similar to the current policy, it'll be a functional interface with single method process().
HttpPipelineBuilder will have another overload policies(HttpSyncPipelinePolicy... policies).
All builders will have an overload for addCustomPolicy(HttpSyncPipelinePolicy policy).
This will keep the sync and async pipeline separate.
Also, this will allow the current async stack to remain unchanged. The decision to build the sync/async pipeline will happen at the time buildClient/ buildAsyncClient is called and if the pipeline policies are not homogenous, we throw here.
Pros:
allows customers to make a decision as to which interface they want to implement when creating a custom policy
the sync process() method is not hidden by default implementation in the interface
no mixing of sync and async policies in a pipeline (can be a con too)
current async pipeline will not have to change and none of the current policies will have to updated
Cons:
we'll have 2 classes for each policy - sync and async
additional method overload should be added to every builder to add custom sync policy
cannot have sync and async policies in the same pipeline
the policy names are going to use the termsync while the build methods use async to distinguish between sync and async
Pros and cons
Introduce new API as default implementation and attempt to make it work
Make a breaking change
Introduce new API as default implementation and throw
Introduce HttpPipelinePolicyV2 and deprecate HttpPipelinePolicy and related APIs
Introduce separate HttpSyncPipelinePolicy
Doesn't break customer code compilation
✓
✗
✓
✓
✓
Doesn't break at runtime
✓
✗ (possible NoSuchMethodError)
✗ (if we want to switch sync APIs to use sync stack)
✓
✗ (if we want to switch sync APIs to use sync stack)
Allows us to flip sync client APIs to sync stack under the hood
✓
✓
✗
✓
✗ (previusly configured policies stop working)
Signals customer that they have to update their code
✗
✓
✗
✓
✗
Can enforce "sync stack" in HttpPipeline layer
✗
✓
✓
✓ (eventually, assuming that deprecation is noticed)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This document presents second attempt to build a synchronous stack and focuses on HttpPipeline and HttpClient layers. The synchronous RestProxy is covered in Vinay's documment.
This project has evolved beyond "sync stack" since inception. Therefore, plenty of improvement discovered here are going to be applicable to async clients as well.
BinaryData gets accessors availabe in the core packages that allow access to the BinaryDataContent.
package com.azure.core.implementation.util;
public final class BinaryDataHelper {
private static BinaryDataAccessor accessor;
public interface BinaryDataAccessor {
BinaryData createBinaryData(BinaryDataContent content);
BinaryDataContent getContent(BinaryData binaryData);
}
}
BinaryData should also get API to be able to have lazy FluxContent. Currently BinaryData.fromFlux eagerly reads payload due to historical reasons.
HttpRequest
HttpRequest now accepts BinaryData as content representation. It keeps the payload in BinaryData form internally while making sure existing APIs work.
This is to defers format conversions until absolutely necessary. This also allows to apply optimizations in places where content has to be accessed.
public class HttpRequest {
public HttpRequest(HttpMethod httpMethod, URL url)
public HttpRequest(HttpMethod httpMethod, String url)
public HttpRequest(HttpMethod httpMethod, URL url, HttpHeaders headers, Flux<ByteBuffer> body)
+ public HttpRequest(HttpMethod httpMethod, URL url, HttpHeaders headers, BinaryData data)
public Flux<ByteBuffer> getBody()
+ public BinaryData getBodyAsBinaryData()
public HttpRequest setBody(String content)
public HttpRequest setBody(byte[] content)
public HttpRequest setBody(Flux<ByteBuffer> content)
+ public HttpRequest setBody(BinaryData data)
public HttpRequest copy()
public HttpRequest setHeader(String name, String value)
public HttpHeaders getHeaders()
public HttpRequest setHeaders(HttpHeaders headers)
public HttpMethod getHttpMethod()
public HttpRequest setHttpMethod(HttpMethod httpMethod)
public URL getUrl()
public HttpRequest setUrl(URL url)
public HttpRequest setUrl(String url)
}
HttpResponse
HttpResponse can now return payload in a form of BinaryData. This is to let HttpClient serve content in a form that's most apriopriate for the context. E.g. serve Flux for async calls, serve InputStream for sync calls or byte[] if eager buffering of response has been requested. These optimizations are best effort depending on the HttpClient implementation.
Additionally HttpResponse gets a set of writeBodyTo methods that cover types like OutputStream or Channel and can write bytes to destination in optimized way (i.e. use much less memory).
public abstract class HttpResponse implements Closeable {
protected HttpResponse(HttpRequest request)
public abstract Flux<ByteBuffer> getBody()
public abstract Mono<byte[]> getBodyAsByteArray()
public Mono<InputStream> getBodyAsInputStream()
public abstract Mono<String> getBodyAsString()
public abstract Mono<String> getBodyAsString(Charset charset)
+ public BinaryData getBodyAsBinaryData()
public HttpResponse buffer()
@Override public void close()
public abstract HttpHeaders getHeaders()
public abstract String getHeaderValue(String name)
public final HttpRequest getRequest()
public abstract int getStatusCode()
+ public void writeBodyTo(OutputStream outputStream) throws IOException+ public void writeBodyTo(WritableByteChannel channel) throws IOException+ public void writeBodyTo(FileChannel channel, long position) throws IOException+ public Mono<Void> writeBodyTo(AsynchronousFileChannel asynchronousFileChannel, long position)
}
HttpPipeline
The HttpPipeline gets extra set of synchronous APIs that mirror reactive counterpart. We probably need just one of those.
public final class HttpPipeline {
// This class does not have any public constructors, and is not able to be instantiated using 'new'.
public HttpClient getHttpClient()
public HttpPipelinePolicy getPolicy(int index)
public int getPolicyCount()
public Mono<HttpResponse> send(HttpRequest request)
public Mono<HttpResponse> send(HttpPipelineCallContext context)
public Mono<HttpResponse> send(HttpRequest request, Context data)
+ public HttpResponse sendSync(HttpRequest request, Context data)
}
HttpPipelinePolicy
The HttpPipelinePolicy got an extra synchronous API with default implementation that blocks on async API.
The HttpPipelineNextPolicy got new synchronous API.
public class HttpPipelineNextPolicy {
// This class does not have any public constructors, and is not able to be instantiated using 'new'.
@Override public HttpPipelineNextPolicy clone()
public Mono<HttpResponse> process()
+ public HttpResponse processSync()
}
In addition to that HttpPipelineNextPolicy keeps track of whether HttpPipeline got called synchronously or not and tries to get it back on right track.
publicclassHttpPipelineNextPolicy {
...
privatefinalbooleanisSynchronous;
...
publicMono<HttpResponse> process() {
if (isSynchronous && !Schedulers.isInNonBlockingThread()) {
// Pipeline executes in synchronous style. We most likely got here via default implementation in the// HttpPipelinePolicy.processSync so go back to sync style here.// Don't do this on non-blocking threads.returnMono.fromCallable(this::processSync);
} else {
if (isSynchronous) {
LOGGER.warning("The pipeline switched from synchronous to asynchronous."
+ " Check if all policies override HttpPipelinePolicy.processSynchronously");
}
...
}
}
publicHttpResponseprocessSync() {
if (!isSynchronous) {
throwLOGGER.logExceptionAsError(newIllegalStateException(
"Must not use HttpPipelineNextPolicy.processSynchronously in asynchronous HttpPipeline invocation."));
}
...
}
...
}
HttpPipelineSynchronousPolicy
A new abstract class HttpPipelineSynchronousPolicy has been introduced to cover cases where pre and post request actions are purely sychronous.
This is to reduce code duplication invoved in writing simple policies like this.
The implementations of HttpClient then choose what to do and what's best way of handling request content.
There's a little bit of controversy around Async in the HttpClient implementations and their builders.
However a design where we'd have separate HttpClient and SynchronousHttpClient with respective implementation
could potentially create a configuration nightmare, i.e. what should happen if somebody calls new FooClientBuilder.buildFooClient()
but provided only async client. Instead of this we'd rather create new types and deprecate existing with Async or just leave
it a is assuming that customers would rather use HttpClientOptions over playing with HttpClient builders.
classOkHttpAsyncHttpClientimplementsHttpClient {
...
@OverridepublicHttpResponsesendSync(HttpRequestrequest, Contextcontext) {
booleaneagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);
RequestokHttpRequest = toOkHttpRequestSynchronously(request);
Callcall = httpClient.newCall(okHttpRequest);
try {
ResponseokHttpResponse = call.execute();
returnfromOkHttpResponse(okHttpResponse, request, eagerlyReadResponse);
} catch (IOExceptione) {
throwLOGGER.logExceptionAsError(newUncheckedIOException(e));
}
}
...
privatestaticRequestBodytoOkHttpRequestBodySynchronously(BinaryDatabodyContent, HttpHeadersheaders) {
StringcontentType = headers.getValue("Content-Type");
MediaTypemediaType = (contentType == null) ? null : MediaType.parse(contentType);
if (bodyContent == null) {
returnRequestBody.create(ByteString.EMPTY, mediaType);
}
BinaryDataContentcontent = BinaryDataHelper.getContent(bodyContent);
if (contentinstanceofByteArrayContent) {
returnRequestBody.create(content.toBytes(), mediaType);
} elseif (contentinstanceofFileContent) {
FileContentfileContent = (FileContent) content;
// This won't be right all the time as we may be sending only a partial view of the file.// TODO (alzimmer): support ranges in FileContentreturnRequestBody.create(fileContent.getFile().toFile(), mediaType);
} elseif (contentinstanceofStringContent) {
returnRequestBody.create(bodyContent.toString(), mediaType);
} elseif (contentinstanceofInputStreamContent) {
LongcontentLength = content.getLength();
if (contentLength == null) {
StringcontentLengthHeaderValue = headers.getValue("Content-Length");
if (contentLengthHeaderValue != null) {
contentLength = Long.parseLong(contentLengthHeaderValue);
} else {
contentLength = -1L;
}
}
longeffectiveContentLength = contentLength;
returnnewRequestBody() {
@OverridepublicMediaTypecontentType() {
returnmediaType;
}
@OverridepubliclongcontentLength() {
returneffectiveContentLength;
}
@OverridepublicvoidwriteTo(BufferedSinkbufferedSink) throwsIOException {
// TODO (kasobol-msft) OkHttp client can retry internally so we should add mark/reset here// and fallback to buffering if that's not supported.// We should also consider adding InputStreamSupplierBinaryDataContent type where customer can// give a prescription how to acquire/re-acquire an InputStream.Sourcesource = Okio.source(content.toStream());
bufferedSink.writeAll(source);
}
};
} else {
// TODO (kasobol-msft) is there better way than just block? perhaps throw?// Perhaps we could consider using one of storage's stream implementation on top of flux?// Or maybe implement that OkHttp sink and get rid off reading to string altogether.returntoByteString(bodyContent.toFluxByteBuffer()).map(bs -> RequestBody.create(bs, mediaType)).block();
}
}
...
}
classNettyAsyncHttpClientimplementsHttpClient {
...
// This implementation doesn't override default implementation as it wouldn't do anything smart about it anyway.
...
privatestaticBiFunction<HttpClientRequest, NettyOutbound, Publisher<Void>> bodySendDelegate(
finalHttpRequestrestRequest) {
...
BinaryDataContentbinaryDataContent = BinaryDataHelper.getContent(restRequest.getContent());
if (binaryDataContentinstanceofByteArrayContent) {
returnreactorNettyOutbound.send(Mono.just(Unpooled.wrappedBuffer(binaryDataContent.toBytes())));
} elseif (binaryDataContentinstanceofStringContent) {
returnreactorNettyOutbound.send(Mono.fromSupplier(
() -> Unpooled.wrappedBuffer(binaryDataContent.toBytes())));
} elseif (binaryDataContentinstanceofFileContent) {
FileContentfileContent = (FileContent) binaryDataContent;
// fileContent.getLength() is always not null in FileContent.if (restRequest.getUrl().getProtocol().equals("https")) {
// NettyOutbound uses such logic internally for ssl connections but with smaller buffer of 1KB.returnreactorNettyOutbound.sendUsing(
() -> FileChannel.open(fileContent.getFile(), StandardOpenOption.READ),
(c, fc) -> {
if (c.channel().pipeline().get(ChunkedWriteHandler.class) == null) {
c.addHandlerLast("reactor.left.chunkedWriter", newChunkedWriteHandler());
}
try {
returnnewChunkedNioFile(
fc, fileContent.getPosition(), fileContent.getLength(), fileContent.getChunkSize());
} catch (IOExceptione) {
throwExceptions.propagate(e);
}
},
(fc) -> {
try {
fc.close();
} catch (IOExceptione) {
LOGGER.log(LogLevel.VERBOSE, () -> "Could not close file", e);
}
});
} else {
// Beware of NettyOutbound.sendFile(Path) it involves extra file length lookup.// This is going to use zero-copy transfer if there's no sslreturnreactorNettyOutbound.sendFile(
fileContent.getFile(), fileContent.getPosition(), fileContent.getLength());
}
} elseif (binaryDataContentinstanceofStringContent) {
returnreactorNettyOutbound.sendString(Mono.just(binaryDataContent.toString()));
} elseif (binaryDataContentinstanceofInputStreamContent) {
returnreactorNettyOutbound.sendUsing(
binaryDataContent::toStream,
(c, stream) -> {
if (c.channel().pipeline().get(ChunkedWriteHandler.class) == null) {
c.addHandlerLast("reactor.left.chunkedWriter", newChunkedWriteHandler());
}
returnnewChunkedStream(stream);
},
(stream) -> {
try {
stream.close();
} catch (IOExceptione) {
LOGGER.log(LogLevel.VERBOSE, () -> "Could not close stream", e);
}
});
} else {
Flux<ByteBuf> nettyByteBufFlux = restRequest.getBody().map(Unpooled::wrappedBuffer);
returnreactorNettyOutbound.send(nettyByteBufFlux);
}
...
}
...
}
publicfinalclassNettyAsyncHttpResponseextendsNettyAsyncHttpResponseBase {
...
@OverridepublicvoidwriteBodyTo(OutputStreamoutputStream) throwsIOException {
// TODO (kasobol-msft) handle other cases optimizations from ImplUtils.writeByteBufferToStream.// However it seems that buffers here don't have backing arrays. And for files we should probably have// writeTo(Channel) API.byte[] buffer = newbyte[8 * 1024];
bodyIntern().flatMap(byteBuff -> {
while (byteBuff.isReadable()) {
try {
// TODO (kasobol-msft) this could be optimized further,i.e. make sure we're utilizing// whole buffer before passing to outputstream.intnumberOfBytes = Math.min(buffer.length, byteBuff.readableBytes());
byteBuff.readBytes(buffer, 0, numberOfBytes);
// TODO (kasobol-msft) consider farming this out to Schedulers.boundedElastic.// https://github.com/reactor/reactor-netty/issues/2096#issuecomment-1068832894outputStream.write(buffer, 0, numberOfBytes);
} catch (IOExceptione) {
returnMono.error(e);
}
}
returnMono.empty();
}).blockLast();
}
...
}
Branching of sync stack means that sync and async client are going to demand equal test coverage. In order to facilitate this we should be building test extensions like one presented below.
Simmulated DNS failure in perf runs. I.e. --endpoint "https://foo"
Netty
New
java.util.concurrent.ExecutionException: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:241)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
at com.azure.core.perf.App.main(App.java:20)
Caused by: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
at reactor.core.Exceptions.propagate(Exceptions.java:392)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.azure.core.http.HttpClient.sendSync(HttpClient.java:42)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:78)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:139)
at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:112)
at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:54)
at com.azure.perf.test.core.PerfStressTest.runTest(PerfStressTest.java:31)
at com.azure.perf.test.core.ApiPerfTestBase.runAll(ApiPerfTestBase.java:117)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$16(PerfStressProgram.java:240)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$17(PerfStressProgram.java:240)
at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
... 32 more
Caused by: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:169)
at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:166)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at io.netty.util.internal.SocketUtils.allAddressesByName(SocketUtils.java:166)
at io.netty.resolver.DefaultNameResolver.doResolveAll(DefaultNameResolver.java:50)
at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:79)
at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:71)
at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:73)
at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:31)
at io.netty.resolver.AbstractAddressResolver.resolveAll(AbstractAddressResolver.java:158)
at reactor.netty.transport.TransportConnector.doResolveAndConnect(TransportConnector.java:283)
at reactor.netty.transport.TransportConnector.lambda$connect$6(TransportConnector.java:110)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
at reactor.netty.transport.TransportConnector$MonoChannelPromise._subscribe(TransportConnector.java:579)
at reactor.netty.transport.TransportConnector$MonoChannelPromise.lambda$subscribe$0(TransportConnector.java:499)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Old
java.util.concurrent.ExecutionException: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:237)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
at com.azure.core.perf.App.main(App.java:20)
Caused by: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
at reactor.core.Exceptions.propagate(Exceptions.java:392)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:46)
at com.azure.perf.test.core.PerfStressProgram.runLoop(PerfStressProgram.java:285)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$14(PerfStressProgram.java:236)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$15(PerfStressProgram.java:236)
at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
... 25 more
Caused by: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:169)
at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:166)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at io.netty.util.internal.SocketUtils.allAddressesByName(SocketUtils.java:166)
at io.netty.resolver.DefaultNameResolver.doResolveAll(DefaultNameResolver.java:50)
at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:79)
at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:71)
at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:73)
at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:31)
at io.netty.resolver.AbstractAddressResolver.resolveAll(AbstractAddressResolver.java:158)
at reactor.netty.transport.TransportConnector.doResolveAndConnect(TransportConnector.java:283)
at reactor.netty.transport.TransportConnector.lambda$connect$6(TransportConnector.java:110)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
at reactor.netty.transport.TransportConnector$MonoChannelPromise._subscribe(TransportConnector.java:579)
at reactor.netty.transport.TransportConnector$MonoChannelPromise.lambda$subscribe$0(TransportConnector.java:499)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
OkHttp
New
java.util.concurrent.ExecutionException: java.io.UncheckedIOException: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:241)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
at com.azure.core.perf.App.main(App.java:20)
Caused by: java.io.UncheckedIOException: java.net.UnknownHostException: No such host is known (foo)
at com.azure.core.http.okhttp.OkHttpAsyncHttpClient.sendSync(OkHttpAsyncHttpClient.java:102)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:78)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:139)
at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:112)
at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:54)
at com.azure.perf.test.core.PerfStressTest.runTest(PerfStressTest.java:31)
at com.azure.perf.test.core.ApiPerfTestBase.runAll(ApiPerfTestBase.java:117)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$16(PerfStressProgram.java:240)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$17(PerfStressProgram.java:240)
at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
at okhttp3.Dns$Companion$DnsSystem.lookup(Dns.kt:49)
at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.kt:164)
at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.kt:129)
at okhttp3.internal.connection.RouteSelector.next(RouteSelector.kt:71)
at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.kt:205)
at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.kt:106)
at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.kt:74)
at okhttp3.internal.connection.RealCall.initExchange$okhttp(RealCall.kt:255)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:32)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154)
at com.azure.core.http.okhttp.OkHttpAsyncHttpClient.sendSync(OkHttpAsyncHttpClient.java:99)
... 30 more
Old
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:259)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
at com.azure.core.perf.App.main(App.java:20)
Caused by: java.util.concurrent.ExecutionException: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:237)
... 3 more
Caused by: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
at reactor.core.Exceptions.propagate(Exceptions.java:392)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:46)
at com.azure.perf.test.core.PerfStressProgram.runLoop(PerfStressProgram.java:285)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$14(PerfStressProgram.java:236)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$15(PerfStressProgram.java:236)
at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
... 25 more
Caused by: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
at okhttp3.Dns$Companion$DnsSystem.lookup(Dns.kt:49)
at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.kt:164)
at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.kt:129)
at okhttp3.internal.connection.RouteSelector.next(RouteSelector.kt:71)
at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.kt:205)
at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.kt:106)
at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.kt:74)
at okhttp3.internal.connection.RealCall.initExchange$okhttp(RealCall.kt:255)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:32)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:517)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Simple Netty
java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:241)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
at com.azure.core.perf.App.main(App.java:20)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.net.UnknownHostException: No such host is known (foo)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600)
... 5 more
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.net.UnknownHostException: No such host is known (foo)
at com.azure.core.http.netty.SimpleNettyHttpClient.sendSync(SimpleNettyHttpClient.java:76)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:78)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:139)
at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:112)
at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:54)
at com.azure.perf.test.core.PerfStressTest.runTest(PerfStressTest.java:31)
at com.azure.perf.test.core.ApiPerfTestBase.runAll(ApiPerfTestBase.java:117)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$16(PerfStressProgram.java:240)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$17(PerfStressProgram.java:240)
at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: java.util.concurrent.ExecutionException: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at com.azure.core.http.netty.SimpleNettyHttpClient.sendSync(SimpleNettyHttpClient.java:74)
... 30 more
Caused by: java.net.UnknownHostException: No such host is known (foo)
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
at java.base/java.net.InetAddress.getByName(InetAddress.java:1247)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:834)
0 0 NaN
Benchmarks
This section contains results of perf runs where PipelineSendTest from main and from feature/sync-stack branch has been run with various input types. The version from main branch represents sync-over-async, the version from feature/sync-stack branch represents sync stack. Additional runs with simple netty prototype has been perfromed.
Tests were run on D4ds v5 VM against Premium Blobs sitting in the same region. The procedure can be found in run.sh and stats.py scripts attached to this gist.
How to read the data
I had to replace storage account and re-run benchmarks for netty while iterating on it. So please:
Look at how new vs old compares for given http client type
Don't try to compare Netty vs OkHttp using this data
The Simple Netty has not been deeply explored as early runs revealed it's yet far from being ready to handle higher concurrency and payload.
In order to get profiles I created a simple app that repeats fixed number of transactions with different perf fat jar. Ran on VM with real account to have profiles that talk over HTTPS.
This has been also done with storage scenarios.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Naming decision, converge on the usage of Sync or Synchronously everywhere.
Think about removing setContent and getContent in HttpRequest and use getBodyAsBinaryData and setBody(BinaryData) to prevent confusion on whether getBody or getContent should be used. Same concept applies to HttpResponse.
Potentially reduce the number of sendSynchronously overloads to only one, either HttpPipelineCallContext or (HttpRequest, Context). No matter what the HttpRequest only API should removed.
Determine the cost and impact making a breaking change to HttpPipelinePolicy on making processSynchronously a non-default interface API. Generally, SDK authors will be the only people implementing the interface and we can own updating all instances in our code base. For the few customers who we've worked with on creating their own policies we may be able to notify them about this breaking change and help them work through adding a new implementation.
For policies that are able to do so, we should look into having them use HttpPipelineSynchronousPolicy if it isn't too breaking and if the policy processes synchronously.
Notes from meeting:
Naming decision, converge on the usage of
Sync
orSynchronously
everywhere.Think about removing
setContent
andgetContent
inHttpRequest
and usegetBodyAsBinaryData
andsetBody(BinaryData)
to prevent confusion on whethergetBody
orgetContent
should be used. Same concept applies toHttpResponse
.Potentially reduce the number of
sendSynchronously
overloads to only one, eitherHttpPipelineCallContext
or(HttpRequest, Context)
. No matter what theHttpRequest
only API should removed.Determine the cost and impact making a breaking change to
HttpPipelinePolicy
on makingprocessSynchronously
a non-default interface API. Generally, SDK authors will be the only people implementing the interface and we can own updating all instances in our code base. For the few customers who we've worked with on creating their own policies we may be able to notify them about this breaking change and help them work through adding a new implementation.For policies that are able to do so, we should look into having them use
HttpPipelineSynchronousPolicy
if it isn't too breaking and if the policy processes synchronously.