Skip to content

Instantly share code, notes, and snippets.

Revisions

  1. markheath created this gist Mar 10, 2023.
    110 changes: 110 additions & 0 deletions Azure Blob Chunked Upload Parallel TPL V12.linq
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,110 @@
    string[] SizeSuffixes = { "bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB" };
    string SizeSuffix(long value, int decimalPlaces = 0)
    {
    if (value < 0)
    {
    throw new ArgumentException("Bytes should not be negative", "value");
    }
    var mag = (int)Math.Max(0, Math.Log(value, 1024));
    var adjustedSize = Math.Round(value / Math.Pow(1024, mag), decimalPlaces);
    return $"{adjustedSize} {SizeSuffixes[mag]}";
    }

    string HashStream(HashAlgorithm hasher, Stream stream)
    {
    hasher.Initialize();
    var buffer = new byte[4*1024*1024];
    while (true)
    {
    int read = stream.Read(buffer, 0, buffer.Length);
    if (read == 0) break;
    hasher.TransformBlock(buffer, 0, read, null, 0);
    }

    hasher.TransformFinalBlock(new byte[0], 0, 0);
    var hash = hasher.Hash;
    return BitConverter.ToString(hash).Replace("-", "");
    };

    async Task<IReadOnlyCollection<string>> Produce(ITargetBlock<Block> target, Stream file)
    {
    var blockIds = new List<string>();
    var blockSize = 8 * 1024 * 1024;

    while (true)
    {
    // need to create a new buffer each time as they are processed in parallel
    var buffer = new byte[blockSize];
    var read = await file.ReadAsync(buffer, 0, buffer.Length);
    if (read == 0) break;

    string blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray());
    blockIds.Add(blockId);
    await target.SendAsync(new Block(blockId, buffer, read));
    }
    target.Complete();
    return blockIds;
    }

    // The consumer
    async Task StageBlock(Block block, BlockBlobClient blobClient)
    {
    using var ms = new MemoryStream(block.Data, 0, block.Length);
    await blobClient.StageBlockAsync(block.Id, ms);
    }

    var connectionString = Util.GetPassword("mheath-storage");

    var blobServiceClient = new BlobServiceClient(connectionString);
    var containerName = "uploads";
    var containerClient = blobServiceClient.GetBlobContainerClient(containerName);

    var uploadBlobClient = containerClient.GetBlockBlobClient("chunked-tpl.mp4");
    await uploadBlobClient.DeleteIfExistsAsync();

    // pick the largest file I have in my video folder
    var fileName = Directory.EnumerateFiles(@"C:\Users\mark\Videos")
    .OrderByDescending(f => new FileInfo(f).Length)
    .First();
    var maxParallelConsume = 8; // experiment with this value

    using var file = File.OpenRead(fileName);

    var expectedHash = HashStream(SHA512.Create(), file);
    expectedHash.Dump();
    file.Position = 0; // reset before upload

    var sw = Stopwatch.StartNew();
    var buffer = new BufferBlock<Block>(new DataflowBlockOptions() { BoundedCapacity = maxParallelConsume });

    var consumerBlock = new ActionBlock<Block>(
    block => StageBlock(block,uploadBlobClient),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelConsume });
    buffer.LinkTo(consumerBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    var producerTask = Produce(buffer,file);

    await consumerBlock.Completion;

    var blockIds = producerTask.Result;
    var opts = new CommitBlockListOptions()
    {
    // could set tags, metadata, mime type etc here
    };

    var info = await uploadBlobClient.CommitBlockListAsync(blockIds, opts);
    $"{SizeSuffix(file.Length, 1)} uploaded in {sw.Elapsed} {SizeSuffix((long)(file.Length / sw.Elapsed.TotalSeconds), 1)} per second with {maxParallelConsume} threads".Dump();
    //info.Dump();

    Console.WriteLine("Calculating hash");
    using var readStream = uploadBlobClient.OpenRead();
    var uploadedHash = HashStream(SHA512.Create(),readStream);
    if (uploadedHash != expectedHash)
    {
    Console.WriteLine($"Hashes don't match {uploadedHash}");
    }
    else
    {
    Console.WriteLine("Hash is valid!");
    }

    record Block(string Id, byte[] Data, int Length);