You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

90 lines
2.7 KiB

import json
import asyncio
from typing import Any
from redis.asyncio import Redis
from config import settings
class CacheManager:
def __init__(self):
self._local: dict[str, tuple[float, Any]] = {}
self._redis: Redis | None = None
self._redis_available = False
self._lock = asyncio.Lock()
async def connect(self):
try:
self._redis = Redis.from_url(settings.REDIS_URL, decode_responses=True)
await self._redis.ping()
self._redis_available = True
except Exception:
self._redis_available = False
async def disconnect(self):
if self._redis:
await self._redis.close()
@property
def available(self) -> bool:
return self._redis_available
async def get(self, key: str) -> Any | None:
if self._redis_available and self._redis:
try:
val = await self._redis.get(key)
if val:
return json.loads(val)
except Exception:
pass
async with self._lock:
entry = self._local.get(key)
if entry:
expire_at, value = entry
if time.time() < expire_at:
return value
del self._local[key]
return None
async def set(self, key: str, value: Any, ttl: int = 300):
if self._redis_available and self._redis:
try:
await self._redis.setex(key, ttl, json.dumps(value, default=str))
except Exception:
pass
async with self._lock:
self._local[key] = (time.time() + ttl, value)
if len(self._local) > 10000:
now = time.time()
expired = [k for k, (t, v) in self._local.items() if now >= t]
for k in expired:
del self._local[k]
async def delete(self, key: str):
if self._redis_available and self._redis:
try:
await self._redis.delete(key)
except Exception:
pass
async with self._lock:
self._local.pop(key, None)
async def delete_pattern(self, pattern: str):
if self._redis_available and self._redis:
try:
keys = await self._redis.keys(pattern)
if keys:
await self._redis.delete(*keys)
except Exception:
pass
async with self._lock:
to_delete = [k for k in self._local if pattern.replace("*", "") in k]
for k in to_delete:
del self._local[k]
cache_manager = CacheManager()
import time # noqa: E402