[RocketMQ]转换器
创始人
2025-05-28 20:15:58

背景:

在一次业务联调时,客户说无法解析rocketmq中的消息(我们是用的消息协议是google的protobuf),通过rocketmq的控制台发现,消息变成了json串如下:

而客户想要的是如下样子的:

观察两者可以发现,样子是截然不同的,一个变成了json串,一个是乱码的样子。

然后查看我们的代码,发现是在发送消息时,用错了Message对象。下面详细说明问题点:

在springboot中通过rocketmqTemplate发送消息,一般使用如下方式:

rocketMQTemplate.sendOneWay(topic,message);

我们在创建message的时候,一般有两种方式如下,注意观察Message的包:

方式一:对应于第一个json的图片

//方式一Demo1.Person person = Demo1.Person.newBuilder().setId(1).setEmail("b.163.com").setName("小里3").build();
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message();
msg.setBody(person.toByteArray());
rocketMQTemplate.sendOneWay(topic,msg);

方式二:对应第二个乱码的样子的图片

//方式二
Demo1.Person person = Demo1.Person.newBuilder().setId(1).setEmail("b.163.com").setName("小里3").build();
byte[] newByteArray =person.toByteArray();
org.springframework.messaging.Message msg = MessageBuilder.withPayload(newByteArray).build();
rocketMQTemplate.sendOneWay("test5",msg);

正如我们看到的,spring和rocketmq都有一个Message对象,这两个都可以放到rocketMqTemplate中使用,但是这二者有个明显的区别在于,rocektmq的Message对象在发送时,会将我们发送的对象进行json的序列化(也就是图一看到的,将序列化成json的字符串放到rocketmq中),而spring的Message对象是将字节数组放到rocketmq中,也就是看到的乱码的样子啦。

为什么放入的都是对象,但是最终放到rocketmq中的结果不同呢?主要是因为在发送消息时,会根据对象的不同,使用了了不同的消息转换器对消息内容进行了序列化。下面通过源码来看一探究竟:

看下图,以RocketmqTemplate的sendOneWay方法为例,你会发现这个方法有两个(红框),也就是方法重载了,当sendOneWay传入的参数是spring的Message时会调用【1】,当传入的参数是其他类型时便会调用【2】,然后在【2】中组建一个spring的Mesage在调用【1】,这一步会影响到后面要讲的消息转换器的选择,因为palyload的内容被二次封装了,记住这里,后面会用到这里

然后在这个createRocketMqMessage()方法,就会将我们的消息通过转换器进行转换,比如把消息变成字节数组还是序列换成json串,然后把转换完成的结果放到roceketmq的Message对象发送出去。如下图:

在这一步的转换过程中,就涉及到了序列化的问题,也就是最开头我们说的是将对象序列化为json还是字节数组的问题了,至于最后到底变成什么样子,取决于它内部转换器MessageConverter(默认一共有四个转换器)。如下图:

上图这个doConvert就是用来做转换的了,里面会选择处合适的(转换器的support方法,默认都是通过类型进行判断)转换器,将我们传入的原始消息进行转化。例如转为json格式,或者维持不变。

在doConvert方法中,会调用一个CompositeMessageConverter的转换器,这是一个代理了,这里面会遍历所有convert,然后找到符合的转换器,如下图:

我们说过,到底用哪个转换器,就是在toMessage这个方法里来确定的,并且转换也是在这个方法中完成的。这个方法也是个重载的方法,如下图,这里很重要,他跟最后用哪个转换器有一定关系。

注意看这个canConvertTo方法,他用来判断消息是否可以被当前的这个消息转换器给进行转换,一般来说都是supports方法来判断,supports里面通过class来判断是否支持。如下图

但是这些消息转换器中的MappingJackson2MessageConverter是直接重写了canConvertTo方法,所以当我们发送的消息对象是Spring的Message时,它里面的payload属性就是字节数组,所以当按照顺先执行ByteArrayMessageConverter时,他的supports方法会判断这个payload是不是数组对象,满足条件就用这个转换器转换了。

但是当我们的消息用的是rocketmq的Message对象时,就会想前面【1】处所写的那样,会先进行一次包装,将rocketmq的Message对象作为payload封装到spring的Message对象中,此值这些转换器的supports方法将都不满足,因为类型都不匹配。只有到MappingJackson2MessageConverter时,因为它重写了canConvertTo方法,所以就不使用supports了。它canConvertTo中返回了true,所以就会将消息转化为了json对象,然后将这个json放到mq中了。

总结:

此篇文章的关键点在于消息转换器,如下:

使用哪个转换器决定了放到mq中的数据的样子,是字节是字符串还是json串等等。

另外,当通过MappingJackson2MessageConverter转化后,可能会发现字节数组在转换前不同了,那是因为是将转换器转换后的数据的字节数组放到了里面,肯定与原始的字节数组中的内容不同了。

扩展:

考虑一下,我们是不是可以自定义自己的转换器

相关内容

热门资讯

重大通报“天天武汉麻将确实有挂... 亲.天天武汉麻将这款游戏是可以开挂的,确实是有挂的,通过添加客服【3671900】很多玩家在这款游戏...
盘点一款“樱花众娱究竟有没有挂... 您好:樱花众娱这款游戏可以开挂,确实是有挂的,需要软件加微信【4830828】很多玩家在这款游戏中打...
分享!“斗棋联盟能开挂吗 ”(... 您好:斗棋联盟这款游戏可以开挂,确实是有挂的,需要软件加微信【5537821】,很多玩家在永和备厅这...
重磅.揭秘“ 老西儿作弊器”确... 您好:老西儿这款游戏可以开挂,确实是有挂的,需要软件加微信【6676724】很多玩家在这款游戏中打牌...
我来教您「闲逸斗地主」开挂辅助... 您好:闲逸斗地主这款游戏可以开挂,确实是有挂的,需要了解加客服微信【9183893】很多玩家在这款游...