Created
May 20, 2020 09:13
-
-
Save normanrz/dcf3e37e4dc95c330ea8e7432a2fe102 to your computer and use it in GitHub Desktop.
Merge webKnossos volume annotations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# VOLUME ANNOTATION MERGE | |
# | |
# This script merges multiple volume annotations from webKnossos. | |
# In case of overlapping annotations, the last annotation wins. | |
# | |
# The --relabel flag will relabel the segments of each annotation | |
# to be unique in the output annotation. Useful when multiple | |
# annotators created segments with id 1. | |
# | |
# 1. Download all annotations that you want to merge. | |
# You will get a .zip file for each annotation. | |
# 2. Install Python 3 (if you don't have it) | |
# 3. Install the dependencies of this script: | |
# pip install -U wkw | |
# 4. Run the script from the terminal | |
# python merge_volume.py volume0.zip volume1.zip volume2.zip | |
# OR | |
# python merge_volume.py --relabel volume0.zip volume1.zip volume2.zip | |
# 5. The script will output a out.zip file that | |
# you can upload to webKnossos. | |
# | |
# License: MIT, scalable minds | |
import wkw | |
from zipfile import ZipFile | |
import sys | |
from glob import iglob | |
from uuid import uuid4 | |
import os | |
import re | |
import numpy as np | |
from shutil import rmtree, copyfile | |
from argparse import ArgumentParser | |
# Consts | |
path = os.path | |
BUCKET_SIZE = 32 | |
out_folder = str(uuid4()) | |
# Prelude | |
parser = ArgumentParser(description="Merge webKnossos volume annotations") | |
parser.add_argument( | |
"--relabel", | |
action="store_true", | |
help="Relabel all segments with a new unique id in order to avoid label collisions.", | |
) | |
parser.add_argument("zip_files", nargs="+", help="Volume annotation files (.zip)") | |
args = parser.parse_args() | |
if len(args.zip_files) == 0: | |
print("Please supply volume annotations as downloaded from webKnossos") | |
sys.exit(0) | |
print("Merging {} annotations: {}".format(len(args.zip_files), args.zip_files)) | |
# Unzip all annotation zips | |
folder_names = [str(uuid4()) for _ in args.zip_files] | |
for zip_file, folder_name in zip(args.zip_files, folder_names): | |
os.makedirs(folder_name, exist_ok=True) | |
with ZipFile(zip_file, "r") as zip_ref: | |
zip_ref.extractall(folder_name) | |
with ZipFile(path.join(folder_name, "data.zip"), "r") as zip_ref: | |
zip_ref.extractall(folder_name) | |
print("Unpacked all volume annotations") | |
# Create output WKW | |
out_ds = wkw.Dataset.open( | |
path.join(out_folder, "1"), | |
wkw.Header( | |
voxel_type=np.uint32, file_len=1, block_type=wkw.Header.BLOCK_TYPE_LZ4HC | |
), | |
) | |
# Get buckets of all annotations | |
CUBE_REGEX = re.compile(r"z(\d+)/y(\d+)/x(\d+)(\.wkw)$") | |
def list_buckets(layer_path): | |
output = set() | |
for filename in iglob(path.join(layer_path, "*", "*", "*.wkw"), recursive=True): | |
m = CUBE_REGEX.search(filename) | |
if m is not None: | |
output.add((int(m.group(3)), int(m.group(2)), int(m.group(1)))) | |
return output | |
bucket_lists = [ | |
list_buckets(path.join(folder_name, "1")) for folder_name in folder_names | |
] | |
# Collect all unique labels | |
label_sets = [] | |
for buckets, folder_name in zip(bucket_lists, folder_names): | |
with wkw.Dataset.open(path.join(folder_name, "1")) as in_ds: | |
label_set = set() | |
for (x, y, z) in buckets: | |
offset = (x * BUCKET_SIZE, y * BUCKET_SIZE, z * BUCKET_SIZE) | |
size = (BUCKET_SIZE, BUCKET_SIZE, BUCKET_SIZE) | |
in_block = in_ds.read(offset, size)[0] | |
label_set.update(set(in_block[in_block != 0])) | |
label_sets.append(label_set) | |
label_maps = [] | |
if args.relabel: | |
i = 1 | |
for label_set in label_sets: | |
label_map = {} | |
for label in label_set: | |
label_map[label] = i | |
i += 1 | |
label_maps.append(label_map) | |
print("Relabelling {} unique labels".format(sum(map(lambda a: len(a), label_maps)))) | |
else: | |
for label_set in label_sets: | |
label_maps.append({l: l for l in label_set}) | |
print( | |
"Found {} labels, not relabelling".format( | |
sum(map(lambda a: len(a), label_maps)) | |
) | |
) | |
# Merge conflicting buckets (write all non-zero items, last write wins) | |
for buckets, folder_name, label_map in zip(bucket_lists, folder_names, label_maps): | |
with wkw.Dataset.open(path.join(folder_name, "1")) as in_ds: | |
for (x, y, z) in buckets: | |
offset = (x * BUCKET_SIZE, y * BUCKET_SIZE, z * BUCKET_SIZE) | |
size = (BUCKET_SIZE, BUCKET_SIZE, BUCKET_SIZE) | |
out_block = out_ds.read(offset, size)[0] | |
in_block = in_ds.read(offset, size)[0] | |
for in_label, out_label in label_map.items(): | |
idx = in_block == in_label | |
out_block[idx] = out_label | |
out_ds.write(offset, out_block) | |
print("Merged all data") | |
# Create zip file | |
nml_file = next(f for f in iglob(path.join(folder_names[-1], "*.nml"))) | |
with ZipFile("data.zip", "w") as zip_ref: | |
for root, dirs, files in os.walk(path.join(out_folder, "1")): | |
for file in files: | |
zip_ref.write(path.join(root, file)) | |
with ZipFile("out.zip", "w") as zip_ref: | |
zip_ref.write("data.zip") | |
zip_ref.write(nml_file, arcname=path.basename(nml_file)) | |
# Cleanup | |
for folder_name in folder_names: | |
rmtree(folder_name) | |
rmtree(out_folder) | |
os.unlink("data.zip") | |
# Done | |
print("Created out.zip") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment