init version
This commit is contained in:
@@ -0,0 +1,2 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="JAVA_MODULE" version="4" />
|
||||
27
joju-framework/joju-spring-boot-starter-mq/pom.xml
Normal file
27
joju-framework/joju-spring-boot-starter-mq/pom.xml
Normal file
@@ -0,0 +1,27 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>com.jojubanking.boot</groupId>
|
||||
<artifactId>joju-framework</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>joju-spring-boot-starter-mq</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>${project.artifactId}</name>
|
||||
<description>消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费</description>
|
||||
<url>https://www.jojubanking.com</url>
|
||||
|
||||
<dependencies>
|
||||
<!-- DB 相关 -->
|
||||
<dependency>
|
||||
<groupId>com.jojubanking.boot</groupId>
|
||||
<artifactId>joju-spring-boot-starter-redis</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,143 @@
|
||||
package com.jojubanking.boot.framework.mq.config;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import com.jojubanking.boot.framework.common.enums.DocumentEnum;
|
||||
import com.jojubanking.boot.framework.mq.core.RedisMQTemplate;
|
||||
import com.jojubanking.boot.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import com.jojubanking.boot.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
||||
import com.jojubanking.boot.framework.mq.core.stream.AbstractStreamMessageListener;
|
||||
import com.jojubanking.boot.framework.redis.config.JojuRedisAutoConfiguration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisServerCommands;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.core.RedisCallback;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.listener.ChannelTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 消息队列配置类
|
||||
*
|
||||
* @author TW
|
||||
*/
|
||||
@Configuration
|
||||
@AutoConfigureAfter(JojuRedisAutoConfiguration.class)
|
||||
@Slf4j
|
||||
public class JojuMQAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
|
||||
List<RedisMessageInterceptor> interceptors) {
|
||||
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
|
||||
// 添加拦截器
|
||||
interceptors.forEach(redisMQTemplate::addInterceptor);
|
||||
return redisMQTemplate;
|
||||
}
|
||||
|
||||
// ========== 消费者相关 ==========
|
||||
|
||||
/**
|
||||
* 创建 Redis Pub/Sub 广播消费的容器
|
||||
*/
|
||||
@Bean
|
||||
public RedisMessageListenerContainer redisMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) {
|
||||
// 创建 RedisMessageListenerContainer 对象
|
||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||
// 设置 RedisConnection 工厂。
|
||||
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
|
||||
// 添加监听器
|
||||
listeners.forEach(listener -> {
|
||||
listener.setRedisMQTemplate(redisMQTemplate);
|
||||
container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
|
||||
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
|
||||
listener.getChannel(), listener.getClass().getName());
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 集群消费的容器
|
||||
*
|
||||
* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) {
|
||||
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|
||||
checkRedisVersion(redisTemplate);
|
||||
// 第一步,创建 StreamMessageListenerContainer 容器
|
||||
// 创建 options 配置
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
||||
.batchSize(10) // 一次性最多拉取多少条消息
|
||||
.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
|
||||
.build();
|
||||
// 创建 container 对象
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
|
||||
// StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions);
|
||||
DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
|
||||
|
||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
||||
String consumerName = buildConsumerName();
|
||||
listeners.parallelStream().forEach(listener -> {
|
||||
// 创建 listener 对应的消费者分组
|
||||
try {
|
||||
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
||||
} catch (Exception ignore) {}
|
||||
// 设置 listener 对应的 redisTemplate
|
||||
listener.setRedisMQTemplate(redisMQTemplate);
|
||||
// 创建 Consumer 对象
|
||||
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
|
||||
// 设置 Consumer 消费进度,以最小消费进度为准
|
||||
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
|
||||
// 设置 Consumer 监听
|
||||
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
|
||||
.builder(streamOffset).consumer(consumer)
|
||||
.autoAcknowledge(false) // 不自动 ack
|
||||
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
||||
container.register(builder.build(), listener);
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建消费者名字,使用本地 IP + 进程编号的方式。
|
||||
* 参考自 RocketMQ clientId 的实现
|
||||
*
|
||||
* @return 消费者名字
|
||||
*/
|
||||
private static String buildConsumerName() {
|
||||
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验 Redis 版本号,是否满足最低的版本号要求!
|
||||
*/
|
||||
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
|
||||
// 获得 Redis 版本
|
||||
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
|
||||
String version = MapUtil.getStr(info, "redis_version");
|
||||
// 校验最低版本必须大于等于 5.0.0
|
||||
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
|
||||
if (majorVersion < 5) {
|
||||
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
|
||||
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
package com.jojubanking.boot.framework.mq.core;
|
||||
|
||||
import com.jojubanking.boot.framework.common.util.json.JsonUtils;
|
||||
import com.jojubanking.boot.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import com.jojubanking.boot.framework.mq.core.message.AbstractRedisMessage;
|
||||
import com.jojubanking.boot.framework.mq.core.pubsub.AbstractChannelMessage;
|
||||
import com.jojubanking.boot.framework.mq.core.stream.AbstractStreamMessage;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.experimental.Accessors;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis MQ 操作模板类
|
||||
*
|
||||
* @author TW
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class RedisMQTemplate {
|
||||
|
||||
@Getter
|
||||
private final RedisTemplate<String, ?> redisTemplate;
|
||||
/**
|
||||
* 拦截器数组
|
||||
*/
|
||||
@Getter
|
||||
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis pub/sub 实现
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public <T extends AbstractChannelMessage> void send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis Stream 实现
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 消息记录的编号对象
|
||||
*/
|
||||
public <T extends AbstractStreamMessage> RecordId send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
|
||||
.withStreamKey(message.getStreamKey())); // 设置 stream key
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加拦截器
|
||||
*
|
||||
* @param interceptor 拦截器
|
||||
*/
|
||||
public void addInterceptor(RedisMessageInterceptor interceptor) {
|
||||
interceptors.add(interceptor);
|
||||
}
|
||||
|
||||
private void sendMessageBefore(AbstractRedisMessage message) {
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
|
||||
}
|
||||
|
||||
private void sendMessageAfter(AbstractRedisMessage message) {
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.jojubanking.boot.framework.mq.core.interceptor;
|
||||
|
||||
import com.jojubanking.boot.framework.mq.core.message.AbstractRedisMessage;
|
||||
|
||||
/**
|
||||
* {@link AbstractRedisMessage} 消息拦截器
|
||||
* 通过拦截器,作为插件机制,实现拓展。
|
||||
* 例如说,多租户场景下的 MQ 消息处理
|
||||
*
|
||||
* @author TW
|
||||
*/
|
||||
public interface RedisMessageInterceptor {
|
||||
|
||||
default void sendMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void sendMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.jojubanking.boot.framework.mq.core.message;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Redis 消息抽象基类
|
||||
*
|
||||
* @author TW
|
||||
*/
|
||||
@Data
|
||||
public abstract class AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 头
|
||||
*/
|
||||
private Map<String, String> headers = new HashMap<>();
|
||||
|
||||
public String getHeader(String key) {
|
||||
return headers.get(key);
|
||||
}
|
||||
|
||||
public void addHeader(String key, String value) {
|
||||
headers.put(key, value);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.jojubanking.boot.framework.mq.core.pubsub;
|
||||
|
||||
import com.jojubanking.boot.framework.mq.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Channel Message 抽象类
|
||||
*
|
||||
* @author TW
|
||||
*/
|
||||
public abstract class AbstractChannelMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Channel
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
|
||||
public abstract String getChannel();
|
||||
|
||||
}
|
||||
@@ -0,0 +1,103 @@
|
||||
package com.jojubanking.boot.framework.mq.core.pubsub;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import com.jojubanking.boot.framework.common.util.json.JsonUtils;
|
||||
import com.jojubanking.boot.framework.mq.core.RedisMQTemplate;
|
||||
import com.jojubanking.boot.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import com.jojubanking.boot.framework.mq.core.message.AbstractRedisMessage;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Pub/Sub 监听器抽象类,用于实现广播消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author TW
|
||||
*/
|
||||
public abstract class AbstractChannelMessageListener<T extends AbstractChannelMessage> implements MessageListener {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
private final String channel;
|
||||
/**
|
||||
* RedisMQTemplate
|
||||
*/
|
||||
@Setter
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractChannelMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.channel = messageType.newInstance().getChannel();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获得 Sub 订阅的 Redis Channel 通道
|
||||
*
|
||||
* @return channel
|
||||
*/
|
||||
public final String getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onMessage(Message message, byte[] bytes) {
|
||||
T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
|
||||
try {
|
||||
consumeMessageBefore(messageObj);
|
||||
// 消费消息
|
||||
this.onMessage(messageObj);
|
||||
} finally {
|
||||
consumeMessageAfter(messageObj);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
||||
}
|
||||
|
||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).consumeMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.jojubanking.boot.framework.mq.core.stream;
|
||||
|
||||
import com.jojubanking.boot.framework.mq.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Stream Message 抽象类
|
||||
*
|
||||
* @author TW
|
||||
*/
|
||||
public abstract class AbstractStreamMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Stream Key
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化
|
||||
public abstract String getStreamKey();
|
||||
|
||||
}
|
||||
@@ -0,0 +1,113 @@
|
||||
package com.jojubanking.boot.framework.mq.core.stream;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import com.jojubanking.boot.framework.common.util.json.JsonUtils;
|
||||
import com.jojubanking.boot.framework.mq.core.RedisMQTemplate;
|
||||
import com.jojubanking.boot.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import com.jojubanking.boot.framework.mq.core.message.AbstractRedisMessage;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.stream.StreamListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Stream 监听器抽象类,用于实现集群消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author TW
|
||||
*/
|
||||
public abstract class AbstractStreamMessageListener<T extends AbstractStreamMessage>
|
||||
implements StreamListener<String, ObjectRecord<String, String>> {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
@Getter
|
||||
private final String streamKey;
|
||||
|
||||
/**
|
||||
* Redis 消费者分组,默认使用 spring.application.name 名字
|
||||
*/
|
||||
@Value("${spring.application.name}")
|
||||
@Getter
|
||||
private String group;
|
||||
/**
|
||||
* RedisMQTemplate
|
||||
*/
|
||||
@Setter
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractStreamMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.streamKey = messageType.newInstance().getStreamKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ObjectRecord<String, String> message) {
|
||||
// 消费消息
|
||||
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
|
||||
try {
|
||||
consumeMessageBefore(messageObj);
|
||||
// 消费消息
|
||||
this.onMessage(messageObj);
|
||||
// ack 消息消费完成
|
||||
redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
|
||||
// TODO TW:需要额外考虑以下几个点:
|
||||
// 1. 处理异常的情况
|
||||
// 2. 发送日志;以及事务的结合
|
||||
// 3. 消费日志;以及通用的幂等性
|
||||
// 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
|
||||
} finally {
|
||||
consumeMessageAfter(messageObj);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
||||
}
|
||||
|
||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).consumeMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
/**
|
||||
* 消息队列,基于 Redis 提供:
|
||||
* 1. 基于 Pub/Sub 实现广播消费
|
||||
* 2. 基于 Stream 实现集群消费
|
||||
*/
|
||||
package com.jojubanking.boot.framework.mq;
|
||||
@@ -0,0 +1,62 @@
|
||||
package org.springframework.data.redis.stream;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.stream.ByteRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.Record;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 拓展 DefaultStreamMessageListenerContainer 实现,解决 Spring Data Redis + Redisson 结合使用时,Redisson 在 Stream 获得不到数据时,返回 null 而不是空 List,导致 NPE 异常。
|
||||
* 对应 issue:https://github.com/spring-projects/spring-data-redis/issues/2147 和 https://github.com/redisson/redisson/issues/4006
|
||||
* 目前看下来 Spring Data Redis 不肯加 null 判断,Redisson 暂时也没改返回 null 到空 List 的打算,所以暂时只能自己改,哽咽!
|
||||
*
|
||||
* @author TW
|
||||
*/
|
||||
public class DefaultStreamMessageListenerContainerX<K, V extends Record<K, ?>> extends DefaultStreamMessageListenerContainer<K, V> {
|
||||
|
||||
/**
|
||||
* 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现
|
||||
*/
|
||||
public static <K, V extends Record<K, ?>> StreamMessageListenerContainer<K, V> create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> options) {
|
||||
Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!");
|
||||
Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!");
|
||||
return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options);
|
||||
}
|
||||
|
||||
public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions<K, V> containerOptions) {
|
||||
super(connectionFactory, containerOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现
|
||||
*/
|
||||
@Override
|
||||
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
|
||||
return this.doRegisterX(getReadTaskX(streamRequest, listener));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private StreamPollTask<K, V> getReadTaskX(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
|
||||
StreamPollTask<K, V> task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener);
|
||||
// 修改 readFunction 方法
|
||||
Function<ReadOffset, List<ByteRecord>> readFunction = (Function<ReadOffset, List<ByteRecord>>) ReflectUtil.getFieldValue(task, "readFunction");
|
||||
ReflectUtil.setFieldValue(task, "readFunction", (Function<ReadOffset, List<ByteRecord>>) readOffset -> {
|
||||
List<ByteRecord> records = readFunction.apply(readOffset);
|
||||
//【重点】保证 records 不是空,避免 NPE 的问题!!!
|
||||
return records != null ? records : Collections.emptyList();
|
||||
});
|
||||
return task;
|
||||
}
|
||||
|
||||
private Subscription doRegisterX(Task task) {
|
||||
return ReflectUtil.invoke(this, "doRegister", task);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
com.jojubanking.boot.framework.mq.config.JojuMQAutoConfiguration
|
||||
@@ -0,0 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
com.jojubanking.boot.framework.mq.config.JojuMQAutoConfiguration
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,5 @@
|
||||
#Generated by Maven
|
||||
#Wed Apr 02 13:07:38 CST 2025
|
||||
version=2.0.0-beta
|
||||
groupId=com.jojubanking.boot
|
||||
artifactId=joju-spring-boot-starter-mq
|
||||
@@ -0,0 +1,9 @@
|
||||
com\jojubanking\boot\framework\mq\core\pubsub\AbstractChannelMessageListener.class
|
||||
com\jojubanking\boot\framework\mq\core\message\AbstractRedisMessage.class
|
||||
com\jojubanking\boot\framework\mq\config\JojuMQAutoConfiguration.class
|
||||
com\jojubanking\boot\framework\mq\core\RedisMQTemplate.class
|
||||
com\jojubanking\boot\framework\mq\core\stream\AbstractStreamMessage.class
|
||||
com\jojubanking\boot\framework\mq\core\pubsub\AbstractChannelMessage.class
|
||||
com\jojubanking\boot\framework\mq\core\stream\AbstractStreamMessageListener.class
|
||||
com\jojubanking\boot\framework\mq\core\interceptor\RedisMessageInterceptor.class
|
||||
org\springframework\data\redis\stream\DefaultStreamMessageListenerContainerX.class
|
||||
@@ -0,0 +1,10 @@
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\com\jojubanking\boot\framework\mq\core\pubsub\AbstractChannelMessage.java
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\com\jojubanking\boot\framework\mq\core\stream\AbstractStreamMessage.java
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\org\springframework\data\redis\stream\DefaultStreamMessageListenerContainerX.java
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\com\jojubanking\boot\framework\mq\core\message\AbstractRedisMessage.java
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\com\jojubanking\boot\framework\mq\core\RedisMQTemplate.java
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\com\jojubanking\boot\framework\mq\core\stream\AbstractStreamMessageListener.java
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\com\jojubanking\boot\framework\mq\package-info.java
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\com\jojubanking\boot\framework\mq\config\JojuMQAutoConfiguration.java
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\com\jojubanking\boot\framework\mq\core\interceptor\RedisMessageInterceptor.java
|
||||
D:\^新版项目\05.九聚项目\32.库尔勒妇幼二期\下园体检\examination\joju-framework\joju-spring-boot-starter-mq\src\main\java\com\jojubanking\boot\framework\mq\core\pubsub\AbstractChannelMessageListener.java
|
||||
Reference in New Issue
Block a user