Rocketmq 整合Springboot中APi
maven坐标
//版本控制
<properties>
<rocketmq.version>2.2.2</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
application.yml 配置连接RocketMQ的地址
rocketMQ:
name:
server:
address: ip:9876
配置RocketMQConfig(创建一个生产者 消费者)
@Configuration
public class RocketMQConfig {
//获取application中连接地址
@Value("${rocketMQ.name.server.address}")
public String address;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private UserFollowingService userFollowingService;
/**
* 生产者
*
* @return
* @throws MQClientException
*/
@Bean("monitorProducer")
public DefaultMQProducer monitorProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstat.DEFAULT_GROUP_NAME);
producer.setNamesrvAddr(address);
producer.start();
return producer;
}
/**
* 消费者(订阅消息)
*
* @return
* @throws MQClientException
*/
@Bean("momentConSumer")
public DefaultMQPushConsumer momentConSumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstat.DEFAULT_GROUP_NAME);
consumer.setNamesrvAddr(address);
//交换机 监听* 全部的消息
consumer.subscribe(UserMomentsConstat.DEFAUL_TOPIC_MOMENTS, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.stream().filter(c -> Objects.nonNull(c.getBody())).map((c) -> {
return JSONObject.parseObject(new String(c.getBody()), UserMoments.class);
}).filter(Objects::nonNull).forEach(c -> {
try {
//TODO 修改关注用户粉丝列表 而不是我的粉丝列表
List<JSONObject> userList = userFollowingService.focusUserList(c.getUserId().toString());
userList.stream().filter(Objects::nonNull).forEach(t -> {
List<UserFans> fansList = JSONArray.parseArray(t.getString("fansList"), UserFans.class);
Set<Long> fansIds = fansList.stream().filter(fs -> fs.isEachOtherFans()).map(UserFans::getUserId).collect(Collectors.toSet());
//存入rdis中
fansIds.forEach(fans -> {
System.out.println(fans);
Object redisKey = redisTemplate.opsForValue().get(RedisFinal.REDIS_MOMENTS_USERID_KEY + fans);
List<Integer> fansCountsList = new ArrayList<>();
if (redisKey != null) {
fansCountsList = JSONArray.parseArray(redisKey.toString(), Integer.class);
}
long counts = fansCountsList.stream().filter(count -> !count.equals(fans + "")).count();
if (counts <= 0) {
fansCountsList.add(Integer.parseInt(fans + ""));
redisTemplate.opsForValue().set(RedisFinal.REDIS_MOMENTS_USERID_KEY + fans, JSONObject.toJSONString(c.getContentId()));
}
});
});
} catch (CannotCreateException e) {
e.printStackTrace();
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
return consumer;
}
}
常量
public class UserMomentsConstat {
//队列
public static final String DEFAULT_GROUP_NAME="nolpin";
//交换机
public static final String DEFAUL_TOPIC_MOMENTS="Topic-moments";
}
RocketMQ的工具包
public class RocketMQUtils {
/**
* 同步发送
*
* @param producer
* @param msg
*/
public static void syncSendMsg(DefaultMQProducer producer, Message msg) {
try {
SendResult result = producer.send(msg,10000);
System.out.println(result);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 异步发送
*
* @param producer
* @param msg
* @throws RemotingException
* @throws MQClientException
* @throws InterruptedException
*/
private final static CountDownLatch mCountDownLatch = new CountDownLatch(1);
public static void asyncSendMsg(DefaultMQProducer producer, Message msg) throws RemotingException, MQClientException, InterruptedException, UnsupportedEncodingException {
int messgeCount = 1;
producer.setRetryTimesWhenSendAsyncFailed(3);
for (int i = 0; i < messgeCount; i++) {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getMessageQueue());
}
@Override
public void onException(Throwable throwable) {
mCountDownLatch.countDown();
System.out.println("发送失败有异常" + throwable);
throwable.printStackTrace();
}
//TODO 需要添加一个连接时间 不让会一直连接失败
},1000);
}
mCountDownLatch.await(5, TimeUnit.SECONDS);
}
}
使用
//发消息
public Integer create(UserMoments userMoments) {
Optional.ofNullable(userMoments)
.ifPresent(c -> {
//发送消息
Message message = null;
try {
//消息体 UserMomentsConstat.DEFAUL_TOPIC_MOMENTS 交换机
message = new Message(UserMomentsConstat.DEFAUL_TOPIC_MOMENTS, JSONObject.toJSONString(c).getBytes(RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
try {
//发消息
RocketMQUtils.asyncSendMsg(producer, message);
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
return this.userMomentsMapper.insert(userMoments);
}
注意:本文归作者所有,未经作者允许,不得转载