Skip to content

Instantly share code, notes, and snippets.

@omnifroodle
Created September 7, 2018 21:33
simple Cassandra testing with Locust framework
from cassandra.cluster import Cluster
from cassandra.policies import RoundRobinPolicy
from locust import Locust, events, task, TaskSet
import time
class CassandraClient(object):
def __init__(self, host):
cluster = Cluster([host], load_balancing_policy=RoundRobinPolicy())
self.session = cluster.connect()
def execute(self, query):
start_time = time.time()
try:
self.session.execute(query)
except Exception as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="c* query", name=query, response_time=total_time, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="c* query", name=query, response_time=total_time, response_length=0)
class CassandraLocust(Locust):
def __init__(self, *args, **kwargs):
super(CassandraLocust, self).__init__(*args, **kwargs)
self.client = CassandraClient(self.host)
class SimpleUser(CassandraLocust):
min_wait = 10
max_wait = 500
host = "127.0.0.1"
class task_set(TaskSet):
@task
def test_task(self):
self.client.execute("select * from system.local")
# @task
# def fail_task(self):
# self.client.execute("select * from bob")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment