-
-
Save tiagocoutinho/8375ff2f6cf1263b946f5dbf52bc93f5 to your computer and use it in GitHub Desktop.
Script for changing Tango database in Sardana persistent information.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Changes the following persistent information: | |
Pool: | |
- MeasurementGroup's Configuration attribute | |
- MeasurementGroup's elements property | |
- IcePAP Motor's EncoderSource attribute | |
- TangoAttribute of TangoAttribute controllers | |
- TaurusAttribute of TaurusAttirubteCounterTimer controller | |
MacroServer: | |
- PreScanSnapshot | |
""" | |
import sys | |
try: | |
import argparse | |
except ImportError: | |
from taurus.external import argparse | |
try: | |
from itertools import zip_longest | |
except ImportError: | |
from itertools import izip_longest as zip_longest | |
import PyTango as tango | |
import taurus | |
from sardana.taurus.core.tango.sardana import registerExtensions | |
def grouper(iterable, n, fillvalue=None): | |
"""Collect data into fixed-length chunks or blocks""" | |
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" | |
args = [iter(iterable)] * n | |
return zip_longest(*args, fillvalue=fillvalue) | |
def change_pool(pool, old_tango_db, new_tango_db, verbose): | |
def change_tango_attr(dev, attr_name, old_tango_db, new_tango_db): | |
old_attr_value = dev.read_attribute(attr_name).value | |
new_attr_value = old_attr_value.replace(old_tango_db, new_tango_db) | |
if verbose: | |
print("changing {0} {1}".format(dev.name(), attr_name)) | |
dev.write_attribute(attr_name, new_attr_value) | |
def change_tango_dev(dev, attrs, old_tango_db, new_tango_db): | |
dev = tango.DeviceProxy(dev) | |
for attr in attrs: | |
if attr in dev.get_attribute_list(): | |
change_tango_attr(dev, attr, old_tango_db, new_tango_db) | |
def change_tango_prop_list(dev, prop_name, old_tango_db, new_tango_db): | |
dev = tango.DeviceProxy(dev) | |
property_ = dev.get_property(prop_name)[prop_name] | |
new_property = [] | |
for item in property_: | |
new_item = item.replace(old_tango_db, new_tango_db) | |
new_property.append(new_item) | |
if verbose: | |
print("changing {0} {1}".format(dev.name(), prop_name)) | |
dev.put_property({prop_name: new_property}) | |
hwinfo = pool.getHWObj().info() | |
server = hwinfo.server_id | |
db = tango.Database() | |
dev_cls = db.get_device_class_list(server) | |
for dev, cls in grouper(dev_cls, 2): | |
if cls == "MeasurementGroup": | |
config_attr = tango.AttributeProxy(dev + "/Configuration") | |
try: | |
config = config_attr.get_property("__value")["__value"][0] | |
except: | |
continue | |
new_config = config.replace(old_tango_db, new_tango_db) | |
if verbose: | |
print("changing {0} Configuration".format(dev)) | |
config_attr.put_property({"__value": new_config}) | |
mnt_grp = tango.DeviceProxy(dev) | |
change_tango_prop_list(dev, "elements", old_tango_db, new_tango_db) | |
elif cls == "Motor": | |
attrs = ["EncoderSource", "TangoAttribute"] | |
change_tango_dev(dev, attrs, old_tango_db, new_tango_db) | |
db.delete_device_attribute_property(dev, dict(Limit_Switches=["abs_change"])) | |
elif cls == "CTExpChannel": | |
attrs = ["TangoAttribute", "TaurusAttribute"] | |
change_tango_dev(dev, attrs, old_tango_db, new_tango_db) | |
elif cls == "IORegister": | |
attrs = ["TangoAttribute"] | |
change_tango_dev(dev, attrs, old_tango_db, new_tango_db) | |
elif cls == "ZeroDExpChannel": | |
attrs = ["TangoAttribute"] | |
change_tango_dev(dev, attrs, old_tango_db, new_tango_db) | |
def change_macroserver(ms, old_tango_db, new_tango_db, verbose): | |
old_env = ms.getEnvironment() | |
try: | |
old_snapshot = old_env["PreScanSnapshot"] | |
except KeyError: | |
if verbose: | |
print("No pre-scan snapshot defined") | |
return | |
new_snapshot = [] | |
for old_model, alias in old_snapshot: | |
new_model = old_model.replace(old_tango_db, new_tango_db) | |
new_snapshot.append((new_model, alias)) | |
if verbose: | |
print("changing {0} pre-scan snapshot".format(ms.name())) | |
ms.putEnvironment("PreScanSnapshot", new_snapshot) | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Change Tango DB in Sardana persistent information" | |
) | |
parser.add_argument("model", type=str, | |
help="either Pool or MacroServer device name") | |
parser.add_argument("old_tango_db", action="store", type=str, | |
help="old Tango database e.g. tbl0901:10000") | |
parser.add_argument("new_tango_db", action="store", type=str, | |
help="new Tango database e.g. tbl09.cells.es:10000") | |
parser.add_argument("--verbose", action="store_true") | |
args = parser.parse_args() | |
registerExtensions() | |
device = taurus.Device(args.model) | |
hwinfo = device.getHWObj().info() | |
dev_class = hwinfo.dev_class | |
if dev_class == "Pool": | |
change_pool(device, args.old_tango_db, args.new_tango_db, | |
args.verbose) | |
elif dev_class == "MacroServer": | |
change_macroserver(device, args.old_tango_db, args.new_tango_db, | |
args.verbose) | |
else: | |
print("Invalid model, expected Pool or MacroServer. Exiting...") | |
sys.exit(-1) | |
print("IMPORTANT: now restart the {0} device server".format(dev_class)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment