这篇文章旨在主要记录如何快速上手rocketmq
参考博文:
消息中间件的使用场景有哪些
Kafka、ActiveMQ、RabbitMQ、RocketMQ 区别以及高可用原理
RocketMq安装(windows环境)与Rocketmq-dashboard的web管理页面部署
RocketMQ消息存储之刷盘机制(原理篇)
RabbitMQ的ack机制
如果对于消息队列的功能和性能要求不是很高,那么RabbitMQ就够了,开箱即用。
如果系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,RocketMQ 的低延迟和金融级的稳定性就可以满足。官网
要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用 了大数据、流计算相关的开源产品,那 Kafka 就是最合适的了。
一、部署架构
角色介绍
- Producer:消息的发送者;举例:发信者
- Consumer:消息接收者;举例:收信者
- Broker:暂存和传输消息;举例:邮局
- NameServer:管理Broker;举例:各个邮局的管理机构
- Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
- Message Queue:相当于是Topic的分区;用于并行发送和接收消息
角色交互解释
- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
- 每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
执行流程
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息
一、生产者示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| package com.lz.coder.controller;
import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Random;
@Slf4j @RestController @RequestMapping("/rocketmq") public class TestRocket { @PostMapping("/sendMq") public void sendMq(){ DefaultMQProducer defaultMQProducer = getRocketMqProducer();
try { defaultMQProducer.start(); } catch (MQClientException e) { e.printStackTrace(); } JSONObject jsonObject = generateMsgContent(); Message message = new Message("lucky-topic", "lucky-tag", jsonObject.toString().getBytes()); SendResult result = null; try { result = defaultMQProducer.send(message); } catch (Exception e) { e.printStackTrace(); } System.out.println("SendResult-->" + result); defaultMQProducer.shutdown(); }
private DefaultMQProducer getRocketMqProducer(){ String mqAddress = "127.0.0.1:9876"; String groupId = "FLEP_FILE"; String msgTimeout = "10000"; String retryWhenSendFailed = "3"; DefaultMQProducer defaultMQProducer = new DefaultMQProducer(groupId); defaultMQProducer.setNamesrvAddr(mqAddress); defaultMQProducer.setSendMsgTimeout(Integer.parseInt(msgTimeout)); defaultMQProducer.setRetryTimesWhenSendFailed(Integer.parseInt(retryWhenSendFailed)); return defaultMQProducer;
}
private JSONObject generateMsgContent(){ JSONObject jsonObject=new JSONObject(); Random random=new Random(); int fileId = random.nextInt(10000); jsonObject.put("fileId",String.valueOf(fileId)); LocalDateTime localDateTime=LocalDateTime.now(); String fileCreateDate = localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); jsonObject.put("fileCreateDate",fileCreateDate ); return jsonObject; } }
|
二、消费者示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| package com.lz.coder.controller; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List;
@Slf4j @RestController @RequestMapping("/rocketmq2") public class ReceiveRocketMsg {
@PostMapping("/receiveMqMsg") public void receiveMqMsg(){ DefaultMQPushConsumer defaultMQPushConsumer = getRocketMqConsumer();
defaultMQPushConsumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> {
try { System.out.println("收到消息--》" + list); for (MessageExt messageExt : list) { String message=new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET); JSONObject object=JSONObject.parseObject(message); String fileId = (String) object.get("fileId"); String fileCreateDate = (String) object.get("fileCreateDate"); log.info(fileId+":"+fileCreateDate); }
} catch (Throwable throwable) { throwable.printStackTrace(); }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
try { defaultMQPushConsumer.start(); } catch (MQClientException e) { e.printStackTrace(); } System.out.println("消费者启动成功。。。");
}
private DefaultMQPushConsumer getRocketMqConsumer(){
String mqAddress = "127.0.0.1:9876"; String consumerGroup = "FLEP-CONSUMER-TEST";
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup); defaultMQPushConsumer.setNamesrvAddr(mqAddress); try { defaultMQPushConsumer.subscribe("lucky-topic", "*"); } catch (MQClientException e) { e.printStackTrace(); } return defaultMQPushConsumer; } }
|