Created
January 20, 2019 04:17
-
-
Save Kiechlus/1acfa39aab662221510d6633f08f8c62 to your computer and use it in GitHub Desktop.
Stream parallel requests into zip
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Streaming: https://blog.stephencleary.com/2016/11/streaming-zip-on-aspnet-core.html | |
* Queue: https://robertwray.co.uk/blog/wrapping-concurrentqueue-t-to-make-it-eventful | |
* | |
*/ | |
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.IO.Compression; | |
using System.Json; | |
using System.Net.Http; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.AspNetCore.Mvc; | |
using Microsoft.Net.Http.Headers; | |
using RW.EventfulConcurrentQueue; | |
using stream; | |
namespace WebApplication.Controllers | |
{ | |
[Route("api/[controller]")] | |
public class FileController : Controller | |
{ | |
private static HttpClient Client { get; } = new HttpClient(); | |
private EventfulConcurrentQueue<Dictionary<string, byte[]>> imageQueue = new EventfulConcurrentQueue<Dictionary<string, byte[]>>(); | |
private const int CONCURRENT_REQUESTS = 20; | |
/// <summary> | |
/// Get http://localhost:<port>/api/file to start the download. | |
/// </summary> | |
/// <returns></returns> | |
[HttpGet] | |
public IActionResult Get() | |
{ | |
string orthancBase = "xxxxxxxxxxxx"; | |
string study = "xxxxxxxxxxxxxxxxx"; // This is a big study with 800MB zipped. | |
//string study = "xxxxxxxxxxxxxxxx"; // Small study with only one file. | |
var fetchMetadataStartTime = DateTime.Now; | |
// Read instance metadata from file. Fetching it from Orthanc takes too long. | |
JsonValue instancesMetadata = JsonValue.Parse(System.IO.File.ReadAllText($"{AppDomain.CurrentDomain.BaseDirectory}\\instances.json")); | |
var tasks = new List<Task>(); | |
return new FileCallbackResult(new MediaTypeHeaderValue("application/octet-stream"), async (outputStream, _) => // start callback function | |
{ | |
//Debug.WriteLine($"Fetching instance metadata started at {fetchMetadataStartTime.ToString()}."); | |
//HttpResponseMessage response = await Client.GetAsync($"{orthancBase}/studies/{study}/instances"); | |
//Debug.WriteLine($"Fetched instance metadata. Duration: {(System.DateTime.Now - fetchMetadataStartTime).Milliseconds} ms.."); | |
//response.EnsureSuccessStatusCode(); | |
//JsonValue instancesMetadata = JsonValue.Parse(await response.Content.ReadAsStringAsync()); | |
var startTime = DateTime.Now; | |
using (var zipArchive = new ZipArchive(outputStream, ZipArchiveMode.Create)) | |
{ | |
// Event handler, fires when a new object is added to the queue. | |
void queue_Enqueued(object sender, EventArgs args) | |
{ | |
Debug.WriteLine("New image in the queue. Trying to lock the zip stream and to stream the image into it."); | |
var s2 = DateTime.Now; | |
// Locking is needed, it is not possible to stream two files at the same time into the zip stream. | |
lock (zipArchive) | |
{ | |
// Fetch the image from the queue. | |
var success = imageQueue.TryDequeue(out Dictionary<string, byte[]> nameAndInstance); | |
if (!success || nameAndInstance.Count != 1) | |
{ | |
throw new Exception("Error, cannot dequeue or wrong content."); | |
} | |
byte[] image = null; | |
string fileName = null; | |
foreach (var key in nameAndInstance.Keys) | |
{ | |
fileName = key; | |
image = nameAndInstance[key]; | |
} | |
// Stream the image as a new file into the zip stream. | |
var zipEntry = zipArchive.CreateEntry($"{fileName}.dcm"); | |
using (var zipStream = zipEntry.Open()) | |
using (var stream = new MemoryStream(image)) | |
stream.CopyTo(zipStream); | |
Debug.WriteLine($"Finished flushing image to the zip stream in {(DateTime.Now - s2).Milliseconds} ms."); | |
} | |
} | |
// Register the event handler. | |
imageQueue.ItemEnqueued += queue_Enqueued; | |
// Semaphore, allow to make n requests in parallel. | |
using (var semaphore = new SemaphoreSlim(CONCURRENT_REQUESTS)) | |
{ | |
foreach (JsonValue i in instancesMetadata) | |
{ | |
string id = i["ID"]; | |
string instanceUrl = $"{orthancBase}/instances/{id}/file"; | |
// await here until there is a room for this task | |
await semaphore.WaitAsync(); | |
tasks.Add(MakeRequest(semaphore, instanceUrl, $"{id}.dcm")); | |
Debug.WriteLine($"Adding new task, length: {tasks.Count}"); | |
} | |
// await for the rest of tasks to complete | |
await Task.WhenAll(tasks); | |
} | |
Debug.WriteLine($"Fetched all data. Total duration: {(System.DateTime.Now - startTime).Milliseconds} ms.."); | |
} | |
}) | |
// end callback function | |
{ | |
FileDownloadName = $"{study}.zip" | |
}; | |
} | |
/// <summary> | |
/// Fetch an image from orthanc and write it to the queue. | |
/// </summary> | |
/// <param name="semaphore">The semaphore.</param> | |
/// <param name="instanceUrl">The instance URL.</param> | |
/// <param name="fileName">Name of the file.</param> | |
/// <returns></returns> | |
private async Task MakeRequest(SemaphoreSlim semaphore, string instanceUrl, string fileName) | |
{ | |
try | |
{ | |
using (var responseMessage = await Client.GetAsync(instanceUrl).ConfigureAwait(false)) | |
{ | |
var s3 = DateTime.Now; | |
var filenameAndContent = new Dictionary<string, byte[]> | |
{ | |
{ fileName, await responseMessage.Content.ReadAsByteArrayAsync() }, | |
}; | |
imageQueue.Enqueue(filenameAndContent); | |
Debug.WriteLine($"Enqueuing result. New queue length: {imageQueue.Count()}"); | |
Debug.WriteLine($"Finished fetching image after {(DateTime.Now - s3).Milliseconds} ms..."); | |
} | |
} | |
finally | |
{ | |
semaphore.Release(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment