Skip to content

Instantly share code, notes, and snippets.

@VibhuJawa
Last active April 3, 2026 04:13
Show Gist options
  • Select an option

  • Save VibhuJawa/d743b9b122dc85342c9eef347cfa0b93 to your computer and use it in GitHub Desktop.

Select an option

Save VibhuJawa/d743b9b122dc85342c9eef347cfa0b93 to your computer and use it in GitHub Desktop.
NeMo Curator — Interleaved IO Benchmark (PR #1657): WDS→Parquet, WDS→WDS, PQ→Parquet

NeMo Curator — Interleaved IO Benchmark

Benchmarks four interleaved IO pipeline paths end-to-end on 80 local NVMe shards. All paths apply an aspect-ratio filter (1.0–2.0) to images.

Path Wall-clock time Samples/sec Input samples Input size Output rows Output size
WDS → Parquet 76.8 s 88.8 6,818 9.9 GB 57,713 rows 3.84 GB
WDS → WDS 75.4 s 90.4 6,818 9.9 GB 6,818 samples 4.06 GB
PQ → Parquet 15.7 s 435.0 6,818 6.0 GB 57,713 rows 3.83 GB
PQ → WDS 18.5 s 368.4 6,818 6.0 GB 6,818 samples 4.06 GB

WDS paths are bottlenecked by the reader (396 s of cumulative stage work across 80 shards vs 14 s for PQ), which must scan tar archives sequentially and fetch images over the network. Filter and writer costs are nearly identical across both formats — the format difference is entirely in the reader. With 16 workers in parallel the WDS wall-clock time is ~76 s vs ~16 s for PQ (~5× slower).

Tested on: Python 3.10.12 · Ray 2.54.0 · PyArrow 19.0.1 · Pandas 2.3.3 · 16-core CPU · NVMe · materialize_on_read=True


1. Get the code

git clone https://github.com/VibhuJawa/NeMo-Curator.git nemo-curator
cd nemo-curator
git checkout feat/interleaved-parquet-reader-wds-writer

2. Create the environment

pip install uv
uv sync --extra interleaved_cpu
pre-commit install --install-hooks

3. Smoke test — verify install (single shard, ~10–30 s each)

Run all four paths on a single shard immediately after install to confirm the environment is working correctly before committing to the full 80-shard benchmark.

Sample data (available at /datasets/vjawa/):

Format Path
WDS (single tar, ~64 MB) /datasets/vjawa/MINT-1T-PDF-CC-2024-18-10gb/CC-MAIN-2024-18-shard-0/CC-MAIN-20240412101354-20240412131354-00000.tar
Interleaved Parquet (single shard, 32 MB) /datasets/vjawa/mint1t_interleaved_parquet_1shard/1796218b4791.parquet
WDS_SHARD=/datasets/vjawa/MINT-1T-PDF-CC-2024-18-10gb/CC-MAIN-2024-18-shard-0/CC-MAIN-20240412101354-20240412131354-00000.tar
PQ_SHARD=/datasets/vjawa/mint1t_interleaved_parquet_1shard/1796218b4791.parquet

# WDS → Parquet
python benchmarking/scripts/multimodal_mint1t_benchmark.py \
  --benchmark-results-path .tmp_runs/wds_to_parquet \
  --input-path $WDS_SHARD \
  --output-path .tmp_runs/wds_to_parquet/output \
  --reader-type wds --writer-format parquet \
  --materialize-on-read --on-materialize-error error --mode overwrite

# WDS → WDS
python benchmarking/scripts/multimodal_mint1t_benchmark.py \
  --benchmark-results-path .tmp_runs/wds_to_wds \
  --input-path $WDS_SHARD \
  --output-path .tmp_runs/wds_to_wds/output \
  --reader-type wds --writer-format wds \
  --materialize-on-read --on-materialize-error error --mode overwrite

# PQ → Parquet
python benchmarking/scripts/multimodal_mint1t_benchmark.py \
  --benchmark-results-path .tmp_runs/pq_to_parquet \
  --input-path $PQ_SHARD \
  --output-path .tmp_runs/pq_to_parquet/output \
  --reader-type parquet --writer-format parquet \
  --on-materialize-error error --mode overwrite

# PQ → WDS
python benchmarking/scripts/multimodal_mint1t_benchmark.py \
  --benchmark-results-path .tmp_runs/pq_to_wds \
  --input-path $PQ_SHARD \
  --output-path .tmp_runs/pq_to_wds/output \
  --reader-type parquet --writer-format wds \
  --on-materialize-error error --mode overwrite

4. Datasets (full 80-shard benchmark)

Dataset Type Files Samples Size
mint1t_pdf_cc2024_wds_80shards WebDataset tars 80 6,818 ~10 GB
mint1t_pdf_cc2024_parquet_80shards Parquet 80 6,818 ~6 GB

Both datasets contain identical samples. Parquet row breakdown (all 80 shards):

  • metadata rows: 6,818 (one per sample)
  • text rows: 30,079
  • image rows: 32,886
  • total rows: 69,783

5. Run the full benchmark

YAML-driven runner (all 4 paths at once)

# Start a fresh Ray cluster (adjust ports as needed)
ray start --head --port=6402 --dashboard-port=8270 --dashboard-host=0.0.0.0 --num-cpus=16 --num-gpus=0

# Connect to it and run all 4 entries
RAY_ADDRESS=<host:port> python benchmarking/run.py \
  --config benchmarking/interleaved_io_local_benchmark.yaml

# Or let run.py start its own cluster
python benchmarking/run.py \
  --config benchmarking/interleaved_io_local_benchmark.yaml

The YAML config used for the 80-shard run above:

# benchmarking/interleaved_io_local_benchmark.yaml
results_path: /raid/vjawa/benchmark_results_apr2
datasets_path: /raid/vjawa/interleaved_test
model_weights_path: /tmp/no_models

default_timeout_s: 7200

datasets:
  - name: mint1t_wds_local
    formats:
      - type: wds
        path: "{datasets_path}/mint1t_pdf_cc2024_wds_80shards"
  - name: mint1t_parquet_local
    formats:
      - type: parquet
        path: "{datasets_path}/mint1t_pdf_cc2024_parquet_80shards"

entries:
  - name: wds_to_parquet
    enabled: true
    script: multimodal_mint1t_benchmark.py
    args: >-
      --benchmark-results-path={session_entry_dir}
      --input-path={dataset:mint1t_wds_local,wds}
      --output-path={session_entry_dir}/output
      --reader-type=wds
      --writer-format=parquet
      --materialize-on-read
      --on-materialize-error=error
      --mode=overwrite
    ray:
      num_cpus: 16
      num_gpus: 0
      enable_object_spilling: false

  - name: wds_to_wds
    enabled: true
    script: multimodal_mint1t_benchmark.py
    args: >-
      --benchmark-results-path={session_entry_dir}
      --input-path={dataset:mint1t_wds_local,wds}
      --output-path={session_entry_dir}/output
      --reader-type=wds
      --writer-format=wds
      --materialize-on-read
      --on-materialize-error=error
      --mode=overwrite
    ray:
      num_cpus: 16
      num_gpus: 0
      enable_object_spilling: false

  - name: pq_to_parquet
    enabled: true
    script: multimodal_mint1t_benchmark.py
    args: >-
      --benchmark-results-path={session_entry_dir}
      --input-path={dataset:mint1t_parquet_local,parquet}
      --output-path={session_entry_dir}/output
      --reader-type=parquet
      --writer-format=parquet
      --on-materialize-error=error
      --mode=overwrite
    ray:
      num_cpus: 16
      num_gpus: 0
      enable_object_spilling: false

  - name: pq_to_wds
    enabled: true
    script: multimodal_mint1t_benchmark.py
    args: >-
      --benchmark-results-path={session_entry_dir}
      --input-path={dataset:mint1t_parquet_local,parquet}
      --output-path={session_entry_dir}/output
      --reader-type=parquet
      --writer-format=wds
      --on-materialize-error=error
      --mode=overwrite
    ray:
      num_cpus: 16
      num_gpus: 0
      enable_object_spilling: false

6. Image filtering results

The InterleavedAspectRatioFilterStage(min_aspect_ratio=1.0, max_aspect_ratio=2.0) removes images outside the aspect-ratio range. Results on the 80-shard dataset:

Modality Input rows Output rows Filtered Pass rate
image 32,886 20,816 12,070 (36.7%) 63.3%
text 30,079 30,079 0 100%
metadata 6,818 6,818 0 100%
total 69,783 57,713 12,070

The filter only affects image rows. Metadata and text rows pass through unchanged. Output samples remain 6,818 — samples are not dropped, only their image rows are filtered.


7. Metrics reference

Every benchmark run writes metrics.json and params.json to the results path.

Input metrics

Key Description
input_num_files Number of input files
input_total_bytes Total input size in bytes
input_total_mb Total input size in MB
input_num_rows Total input rows (both PQ and WDS)
input_num_samples Distinct sample count (both PQ and WDS)
input_num_images Input image row/entry count
input_num_texts Input text row/entry count
input_modality_counts Per-modality row counts

Output metrics

Key Description
output_num_files Number of output files
output_total_bytes Total output size in bytes
output_total_mb Total output size in MB
output_num_rows Total output rows
output_num_samples Distinct output sample count
output_num_images Output image row count
output_num_texts Output text row count
output_num_metadata Output metadata row count (Parquet only)
output_modality_counts Per-modality output row counts
output_materialize_error_count Rows with materialization errors (Parquet only; 0 in all runs above)

Performance metrics

Key Description
time_taken_s Total pipeline wall-clock time
throughput_samples_per_sec Output samples per second
throughput_rows_per_sec Output rows per second
is_success Whether the pipeline completed without errors
ordering_valid Interleaved position ordering valid (True/False/None if not applicable)
wds_valid WDS output passes structural validation (True/False/None if not applicable)

8. Full 80-shard benchmark results

Run on 2026-04-02, feat/interleaved-parquet-reader-wds-writer, 16 CPUs, NVMe, materialize_on_read=True.

Stage timing note: sums are across all 80 tasks running with up to 16 workers in parallel. Wall-clock time reflects actual end-to-end elapsed time with parallelism.


wds → parquet

Value
Wall-clock time 76.8 s
Throughput 88.8 samples/s · 751.7 rows/s
Input 80 WDS tars · 6,818 samples · 9,908 MB
Input rows 69,783 (metadata: 6,818 · text: 30,079 · image: 32,886)
Output 80 Parquet · 57,713 rows · 3,838 MB
Output rows metadata: 6,818 · text: 30,079 · image: 20,816
Materialize errors 0
Ordering valid
Stage Sum (80 tasks) Mean / shard
WDS reader (inc. image fetch) 396.7 s 4.96 s
Aspect ratio filter 35.1 s 0.44 s
Parquet writer 12.0 s 0.15 s

The WDS reader accounts for 89% of total stage work. With 16 workers the 397 s of work completes in ~77 s wall-clock.


wds → wds

Value
Wall-clock time 75.4 s
Throughput 90.4 samples/s · 765.0 rows/s
Input 80 WDS tars · 6,818 samples · 9,908 MB
Input rows 69,783 (metadata: 6,818 · text: 30,079 · image: 32,886)
Output 80 WDS tars · 6,818 samples · 4,062 MB
Output rows metadata: 6,818 · text: 30,079 · image: 20,816
Materialize errors 0
WDS valid
Stage Sum (80 tasks) Mean / shard
WDS reader (inc. image fetch) 395.3 s 4.94 s
Aspect ratio filter 34.2 s 0.43 s
WDS writer 45.4 s 0.57 s

parquet → parquet

Value
Wall-clock time 15.7 s
Throughput 435.0 samples/s · 3,682.5 rows/s
Input 80 Parquet · 6,818 samples · 5,987 MB
Input rows 69,783 (metadata: 6,818 · text: 30,079 · image: 32,886)
Output 80 Parquet · 57,713 rows · 3,834 MB
Output rows metadata: 6,818 · text: 30,079 · image: 20,816
Materialize errors 0
Ordering valid
Stage Sum (80 tasks) Mean / shard
Parquet reader 13.4 s 0.17 s
Aspect ratio filter 40.5 s 0.51 s
Parquet writer 13.1 s 0.16 s

Stage work is balanced across reader, filter, and writer.


parquet → wds

Value
Wall-clock time 18.5 s
Throughput 368.4 samples/s · 3,118.8 rows/s
Input 80 Parquet · 6,818 samples · 5,987 MB
Input rows 69,783 (metadata: 6,818 · text: 30,079 · image: 32,886)
Output 80 WDS tars · 6,818 samples · 4,062 MB
Output rows metadata: 6,818 · text: 30,079 · image: 20,816
Materialize errors 0
WDS valid
Stage Sum (80 tasks) Mean / shard
Parquet reader 14.1 s 0.18 s
Aspect ratio filter 40.6 s 0.51 s
WDS writer 48.1 s 0.60 s

9. Interactive notebook

See notebooks/interleaved_io_benchmark.ipynb for an interactive version that:

  • Runs all 4 paths on 16 shards
  • Inspects WDS tar structure and Parquet modality distribution before running
  • Shows throughput bar charts across all 4 paths
  • Validates output ordering and binary_content null rates per path
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment