架构及组件说明
https://rocketmq.apache.org/zh/download
将 ocketmq-all-4.9.4-bin-release文件复制到合适的位置
查询自己IP
添加如下配置(IP使用自己的),并保存。
brokerIP1=192.168.31.199
namesrvAddr=192.168.31.199:9876
文件路径使用自己的
set ROCKETMQ_HOME=D:\ProgramFiles\rocketmq-all-4.9.4-bin-release
在rocketmq文件的bin目录下,进入cmd
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
http://localhost:8080/#/
按需修改为中文
查看消费者(非必须)
创建普通springboot项目,添加依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.1
修改配置文件
# 应用名称
spring:application:name: rocket-producer
# 应用服务 WEB 访问端口
server:port: 8002
rocketmq:name-server: localhost:9876producer:group: my-group
创建测试代码
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class SendMessage {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Scheduled(fixedRate = 5000)public void run(){//发送消息rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");}
}
启动类添加@EnableScheduling注解
项目目录
创建普通springboot项目,添加依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.1
修改配置文件
# 应用名称
spring:application:name: rocket-consumer
server:port: 8001rocketmq:name-server: localhost:9876
创建测试代码
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
class MyConsumer1 implements RocketMQListener {/***需要注意的是,onMessage()封装了ACK机制,消费者往外抛异常时,RocketMQ认为消费失败,重新发送该条消息,否则默认消费成功*/@Overridepublic void onMessage(String s) {System.out.println(s);}
}
项目目录
接受消息正常