Created
May 24, 2018 09:09
-
-
Save kbeathanabhotla/d898644924cf321c890fdedd1dddb223 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext | |
from pyspark.streaming import StreamingContext | |
from pyspark.streaming.kafka import KafkaUtils | |
def read_offsets(zk, topics): | |
from pyspark.streaming.kafka import TopicAndPartition | |
from_offsets = {} | |
for topic in topics: | |
for partition in zk.get_children(f'/consumers/{topic}'): | |
topic_partion = TopicAndPartition(topic, int(partition)) | |
offset = int(zk.get(f'/consumers/{topic}/{partition}')[0]) | |
from_offsets[topic_partion] = offset | |
return from_offsets | |
def save_offsets(rdd): | |
zk = get_zookeeper_instance() | |
for offset in rdd.offsetRanges(): | |
path = f"/consumers/{offset.topic}/{offset.partition}" | |
zk.ensure_path(path) | |
zk.set(path, str(offset.untilOffset).encode()) | |
ZOOKEEPER_SERVERS = "127.0.0.1:2181" | |
def get_zookeeper_instance(): | |
from kazoo.client import KazooClient | |
if 'KazooSingletonInstance' not in globals(): | |
globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS) | |
globals()['KazooSingletonInstance'].start() | |
return globals()['KazooSingletonInstance'] | |
def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']): | |
sc = SparkContext(appName="PythonStreamingSaveOffsets") | |
ssc = StreamingContext(sc, 2) | |
zk = get_zookeeper_instance() | |
from_offsets = read_offsets(zk, topics) | |
directKafkaStream = KafkaUtils.createDirectStream( | |
ssc, topics, {"metadata.broker.list": brokers}, | |
fromOffsets=from_offsets) | |
directKafkaStream.foreachRDD(save_offsets) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment