Skip to content

Instantly share code, notes, and snippets.

@BigRoy
Last active December 20, 2024 17:18
Show Gist options
  • Save BigRoy/c3e8028a0f8c1901f36b10780b0211c8 to your computer and use it in GitHub Desktop.
Save BigRoy/c3e8028a0f8c1901f36b10780b0211c8 to your computer and use it in GitHub Desktop.
Capture instance.data and context.data reads and writes from Pyblish plugins (the instances must already exist at the specified CollectorOrder!)
"""This plug-in will do some heavy logging of all reads and changes to any
instance.data or context.data by pyblish plug-ins and will generate a JSON
This requires typing-extensions and observatory library and will in the worst
possible manner add those to sys.path to make it work. This is a quick hack.
Dependencies:
observatory: https://github.com/sharpencrag/observatory
typing_extensions: https://github.com/python/typing_extensions
1. Put their downloaded git repos unzipped into `DEPENDENCIES_ROOT`.
2. Specify the `JSON_PATH` where the JSON file will be written to.
3. Publish (with this plug-in active)
"""
import os
import sys
import inspect
import json
import zipfile
from collections import defaultdict
from functools import partial
from typing import Any
import pyblish.api
# Configure these hardcoded values
JSON_PATH = r"E:\result.json"
DEPENDENCIES_ROOT = r"C:\Users\User\Downloads"
# The hackiest way to inject these libraries into the current env
# because AYON already comes with `typing_extensions` BUT it's an older
# version which does not work with observatory. So we're hijacking it like
# this for the time being
for repo in [
r"typing_extensions-main\src",
r"observatory-main\src",
]:
path = os.path.join(DEPENDENCIES_ROOT, repo)
if path not in sys.path:
sys.path.insert(0, path)
import typing_extensions
import importlib
importlib.reload(typing_extensions)
from observatory.data_types import ObservableDict, EventHook, T, K, t
def is_pyblish_plugin_method(frame) -> bool:
"""Check if the frame is inside a Pyblish plug-in method.
WARNING: This would not work for `staticmethod` because it has no arguments
returning to `self` or `cls` and hence it can't be detected whether
it's a method or not from the frame.
"""
try:
# find the name of the first variable in the calling
# function - which is hopefully the "self"
codeobj = frame.f_code
try:
self_name = codeobj.co_varnames[0]
except IndexError:
return False
# try to access the caller's "self"
try:
self_obj = frame.f_locals[self_name]
except KeyError:
return False
# check if the calling function is really a method
self_type = type(self_obj)
if (
# method
not isinstance(self_obj, pyblish.api.Plugin)
# classmethod
and not issubclass(self_obj, pyblish.api.Plugin)
):
return False
func_name = codeobj.co_name
# iterate through all classes in the MRO
for cls in self_type.__mro__:
# see if this class has a method with the name
# we're looking for
try:
method = vars(cls)[func_name]
except KeyError:
continue
# unwrap the method just in case there are any decorators
try:
method = inspect.unwrap(method)
except ValueError:
pass
# see if this is the method that called us
if getattr(method, '__code__', None) is codeobj:
return True
# if we didn't find a matching method, return None
return False
finally:
# make sure to clean up the frame at the end to avoid ref cycles
del frame
def get_class_name(frame):
"""Return the (assumed) class name from the inspect frame."""
self_name = frame.f_code.co_varnames[0]
return frame.f_locals[self_name].__class__.__name__
class ObservableReadDict(ObservableDict):
"""Observable dictionary that also emits signals key is accessed (read)."""
item_get: EventHook[K] = EventHook()
items_get: EventHook[t.Dict[K, Any]] = EventHook()
def __getitem__(self, key):
value = super().__getitem__(key)
#for info in inspect.stack():
# print(f"{info.filename}:{info.lineno} {info.function}")
self.item_get.emit(key)
return value
def get(self, key, *args, **kwargs):
value = super().get(key, *args, **kwargs)
self.item_get.emit(key)
return value
def items(self):
value = super().items()
self.item_get.emit(None)
return value
def keys(self):
value = super().items()
self.item_get.emit(None)
return value
def values(self):
value = super().items()
self.item_get.emit(None)
return value
class Printer:
"""Helper from `watchpoints` library to print the source code of the
inspect frame."""
@classmethod
def _file_string(cls, exec_info):
return cls.getsourceline(exec_info)
@classmethod
def _frame_string(cls, frame):
return cls._file_string((frame.f_code.co_name,
frame.f_code.co_filename,
frame.f_lineno))
@classmethod
def getsourceline(cls, exec_info):
try:
filename = exec_info[1]
if os.path.exists(filename):
with open(exec_info[1], encoding="utf-8") as f:
lines = f.readlines()
return f" {lines[exec_info[2] - 1].strip()}"
else:
# We may have an egg file, we try to figure out if we have a zipfile
# in the path and unzip that
potential_egg = filename
f_paths = []
while os.path.dirname(potential_egg) != potential_egg:
potential_egg, f_path = os.path.split(potential_egg)
f_paths.append(f_path)
if zipfile.is_zipfile(potential_egg):
with zipfile.ZipFile(potential_egg) as zf:
with zf.open("/".join(reversed(f_paths))) as f:
lines = f.readlines()
return f" {lines[exec_info[2] - 1].decode('utf-8').strip()}"
return "unable to locate the source"
except (FileNotFoundError, PermissionError): # pragma: no cover
return "unable to locate the source"
class CollectWatchpoints(pyblish.api.ContextPlugin):
order = pyblish.api.CollectorOrder - 0.4999
label = "Watch"
def process(self, context):
# We should consider only the frames INSIDE a Pyblish plug-in process
# method and ignore anything outside of it.
states = defaultdict(lambda: {"inputs": [], "outputs": []})
def observe(entity):
observable = ObservableReadDict(entity.data)
entity._data = observable
def emit_only(is_input, *args, **kwargs):
frame = inspect.currentframe()
stack = []
while frame:
stack.append(frame)
# stack.append(frame)
# Found a process method
if "process" != frame.f_code.co_name:
frame = frame.f_back
continue
# Only consider if is frame on a Pyblish plug-in method
if not is_pyblish_plugin_method(frame):
frame = frame.f_back
continue
plugin_name = get_class_name(frame)
# Pop the Observable part from the stack
stack.pop(0)
stack.pop(0)
stack.pop(0)
stack.pop(0)
# Detect what keys were affected
if args:
value = args[0]
if isinstance(value, dict):
keys = list(value.keys())
else:
keys = [value]
else:
keys = [""]
# Print the change traceback
changed = ", ".join(keys)
action = "reads" if is_input else "writes"
print(f"Traceback (most recent call last) - {entity.name} {action}: {changed}.")
for elem in reversed(stack):
filename = elem.f_code.co_filename
lineno = elem.f_lineno
co_name = elem.f_code.co_name
print(f' File "{filename}", line {lineno}, in {co_name}')
print(Printer._frame_string(elem))
# Report all changes
for key in keys:
if isinstance(entity, pyblish.api.Context):
label = f"Context {key}"
else:
label = f"Instance {entity.name} {key}"
label = label.strip()
if is_input:
states[plugin_name]["inputs"].append(label)
else:
states[plugin_name]["outputs"].append(label)
# TODO: Do not write to disk on every change
# It's hard to detect when the publish finished
with open(JSON_PATH, "w") as f:
json.dump(dict(states), f, indent=4)
del frame
return
# this isn't necessarily an input if it removes data :O
observable.cleared.connect(partial(emit_only, False))
observable.item_popped.connect(partial(emit_only, False))
# value changed
observable.updated.connect(partial(emit_only, False))
observable.item_set.connect(partial(emit_only, False))
# values read
observable.item_get.connect(partial(emit_only, True))
# Monkeypatch the instances, make them observable
observe(context)
for instance in context:
observe(instance)
@BigRoy
Copy link
Author

