-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: new cache struce such as use redis. (#727)
* build: add redis cache config. * build: add cache config. * feat: impl @MonoCacheable and @FluxCacheable cache logic. * build: update to v0.20.0 * optimize: cache config and aspect. * feat: impl @MonoCacheEvict and @FluxCacheEvict aspect logic. * optimize: cache evict clear all. * fix: remove cache after attachment ref episode matching. * fix: return Long value for redis type cast exception for @MonoCacheable. * feat: add cache enable config, default is false. * docs: update CHANGELOG.MD
- Loading branch information
Showing
23 changed files
with
706 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
version=0.19.4 | ||
version=0.20.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
230 changes: 230 additions & 0 deletions
230
server/src/main/java/run/ikaros/server/cache/CacheAspect.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,230 @@ | ||
package run.ikaros.server.cache; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.CountDownLatch; | ||
import org.aspectj.lang.ProceedingJoinPoint; | ||
import org.aspectj.lang.annotation.Around; | ||
import org.aspectj.lang.annotation.Aspect; | ||
import org.aspectj.lang.annotation.Pointcut; | ||
import org.aspectj.lang.reflect.MethodSignature; | ||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | ||
import org.springframework.expression.EvaluationContext; | ||
import org.springframework.expression.ExpressionParser; | ||
import org.springframework.expression.spel.standard.SpelExpressionParser; | ||
import org.springframework.expression.spel.support.StandardEvaluationContext; | ||
import org.springframework.stereotype.Component; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import run.ikaros.server.cache.annotation.FluxCacheEvict; | ||
import run.ikaros.server.cache.annotation.FluxCacheable; | ||
import run.ikaros.server.cache.annotation.MonoCacheEvict; | ||
import run.ikaros.server.cache.annotation.MonoCacheable; | ||
|
||
@Aspect | ||
@Component | ||
@ConditionalOnProperty(value = "ikaros.cache.enable", havingValue = "true") | ||
public class CacheAspect { | ||
|
||
|
||
@Pointcut("@annotation(run.ikaros.server.cache.annotation.MonoCacheable) " | ||
+ "&& execution(public reactor.core.publisher.Mono *(..))") | ||
public void monoCacheableMethods() { | ||
} | ||
|
||
@Pointcut("@annotation(run.ikaros.server.cache.annotation.FluxCacheable) " | ||
+ "&& execution(public reactor.core.publisher.Flux *(..))") | ||
public void fluxCacheableMethods() { | ||
} | ||
|
||
@Pointcut("@annotation(run.ikaros.server.cache.annotation.MonoCacheEvict)") | ||
public void monoCacheEvictMethods() { | ||
} | ||
|
||
@Pointcut("@annotation(run.ikaros.server.cache.annotation.FluxCacheEvict)") | ||
public void fluxCacheEvictMethods() { | ||
} | ||
|
||
private final ExpressionParser spelExpressionParser = new SpelExpressionParser(); | ||
private final ConcurrentHashMap<String, Class<?>> methodReturnValueTypes | ||
= new ConcurrentHashMap<>(); | ||
|
||
private final ReactiveCacheManager cm; | ||
|
||
public CacheAspect(ReactiveCacheManager cm) { | ||
this.cm = cm; | ||
} | ||
|
||
/** | ||
* 应用关闭时清空缓存. | ||
*/ | ||
// @PreDestroy | ||
public void onShutdown() throws InterruptedException { | ||
// 使用 CountDownLatch 来确保响应式流在退出前执行完成 | ||
CountDownLatch latch = new CountDownLatch(1); | ||
|
||
cm.clear().then() | ||
.doOnTerminate(latch::countDown) // 当任务完成时计数器减一 | ||
.subscribe(); | ||
|
||
// 等待响应式操作完成 | ||
latch.await(); | ||
System.out.println("Shutdown process completed."); | ||
} | ||
|
||
|
||
private String parseSpelExpression(String expression, ProceedingJoinPoint joinPoint) { | ||
final EvaluationContext context = new StandardEvaluationContext(); | ||
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); | ||
String[] paramNames = methodSignature.getParameterNames(); | ||
Object[] paramValues = joinPoint.getArgs(); | ||
for (int i = 0; i < paramNames.length; i++) { | ||
context.setVariable(paramNames[i], paramValues[i]); | ||
} | ||
return spelExpressionParser.parseExpression(expression).getValue(context, String.class); | ||
} | ||
|
||
/** | ||
* 处理可缓存注解切面 | ||
* 要求返回值为Mono类型 | ||
* . | ||
*/ | ||
@Around("monoCacheableMethods() && @annotation(monoCacheable)") | ||
public Mono<?> aroundMonoMethodsWithAnnotationCacheable( | ||
ProceedingJoinPoint joinPoint, MonoCacheable monoCacheable) throws Throwable { | ||
final String cacheKeyPostfix = parseSpelExpression(monoCacheable.key(), joinPoint); | ||
final List<String> cacheKeys = | ||
Arrays.stream(monoCacheable.value()) | ||
.map(namespace -> namespace + cacheKeyPostfix).toList(); | ||
return Flux.fromStream(cacheKeys.stream()) | ||
.concatMap(key -> cm.get(key).filter(Objects::nonNull)) | ||
.next() | ||
// 缓存中不存在 | ||
.switchIfEmpty(Mono.defer(() -> { | ||
Object proceed; | ||
try { | ||
proceed = joinPoint.proceed(joinPoint.getArgs()); | ||
} catch (Throwable e) { | ||
return Mono.error(e); | ||
} | ||
return ((Mono<?>) proceed) | ||
.flatMap(val -> | ||
Flux.fromIterable(cacheKeys) | ||
.flatMap(k -> cm.put(k, val)) | ||
.next() | ||
.flatMap(list -> Mono.just(val)) | ||
).switchIfEmpty( | ||
Flux.fromIterable(cacheKeys) | ||
.flatMap(k -> cm.put(k, "null")) | ||
.next() | ||
.flatMap(bool -> Mono.empty()) | ||
); | ||
})) | ||
.map(o -> { | ||
if (o instanceof Integer integer) { | ||
return integer.longValue(); | ||
} | ||
return o; | ||
}) | ||
.filter(o -> !"null".equals(o)); | ||
} | ||
|
||
/** | ||
* 处理可缓存注解切面 | ||
* 要求返回值为Flux类型 | ||
* . | ||
*/ | ||
@Around("fluxCacheableMethods() && @annotation(fluxCacheable)") | ||
public Flux<?> aroundMonoMethodsWithAnnotationCacheable( | ||
ProceedingJoinPoint joinPoint, FluxCacheable fluxCacheable) throws Throwable { | ||
final String cacheKeyPostfix = parseSpelExpression(fluxCacheable.key(), joinPoint); | ||
final List<String> cacheKeys = | ||
Arrays.stream(fluxCacheable.value()) | ||
.map(namespace -> namespace + cacheKeyPostfix).toList(); | ||
return Flux.fromStream(cacheKeys.stream()) | ||
.concatMap(key -> cm.get(key) | ||
.filter(Objects::nonNull)) | ||
.next() | ||
// 缓存中不存在 | ||
.switchIfEmpty(Mono.defer(() -> { | ||
Object proceed; | ||
try { | ||
proceed = joinPoint.proceed(joinPoint.getArgs()); | ||
} catch (Throwable e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
return ((Flux<?>) proceed) | ||
.collectList() | ||
.flatMap(vals -> | ||
Flux.fromIterable(cacheKeys) | ||
.flatMap(k -> cm.put(k, vals)) | ||
.collectList() | ||
.flatMap(list -> Mono.just(vals)) | ||
).switchIfEmpty( | ||
Flux.fromIterable(cacheKeys) | ||
.flatMap(k -> cm.put(k, List.of())) | ||
.next() | ||
.flatMap(bool -> Mono.empty()) | ||
); | ||
})) | ||
.map(o -> (List<?>) o) | ||
.filter(list -> !list.isEmpty()) | ||
// 缓存中存的是集合 | ||
.flatMapMany(Flux::fromIterable); | ||
} | ||
|
||
/** | ||
* 处理缓存移除注解切面 | ||
* 要求返回值为Mono类型 | ||
* . | ||
*/ | ||
@Around("monoCacheEvictMethods() && @annotation(monoCacheEvict)") | ||
public Mono<?> aroundMonoMethodsWithAnnotationCacheable( | ||
ProceedingJoinPoint joinPoint, MonoCacheEvict monoCacheEvict | ||
) throws Throwable { | ||
Object proceed = joinPoint.proceed(); | ||
if (monoCacheEvict.value().length == 0 | ||
&& "".equals(monoCacheEvict.key())) { | ||
return cm.clear() | ||
.flatMap(s -> (Mono<?>) proceed); | ||
} | ||
final String cacheKeyPostfix = parseSpelExpression(monoCacheEvict.key(), joinPoint); | ||
final List<String> cacheKeys = | ||
Arrays.stream(monoCacheEvict.value()) | ||
.map(namespace -> namespace + cacheKeyPostfix).toList(); | ||
return Flux.fromStream(cacheKeys.stream()) | ||
.flatMap(cm::remove) | ||
.next() | ||
.flatMap(bool -> (Mono<?>) proceed); | ||
} | ||
|
||
/** | ||
* 处理缓存移除注解切面 | ||
* 要求返回值为Mono类型 | ||
* . | ||
*/ | ||
@Around("fluxCacheEvictMethods() && @annotation(fluxCacheEvict)") | ||
public Flux<?> aroundMonoMethodsWithAnnotationCacheable( | ||
ProceedingJoinPoint joinPoint, FluxCacheEvict fluxCacheEvict | ||
) throws Throwable { | ||
Object proceed = joinPoint.proceed(); | ||
if (fluxCacheEvict.value().length == 0 | ||
&& "".equals(fluxCacheEvict.key())) { | ||
return cm.clear() | ||
.flatMapMany(s -> (Flux<?>) proceed); | ||
} | ||
final String cacheKeyPostfix = parseSpelExpression(fluxCacheEvict.key(), joinPoint); | ||
final List<String> cacheKeys = | ||
Arrays.stream(fluxCacheEvict.value()) | ||
.map(namespace -> namespace + cacheKeyPostfix).toList(); | ||
return Flux.fromStream(cacheKeys.stream()) | ||
.flatMap(cm::remove) | ||
.next() | ||
.flatMapMany(bool -> (Flux<?>) proceed); | ||
} | ||
|
||
|
||
} |
53 changes: 53 additions & 0 deletions
53
server/src/main/java/run/ikaros/server/cache/CacheConfiguration.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package run.ikaros.server.cache; | ||
|
||
import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Conditional; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; | ||
import org.springframework.data.redis.core.ReactiveRedisTemplate; | ||
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; | ||
import org.springframework.data.redis.serializer.RedisSerializationContext; | ||
import org.springframework.data.redis.serializer.StringRedisSerializer; | ||
import run.ikaros.server.cache.condition.CacheMemoryEnableCondition; | ||
import run.ikaros.server.cache.condition.CacheRedisEnableCondition; | ||
|
||
@Configuration(proxyBeanMethods = false) | ||
@EnableConfigurationProperties(CacheProperties.class) | ||
public class CacheConfiguration { | ||
|
||
|
||
@Bean | ||
@Conditional(CacheMemoryEnableCondition.class) | ||
public ReactiveCacheManager memoryReactiveCacheManager() { | ||
return new MemoryReactiveCacheManager(); | ||
} | ||
|
||
@Bean | ||
@Conditional(CacheRedisEnableCondition.class) | ||
public ReactiveCacheManager redisReactiveCacheManager( | ||
ReactiveRedisTemplate<String, Object> reactiveRedisTemplate | ||
) { | ||
return new RedisReactiveCacheManager(reactiveRedisTemplate); | ||
} | ||
|
||
/** | ||
* Redis reactive template. | ||
*/ | ||
@Bean | ||
@Conditional(CacheRedisEnableCondition.class) | ||
public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate( | ||
ReactiveRedisConnectionFactory connectionFactory) { | ||
RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder = | ||
RedisSerializationContext.newSerializationContext(); | ||
GenericJackson2JsonRedisSerializer objectSerializer = | ||
new GenericJackson2JsonRedisSerializer(); | ||
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); | ||
builder.key(stringRedisSerializer); | ||
builder.value(objectSerializer); | ||
builder.hashKey(stringRedisSerializer); | ||
builder.hashValue(objectSerializer); | ||
return new ReactiveRedisTemplate<>(connectionFactory, builder.build()); | ||
} | ||
|
||
} |
Oops, something went wrong.