Skip to content

Instantly share code, notes, and snippets.

@gurbuzali
Created May 24, 2018 08:19
Show Gist options
  • Save gurbuzali/3ec3498e0dc71052d6a055dc2e26e919 to your computer and use it in GitHub Desktop.
Save gurbuzali/3ec3498e0dc71052d6a055dc2e26e919 to your computer and use it in GitHub Desktop.
package jet.avro;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.impl.util.Util;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.IntStream;
import static com.hazelcast.jet.Traversers.traverseIterator;
import static com.hazelcast.jet.impl.util.ExceptionUtil.sneakyThrow;
import static java.util.stream.Collectors.toList;
public final class ReadAvroP<R> extends AbstractProcessor {
private final int parallelism;
private final int id;
private final DistributedBiFunction<String, GenericRecord, R> mapOutputFn;
private final Path directory;
private final String glob;
private DirectoryStream<Path> directoryStream;
private Traverser<R> outputTraverser;
private DataFileReader<GenericRecord> currentFileReader;
private ReadAvroP(String directory, String glob, int parallelism, int id,
DistributedBiFunction<String, GenericRecord, R> mapOutputFn) {
this.directory = Paths.get(directory);
this.glob = glob;
this.parallelism = parallelism;
this.id = id;
this.mapOutputFn = mapOutputFn;
}
public static <R> ProcessorMetaSupplier metaSupplier(
String directory,
String glob,
DistributedBiFunction<String, GenericRecord, R> mapOutputFn
) {
return ProcessorMetaSupplier.of((ProcessorSupplier)
count -> IntStream.range(0, count)
.mapToObj(i -> new ReadAvroP(directory, glob, count, i, mapOutputFn))
.collect(toList()),
2);
}
@Override
protected void init(Context ignored) throws Exception {
directoryStream = Files.newDirectoryStream(directory, glob);
outputTraverser = Traversers.traverseIterator(directoryStream.iterator())
.filter(this::shouldProcessEvent)
.flatMap(this::processFile);
}
@Override
public boolean complete() {
return emitFromTraverser(outputTraverser);
}
private boolean shouldProcessEvent(Path file) {
if (Files.isDirectory(file)) {
return false;
}
int hashCode = file.hashCode();
return ((hashCode & Integer.MAX_VALUE) % parallelism) == id;
}
private Traverser<R> processFile(Path file) {
if (getLogger().isFinestEnabled()) {
getLogger().finest("Processing file " + file);
}
try {
assert currentFileReader == null : "currentFileReader != null";
currentFileReader = createFileReader(file);
String fileName = file.getFileName().toString();
return traverseIterator(currentFileReader)
.map(record -> mapOutputFn.apply(fileName, record))
.onFirstNull(() -> {
Util.uncheckRun(() -> currentFileReader.close());
currentFileReader = null;
});
} catch (IOException e) {
throw sneakyThrow(e);
}
}
private DataFileReader<GenericRecord> createFileReader(Path file) throws IOException {
return new DataFileReader<>(file.toFile(), new GenericDatumReader<>());
}
@Override
public void close(Throwable error) throws IOException {
IOException ex = null;
if (directoryStream != null) {
try {
directoryStream.close();
} catch (IOException e) {
ex = e;
}
}
if (currentFileReader != null) {
currentFileReader.close();
}
if (ex != null) {
throw ex;
}
}
@Override
public boolean isCooperative() {
return false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment