Wie ich schon in meinem Beitrag zu Microservices erwähnt habe, kann die Kommunikation zwischen zwei System nicht nur über HTTP, sondern auch über andere leichtgewichtige Protokolle geschehen. Dabei nenne ich als konkretes Beispiel AMQP. Das Advanced Message Queuing Protocol eignet sich hier vor allem dadurch, da es programmiersprachen-neutral ist und sowohl in einer Java-, Ruby- oder C#-Anwendung verwendet werden kann.
Im konkreten möchte ich eine kleine Demo-Anwendung entwickeln bei der Events anhand bestimmter Kriterien zu Konsumenten geleitet werden. Dieses Vorgehen findet sich derzeit oft in Microservices-Architekturen wieder, da mit dem Einsatz eines Message Brokers die direkte Kopplung einer Point-to-Point-Verbindung wie HTTP gelöst wird. Zudem weiß ein Service oft nicht wie viele Konsumenten er bedienen muss.
Zunächst etwas Vokabular.
Begriff | Beschreibung |
---|---|
Produzent | Das System, das die Nachricht erzeugt/produziert hat |
Konsument | Das System, das die Nachricht empfängt |
Exchange | Ein Endpunkt für Produzenten. Hier werden Nachrichten abgelegt |
Queue | Ein Endpunkt für Konsumenten. Hier werden Nachrichten abgerufen |
Binding | Eine Zuordnung von eines Exchange zu Queue |
Routing-Key | Ein Diskriminator um zu entscheiden zu welchen Queues die Nachricht geleitet werden soll |
Direct Exchange | Ein direkte Zuordnung Exchange zu einer Queue |
Topic Exchange | Ein Exchange bei dem die Nachrichten nur an Queues weitergeleitet werden die über einen bestimmten Routing-Key an den Topic Exchange angebunden sind |
Fanout Exchange | Ein Endpunkt für Produzenten bei dem Nachrichten zu allen angebundenen Queues geleitet wird |
Ich habe mich dafür entschieden die Anbindung an meine RabbitMQ-Instanz über Spring AMQP aufzubauen. Dafür habe ich folgende Konfigurations-Klasse angelegt.
@Configuration public class MessagingConfiguration { static final String DEMO_EXCHANGE_NAME = "demo.direct"; static final String DEMO_QUEUE_NAME = "demo.eai"; @Bean public ConnectionFactory conenctionFactory() { final CachingConnectionFactory ccf = new CachingConnectionFactory(); ccf.setUsername("guest"); ccf.setPassword("guest"); return ccf; } @Bean public AmqpTemplate AmqpTemplate(ConnectionFactory cf) { return new RabbitTemplate(cf); } @Bean public AmqpAdmin amqpAdmin(ConnectionFactory cf) { return new RabbitAdmin(cf); } @Bean public Exchange topicExchange(AmqpAdmin admin) { final TopicExchange topicExchange = new TopicExchange(DEMO_EXCHANGE_NAME); admin.declareExchange(topicExchange); return topicExchange; } @Bean public Queue eaiQueue(AmqpAdmin admin) { final Queue eaiQueue = new Queue(DEMO_QUEUE_NAME); admin.declareQueue(eaiQueue); return eaiQueue; } @Bean @DependsOn({ "topicExchange", "eaiQueue" }) public Binding eaiBinding(AmqpAdmin admin) { final Binding binding = new Binding(DEMO_QUEUE_NAME, DestinationType.QUEUE, DEMO_EXCHANGE_NAME, "eai", new HashMap<String, Object>()); admin.declareBinding(binding); return binding; } }
Neben der Konfiguration der ConnectionFactory
und des AmqpTemplates
wird hier sowohl der Exchange als auch die Queue und das Binding von Exchange zu Queue definiert.
AMQP erlaubt es, dass Exchanges und Queues programatisch erzeugt werden können. Spring AMQP kennt für diese Zwecke das Interface AmqpAdmin
. Eine konkrete Implementierung ist RabbitAdmin
.
Das AmqpTemplate kann dahingehend konfiguriert werden, dass sowohl Exchange, Routing-Key und Queue fest definiert sind. Dies eignet sich, wenn die Instanz nur einen Use-Case (nur an einen Exchange senden, nur von einer Queue empfangen) hat.
Ich habe einen (Unit-)Test angelegt, der beispielhaft das Senden und Empfangen einer Nachricht demonstriert.
@Test public void routingKeyEAI() throws InterruptedException { template.send(MessagingConfiguration.DEMO_EXCHANGE_NAME, "eai", MessageBuilder.withBody("Message Channel".getBytes()).build()); Thread.sleep(100); final Object data = template.receiveAndConvert(MessagingConfiguration.DEMO_QUEUE_NAME); assertThat(data, not(nullValue())); final String message = new String((byte[]) data); assertThat(message, equalTo("Message Channel")); }
Der erste Test sendet die Nachricht Message Channel mit dem Routing-Key eai
an den Topic Exchange demo.topic
und empfängt die Nachricht nach kurzer Pause von der Queue demo.eai
.
@Test public void routingKeyDP() throws InterruptedException { template.send(MessagingConfiguration.DEMO_EXCHANGE_NAME, "dp", MessageBuilder.withBody("State Machine".getBytes()).build()); Thread.sleep(100); final Object data = template.receiveAndConvert(MessagingConfiguration.DEMO_QUEUE_NAME); assertThat(data, nullValue()); }
Im zweiten Test wird die Nachricht Queue an den gleichen Topic Exchange versendet. Diesmal jedoch mit dem Routing-Key dp
. Ergebnis des Ganzen: Es wird keine Nachricht empfangen.
Der komplette Quelltext zu dieser Demonstrations-Anwendung ist auf GitHub unter amqp-demo hinterlegt. Dort sind auch die Abhängigkeiten zu finden.