BigRoy commented Dec 20, 2024

Example script to restructure the data:

import json
from collections import defaultdict

INPUT_PATH = r"E:\test.json"
OUTPUT_BY_KEY_PATH = r"E:/by_key.json"
OUTPUT_BY_KEY_GROUPED_PATH = r"E:/by_key_grouped.json"

def uniqify(seq, idfun=None):
    """fast order preserving uniqify"""
    if idfun is None:
        def idfun(x): return x
    seen = {}
    result = []
    for item in seq:
        marker = idfun(item)
        # in old Python versions:
        # if seen.has_key(marker)
        # but in new ones:
        if marker in seen: continue
        seen[marker] = 1
        result.append(item)
    return result


# Load source json
with open(INPUT_PATH, "r") as file:
    data = json.load(file)

result = defaultdict(lambda: {"read": [], "write": []})
for plugin, changes in data.items():

    for key in changes["inputs"]:
        result[key]["read"].append(plugin)

    for key in changes["outputs"]:
        result[key]["write"].append(plugin)

for key, changes in result.items():
    changes["read"] = uniqify(changes["read"])
    changes["write"] = uniqify(changes["write"])

    if not changes["read"]:
        changes.pop("read")
    if not changes["write"]:
        changes.pop("write")


# Output by key json
with open(OUTPUT_BY_KEY_PATH, "w") as f:
    json.dump(dict(result), f, indent=4)


