rabbit – producer and consumer confirm the mode of ack

Include this and share with you some practical action on rabbit production and consumption side; as the title of the article, the main content as confirm the producer and consumer of ack, use this mode both are used to ensure data integrity, prevent data loss.

    producer of confirm mode

    consumer mode of ack

producer of confirm mode

First of all, there is a business scenario 1: a system before doing activities, activities need to send a text message to the user’s mobile phone users want to participate, because the amount of users a bit large, so by SMS mq to insert data, which allows text messages consumer mq services to send text messages;

At this point insert mq news service in order to ensure a message to all users, and to insert in a short time to complete (and therefore uses asynchronous insert mode (fast)), we need to know that every time you insert mq successful, if unsuccessful it we may collect information after the failure of the replacement (and therefore confirm mode came back to me); Figure design:

Springboot may be used to open the package amqp factory classes based confirm mode, and then set the callback function RabbitTemplate template, the following code:

 1     ///

region producer Production - confirm mode

2 3 public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback) { 4 return this.getRabbitTemplate(this.connectionFactory(), confirmCallback); 5 } 6 7 public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplate.ConfirmCallback confirmCallback) { 8 RabbitTemplate template = new RabbitTemplate(connectionFactory); 9 //

product turned confirm mode

10 connectionFactory.setPublisherConfirms(true); 11 //

Set confirm callback processing

12 template.setConfirmCallback(confirmCallback); 13 return template; 14 } 15 ///endregion

Here we pass the callback method through RabbitTemplate.ConfirmCallback custom programming function, information collection results confirm returned as follows:

1         RabbitUtil rabbitUtil = new RabbitUtil(this.getFirstNode().getLink());
2         RabbitTemplate template = rabbitUtil.getRabbitTemplate((a, b, c) -> {
3             System.out.println("

firstNodeTpl - ConfirmCallback of Id:

" + a.getId() + "


" + b + "


" + c); 4 });

Finally mq information transmission method RabbitTemplate convertAndSend example, we can see the information recorded in the log follows:

State where it to true: send successfully expressed, false: indicates send failed; false when the error information is typically respond by c, where the network is disconnected, the following error:

consumer mode of ack

Again, there is a Scenario 2: The SMS service to the consumer mq queue information, if the service call operator interface to send text messages anomaly (SMS carrier interface arrears), the text was sent at this time we failed, users not receive text messages, but the default (enabled by default ack) mq message has been consumed rabbit is not the premise of the record (kafka exception); abnormal Shihai want mq messages exist in the business logic, you can use ack way;

Use factory classes based amqp encapsulation off automatic ack mode to the manual ack manner springboot; If the manual; business code only when the process has completed, the final identification code is set by the ack, the message may be discarded to notify the rabbit after the model has not submitted ack identities, there has been mq messages can not be released (after each consumer consumption, rabbit noack will repeat the message in a queue):

 1     ///

region consumer monitor - Manual ack

2 public SimpleRabbitListenerContainerFactory listenerContainerFactory() { 3 return this.listenerContainerFactory(this.connectionFactory()); 4 } 5 6 public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) { 7 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 8 factory.setConnectionFactory(connectionFactory); 9 //

Code manual ack

10 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); 11 //

The number of consumers turn

12 factory.setConcurrentConsumers(2); 13 //

Every time accept the amount of data, default 250

14 factory.setPrefetchCount(300); 15 return factory; 16 } 17 ///endregion

After connection factory settings by manual ack mode, then the message acquisition mq, finish normal business logic, and finally releasing the manual ack message notification, as follows:

1     @RabbitListener(containerFactory = "firstNodeListener", queues = {"${shenniu.rabbits.firstNode.queue}"})
2     private void firstNodeListener(String msg, Channel channel, Message message) {
3         try {
4             long deliverTag = message.getMessageProperties().getDeliveryTag();
5             System.out.println("

firstNodeListener - Consumer news [

" + deliverTag + "] - " + msg); 6 channel.basicAck(deliverTag, true); 7 } catch (Exception ex) { 8 } 9 }

Here ack mainly based on the unique number mq messages (deliverTag) to inform; if we do not set ack confirmation, the message status is as follows rabbit management background:

Leave a Reply