Implement message sending and receiving with Spring Boot + Amazon MQ (ActiveMQ)

This section describes the procedure for building Apache ActiveMQ with Amazon MQ and using it from the Spring Boot application. In addition, we will verify the operation on the application side when a file over occurs in ActiveMQ.

I will describe how to build Apache ActiveMQ with Amazon MQ and use it from Spring Boot application.

I couldn’t find a lot of information, but even if I use Amazon MQ, the usability is not much different from normal ActiveMQ.

Differences between Amazon MQ and Amazon SQS

When it comes to queue message services on AWS, Amazon SQS comes to mind first.

SQS is a fully managed service that does not require any message broker configuration and automatically scales according to the number of message requests, making it easy to build and operate.

However, I think there are cases where SQS cannot be selected.

For example, if you want to migrate an existing application that uses Apache ActiveMQ as a message broker to AWS, you may want to use ActiveMQ as it is because you do not have to modify the application.

Also, SQS has some restrictions because it is managed. Please refer to the following page for details.

https://hatenablog-parts.com/embed?url=https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html

There are cases where Amazon MQ is selected due to the above circumstances.

Amazon MQ is a message broker service compatible with Apache ActiveMQ and RabbitMQ. It is a managed service that can be used without installing middleware on a server such as EC2. Unlike SQS, you need to set up the message broker yourself, and there is also operational effort such as scaling.

The difference between MQ and SQS and how to use them are explained in the official AWS documentation as follows.

Q: What is the difference between Amazon SQS and Amazon MQ?

If you want to quickly and easily move the messaging functionality that your existing application is processing to the cloud, we recommend using Amazon MQ. Support for industry-standard APIs and protocols allows any standard-compliant message broker to switch to Amazon MQ without rewriting the messaging code in your application. If you’re building an entirely new application in the cloud, we recommend considering Amazon SQS and Amazon SNS. Amazon SQS and SNS are lightweight, fully managed message queuing and topic services that scale almost unlimitedly and provide a simple, easy-to-use API.

Q: How should I use Amazon MQ and Amazon EC2 ActiveMQ or RabbitMQ that I manage?

Your choice depends on how detailed you want to manage your message broker and the underlying infrastructure. Amazon MQ is a managed message broker service that handles operations such as setup, monitoring, maintenance, and provisioning of a highly available and durable underlying infrastructure. If you want to reduce operational overhead and associated costs, Amazon MQ is for you. If you want more control over feature and configuration customizations, or if you want to use custom plugins, you can consider installing and running Message Broker directly on Amazon EC2.

Create a project with Spring Initializar

First, prepare the Spring Boot application. Let’s create a project with libraries etc. prepared using Spring Initializar.

Since the library for handling ActiveMQ is provided by Spring official, specify it as Dependencies.

Spring Initializr

As shown above, the versions are Java 11 and Spring Boot 2.5.0. Also, other libraries are included for verification.

Creating Amazon MQ (ActiveMQ)

Next, create ActiveMQ in Amazon MQ. The settings are as follows. I want to verify failover later, so I created it with the deploy mode set to active / standby.

The version of ActiveMQ is 5.16.3.

This time, public accessibility is enabled for verification, but it is preferable to disable it during production operation and allow access only from within the VPC.

Also, consider whether to enable the output of general logs and audit logs to CloudWatch Logs as needed.

Sending and receiving messages using ActiveMQ

In the official Spring library, spring-boot-starter-activemqyou can specify the endpoint and authentication information as follows.

application.yml

spring:
  activemq:
    broker-url: failover:(ssl://xxxxxxxxxx-1.mq.eu-west-3.amazonaws.com:61617,ssl://xxxxxxxxxx-2.mq.eu-west-3.amazonaws.com:61617)
    user: user
    password: password

Please refer to the official document for other setting items.

https://hatenablog-parts.com/embed?url=https://spring.pleiades.io/spring-boot/docs/2.1.11.RELEASE/reference/html/boot-features-messaging.html

Next, implement the messaging process. This time, I referred to the guide provided by Spring officially as it is.

https://hatenablog-parts.com/embed?url=https://spring.pleiades.io/guides/gs/messaging-jms/Source code

Application.Java

package com.example.demo;

import com.example.demo.hello.Email;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.jms.ConnectionFactory;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Timer;
import java.util.TimerTask;

@SpringBootApplication
@EnableJms
public class Application {

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        var factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        var converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    public static void main(String[] args) {
        var context = SpringApplication.run(Application.class, args);
        var jmsTemplate = context.getBean(JmsTemplate.class);

        sendQueue(jmsTemplate);
    }

    private static void sendQueue(JmsTemplate jmsTemplate) {
        var sendingTask = new TimerTask() {
            public void run() {
                var zonedDateTime = ZonedDateTime.now();
                var dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
                var now = zonedDateTime.format(dateTimeFormatter);

                System.out.println("Sending an email message.");
                jmsTemplate.convertAndSend("mailbox", new Email("bilel@example.com", now));
            }
        };
        var timer = new Timer();
        timer.schedule(sendingTask, 0, 10000);
    }
}

Email.Java

package com.example.demo.hello;

public class Email {

    private String to;
    private String body;

    public Email() {
    }

    public Email(String to, String body) {
        this.to = to;
        this.body = body;
    }

    public String getTo() {
        return to;
    }

    public void setTo(String to) {
        this.to = to;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }

    @Override
    public String toString() {
        return String.format("Email{to=%s, body=%s}", getTo(), getBody());
    }

}

Receiver.Java

package com.example.demo.hello;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

    @JmsListener(destination = "mailbox", containerFactory = "myFactory")
    public void receiveMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }

}

Only a part of the content has been changed, and it is implemented to send the current date and time as the content of the message. This is to check if the queue is not lost and the process is executed normally when failover occurs in ActiveMQ.

When Spring Boot is started with this implementation, the following log will be output.

Application log output 
2021-11-01 15:04:07.776  INFO 53464 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.activemq.util.IntrospectionSupport (file:/Users/bilel/.gradle/caches/modules-2/files-2.1/org.apache.activemq/activemq-client/5.16.2/xxxxxxxxxx/activemq-client-5.16.2.jar) to method sun.security.ssl.SSLSocketImpl.setHost(java.lang.String)
WARNING: Please consider reporting this to the maintainers of org.apache.activemq.util.IntrospectionSupport
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-11-01 15:04:08.452  INFO 53464 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to ssl://xxxxxxxxxx-1.mq.eu-west-3.amazonaws.com:61617
2021-11-01 15:04:08.522  INFO 53464 --- [  restartedMain] com.example.demo.Application             : Started Application in 2.962 seconds (JVM running for 3.784)
2021-11-01 15:04:08.525  INFO 53464 --- [  restartedMain] o.s.b.a.ApplicationAvailabilityBean      : Application availability state LivenessState changed to CORRECT
2021-11-01 15:04:08.527  INFO 53464 --- [  restartedMain] o.s.b.a.ApplicationAvailabilityBean      : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC
2021-11-01 15:04:09.056  INFO 53464 --- [on(1)-127.0.0.1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-01 15:04:09.057  INFO 53464 --- [on(1)-127.0.0.1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-11-01 15:04:09.057  INFO 53464 --- [on(1)-127.0.0.1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
Sending an email message.
Received <Email{to=bilel@example.com, body=2021/11/01 15:04:18.532}>
Sending an email message.
Received <Email{to=bilel@example.com, body=2021/11/01 15:04:28.537}>
Sending an email message.
Received <Email{to=bilel@example.com, body=2021/11/01 15:04:38.542}>
Sending an email message.

You can see that the message is being sent and received properly.

ActiveMQ failover

This time I created a broker with an active / standby configuration.

With this configuration, two brokers will be created, but only one will be active (accessible). When the active broker goes down for some reason, the standby broker becomes active.

Also, the URI is prepared for each broker, and it seems that only the URI of the active broker is valid.

Like the application.yml file mentioned above, ActiveMQ allows failover:uri1,...,uriNyou to write a URI in a format such as when specifying an endpoint . By writing in this format, when the ActiveMQ active broker goes down, the application can wait for failover so that queue loss does not occur.

https://hatenablog-parts.com/embed?url=https://activemq.apache.org/failover-transport-reference.html

Now, let’s check the behavior when actually failing over. In Amazon MQ, failover can be achieved by performing a « reboot » from the management console. The official AWS tutorial is provided, so please refer to it.

https://hatenablog-parts.com/embed?url=https://amazon-mq-intro.workshop.aws/failover.html

When failover occurs, the following log will be output on the application side.

Log output at failover
Sending an email message.
Received <Email{to=bilel@example.com, body=2021/11/01 15:07:28.615}>
Sending an email message.
Received <Email{to=bilel@example.com, body=2021/11/01 15:07:38.620}>
2021-11-01 15:07:48.365  WARN 53464 --- [.73.97.75:61617] o.a.a.t.failover.FailoverTransport       : Transport (ssl://xxxxxxxxxx-1.mq.eu-west-3.amazonaws.com:61617) failed, attempting to automatically reconnect

java.io.EOFException: null
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397) ~[na:na]
	at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.16.2.jar:5.16.2]
	at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.16.2.jar:5.16.2]
	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.16.2.jar:5.16.2]
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.16.2.jar:5.16.2]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2021-11-01 15:07:48.404  WARN 53464 --- [ActiveMQ Task-3] o.a.a.t.failover.FailoverTransport       : Failed to connect to [ssl://xxxxxxxxxx-2.mq.eu-west-3.amazonaws.com:61617, ssl://xxxxxxxxxx-1.mq.eu-west-3.amazonaws.com:61617] after: 1 attempt(s) with Connection refused (Connection refused), continuing to retry.
Sending an email message.
2021-11-01 15:07:59.229  INFO 53464 --- [ActiveMQ Task-3] o.a.a.t.failover.FailoverTransport       : Successfully reconnected to ssl://xxxxxxxxxx-2.mq.eu-west-3.amazonaws.com:61617
Received <Email{to=bilel@example.com, body=2021/11/01 15:07:48.624}>
Sending an email message.
Received <Email{to=bilel@example.com, body=22021/11/01 15:07:59.318}>

You can see that the broker has gone down and is automatically reconnecting. Also, when checking the contents of the received queue, it seems that sending and receiving processing can be performed without missing the queue.

In SQS, scaling is managed on the AWS side, so you do not need to be aware of it, but ActiveMQ also has a failover mechanism properly. Since it is well supported by the official Spring library, it seems that a certain degree of availability can be guaranteed.

Other reference documents

For Java code that does not use Spring Boot, the contents of the AWS official documentation will be helpful.

https://hatenablog-parts.com/embed?url=https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-connecting-application.html


Une réflexion sur “Implement message sending and receiving with Spring Boot + Amazon MQ (ActiveMQ)

Votre commentaire

Entrez vos coordonnées ci-dessous ou cliquez sur une icône pour vous connecter:

Logo WordPress.com

Vous commentez à l’aide de votre compte WordPress.com. Déconnexion /  Changer )

Image Twitter

Vous commentez à l’aide de votre compte Twitter. Déconnexion /  Changer )

Photo Facebook

Vous commentez à l’aide de votre compte Facebook. Déconnexion /  Changer )

Connexion à %s