# Group by context and instance
grouped = {}
for key, value in result.items():
    if key.startswith("Context "):
        label, data_key = key.split(" ", 1)
    else:
        _, label, data_key = key.split(" ", 2)

    grouped.setdefault(label, {})[data_key] = value


# Output by key grouped json
with open(OUTPUT_BY_KEY_GROUPED_PATH, "w") as f:
    json.dump(dict(grouped), f, indent=4)

@BigRoy
Copy link
Author

BigRoy commented Dec 20, 2024

Instance and context data to pydantic models

Example plug-in to just quickly write out .json of the instance and context data at a particular plug-in order.

import os
import json
import collections

import pyblish.api

# Configure this for your machine. It's the output root directory for the JSON files for context and per instance.
ROOT = "E:/tmp/pyblish"


def encoder(value):
    # convert some types that we can safely 'convert' to a similar datatype for JSON
    if isinstance(value, collections.defaultdict):
        return dict(value)
    elif isinstance(value, (tuple, set, frozenset)):
        return list(value)
    return str(value)


def write(data, path):
    
    # Pop some big data that we kind of know the type of and are just clutter
    # the readability of the output
    convert = {
        "projectEntity": dict(),
        "folderEntity": dict(),
        "taskEntity": dict(),
        "anatomyData": dict(),
        "publish_attributes": dict(),
        "creator_attributes": dict(),
    }
    for key, value in convert.items():
        if key in data:
            data[key] = value

    # Avoid heavy outputDef on representations
    for repre in data.get("representations", []):
        if "outputDef" in repre:
            repre["outputDef"] = dict()

    data.pop("transientData", None)
    data.pop("farmJobExtraInfo", None)

    # Remove private keys
    for key in list(data):
        if key.startswith("_"):
            data.pop(key)

    with open(path, "w") as f:
        json.dump(data, f, indent=4, default=encoder)


class SaveJSON(pyblish.api.ContextPlugin):
    order = pyblish.api.ExtractorOrder + 0.4999
    label = "Save current state as JSON"

    def process(self, context):

        os.makedirs(ROOT, exist_ok=True)

        data = context.data.copy()
        data.pop("instances", None)
        write(data, os.path.join(ROOT, "context.json"))

        for instance in context:
            data = instance.data.copy()
            data.pop("context", None)
            write(data, os.path.join(ROOT, instance.name + ".json"))

Generate pydantic model from the JSON

You can e.g. for fun throw these JSON files into https://jsontopydantic.com/ to generate some fake pydantic classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment