Created
February 7, 2017 14:24
-
-
Save mfenniak/04f9c0bea8a1a2e0a747d678117df9f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun printGraphviz(builder: TopologyBuilder) { | |
println("digraph {") | |
val nodeGroups = builder.nodeGroups() | |
val processorTopologys = nodeGroups.map { kv -> builder.build(kv.key) } | |
val sourceTopics = processorTopologys.flatMap { it.sourceTopics() } | |
val sinkTopics = processorTopologys.flatMap { it.sinkTopics() } | |
val allTopics = sourceTopics.plus(sinkTopics).distinct().sorted() | |
println(" node [shape=\"rect\"]") | |
for (topic in allTopics) { | |
println(" \"topic-$topic\" [label=\"Topic $topic\"]") | |
} | |
for (processorTopology in processorTopologys.withIndex()) { | |
printProcessor(processorTopology.index, processorTopology.value) | |
} | |
println("}") | |
} | |
val sinkNodeTopicField: Field by lazy { | |
val klass = SinkNode::class.java | |
val topicField = klass.getDeclaredField("topic") | |
topicField.isAccessible = true | |
topicField | |
} | |
fun printProcessor(nodeGroup: Int, processorTopology: ProcessorTopology) { | |
println(" subgraph t$nodeGroup {") | |
println(" # processor nodes") | |
println(" node [shape=\"ellipse\"]") | |
for (processor in processorTopology.processors()) { | |
println(" \"processor-${processor.name()}\" [label=\"Processor ${processor.name()}\"]") | |
} | |
println(" # source nodes") | |
for (sourceTopic in processorTopology.sourceTopics()) { | |
val sourceNode = processorTopology.source(sourceTopic) | |
println(" \"topic-$sourceTopic\" -> \"processor-${sourceNode.name()}\"") | |
} | |
println(" # processors") | |
for (processor in processorTopology.processors()) { | |
if (processor is SinkNode<*, *>) { | |
// note: can't use processorTopology's sinkTopics() & sink(...) methods, because a single processor | |
// topology can have multiple sinks for one topic, but the ProcessorTopology API doesn't allow you | |
// to retrieve all the sinks. It just returns a single random SinkNode. | |
val sinkTopic = sinkNodeTopicField.get(processor) as String | |
println(" \"processor-${processor.name()}\" -> \"topic-$sinkTopic\"") | |
} | |
/* | |
SourceNode's topics field are not correct; they are not applicationId prefixed when they should be. | |
if (processor is SourceNode<*, *>) { | |
val sourceTopics = sourceNodeTopicsField.get(processor) as Array<String> | |
for (sourceTopic in sourceTopics) { | |
println(" \"topic-$sourceTopic\" -> \"processor-${processor.name()}\"") | |
} | |
} | |
*/ | |
for (child in processor.children()) { | |
if (child is ProcessorNode<*, *>) { | |
println(" \"processor-${processor.name()}\" -> \"processor-${child.name()}\"") | |
} | |
} | |
} | |
println(" }") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment