Skip to content

Instantly share code, notes, and snippets.

@tdg5
Last active October 30, 2015 19:48
Show Gist options
  • Save tdg5/795e52c56359f1368562 to your computer and use it in GitHub Desktop.
Save tdg5/795e52c56359f1368562 to your computer and use it in GitHub Desktop.
Various bits of the cassandra cleanup process code
/*
https://github.com/apache/cassandra/blob/cassandra-2.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java#L523
*/
private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables, CounterId.OneShotRenewer renewer) throws IOException
{
assert !cfs.isIndex();
Keyspace keyspace = cfs.keyspace;
Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
if (ranges.isEmpty())
{
logger.info("Cleanup cannot run before a node has joined the ring");
return;
}
boolean hasIndexes = cfs.indexManager.hasIndexes();
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, renewer);
for (SSTableReader sstable : sstables)
{
Set<SSTableReader> sstableAsSet = Collections.singleton(sstable);
if (!hasIndexes && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
{
cfs.replaceCompactedSSTables(sstableAsSet, Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
continue;
}
if (!needsCleanup(sstable, ranges))
{
logger.debug("Skipping {} for cleanup; all rows should be kept", sstable);
continue;
}
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs));
long start = System.nanoTime();
long totalkeysWritten = 0;
long expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(),
SSTableReader.getApproximateKeyCount(sstableAsSet, cfs.metadata));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
logger.info("Cleaning up " + sstable);
File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.CLEANUP));
if (compactionFileLocation == null)
throw new IOException("disk full");
ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
CleanupInfo ci = new CleanupInfo(sstable, scanner);
metrics.beginCompaction(ci);
SSTableWriter writer = createWriter(cfs,
compactionFileLocation,
expectedBloomFilterSize,
sstable);
SSTableReader newSstable = null;
try
{
while (scanner.hasNext())
{
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
row = cleanupStrategy.cleanup(row);
if (row == null)
continue;
AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
if (writer.append(compactedRow) != null)
totalkeysWritten++;
}
if (totalkeysWritten > 0)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
else
writer.abort();
}
catch (Throwable e)
{
writer.abort();
throw Throwables.propagate(e);
}
finally
{
controller.close();
scanner.close();
metrics.finishCompaction(ci);
}
List<SSTableReader> results = new ArrayList<SSTableReader>(1);
if (newSstable != null)
{
results.add(newSstable);
String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = sstable.onDiskLength();
long endsize = newSstable.onDiskLength();
double ratio = (double) endsize / (double) startsize;
logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
}
// flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
cfs.indexManager.flushIndexesBlocking();
cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, OperationType.CLEANUP);
}
}
/*
https://github.com/apache/cassandra/blob/cassandra-2.0/src/java/org/apache/cassandra/db/DataTracker.java#L264
*/
public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Collection<SSTableReader> replacements, OperationType compactionType)
{
replace(sstables, replacements);
notifySSTablesChanged(sstables, replacements, compactionType);
}
/*
https://github.com/apache/cassandra/blob/cassandra-2.0/src/java/org/apache/cassandra/db/compaction/CompactionController.java#L209
*/
public void close()
{
SSTableReader.releaseReferences(overlappingSSTables);
}
/*
https://github.com/apache/cassandra/blob/cassandra-2.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java#L1175
*/
/**
* Release reference to this SSTableReader.
* If there is no one referring to this SSTable, and is marked as compacted,
* all resources are cleaned up and files are deleted eventually.
*/
public void releaseReference()
{
if (references.decrementAndGet() == 0 && isCompacted.get())
{
/**
* Make OS a favour and suggest (using fadvice call) that we
* don't want to see pages of this SSTable in memory anymore.
*
* NOTE: We can't use madvice in java because it requires address of
* the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
*/
dropPageCache();
FileUtils.closeQuietly(this);
deletingTask.schedule();
}
assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;
}
/*
https://github.com/apache/cassandra/blob/cassandra-2.0/src/java/org/apache/cassandra/db/DataTracker.java#L345
*/
private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
{
if (!cfstore.isValid())
{
removeOldSSTablesSize(replacements, false);
replacements = Collections.emptyList();
}
View currentView, newView;
do
{
currentView = view.get();
newView = currentView.replace(oldSSTables, replacements);
}
while (!view.compareAndSet(currentView, newView));
postReplace(oldSSTables, replacements, false);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment