根本消息发送有三种姿势:同步、异步、单向。
- 同步:消息发送到 Broker 乐成后,返回发送乐成结果;这种可靠性同步地发送方式利用的比力广泛,比如:紧张的消息关照,短信关照。
- 异步:消息发送出去后立即返回结果,可以在发送乐成的消息回调中,查察消息是否发送乐成;异步消息通常用在对相应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的相应。
- 单向:消息发送出去,Broker 不返回结果。这种方式重要用在不特别关心发送结果的场景,比方日记发送。
一、同步发送
在 第一章 RocketMQ 搭建调试环境 中,演示了消息的同步发送。
SendResult sendResult = producer.send(msg)二、异步发送
改造 org.apache.rocketmq.example.simple.AsyncProducer
public class AsyncProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); producer.setNamesrvAddr("127.0.0.1:9876"); // https://blog.csdn.net/heihaozi/article/details/119145266 DefaultChannelId.newInstance(); producer.start(); // 异步发送失败重试,大概导致消息重复发送,须要包管消息幂等性 producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("TopicTest", "TagA", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 异步发送模式 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); }}三、单向发送
改造 org.apache.rocketmq.example.simple.OnewayProducer
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 单向发送 producer.sendOneway(msg);四、消息消耗
在 第一章 RocketMQ 搭建调试环境 中,演示了消息的吸收。
参考
RocketMQ 根本样例 |