Rocketmq 整合Springboot

小豆丁 1年前 ⋅ 959 阅读

Rocketmq 整合Springboot中APi

maven坐标

  //版本控制
  <properties> 
    <rocketmq.version>2.2.2</rocketmq.version>
  </properties>
 
  <dependencies>
      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-spring-boot-starter</artifactId>
          <version>${rocketmq.version}</version>
      </dependency>
   </dependencies>

application.yml 配置连接RocketMQ的地址

rocketMQ:
  name:
    server:
      address: ip:9876

配置RocketMQConfig(创建一个生产者 消费者)

@Configuration
public class RocketMQConfig {


	//获取application中连接地址
    @Value("${rocketMQ.name.server.address}")
    public String address;


    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Resource
    private UserFollowingService userFollowingService;



	/**
     * 生产者
     *
     * @return
     * @throws MQClientException
     */
    @Bean("monitorProducer")
    public DefaultMQProducer monitorProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstat.DEFAULT_GROUP_NAME);
        producer.setNamesrvAddr(address);
        producer.start();
        return producer;
    }

 /**
     * 消费者(订阅消息)
     *
     * @return
     * @throws MQClientException
     */
    @Bean("momentConSumer")
    public DefaultMQPushConsumer momentConSumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstat.DEFAULT_GROUP_NAME);
        consumer.setNamesrvAddr(address);
        //交换机 监听* 全部的消息
        consumer.subscribe(UserMomentsConstat.DEFAUL_TOPIC_MOMENTS, "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                list.stream().filter(c -> Objects.nonNull(c.getBody())).map((c) -> {
                    return JSONObject.parseObject(new String(c.getBody()), UserMoments.class);
                }).filter(Objects::nonNull).forEach(c -> {
                    try {
                        //TODO 修改关注用户粉丝列表 而不是我的粉丝列表
                        List<JSONObject> userList = userFollowingService.focusUserList(c.getUserId().toString());
                        userList.stream().filter(Objects::nonNull).forEach(t -> {
                            List<UserFans> fansList = JSONArray.parseArray(t.getString("fansList"), UserFans.class);
                            Set<Long> fansIds = fansList.stream().filter(fs -> fs.isEachOtherFans()).map(UserFans::getUserId).collect(Collectors.toSet());
                            //存入rdis中
                            fansIds.forEach(fans -> {
                                System.out.println(fans);
                                Object redisKey = redisTemplate.opsForValue().get(RedisFinal.REDIS_MOMENTS_USERID_KEY + fans);
                                List<Integer> fansCountsList = new ArrayList<>();
                                if (redisKey != null) {
                                    fansCountsList = JSONArray.parseArray(redisKey.toString(), Integer.class);
                                }
                                long counts = fansCountsList.stream().filter(count -> !count.equals(fans + "")).count();
                                if (counts <= 0) {
                                    fansCountsList.add(Integer.parseInt(fans + ""));
                                    redisTemplate.opsForValue().set(RedisFinal.REDIS_MOMENTS_USERID_KEY + fans, JSONObject.toJSONString(c.getContentId()));
                                }
                            });
                        });
                    } catch (CannotCreateException e) {
                        e.printStackTrace();
                    }
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        return consumer;
    }





}

常量

public class UserMomentsConstat {
	//队列
    public static final String DEFAULT_GROUP_NAME="nolpin";
	//交换机
    public static final String DEFAUL_TOPIC_MOMENTS="Topic-moments";
}

RocketMQ的工具包

public class RocketMQUtils {


    /**
     * 同步发送
     *
     * @param producer
     * @param msg
     */
    public static void syncSendMsg(DefaultMQProducer producer, Message msg) {
        try {
            SendResult result = producer.send(msg,10000);
            System.out.println(result);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 异步发送
     *
     * @param producer
     * @param msg
     * @throws RemotingException
     * @throws MQClientException
     * @throws InterruptedException
     */
    private final static CountDownLatch mCountDownLatch = new CountDownLatch(1);

    public static void asyncSendMsg(DefaultMQProducer producer, Message msg) throws RemotingException, MQClientException, InterruptedException, UnsupportedEncodingException {
        int messgeCount = 1;
        producer.setRetryTimesWhenSendAsyncFailed(3);
        for (int i = 0; i < messgeCount; i++) {
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult.getMessageQueue());
                }
                @Override
                public void onException(Throwable throwable) {
                    mCountDownLatch.countDown();
                    System.out.println("发送失败有异常" + throwable);
                    throwable.printStackTrace();
                }
                //TODO 需要添加一个连接时间 不让会一直连接失败
            },1000);
        }
        mCountDownLatch.await(5, TimeUnit.SECONDS);

    }

}

使用

//发消息
 public Integer create(UserMoments userMoments) {
        Optional.ofNullable(userMoments)
                .ifPresent(c -> {
                    //发送消息
                    Message message = null;
                    try {
                    //消息体  UserMomentsConstat.DEFAUL_TOPIC_MOMENTS 交换机
                        message = new Message(UserMomentsConstat.DEFAUL_TOPIC_MOMENTS, JSONObject.toJSONString(c).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    try {
                    //发消息
                        RocketMQUtils.asyncSendMsg(producer, message);
                    } catch (RemotingException e) {
                        e.printStackTrace();
                    } catch (MQClientException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                });
        return this.userMomentsMapper.insert(userMoments);
    }