Last active
January 21, 2021 18:04
-
-
Save iainlane/ba0979f4cc3e4de9452d49e3f21c4764 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import sys | |
from gi.repository import Gio, GLib, GObject | |
SYSTEM_BUS = Gio.bus_get_sync(Gio.BusType.SYSTEM) | |
class SystemdUnitWatcher: | |
def __init__(self, unit, cb): | |
SYSTEM_BUS.call_sync( | |
"org.freedesktop.systemd1", | |
"/org/freedesktop/systemd1", | |
"org.freedesktop.systemd1.Manager", | |
"Subscribe", | |
None, # parameters | |
None, # reply type | |
Gio.DBusCallFlags.NONE, | |
-1, # timeout | |
None, # cancellable | |
) | |
SYSTEM_BUS.call( | |
"org.freedesktop.systemd1", | |
"/org/freedesktop/systemd1", | |
"org.freedesktop.systemd1.Manager", | |
"LoadUnit", | |
GLib.Variant("(s)", (unit,)), # parameters | |
GLib.VariantType("(o)"), # reply type | |
Gio.DBusCallFlags.NONE, | |
-1, # timeout | |
None, # cancellable | |
self._on_get_unit, | |
cb, # user_data | |
) | |
self.proxy = None | |
self.properties_changed_signal = None | |
def stop(self): | |
if self.properties_changed_signal: | |
GObject.signal_handler_disconnect( | |
self.proxy, self.properties_changed_signal | |
) | |
self.properties_changed_signal = None | |
self.proxy = None | |
def _on_properties_changed( | |
self, proxy, changed_properties, invalidated_properties, cb | |
): | |
try: | |
if changed_properties["ActiveState"] == "active": | |
self.stop() | |
cb() | |
except KeyError: # this property didn't change | |
pass | |
def _on_got_unit_proxy(self, conn, res, cb): | |
self.proxy = conn.new_finish(res) | |
self.properties_changed_signal = self.proxy.connect( | |
"g-properties-changed", self._on_properties_changed, cb | |
) | |
active_state = self.proxy.get_cached_property( | |
"ActiveState" | |
).get_string() | |
if active_state == "active": | |
self.stop() | |
cb() | |
def _on_get_unit(self, conn, res, cb): | |
try: | |
object_path_v = conn.call_finish(res) | |
object_path = object_path_v.get_child_value(0).get_string() | |
Gio.DBusProxy.new( | |
conn, | |
Gio.DBusProxyFlags.GET_INVALIDATED_PROPERTIES, | |
None, # GDBusInterfaceInfo | |
"org.freedesktop.systemd1", | |
object_path, | |
"org.freedesktop.systemd1.Unit", | |
None, # cancellable | |
self._on_got_unit_proxy, | |
cb, # user_data | |
) | |
except GLib.Error as e: | |
if ( | |
e.matches(Gio.io_error_quark(), Gio.IOErrorEnum.DBUS_ERROR) | |
and Gio.DBusError.get_remote_error(e) | |
== "org.freedesktop.systemd1.NoSuchUnit" | |
): | |
pass # the unit doesn't exist, tweak this to error if desired | |
else: | |
raise | |
def _is_active(): | |
print("It has started!") | |
if __name__ == "__main__": | |
unit = sys.argv[1] | |
# In GTK using code (such as ubiquity) there's already a main loop running | |
# for us, so ignore the bits about `main_loop`. | |
main_loop = GLib.MainLoop() | |
SystemdUnitWatcher(unit, _is_active) | |
main_loop.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment