You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

151 lines
5.1 KiB

"""缓存管理器模块。
提供二级缓存机制(Redis + 内存),用于缓存 API 响应和计算结果。
当 Redis 不可用时自动降级为纯内存缓存,保证系统的高可用性。
"""
import json
import time
import asyncio
from typing import Any
from redis.asyncio import Redis
from config import settings
class CacheManager:
"""二级缓存管理器类,优先使用 Redis 缓存,降级时使用内存缓存。
提供 get/set/delete/delete_pattern 四种基本操作,
支持 TTL 过期时间和模式匹配批量删除。
Attributes:
_local: 内存缓存存储,结构为 {key: (expire_timestamp, value)}。
_redis: Redis 异步客户端实例。
_redis_available: Redis 是否可用标志。
_lock: 异步锁,保证内存缓存操作的并发安全。
"""
def __init__(self):
"""初始化缓存管理器实例。"""
self._local: dict[str, tuple[float, Any]] = {} # 内存缓存:{key: (过期时间戳, 值)}
self._redis: Redis | None = None # Redis 异步客户端
self._redis_available = False # Redis 可用性标志
self._lock = asyncio.Lock() # 异步锁
async def connect(self):
"""连接到 Redis 服务器。
尝试从配置中的 REDIS_URL 建立连接,如果连接失败则标记 Redis 不可用,
后续操作将自动降级为纯内存缓存。
"""
try:
self._redis = Redis.from_url(settings.REDIS_URL, decode_responses=True)
await self._redis.ping()
self._redis_available = True
except Exception:
self._redis_available = False
async def disconnect(self):
"""断开 Redis 连接,释放资源。"""
if self._redis:
await self._redis.close()
@property
def available(self) -> bool:
"""检查缓存是否可用(Redis 或内存至少一个可用)。
Returns:
bool: Redis 可用时返回 True,否则返回 False。
"""
return self._redis_available
async def get(self, key: str) -> Any | None:
"""从缓存中获取指定键的值。
优先从 Redis 获取,如果 Redis 不可用或未找到则从内存缓存获取。
内存缓存中的过期条目会被自动清理。
Args:
key: 缓存键。
Returns:
Any | None: 缓存值,未找到或已过期返回 None。
"""
if self._redis_available and self._redis:
try:
val = await self._redis.get(key)
if val:
return json.loads(val)
except Exception:
pass
async with self._lock:
entry = self._local.get(key)
if entry:
expire_at, value = entry
if time.time() < expire_at:
return value
del self._local[key]
return None
async def set(self, key: str, value: Any, ttl: int = 300):
"""将值设置到缓存中,指定过期时间。
同时写入 Redis 和内存缓存,Redis 写入失败不影响内存缓存。
Args:
key: 缓存键。
value: 要缓存的值,支持任意可 JSON 序列化的类型。
ttl: 过期时间(秒),默认 300 秒(5 分钟)。
"""
if self._redis_available and self._redis:
try:
await self._redis.setex(key, ttl, json.dumps(value, default=str))
except Exception:
pass
async with self._lock:
self._local[key] = (time.time() + ttl, value)
# 当内存缓存条目超过上限时清理过期条目
if len(self._local) > 10000:
now = time.time()
expired = [k for k, (t, v) in self._local.items() if now >= t]
for k in expired:
del self._local[k]
async def delete(self, key: str):
"""从缓存中删除指定键。
同时从 Redis 和内存缓存中删除,任一删除失败不影响另一个。
Args:
key: 要删除的缓存键。
"""
if self._redis_available and self._redis:
try:
await self._redis.delete(key)
except Exception:
pass
async with self._lock:
self._local.pop(key, None)
async def delete_pattern(self, pattern: str):
"""按模式匹配批量删除缓存键。
Redis 中使用 keys 命令匹配,内存缓存中使用字符串包含匹配。
Args:
pattern: 匹配模式,支持通配符 *。
"""
if self._redis_available and self._redis:
try:
keys = await self._redis.keys(pattern)
if keys:
await self._redis.delete(*keys)
except Exception:
pass
async with self._lock:
to_delete = [k for k in self._local if pattern.replace("*", "") in k]
for k in to_delete:
del self._local[k]
cache_manager = CacheManager() # 全局缓存管理器单例实例