Skip to content

Instantly share code, notes, and snippets.

@pnarayanan
Last active November 12, 2016 01:12
Show Gist options
  • Save pnarayanan/206360cb96603af4b29bcee0038661d4 to your computer and use it in GitHub Desktop.
Save pnarayanan/206360cb96603af4b29bcee0038661d4 to your computer and use it in GitHub Desktop.
// This will be run on an admin node.
//
// This will be done as part of bootstrapping to map existing clustermap to Helix.
//
// On an ongoing basis, this can be used when we expand - to populate new partitions.
//
// The script will therefore ignore nodes and partitions that are already added to Helix, and
// only add those partitions that are new.
class StaticClusterMapToHelixMapper {
ClusterMap staticClusterMap;
String localDataCenterName;
Map<String, String> dataCenterToZkAddress;
Map<String, HelixAdmin> adminForDc;
/**
* Map the given static clustermap to Helix - this will need to be run only once for the entire cluster.
*
*/
void staticToHelixRunOnce(ClusterMap staticClusterMap, String localDataCenterName,
Map<String, String> dataCenterToZkAddress) {
this.staticClusterMap = staticClusterMap;
this.localDataCenterName = localDataCenterName;
this.dataCenterToZkAddress = dataCenterToZkAddress;
initializeAdmins();
addNewDataNodes();
// Now that all nodes that are not already in Helix have been added, let us add resources to Helix
// Ambry partitions are not "parts" of any particular logical entity like a "database".
// Therefore, instead of viewing the entire corpus as one resource (which is an option), a better approach
// is to view each ambry partition as a Helix resource with one partition.
// Secondly, here we do not want Helix to place replicas based on its algorithm. Instead,
// we want to place replicas as present in the clustermap. On an ongoing basis, the partition
// manager tool will determine how to place replicas (it is rack aware, places things based on
// the capacity etc. and will update the static clustermap). On an ongoing basis, clustermap
// upgrade will be done via updates to the static clustermap.
// For now, we assume that every partition is contained in every colo. If this requirement
// changes for any reason, it is pretty simple to modify this algorithm.
addAmbryPartitionsToAllDcs(clusterName, partition);
}
/**
* Initialize a map of dataCenter -> HelixAdmin based on the given zk Connect Strings.
*/
void initializeAdmins() {
adminForDc = new HashMap();
String clusterName = staticClusterMap.getClusterName();
for (Entry entry : dataCenterToZkAddress) {
admin = new ZKHelixAdmin(entry.getValue());
adminForDc.put(entry.getKey(), admin);
// Add a cluster entry in every DC
if (!clusterExists(admin, clusterName)) {
admin.addCluster(staticClusterMap.getClusterName()); // "PROD" / "PERF"
}
}
}
/**
* Add nodes in the static cluster map that is not already present in Helix.
* Ignores those that are already present. This is to make upgrades smooth.
*/
void addNewDataNodes() {
for (DataNode node : staticClusterMap.getDataNodes()) {
String instanceName = node.getHostName() + "_" + node.getPort();
if (instanceExists(instanceName)) {
// this is probably being run as part of an upgrade and this is a node
// that previously existed. Skip those.
continue;
}
InstanceConfig instanceConfig = new InstanceConfig(instanceName);
instanceConfig.setHostName(node.getHostName());
instanceConfig.setPort(node.getPort());
instanceConfig.setInstanceEnabled(true); // @todo: this seems necessary, what does this do exactly?
Map<String, List<String>> mountPathToReplicas = new Map();
for (Replica replica : node.getReplicas()) {
List<String> replicaStrs = mountPathToReplicas.get(replica.getMountPath());
if (replicaStrs != null) {
replicaStrs.add(replica.getName());
} else {
replicaStrs = new List<String>();
replicaStrs.add(replica.getName());
mountPathToReplicas.put(replica.getMountPath(), replicaStrs);
}
}
// Similarly create the following map
Map<String, Map<String, String>> diskInfo = new Map();
// {diskId -> {"capacity" -> capacity}, {"mountPath" -> mountPath}}
// Add additional system specific things: Disk information goes here.
instanceConfig.getRecord().setListFields(mountPathToReplicas);
instanceConfig.getRecord().setMapFields(diskInfo);
instanceConfig.getRecord().setSimpleField("sslPort", node.getSslPort());
instanceConfig.getRecord().setSimpleField("dataCenter", node.getDataCenter());
instanceConfig.getRecord().setSimpleField("rackId", node.getRackId());
adminForDc.get(node.getDataCenter()).addInstance(clusterName, instanceConfig);
}
}
/**
* Adds all partitions to every datacenter with replicas in nodes as specified in the
* static clustermap (unless it was already added).
*/
void addAmbryPartitionsToAllDcs(String clusterName, Partition partition) {
for (Partition partition : staticClusterMap.getPartitions()) {
String resourceName = partition.getName(); // Ambry Partition == Helix Resource with 1 Partition
String partitionName = partition.getName();
boolean sealed = partition.isReadOnly();
for (Entry entry : adminForDc) {
dcName = entry.getKey();
dcAdmin = entry.getValue();
if (resourceExists(dcAdmin, resourceName)) {
continue;
}
CustomModeISBuilder builder = new CustomModeISBuilder(resourceName);
builder.setStateModel("OnlineOfflineStateModel").setNumPartitions(1).setNumReplica(3);
for (DataNode node : partition.getDataNodesInDc(dcName)) {
String instanceName = node.getHostName() + "_" + node.getPort();
builder.assignInstanceAndState(partitionName, instanceName, "ONLINE");
}
IdealState idealState = builder.build();
dcAdmin.addResource(clusterName, resourceName, idealState);
if (sealed) {
// a resource can be enabled/disabled in Helix - this can be used to trigger
// external view changes. We can use "disabling" to mark a partition as
// sealed. So "disabled" would mean the partition is still
// usable for gets/deletes but not for puts. Yes - "disabling" does not sound
// like the right thing, but in the absence of any other way to do this,
// we can use this method. Note that Ambry is the only one interpreting this state.
dcAdmin.enableResource(clusterName, resourceName, false);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment