Skip to content

Instantly share code, notes, and snippets.

@amorton
Created July 10, 2011 17:17

Revisions

  1. amorton created this gist Jul 10, 2011.
    412 changes: 412 additions & 0 deletions query_profile.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,412 @@
    """Tool for profiling Cassandra query performance.
    Tests are run by profile() multiple times and the 'Read Latency' is
    extracted using node tool.
    Usage:
    #Create the schema using the cassandra-cli.
    create keyspace query
    with strategy_options=[{replication_factor:1}]
    and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';
    use query;
    create column family NoCache
    with comparator = AsciiType
    and default_validation_class = AsciiType
    and key_validation_class = AsciiType
    and keys_cached = 0
    and rows_cached = 0;
    #Load data
    $python query_profile.py insert_rows
    #Warm up the database
    $python query_profile.py warm_up
    #Test the name locality for query by column name
    $python query_profile.py name_locality
    #Test the start position for query by range
    $python query_profile.py start_position
    """

    import math
    import multiprocessing
    import operator
    import os
    import os.path
    import random
    import re
    import shlex
    import subprocess
    import sys
    import time

    import pycassa

    rows = [
    ("small-row", 100), # 100 columns, 5K of data
    ("no-col-index", 1200), # 1200 columns, 60K of data
    ("five-thousand", 5000), # 5000 columns, 244K of data
    ("ten-thousand", 10000), # 10000 columns, 488K of data
    ("hundred-thousand", 100000), # 100000 columns, 4.8M of data
    ("one-million", 1000000), # 1000000 columns, 48M of data
    ("ten-million", 10000000) # 10000000 columns, 480M of data
    ]

    CASSANDRA_PATH = "/Users/aaron/frameworks/cassandra/apache-cassandra-0.8.1/bin/"
    CASS_POOL = pycassa.connect("query", ["localhost"])
    NOCACHE = pycassa.columnfamily.ColumnFamily(CASS_POOL, "NoCache")

    col_pattern = "{0:0>#10}"
    profile_pattern = "Row {0:>20} latency in ms "\
    "{1:>10.4} {2:>10.4} {3:>10.4} {4:>10.4}"

    def make_cols(cols):
    """Make the column names"""
    return [col_pattern.format(i) for i in xrange(cols)]

    def make_paged_cols(cols, shuffle=True):
    """Split the cols into pages about the size as a column index page
    1310 * 50B = 63.9K
    :param shuffle: Shuffle the pages
    :returns: List of lists.
    """

    all_cols = make_cols(cols)
    pages_start = range(0, cols, 1310)
    pages_end = pages_start[1:] + [cols]

    pages = [
    operator.getslice(all_cols, start, end)
    for start, end in zip(pages_start, pages_end)
    ]
    if shuffle:
    random.shuffle(pages)
    return pages

    def col_range(start, cols, count):
    """A range of columns from the start or end of the column list.
    :returns: list of column names.
    """
    all_cols = make_cols(cols)
    return all_cols[:count] if start else all_cols[-count:]

    def range_paged_cols(start, cols, range, shuffle_pages=False,
    shuffle_columns=False, collapse=True):
    """select a range from the start or end of each page of columns
    :returns: list of columns if collapse, else list of list of columns.
    """

    result = []
    for page in make_paged_cols(cols, shuffle=shuffle_pages):
    if shuffle_columns:
    random.shuffle(page)
    if collapse:
    result.extend(page[:range] if start else page[-range:])
    else:
    result.append(page[:range] if start else page[-range:])
    return result

    def start_col(cols, offset, page_offset=None):
    """A single column name at the offset from the start of the row.
    :returns: str column name
    """

    if page_offset is None:
    all_cols = make_cols(cols)
    else:
    all_cols = make_paged_cols(cols, shuffle=False)
    all_cols = all_cols[min(page_offset, len(all_cols)-1)]
    return all_cols[offset]

    # ========================
    # Testing functions

    def run(cmd):
    """Execute the command line"""
    p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
    return p.communicate()

    def percentile(N, percent, key=lambda x:x):
    """
    from http://code.activestate.com/recipes/511478/ (r1)
    Find the percentile of a list of values.
    @parameter N - is a list of values. Note N MUST BE already sorted.
    @parameter percent - a float value from 0.0 to 1.0.
    @parameter key - optional key function to compute value from each element of N.
    @return - the percentile of the values
    """
    if not N:
    return None
    k = (len(N)-1) * percent
    f = math.floor(k)
    c = math.ceil(k)
    if f == c:
    return key(N[int(k)])
    d0 = key(N[int(f)]) * (k-f)
    d1 = key(N[int(c)]) * (c-k)
    return d0+d1


    def profile(target, repeat=10):
    """Clears the recent latency stats, runs the target and then gets the
    latency stats again.
    """
    global rows

    nodetool_path = os.path.join(CASSANDRA_PATH, "nodetool")
    cmd = nodetool_path + " -h localhost cfstats"

    print "Latency is min, 80th percentile, 95th percentile and max."
    print target.__doc__
    print

    for key, cols in rows:
    latency = []
    low_col_warn = False
    for i in range(repeat):
    run(cmd)
    rv = target(key, cols)
    std_out, std_err = run(cmd)
    if std_err:
    raise RuntimeError(std_err)
    if len(rv) < 100 and not low_col_warn:
    print "WARN: only %s columns returned" % len(rv)
    low_col_warn = True

    this_latency = _parse_latency(std_out)
    if this_latency is not None:
    latency.append(this_latency)

    latency.sort()
    stats = (
    min(latency),
    percentile(latency, 0.8),
    percentile(latency, 0.95),
    max(latency)
    )
    print profile_pattern.format(key, *stats)
    print
    return

    def _parse_latency(data):
    """Parse the nodetool std out to get the recent read latency for the CF.
    """

    found_cf = False
    for line in data.split("\n"):
    line = line.strip()
    if not found_cf:
    found_cf = line.startswith("Column Family: NoCache")
    continue
    if found_cf:
    token = re.findall("Read Latency: (?P<foo>\S*)", line)
    if not token:
    continue
    return (float(token[0]))
    return None

    def start_position():
    """Test different start column offsets."""

    print "Test start position..."
    print

    def test1(key, cols):
    """100 columns from with no start column"""
    return NOCACHE.get(key, column_count=100)
    profile(test1)

    def test2(key, cols):
    """100 columns from the start of the row with a start col"""
    x = start_col(cols, 0)
    return NOCACHE.get(key, column_start=x, column_count=100)
    profile(test2)

    def test3(key, cols):
    """100 columns from the start of the second page"""
    x = start_col(cols, 0, page_offset=1)
    return NOCACHE.get(key, column_start=x, column_count=100)
    profile(test3)

    def test4(key, cols):
    """100 columns starting half way through the row"""
    x = start_col(cols, (cols / 2))
    return NOCACHE.get(key, column_start=x, column_count=100)
    profile(test4)

    def test5(key, cols):
    """100 columns starting from the last page """
    x = start_col(cols, 0, page_offset=-1)
    return NOCACHE.get(key, column_start=x, column_count=100)
    profile(test5)


    def name_locality():
    """Test difference between tightly clustered columns and spread out
    columns."""

    print "Test name locality..."
    print

    def test1(key, cols):
    """100 columns by name, start of the row."""
    x = col_range(True, cols, 100)
    return NOCACHE.get(key, columns=x)
    profile(test1)

    def test2(key, cols):
    """100 columns by name, end of the row."""
    x = col_range(False, cols, 100)
    return NOCACHE.get(key, columns=x)
    profile(test2)

    def test3(key, cols):
    """100 columns by name, middle of row."""
    x = range_paged_cols(True, cols, 100, collapse=False)
    #Got a list of lists, inner list is columns from a page.
    #Pick the middle page
    x = x[int(len(x)/2.0)]
    return NOCACHE.get(key, columns=x)
    profile(test3)

    def test4(key, cols):
    """100 columns by name, first 2 cols from 50 random pages"""
    x = range_paged_cols(True, cols, 2, shuffle_pages=True)
    return NOCACHE.get(key, columns=x[:100])
    profile(test4)

    def test5(key, cols):
    """100 columns by name, last 2 cols from 50 random pages"""
    x = range_paged_cols(False, cols, 2,shuffle_pages=True)
    return NOCACHE.get(key, columns=x[:100])
    profile(test5)

    def test6(key, cols):
    """100 columns by name, random 2 cols from 50 random pages"""
    x = range_paged_cols(False, cols, 2,shuffle_pages=True,
    shuffle_columns=True)
    return NOCACHE.get(key, columns=x[:100])
    profile(test6)

    def test7(key, cols):
    """100 columns by name, first col from 100 random pages"""
    x = range_paged_cols(True, cols, 1,shuffle_pages=True)
    return NOCACHE.get(key, columns=x[:100])
    profile(test7)

    def test8(key, cols):
    """100 columns by name, last col from 100 random pages"""
    x = range_paged_cols(False, cols, 1,shuffle_pages=True, )
    return NOCACHE.get(key, columns=x[:100])
    profile(test8)

    def test9(key, cols):
    """100 columns by name, random col from 100 random pages"""
    x = range_paged_cols(False, cols, 1,shuffle_pages=True,
    shuffle_columns=True)
    return NOCACHE.get(key, columns=x[:100])
    profile(test9)

    def warm_up():
    """Scan through all of the keys in each row to warm the server"""

    global rows

    class Walk(multiprocessing.Process):

    def __init__(self, row_key):
    super(Walk, self).__init__()
    self.row_key = row_key
    conn = pycassa.connect("query", ["localhost"])
    self.cf = pycassa.columnfamily.ColumnFamily(conn, "NoCache")

    def run(self):
    start_col = ""
    while start_col is not None:
    cols = self.cf.get(self.row_key, column_start=start_col,
    column_count=1000)
    start_col, _ = cols.popitem(last=True) if len(cols) > 1 else (
    None, None)

    threads = [Walk(key) for key, cols in rows]
    print "Starting %s processes..." % len(rows)
    start = time.time()
    map(Walk.start, threads)

    alive = lambda: (t for t in threads if t.is_alive())
    while (any(alive())):
    time.sleep(1)
    if int(time.time() - start) % 5 == 0:
    print "Alive count ", len(list(alive()))
    print "Finish in ", time.time() - start
    return

    def insert_rows():
    """Insert rows into the DB for testing"""
    global rows

    class Insert(multiprocessing.Process):

    def __init__(self, row_key, cols):
    super(Insert, self).__init__()
    self.row_key = row_key
    self.cols = cols
    self.conn = pycassa.connect("query", ["localhost"])
    self.cf = pycassa.columnfamily.ColumnFamily(self.conn, "NoCache")

    def run(self):
    #automatic send every 100 cols
    mutator = pycassa.batch.Mutator(self.conn, queue_size=100)
    data = ("foo" * 10)[:25]
    for i in xrange(self.cols):
    mutator.insert(NOCACHE, self.row_key,
    {col_pattern.format(i) : data})
    mutator.send()
    return

    threads = [Insert(key, cols) for key, cols in rows]
    print "Starting %s processes..." % len(rows)
    start = time.time()
    map(Insert.start, threads)

    alive = lambda: (t for t in threads if t.is_alive())
    while (any(alive())):
    time.sleep(1)
    if int(time.time() - start) % 5 == 0:
    print "Alive count ", len(list(alive()))
    print "Finish in ", time.time() - start
    return


    if __name__ == "__main__":
    action = sys.argv[1] if len(sys.argv) > 1 else None
    args = [
    token
    for token in sys.argv[2:]
    if token.find("=") < 0
    ]
    kwargs = dict(
    token.split("=")
    for token in sys.argv[2:]
    if token.find("=") > 0
    )

    func = globals().get(action)
    if func:
    func(*args, **kwargs)