Created
November 19, 2021 15:24
-
-
Save kokospapa8/92f2f979d3cf2b990e24d62dc490f51b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from django.core.cache import cache, caches | |
| from django.conf import settings | |
| from django.db.models.query import QuerySet | |
| DEFAULT_CACHE_ALIAS = 'default' | |
| def _get_cache(cache_alias): | |
| # did this because debug toolbar does not track caches when not using just cache. | |
| # So, caches["default"] is not tracked | |
| if cache_alias == DEFAULT_CACHE_ALIAS: | |
| return cache | |
| else: | |
| return caches[cache_alias] | |
| class CacheBase: | |
| key_prefix = '' | |
| cache_alias = DEFAULT_CACHE_ALIAS | |
| single_key = False | |
| expire_duration = settings.CACHE_EXPIRATION_DURATION | |
| @classmethod | |
| def _set_default_key_prefix(cls): | |
| if cls.key_prefix == '': | |
| cls.key_prefix = cls.__name__ | |
| def __init__(self): | |
| self.cache = _get_cache(self.cache_alias) | |
| self._set_default_key_prefix() | |
| def _format_key(self, key): | |
| if self.single_key: | |
| return self.key_prefix | |
| elif isinstance(key, tuple) or isinstance(key, list): | |
| return f'{self.key_prefix}:{"-".join(str(k) for k in key)}' | |
| else: | |
| return f'{self.key_prefix}:{key}' | |
| def _args_format_key_list(self, *args): | |
| return [self._format_key(key) for key in args] | |
| def cache_get(self, key): | |
| key = self._format_key(key) | |
| data = self.cache.get(key) | |
| return data | |
| def cache_get_many(self, *args): | |
| if self.single_key: | |
| return self.get(*args) | |
| else: | |
| key_list = self._args_format_key_list(*args) | |
| result = self.cache.get_many(key_list) | |
| missing = [key for key in args if self._format_key(key) not in result] | |
| return result, missing | |
| def cache_set(self, key, value, expire_duration=None): | |
| key = self._format_key(key) | |
| # value 없을때는 암시적 delete | |
| if value is None: | |
| self.delete(key) | |
| else: | |
| self.cache.set(key, value, expire_duration or self.expire_duration) | |
| def delete(self, key): | |
| key = self._format_key(key) | |
| self.cache.delete(key) | |
| def delete_many(self, *args): | |
| key_list = self._args_format_key_list(*args) | |
| self.cache.delete_many(key_list) | |
| def add(self, key, value): | |
| key = self._format_key(key) | |
| if not self.get(key): | |
| self.set(key, value) | |
| def delete_pattern(self, pattern): | |
| self.cache.delete_pattern(pattern) | |
| def delete_all(self): | |
| pattern = self._format_key("*") | |
| return self.delete_pattern(pattern) | |
| def incr(self, key, delta): | |
| key = self._format_key(key) | |
| self.cache.incr(key, delta) | |
| def decr(self, key, delta): | |
| key = self._format_key(key) | |
| self.cache.incr(key, -delta) | |
| class NewBadgeCache(CacheBase): | |
| expire_duration = 60 * 60 * 24 * 30 * 6 | |
| def get(self, user_uuid): | |
| return self.cache_get(user_uuid) | |
| def set(self, user_uuid, data): | |
| self.cache_set(user_uuid, data) | |
| class SimpleGetCache(CacheBase): | |
| def get(self, key=None, force_db=False, *args, **kwargs): | |
| data = None | |
| if force_db: | |
| data = None | |
| else: | |
| data = self.cache_get(key) | |
| if data is None: | |
| data = self.get_from_db(key, *args, **kwargs) | |
| self.cache_set(key, data) | |
| return data | |
| def delete(self, key=None): | |
| key = self._format_key(key) | |
| self.cache.delete(key) | |
| class ModelCacheBase(CacheBase): | |
| serializer_class = None | |
| def __init__(self): | |
| super(ModelCacheBase, self).__init__() | |
| def get_from_db(self, key, *args, **kwargs): | |
| raise NotImplementedError("get_from_db is not implemented.") | |
| def get_serializer_class(self): | |
| return self.serializer_class | |
| def get_serializer(self, data): | |
| serializer_class = self.get_serializer_class() | |
| if serializer_class is None: | |
| raise TypeError( | |
| f"{self.__class__.__name__} should either include a 'serializer_class' attribute," | |
| "or override the 'get_serializer_class()' method") | |
| if isinstance(data, QuerySet) or isinstance(data, list): | |
| return serializer_class(data, many=True) | |
| else: | |
| return serializer_class(data) | |
| def to_cache_representation(self, obj): | |
| return obj | |
| def serialize(self, data): | |
| serializer = self.get_serializer(data) | |
| serialized_data = serializer.data | |
| return serialized_data | |
| def get(self, key, force_db=False, *args, **kwargs): | |
| data = None | |
| if force_db: | |
| data = None | |
| else: | |
| data = self.cache_get(key) | |
| if data is None: | |
| data = self.get_from_db(key, *args, **kwargs) | |
| if data is None: | |
| return None | |
| data = self.serialize(data) | |
| self.cache_set(key, data) | |
| return self.to_cache_representation(data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment