Created
November 28, 2021 19:21
-
-
Save develar/de9a2eb4934e55b281604d70fd00c5e3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.parquet.column.values.bloomfilter; | |
import org.apache.parquet.Preconditions; | |
import org.apache.parquet.io.api.Binary; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import java.nio.ByteBuffer; | |
import java.nio.ByteOrder; | |
import java.nio.IntBuffer; | |
import java.util.Arrays; | |
/* | |
* This Bloom filter is implemented using block-based Bloom filter algorithm from Putze et al.'s | |
* "Cache-, Hash- and Space-Efficient Bloom filters". The basic idea is to hash the item to a tiny | |
* Bloom filter which size fit a single cache line or smaller. This implementation sets 8 bits in | |
* each tiny Bloom filter. Each tiny Bloom filter is 32 bytes to take advantage of 32-byte SIMD | |
* instruction. | |
*/ | |
public class BlockSplitBloomFilter implements BloomFilter { | |
// Bytes in a tiny Bloom filter block. | |
private static final int BYTES_PER_BLOCK = 32; | |
// Bits in a tiny Bloom filter block. | |
private static final int BITS_PER_BLOCK = 256; | |
// The lower bound of bloom filter size, set to the size of a tiny Bloom filter block. | |
public static final int LOWER_BOUND_BYTES = 32; | |
// The upper bound of bloom filter size, set to default row group size. | |
public static final int UPPER_BOUND_BYTES = 128 * 1024 * 1024; | |
// The number of bits to set in a tiny Bloom filter | |
private static final int BITS_SET_PER_BLOCK = 8; | |
// The metadata in the header of a serialized Bloom filter is four four-byte values: the number of bytes, | |
// the filter algorithm, the hash algorithm, and the compression. | |
public static final int HEADER_SIZE = 16; | |
// The default false positive probability value | |
public static final double DEFAULT_FPP = 0.01; | |
// The hash strategy used in this Bloom filter. | |
private final HashStrategy hashStrategy; | |
// The underlying byte array for Bloom filter bitset. | |
private byte[] bitset; | |
// A integer array buffer of underlying bitset to help setting bits. | |
private IntBuffer intBuffer; | |
// Hash function use to compute hash for column value. | |
private HashFunction hashFunction; | |
private int maximumBytes = UPPER_BOUND_BYTES; | |
private int minimumBytes = LOWER_BOUND_BYTES; | |
// A cache used for hashing | |
private ByteBuffer cacheBuffer = ByteBuffer.allocate(Long.BYTES); | |
private int[] mask = new int[BITS_SET_PER_BLOCK]; | |
// The block-based algorithm needs 8 odd SALT values to calculate eight indexes | |
// of bits to set, one per 32-bit word. | |
private static final int[] SALT = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, | |
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; | |
/** | |
* Constructor of block-based Bloom filter. | |
* | |
* @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within | |
* [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down | |
* to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power | |
* of 2. It uses XXH64 as its default hash function. | |
*/ | |
public BlockSplitBloomFilter(int numBytes) { | |
this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64); | |
} | |
/** | |
* Constructor of block-based Bloom filter. | |
* | |
* @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within | |
* [DEFAULT_MINIMUM_BYTES, maximumBytes], it will be rounded up/down | |
* to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power | |
* of 2. It uses XXH64 as its default hash function. | |
* @param maximumBytes The maximum bytes of the Bloom filter. | |
*/ | |
public BlockSplitBloomFilter(int numBytes, int maximumBytes) { | |
this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64); | |
} | |
/** | |
* Constructor of block-based Bloom filter. | |
* | |
* @param numBytes The number of bytes for Bloom filter bitset | |
* @param hashStrategy The hash strategy of Bloom filter. | |
*/ | |
private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) { | |
this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, hashStrategy); | |
} | |
/** | |
* Constructor of block-based Bloom filter. | |
* | |
* @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within | |
* [minimumBytes, maximumBytes], it will be rounded up/down to lower/upper bound if | |
* num_bytes is out of range. It will also be rounded up to a power of 2. | |
* @param minimumBytes The minimum bytes of the Bloom filter. | |
* @param maximumBytes The maximum bytes of the Bloom filter. | |
* @param hashStrategy The adopted hash strategy of the Bloom filter. | |
*/ | |
public BlockSplitBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy) { | |
if (minimumBytes > maximumBytes) { | |
throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes"); | |
} | |
if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) { | |
this.minimumBytes = minimumBytes; | |
} | |
if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) { | |
this.maximumBytes = maximumBytes; | |
} | |
initBitset(numBytes); | |
cacheBuffer.order(ByteOrder.LITTLE_ENDIAN); | |
switch (hashStrategy) { | |
case XXH64: | |
this.hashStrategy = hashStrategy; | |
hashFunction = new XxHash(); | |
break; | |
default: | |
throw new RuntimeException("Unsupported hash strategy"); | |
} | |
} | |
/** | |
* Construct the Bloom filter with given bitset, it is used when reconstructing | |
* Bloom filter from parquet file. It use XXH64 as its default hash | |
* function. | |
* | |
* @param bitset The given bitset to construct Bloom filter. | |
*/ | |
public BlockSplitBloomFilter(byte[] bitset) { | |
this(bitset, HashStrategy.XXH64); | |
} | |
/** | |
* Construct the Bloom filter with given bitset, it is used when reconstructing | |
* Bloom filter from parquet file. | |
* | |
* @param bitset The given bitset to construct Bloom filter. | |
* @param hashStrategy The hash strategy Bloom filter apply. | |
*/ | |
private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) { | |
if (bitset == null) { | |
throw new RuntimeException("Given bitset is null"); | |
} | |
cacheBuffer.order(ByteOrder.LITTLE_ENDIAN); | |
this.bitset = bitset; | |
this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); | |
switch (hashStrategy) { | |
case XXH64: | |
this.hashStrategy = hashStrategy; | |
hashFunction = new XxHash(); | |
break; | |
default: | |
throw new RuntimeException("Unsupported hash strategy"); | |
} | |
} | |
/** | |
* Create a new bitset for Bloom filter. | |
* | |
* @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within | |
* [minimumBytes, maximumBytes], it will be rounded up/down | |
* to lower/upper bound if num_bytes is out of range and also will rounded up to a power | |
* of 2. It uses XXH64 as its default hash function and block-based algorithm | |
* as default algorithm. | |
*/ | |
private void initBitset(int numBytes) { | |
if (numBytes < minimumBytes) { | |
numBytes = minimumBytes; | |
} | |
// Get next power of 2 if it is not power of 2. | |
if ((numBytes & (numBytes - 1)) != 0) { | |
numBytes = Integer.highestOneBit(numBytes) << 1; | |
} | |
if (numBytes > maximumBytes || numBytes < 0) { | |
numBytes = maximumBytes; | |
} | |
this.bitset = new byte[numBytes]; | |
this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer(); | |
} | |
@Override | |
public void writeTo(OutputStream out) throws IOException { | |
out.write(bitset); | |
} | |
private int[] setMask(int key) { | |
// The following three loops are written separately so that they could be | |
// optimized for vectorization. | |
for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { | |
mask[i] = key * SALT[i]; | |
} | |
for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { | |
mask[i] = mask[i] >>> 27; | |
} | |
for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) { | |
mask[i] = 0x1 << mask[i]; | |
} | |
return mask; | |
} | |
@Override | |
public void insertHash(long hash) { | |
long numBlocks = bitset.length / BYTES_PER_BLOCK; | |
long lowHash = hash >>> 32; | |
int blockIndex = (int)((lowHash * numBlocks) >> 32); | |
int key = (int)hash; | |
// Calculate mask for bucket. | |
int[] mask = setMask(key); | |
for (int i = 0; i < BITS_SET_PER_BLOCK; i++) { | |
int index = blockIndex * (BYTES_PER_BLOCK / 4) + i; | |
int value = intBuffer.get(index); | |
value |= mask[i]; | |
intBuffer.put(index, value); | |
} | |
} | |
@Override | |
public boolean findHash(long hash) { | |
long numBlocks = bitset.length / BYTES_PER_BLOCK; | |
long lowHash = hash >>> 32; | |
int blockIndex = (int)((lowHash * numBlocks) >> 32); | |
int key = (int)hash; | |
// Calculate mask for the tiny Bloom filter. | |
int[] mask = setMask(key); | |
for (int i = 0; i < BITS_SET_PER_BLOCK; i++) { | |
if (0 == (intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) { | |
return false; | |
} | |
} | |
return true; | |
} | |
/** | |
* Calculate optimal size according to the number of distinct values and false positive probability. | |
* | |
* @param n: The number of distinct values. | |
* @param p: The false positive probability. | |
* | |
* @return optimal number of bits of given n and p. | |
*/ | |
public static int optimalNumOfBits(long n, double p) { | |
Preconditions.checkArgument((p > 0.0 && p < 1.0), | |
"FPP should be less than 1.0 and great than 0.0"); | |
final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8)); | |
int numBits = (int)m ; | |
// Handle overflow. | |
if (numBits > UPPER_BOUND_BYTES << 3 || m < 0) { | |
numBits = UPPER_BOUND_BYTES << 3; | |
} | |
// Round numBits up to (k * BITS_PER_BLOCK) | |
numBits = (numBits + BITS_PER_BLOCK -1) & ~BITS_PER_BLOCK; | |
if (numBits < (LOWER_BOUND_BYTES << 3)) { | |
numBits = LOWER_BOUND_BYTES << 3; | |
} | |
return numBits; | |
} | |
@Override | |
public int getBitsetSize() { | |
return this.bitset.length; | |
} | |
@Override | |
public long hash(Object value) { | |
if (value instanceof Binary) { | |
return hashFunction.hashBytes(((Binary) value).getBytes()); | |
} | |
if (value instanceof Integer) { | |
cacheBuffer.putInt((Integer)value); | |
} else if (value instanceof Long) { | |
cacheBuffer.putLong((Long)value); | |
} else if (value instanceof Float) { | |
cacheBuffer.putFloat((Float)value); | |
} else if (value instanceof Double) { | |
cacheBuffer.putDouble((Double) value); | |
} else { | |
throw new RuntimeException("Parquet Bloom filter: Not supported type"); | |
} | |
return doHash(); | |
} | |
@Override | |
public boolean equals(Object object) { | |
if (object == this) { | |
return true; | |
} | |
if (object instanceof BlockSplitBloomFilter) { | |
BlockSplitBloomFilter that = (BlockSplitBloomFilter) object; | |
return Arrays.equals(this.bitset, that.bitset) | |
&& this.getAlgorithm() == that.getAlgorithm() | |
&& this.hashStrategy == that.hashStrategy; | |
} | |
return false; | |
} | |
@Override | |
public HashStrategy getHashStrategy() { | |
return HashStrategy.XXH64; | |
} | |
@Override | |
public Algorithm getAlgorithm() { | |
return Algorithm.BLOCK; | |
} | |
@Override | |
public Compression getCompression() { | |
return Compression.UNCOMPRESSED; | |
} | |
private long doHash() { | |
cacheBuffer.flip(); | |
long hashResult = hashFunction.hashByteBuffer(cacheBuffer); | |
cacheBuffer.clear(); | |
return hashResult; | |
} | |
@Override | |
public long hash(int value) { | |
cacheBuffer.putInt(value); | |
return doHash(); | |
} | |
@Override | |
public long hash(long value) { | |
cacheBuffer.putLong(value); | |
return doHash(); | |
} | |
@Override | |
public long hash(double value) { | |
cacheBuffer.putDouble(value); | |
return doHash(); | |
} | |
@Override | |
public long hash(float value) { | |
cacheBuffer.putFloat(value); | |
return doHash(); | |
} | |
@Override | |
public long hash(Binary value) { | |
return hashFunction.hashBytes(value.getBytes()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment