RabbitMQ-Exchanger

Posted May 30, 202010 min read

In Rabbitmq, the message is sent to the switch, and the switch sends the message to the queue according to certain rules, and the broker sends the message to the consumer, or sends it to the message actively from the queue. The previous few talked about queues. This article looks at how the exchange sends messages to the queue.
hello-world-example-routing.webp.jpg

Exchanger

The exchange receives messages and routes them to zero or more queues. It supports four exchange types:Direct, Fanout, Topic, and Headers. It also declares some attributes, the most important of which are:the name of the exchange, the exchange type, whether it is persistent, whether it is automatically deleted, and parameters.
Whether it is persistent determines whether the switch exists after rabbitmq restarts. Whether to delete automatically determines whether the switch is deleted when the last queue is unbound.

Exchange.DeclareOk exchangeDeclare(String exchange,
        BuiltinExchangeType type,
        boolean durable,
        boolean autoDelete,
        boolean internal,
        Map <String, Object> arguments) throws IOException;

Default switch

The default switch name is " ", which is an empty string. When we don't define it, it looks like the message is sent directly to the queue.

Direct

The message is delivered to the queue according to the message routing key. The main steps are as follows:

  1. Bind the queue to the switch via the routing key K
  2. When a new message with routing key R reaches the switch, if K = R, the switch routes it to the queue

exchange-direct.webp.jpg
The producer code sends messages with routing keys("images.archive", "images.crop", "images.resizer") to the images of the exchange through channel.basicPublish.

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    //Create a Channel
    try(Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        //Define the switch
        channel.exchangeDeclare(Constant.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String []routingKeys = {"images.archive", "images.crop", "images.resizer"};
        for(int i = 0; i <routingKeys.length; i ++) {
            //send the message to the queue
            channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys [i], null, routingKeys [i].getBytes());
            channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys [i], null, routingKeys [i].getBytes());
        }
        System.out.println("Sent complete");
    }
}

The ArchiveRec1 consumer binds the exchange images, routing key images.archive, and queue archive1 together through channel.queueBind.

//Define the name of the queue
public final static String QUEUE_NAME = "archive1";

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    Connection connection = factory.newConnection();
    //Create a Channel
    Channel channel = connection.createChannel();
    //Define the queue through Channel
    //The queue is declared here because the consumer may be started before publishing the server, so make sure that the queue exists before trying to use the messages in the queue
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //Bind switch, routing key, queue
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_NAME, "images.archive");
    System.out.println("Waiting for messages.");
    //Asynchronous callback processing
    DeliverCallback deliverCallback =(consumerTag, delivery)-> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ArchiveRec1 Received '" + message + "'");
    };
    //receive message
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag-> {
    });
}

The ArchiveRec1 consumer binds the exchange images, routing key images.archive, and queue archive2 together through channel.queueBind. Except for the queue name, the others are the same as the above code, so they will not be posted.
CropRec consumers, through channel.queueBind, bind the exchange images, routing key images.crop, and queue cropper together.

//Define the name of the queue
public final static String QUEUE_NAME = "cropper";

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    Connection connection = factory.newConnection();
    //Create a Channel
    Channel channel = connection.createChannel();
    //Define the queue through Channel
    //The queue is declared here because the consumer may be started before publishing the server, so make sure that the queue exists before trying to use the messages in the queue
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //Bind switch, routing key, queue
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_NAME, "images.crop");
    System.out.println("Waiting for messages.");
    //Asynchronous callback processing
    DeliverCallback deliverCallback =(consumerTag, delivery)-> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("cropper Received '" + message + "'");
    };
    //receive message
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag-> {
    });
}

Run ArchiveRec1, ArchiveRec2 once, and CropRec twice. After starting the consumer, run the producer DirectExchange.
The results are as follows. ArchiveRec1 and ArchiveRec2 each consume 2 pieces of data, and two CropRec consume a total of 2 pieces of data, indicating that two consumers in the same queue consume polling.
ArchiveRec1
image.png
ArchiveRec2
image.png
CropRec
image.png

fanout

Send messages to all queues of the switch, ignoring the effect of routing keys. In other words, when multiple queues are bound to this switch, each time the switch receives a message, it will be sent to these queues in bulk. Although the Direct example above can also be bound by multiple queues and routing key switches to achieve some of the group sending function, fanout is still more convenient for the group sending function.
exchange-fanout.webp.jpg
Change it according to the example of direct, change the name of the exchange and queue, and then change the exchange type to fanout.
ArchiveRec1 code is as follows, ArchiveRec2 is the same.

//Define the name of the queue
public final static String QUEUE_NAME = "fanout_archive1";

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    Connection connection = factory.newConnection();
    //Create a Channel
    Channel channel = connection.createChannel();
    //Define the queue through Channel
    //The queue is declared here because the consumer may be started before publishing the server, so make sure that the queue exists before trying to use the messages in the queue
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //Bind switch, routing key, queue
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_FANOUT_NAME, "images.archive");
    System.out.println("Waiting for messages.");
    //Asynchronous callback processing
    DeliverCallback deliverCallback =(consumerTag, delivery)-> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ArchiveRec1 Received '" + message + "'");
    };
    //receive message
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag-> {
    });
}

The CropRec code is as follows:

//Define the name of the queue
public final static String QUEUE_NAME = "fanout_cropper";

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    Connection connection = factory.newConnection();
    //Create a Channel
    Channel channel = connection.createChannel();
    //Define the queue through Channel
    //The queue is declared here because the consumer may be started before publishing the server, so make sure that the queue exists before trying to use the messages in the queue
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //Bind switch, routing key, queue
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_FANOUT_NAME, "images.crop");
    System.out.println("Waiting for messages.");
    //Asynchronous callback processing
    DeliverCallback deliverCallback =(consumerTag, delivery)-> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("cropper Received '" + message + "'");
    };
    //receive message
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag-> {
    });
}

Run ArchiveRec1, ArchiveRec2 once, and CropRec twice. After starting the consumer, run the producer FanoutExchange.
The results are as follows. ArchiveRec1 and ArchiveRec2 each consume 6 pieces of data, and other routing key information is received. The two CropRec consumed a total of 6 data, and the fanout \ _cropper queue also received 6 messages, which were consumed by two consumers.
ArchiveRec1
image.png
ArchiveRec2
image.png
CropRec
image.png

Topic

Use wildcards to consume the messages you want. The name is the same as the activemq publish subscription, but the features are similar to the activemq wildcard.
Wildcards are # and *. # Matches one or more, and * matches one.

AllRec is used to receive all messages at the beginning of images, ArchiveRec is used to receive all messages at the beginning of images.archive, ARec is used to receive messages at the beginning of images and end of C, and CropRec is used to receive all messages at the beginning of images.cropRec.
Producer distribution sends messages to the three routing keys images.archive.a, images.archive.b, images.crop.a

Producer:

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    //Create a Channel
    try(Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        //Define the switch
        channel.exchangeDeclare(Constant.EXCHANGE_TOPIC_NAME, BuiltinExchangeType.TOPIC);
        String []routingKeys = {"images.archive.a", "images.archive.b", "images.crop.a"};
        for(int i = 0; i <routingKeys.length; i ++) {
            //send the message to the queue
            channel.basicPublish(Constant.EXCHANGE_TOPIC_NAME, routingKeys [i], null, routingKeys [i].getBytes());
        }
        System.out.println("Sent complete");
    }
}

AllRec

//Define the name of the queue
public final static String QUEUE_NAME = "all";

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    Connection connection = factory.newConnection();
    //Create a Channel
    Channel channel = connection.createChannel();
    //Define the queue through Channel
    //The queue is declared here because the consumer may be started before publishing the server, so make sure that the queue exists before trying to use the messages in the queue
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //Bind switch, routing key, queue
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images. #");
    System.out.println("Waiting for messages.");
    //Asynchronous callback processing
    DeliverCallback deliverCallback =(consumerTag, delivery)-> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("AllRec Received '" + message + "'");
    };
    //receive message
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag-> {
    });
}

ArchiveRec

//Define the name of the queue
public final static String QUEUE_NAME = "archive";

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    Connection connection = factory.newConnection();
    //Create a Channel
    Channel channel = connection.createChannel();
    //Define the queue through Channel
    //The queue is declared here because the consumer may be started before publishing the server, so make sure that the queue exists before trying to use the messages in the queue
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //Bind switch, routing key, queue
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.archive. *");
    System.out.println("Waiting for messages.");
    //Asynchronous callback processing
    DeliverCallback deliverCallback =(consumerTag, delivery)-> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ArchiveRec Received '" + message + "'");
    };
    //receive message
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag-> {
    });
}

ARec

//Define the name of the queue
public final static String QUEUE_NAME = "a";

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    Connection connection = factory.newConnection();
    //Create a Channel
    Channel channel = connection.createChannel();
    //Define the queue through Channel
    //The queue is declared here because the consumer may be started before publishing the server, so make sure that the queue exists before trying to use the messages in the queue
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //Bind switch, routing key, queue
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images. *. a");
    System.out.println("Waiting for messages.");
    //Asynchronous callback processing
    DeliverCallback deliverCallback =(consumerTag, delivery)-> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ARec Received '" + message + "'");
    };
    //receive message
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag-> {
    });
}

CropRec

//Define the name of the queue
public final static String QUEUE_NAME = "crop";

public static void main(String []args) throws IOException, TimeoutException {
    //Declare a connection factory
    ConnectionFactory factory = new ConnectionFactory();
    //Create a connection to the rabbitmq server
    Connection connection = factory.newConnection();
    //Create a Channel
    Channel channel = connection.createChannel();
    //Define the queue through Channel
    //The queue is declared here because the consumer may be started before publishing the server, so make sure that the queue exists before trying to use the messages in the queue
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //Bind switch, routing key, queue
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.crop. *");
    System.out.println("Waiting for messages.");
    //Asynchronous callback processing
    DeliverCallback deliverCallback =(consumerTag, delivery)-> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("CropRec Received '" + message + "'");
    };
    //receive message
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag-> {
    });
}

The results are as follows:
AllRec received all the messages
image.png
ArchiveRec received 2 messages starting with images.archive
image.png
ARec received 2 messages beginning with a and ending with a
image.png
CropRec received a message starting with images.crop
image.png