Last active
December 23, 2022 05:19
Revisions
-
en0 revised this gist
Dec 23, 2022 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -26,6 +26,7 @@ def __init__( def run(self) -> None: plt.style.use("fivethirtyeight") xvals = deque(maxlen=self._maxplot) series = {} -
en0 created this gist
Dec 23, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,32 @@ from collections import deque from string import printable from liveplot import LivePlot # Create the LivePlot object. The lambda takes the queued data and converts into # a Tuple consisting of the x-axis value (probably time) and a list of metrics. # Each metric is a tuple consisting of a name for the series and a value. plot = LivePlot(lambda counter, queue_depth: (counter, [("depth", queue_depth)])) # Start the sub-process. plot.start() # This is just to demo the live plot chars = printable.strip() max_len = 4 queue = deque([c for c in chars]) counter = 0 while queue: p = queue.popleft() counter += 1 # Send data sample to the metric function every 10k iterations if counter % 10000 == 0: plot.enqueue(counter//10000, len(queue)) print(p) for c in chars: _p = p + c if len(_p) <= max_len: queue.append(_p) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,50 @@ from collections import deque from matplotlib import pyplot as plt from matplotlib.animation import FuncAnimation from multiprocessing import Process, Queue from typing import Callable, Tuple, Union, List MetricFuncRet = Tuple[Union[int, float], List[Tuple[str, Union[int, float]]]] MetricFunc = Callable[[...], MetricFuncRet] class LivePlot(Process): def __init__( self, metric_fn: MetricFunc, maxplot=500, maxqueue=1000000, interval=500 ) -> None: super().__init__() self._queue = Queue(maxsize=maxqueue) self._interval = interval self._maxplot = maxplot self._mf = metric_fn def run(self) -> None: xvals = deque(maxlen=self._maxplot) series = {} def animate(i): while not self._queue.empty(): args, kwargs = self._queue.get_nowait() xval, metrics = self._mf(*args, **kwargs) xvals.append(xval) for k, v in metrics: series.setdefault(k, deque(maxlen=self._maxplot)).append(v) plt.cla() for k, v in series.items(): plt.plot(xvals, v, label=k) plt.legend(loc="upper left") plt.tight_layout() ani = FuncAnimation(plt.gcf(), animate, interval=self._interval) plt.show() def enqueue(self, *args, **kwargs) -> None: self._queue.put((args, kwargs))