Skip to content

Instantly share code, notes, and snippets.

@Eboubaker
Last active December 13, 2024 08:36
Show Gist options
  • Save Eboubaker/6a0b807788088a764b2a4c156fda0e4b to your computer and use it in GitHub Desktop.
Save Eboubaker/6a0b807788088a764b2a4c156fda0e4b to your computer and use it in GitHub Desktop.
updated
import contextlib
import threading
from typing import Generator
# Author : github.com/Eboubaker
# Fixed by: github.com/icezyclon
class ReentrantRWLock:
"""This class implements reentrant read-write lock objects.
A read-write lock can be aquired in read mode or in write mode or both.
Many different readers are allowed while no thread holds the write lock.
While a writer holds the write lock, no other threads, aside from the writer,
may hold the read or the write lock.
A thread may upgrade the lock to write mode while already holding the read lock.
Similarly, a thread already having write access may aquire the read lock
(or may already have it), to retain read access when releasing the write lock.
A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock (read or write), the same thread may acquire it
again without blocking any number of times;
the thread must release each lock (read/write) the same number of times it has acquired it!
The lock provides contextmanagers in the form of `for_read()` and `for_write()`,
which automatically aquire and release the corresponding lock, e.g.,
>>> with lock.for_read(): # get read access until end of context
>>> ...
>>> with lock.for_write(): # upgrade to write access until end of inner
>>> ...
"""
def __init__(self) -> None:
self._writer: int | None = None # current writer
self._writer_count: int = 0 # number of times writer holding write lock
# set of current readers mapping to number of times holding read lock
# entry is missing if not holding the read lock (no 0 values)
self._readers: dict[int, int] = dict()
# main lock + condition, is used for:
# * protecting read/write access to _writer, _writer_times and _readers
# * is actively held when having write access (so no other thread has access)
# * future writers can wait() on the lock to be notified once nobody is reading/writing anymore
self._lock = threading.Condition(threading.RLock()) # reentrant
@contextlib.contextmanager
def for_read(self) -> Generator["ReentrantRWLock", None, None]:
"""
used for 'with' block, e.g., with lock.for_read(): ...
"""
try:
self.acquire_read()
yield self
finally:
self.release_read()
@contextlib.contextmanager
def for_write(self) -> Generator["ReentrantRWLock", None, None]:
"""
used for 'with' block, e.g., with lock.for_write(): ...
"""
try:
self.acquire_write()
yield self
finally:
self.release_write()
def acquire_read(self) -> None:
"""
Acquire one read lock. Blocks only if a another thread has acquired the write lock.
"""
ident: int = threading.current_thread().ident # type: ignore
with self._lock:
self._readers[ident] = self._readers.get(ident, 0) + 1
def release_read(self) -> None:
"""
Release one currently held read lock from this thread.
"""
ident: int = threading.current_thread().ident # type: ignore
with self._lock:
if ident not in self._readers:
raise RuntimeError(
f"Read lock was released while not holding it by thread {ident}"
)
if self._readers[ident] == 1:
del self._readers[ident]
else:
self._readers[ident] -= 1
if not self._readers: # if no other readers remain
self._lock.notify() # wake the next writer if any
def acquire_write(self) -> None:
"""
Acquire one write lock. Blocks until there are no acquired read or write locks from other threads.
"""
ident: int = threading.current_thread().ident # type: ignore
self._lock.acquire() # is reentrant, so current writer can aquire again
if self._writer == ident:
self._writer_count += 1
return
# do not be reader while waiting for write or notify will not be called
times_reading = self._readers.pop(ident, 0)
while len(self._readers) > 0:
self._lock.wait()
self._writer = ident
self._writer_count += 1
if times_reading:
# restore number of read locks thread originally had
self._readers[ident] = times_reading
def release_write(self) -> None:
"""
Release one currently held write lock from this thread.
"""
if self._writer != threading.current_thread().ident:
raise RuntimeError(
f"Write lock was released while not holding it by thread {threading.current_thread().ident}"
)
self._writer_count -= 1
if self._writer_count == 0:
self._writer = None
self._lock.notify() # wake the next writer if any
self._lock.release()
@Eboubaker
Copy link
Author

@icezyclon sorry for the late reply,
you're right I personally haven't done extensive testing like you did, I made this class in a hurry for another project the issue didn't happen on my project so I didn't notice it.
thanks for your work and extensive testing. I have updated my gist with yours

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment