1、发布确认
1.1、发布确认的引出
一个消息的持久化需要经历的步骤:
设置要求队列持久化。
设置要求队列中的消息必须持久化。
发布确认
如果缺少了发布确认的话,那么消息在磁盘上持久化之前会发生丢失,从而不能满足消息持久化的目的。
1.2、发布确认的策略
1.2.1、开启发布确认的方法
Channel channel = RabbitmqUtil.getChannel();
//开启发布确认
channel.confirmSelect();
发布确认默认是没有开启的,如果需要开启需要调用confirmSelect,每当需要使用发布确认的时候,都需要调用该方法。
1.2.2、单个确认发布
单个确认发布是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。
该确认方式主要通过waitForConfirms方法实现,这个方法只有在消息被确认的时候才会返回,如果在指定时间范围内这个消息没有被确认那么它将会抛出异常。
这种确认方式的最大的缺点就是:发布速度特别慢。
public static void ConfirmMessageIndividually() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID()。toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行单个发布确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("单个确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
}
1.2.3、批量确认发布
先发布一批消息然后一起确认。
缺点:当发生故障导致发布出现问题时,不知道那个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的消息而后重新发布消息。
public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID()。toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
// 批量处理消息的个数
int batchSize = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行批量发布确认
if(i % batchSize == 0){
channel.waitForConfirms();
System.out.println("批量处理消息成功");
}
}
long end = System.currentTimeMillis();
System.out.println("批量确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms");
}
如需转载,请注明文章出处和来源网址:http://www.divcss5.com/html/h64834.shtml