[RocketMQ]转换器
创始人
2025-05-28 20:15:58

背景:

在一次业务联调时,客户说无法解析rocketmq中的消息(我们是用的消息协议是google的protobuf),通过rocketmq的控制台发现,消息变成了json串如下:

而客户想要的是如下样子的:

观察两者可以发现,样子是截然不同的,一个变成了json串,一个是乱码的样子。

然后查看我们的代码,发现是在发送消息时,用错了Message对象。下面详细说明问题点:

在springboot中通过rocketmqTemplate发送消息,一般使用如下方式:

rocketMQTemplate.sendOneWay(topic,message);

我们在创建message的时候,一般有两种方式如下,注意观察Message的包:

方式一:对应于第一个json的图片

//方式一Demo1.Person person = Demo1.Person.newBuilder().setId(1).setEmail("b.163.com").setName("小里3").build();
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message();
msg.setBody(person.toByteArray());
rocketMQTemplate.sendOneWay(topic,msg);

方式二:对应第二个乱码的样子的图片

//方式二
Demo1.Person person = Demo1.Person.newBuilder().setId(1).setEmail("b.163.com").setName("小里3").build();
byte[] newByteArray =person.toByteArray();
org.springframework.messaging.Message msg = MessageBuilder.withPayload(newByteArray).build();
rocketMQTemplate.sendOneWay("test5",msg);

正如我们看到的,spring和rocketmq都有一个Message对象,这两个都可以放到rocketMqTemplate中使用,但是这二者有个明显的区别在于,rocektmq的Message对象在发送时,会将我们发送的对象进行json的序列化(也就是图一看到的,将序列化成json的字符串放到rocketmq中),而spring的Message对象是将字节数组放到rocketmq中,也就是看到的乱码的样子啦。

为什么放入的都是对象,但是最终放到rocketmq中的结果不同呢?主要是因为在发送消息时,会根据对象的不同,使用了了不同的消息转换器对消息内容进行了序列化。下面通过源码来看一探究竟:

看下图,以RocketmqTemplate的sendOneWay方法为例,你会发现这个方法有两个(红框),也就是方法重载了,当sendOneWay传入的参数是spring的Message时会调用【1】,当传入的参数是其他类型时便会调用【2】,然后在【2】中组建一个spring的Mesage在调用【1】,这一步会影响到后面要讲的消息转换器的选择,因为palyload的内容被二次封装了,记住这里,后面会用到这里

然后在这个createRocketMqMessage()方法,就会将我们的消息通过转换器进行转换,比如把消息变成字节数组还是序列换成json串,然后把转换完成的结果放到roceketmq的Message对象发送出去。如下图:

在这一步的转换过程中,就涉及到了序列化的问题,也就是最开头我们说的是将对象序列化为json还是字节数组的问题了,至于最后到底变成什么样子,取决于它内部转换器MessageConverter(默认一共有四个转换器)。如下图:

上图这个doConvert就是用来做转换的了,里面会选择处合适的(转换器的support方法,默认都是通过类型进行判断)转换器,将我们传入的原始消息进行转化。例如转为json格式,或者维持不变。

在doConvert方法中,会调用一个CompositeMessageConverter的转换器,这是一个代理了,这里面会遍历所有convert,然后找到符合的转换器,如下图:

我们说过,到底用哪个转换器,就是在toMessage这个方法里来确定的,并且转换也是在这个方法中完成的。这个方法也是个重载的方法,如下图,这里很重要,他跟最后用哪个转换器有一定关系。

注意看这个canConvertTo方法,他用来判断消息是否可以被当前的这个消息转换器给进行转换,一般来说都是supports方法来判断,supports里面通过class来判断是否支持。如下图

但是这些消息转换器中的MappingJackson2MessageConverter是直接重写了canConvertTo方法,所以当我们发送的消息对象是Spring的Message时,它里面的payload属性就是字节数组,所以当按照顺先执行ByteArrayMessageConverter时,他的supports方法会判断这个payload是不是数组对象,满足条件就用这个转换器转换了。

但是当我们的消息用的是rocketmq的Message对象时,就会想前面【1】处所写的那样,会先进行一次包装,将rocketmq的Message对象作为payload封装到spring的Message对象中,此值这些转换器的supports方法将都不满足,因为类型都不匹配。只有到MappingJackson2MessageConverter时,因为它重写了canConvertTo方法,所以就不使用supports了。它canConvertTo中返回了true,所以就会将消息转化为了json对象,然后将这个json放到mq中了。

总结:

此篇文章的关键点在于消息转换器,如下:

使用哪个转换器决定了放到mq中的数据的样子,是字节是字符串还是json串等等。

另外,当通过MappingJackson2MessageConverter转化后,可能会发现字节数组在转换前不同了,那是因为是将转换器转换后的数据的字节数组放到了里面,肯定与原始的字节数组中的内容不同了。

扩展:

考虑一下,我们是不是可以自定义自己的转换器

相关内容

热门资讯

滚动更新丨A股三大指数集体高开... 09:25 A股开盘丨三大指数集体高开沪指高开0.67%,深证成指高开1.74%,创业板指高开2.2...
全国政协委员李书福:推广甲醇电... 李书福建议,交通部优先推广甲醇电动营运货车,同时在高速服务区、省道干线等物流通道布局甲醇加注设施,打...
阿里千问内外交困,马云更频繁的... 当前在AI竞争中落后的阿里巴巴频繁发力,其中一个很有意思的现象是,其创始人马云比以前更频繁的现身了。...
相机价格暴涨,老相机价格翻十倍... 最近一段时间,老相机突然在市场上走红,原先不受欢迎的老相机突然价格暴涨,有些老相机价格甚至翻十倍,如...
三次石油危机复盘 本文系基于公开资料撰写,仅作为信息交流之用,不构成任何投资建议从1973年10月到1990年10月,...