Created
July 10, 2021 06:42
-
-
Save Proteusiq/cf8d0fd9a0b3dfb16d9d4ed77d69ecbe to your computer and use it in GitHub Desktop.
Share objects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import zlib | |
from typing import Any, Optional, Dict | |
import diskcache as dc | |
class DataCache: | |
def __init__( | |
self, | |
cache_path: str, | |
expire: Optional[int] = 24 * 360, | |
tag: Optional[str] = "data", | |
compress_level: Optional[int] = 1, | |
) -> None: | |
self.cache_path = cache_path | |
self.expire = expire # seconds until data expires | |
self.tag = tag | |
self.compress_level = compress_level | |
def set(self, key: str, contents: Any) -> bool: | |
with dc.Cache(self.cache_path) as cache: | |
contents_bytes = json.dumps( | |
contents, default=lambda x: list(x) if isinstance(x, set) else x | |
).encode("utf-8") | |
data = zlib.compress(contents_bytes, self.compress_level) | |
status = cache.set( | |
key, | |
value=data, | |
expire=self.expire, | |
read=True, | |
tag=self.tag, | |
) | |
return status | |
def get(self, key: str) -> Dict: | |
with dc.Cache(self.cache_path) as cache: | |
result = cache.get(key, read=True, expire_time=True, tag=True) | |
contents_bytes, timestamp, tag = result | |
if contents_bytes: | |
data = zlib.decompress(contents_bytes).decode("utf-8") | |
contents = json.loads(data) | |
return { | |
"expired": False, | |
"contents": contents, | |
"time": timestamp, | |
"tag": tag, | |
} | |
else: | |
return {"expired": True, "contents": None, "time": timestamp, "tag": tag} | |
def delete(self, key: str) -> bool: | |
status = False | |
with dc.Cache(self.cache_path) as cache: | |
if key in cache: | |
del cache[key] | |
status = True | |
return status | |
def __contains__(self, key: str) -> bool: | |
with dc.Cache(self.cache_path) as cache: | |
return key in cache |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment