-
-
Save walkermatt/2871026 to your computer and use it in GitHub Desktop.
| from threading import Timer | |
| def debounce(wait): | |
| """ Decorator that will postpone a functions | |
| execution until after wait seconds | |
| have elapsed since the last time it was invoked. """ | |
| def decorator(fn): | |
| def debounced(*args, **kwargs): | |
| def call_it(): | |
| fn(*args, **kwargs) | |
| try: | |
| debounced.t.cancel() | |
| except(AttributeError): | |
| pass | |
| debounced.t = Timer(wait, call_it) | |
| debounced.t.start() | |
| return debounced | |
| return decorator |
| import unittest | |
| import time | |
| from debounce import debounce | |
| class TestDebounce(unittest.TestCase): | |
| @debounce(10) | |
| def increment(self): | |
| """ Simple function that | |
| increments a counter when | |
| called, used to test the | |
| debounce function decorator """ | |
| self.count += 1 | |
| def setUp(self): | |
| self.count = 0 | |
| def test_debounce(self): | |
| """ Test that the increment | |
| function is being debounced. | |
| The counter should only be incremented | |
| once 10 seconds after the last call | |
| to the function """ | |
| self.assertTrue(self.count == 0) | |
| self.increment() | |
| self.increment() | |
| time.sleep(9) | |
| self.assertTrue(self.count == 0) | |
| self.increment() | |
| self.increment() | |
| self.increment() | |
| self.increment() | |
| self.assertTrue(self.count == 0) | |
| time.sleep(10) | |
| self.assertTrue(self.count == 1) | |
| if __name__ == '__main__': | |
| unittest.main() |
Didn't know about threading.Timer, I like this solution!
An optimization would be to protect the Timer creation using a threading.Lock. This would prevent problems with debounce being called between Timer's initialization and calling .start().
Another would be to use functools.wraps to preserve docstrings.
Hi everyone ! A few months ago I had the same need to have a working debounce annotation, after stumbling upon this discussion I created this open source project: https://github.com/salesforce/decorator-operations
The idea of the project is to regroup useful annotations such as debounce, throttle, filter... There are only 4 annotations available for now, but you're more than welcome to suggest new features or suggestions on how to improve the existing ones :D
π thanks for sharing
@KarlPatach that sounds good π
// feature 1
In addition to debounce and throttle
also has a situation, which can has both feature.
if calls too frequently, the executor may delayed indefinitely.
some states cannot be updated in time.
then wish debounce has a max delay time allow execute one time.
// I don't known how to call it, has a debounce time, and a max delay time
// feature 2
if asyncio version has supported, that will be nice.
// finally
I wonder is them thread safe
@KarlPatach That's awesome π
My implementation (fully-typed and thread-safe):
import threading
from typing import Any, Callable, Optional, TypeVar, cast
class Debouncer:
def __init__(self, f: Callable[..., Any], interval: float):
self.f = f
self.interval = interval
self._timer: Optional[threading.Timer] = None
self._lock = threading.Lock()
def __call__(self, *args, **kwargs) -> None:
with self._lock:
if self._timer is not None:
self._timer.cancel()
self._timer = threading.Timer(self.interval, self.f, args, kwargs)
self._timer.start()
VoidFunction = TypeVar("VoidFunction", bound=Callable[..., None])
def debounce(interval: float):
"""
Wait `interval` seconds before calling `f`, and cancel if called again.
The decorated function will return None immediately,
ignoring the delayed return value of `f`.
"""
def decorator(f: VoidFunction) -> VoidFunction:
if interval <= 0:
return f
return cast(VoidFunction, Debouncer(f, interval))
return decoratorAlso took a stab at it and wrote a debounce decorator https://github.com/Jerakin/debounce
@kylebebak here is a great visualization for understanding throttle vs debounce:
http://demo.nimius.net/debounce_throttle/