Messaging mit AMQP

Wie ich schon in meinem Beitrag zu  Microservices erwähnt habe, kann die Kommunikation zwischen zwei System nicht nur über HTTP, sondern auch über andere leichtgewichtige Protokolle geschehen. Dabei nenne ich als konkretes Beispiel AMQP. Das Advanced Message Queuing Protocol eignet sich hier vor allem dadurch, da es programmiersprachen-neutral ist und sowohl in einer Java-, Ruby- oder C#-Anwendung verwendet werden kann.

Im konkreten möchte ich eine kleine Demo-Anwendung entwickeln bei der Events anhand bestimmter Kriterien zu Konsumenten geleitet werden. Dieses Vorgehen findet sich derzeit oft in Microservices-Architekturen wieder, da mit dem Einsatz eines Message Brokers die direkte Kopplung einer Point-to-Point-Verbindung wie HTTP gelöst wird. Zudem weiß ein Service oft nicht wie viele Konsumenten er bedienen muss.

Zunächst etwas Vokabular.

Begriff Beschreibung
Produzent Das System, das die Nachricht erzeugt/produziert hat
Konsument Das System, das die Nachricht empfängt
Exchange Ein Endpunkt für Produzenten. Hier werden Nachrichten abgelegt
Queue Ein Endpunkt für Konsumenten. Hier werden Nachrichten abgerufen
Binding Eine Zuordnung von eines Exchange zu Queue
Routing-Key Ein Diskriminator um zu entscheiden zu welchen Queues die Nachricht geleitet werden soll
Direct Exchange Ein direkte Zuordnung Exchange zu einer Queue
Topic Exchange Ein Exchange bei dem die Nachrichten nur an Queues weitergeleitet werden die über einen bestimmten Routing-Key an den Topic Exchange angebunden sind
Fanout Exchange Ein Endpunkt für Produzenten bei dem Nachrichten zu allen angebundenen Queues geleitet wird

Ich habe mich dafür entschieden die Anbindung an meine RabbitMQ-Instanz über Spring AMQP aufzubauen. Dafür habe ich folgende Konfigurations-Klasse angelegt.

@Configuration
public class MessagingConfiguration {

  static final String DEMO_EXCHANGE_NAME = "demo.direct";
  static final String DEMO_QUEUE_NAME = "demo.eai";

  @Bean
  public ConnectionFactory conenctionFactory() {
    final CachingConnectionFactory ccf = new CachingConnectionFactory();

    ccf.setUsername("guest");
    ccf.setPassword("guest");

    return ccf;
  }

  @Bean
  public AmqpTemplate AmqpTemplate(ConnectionFactory cf) {
    return new RabbitTemplate(cf);
  }

  @Bean
  public AmqpAdmin amqpAdmin(ConnectionFactory cf) {
    return new RabbitAdmin(cf);
  }

  @Bean
  public Exchange topicExchange(AmqpAdmin admin) {
    final TopicExchange topicExchange = new TopicExchange(DEMO_EXCHANGE_NAME);
    admin.declareExchange(topicExchange);

    return topicExchange;
  }

  @Bean
  public Queue eaiQueue(AmqpAdmin admin) {
    final Queue eaiQueue = new Queue(DEMO_QUEUE_NAME);

    admin.declareQueue(eaiQueue);

    return eaiQueue;
  }

  @Bean
  @DependsOn({ "topicExchange", "eaiQueue" })
  public Binding eaiBinding(AmqpAdmin admin) {
    final Binding binding = new Binding(DEMO_QUEUE_NAME, DestinationType.QUEUE,
                                        DEMO_EXCHANGE_NAME, "eai",
                                        new HashMap<String, Object>());

    admin.declareBinding(binding);

    return binding;
  }
}

Neben der Konfiguration der ConnectionFactory und des AmqpTemplates wird hier sowohl der Exchange als auch die Queue und das Binding von Exchange zu Queue definiert.
AMQP erlaubt es, dass Exchanges und Queues programatisch erzeugt werden können. Spring AMQP kennt für diese Zwecke das Interface AmqpAdmin. Eine konkrete Implementierung ist RabbitAdmin.
Das AmqpTemplate kann dahingehend konfiguriert werden, dass sowohl Exchange, Routing-Key und Queue fest definiert sind. Dies eignet sich, wenn die Instanz nur einen Use-Case (nur an einen Exchange senden, nur von einer Queue empfangen) hat.

Ich habe einen (Unit-)Test angelegt, der beispielhaft das Senden und Empfangen einer Nachricht demonstriert.

@Test
public void routingKeyEAI() throws InterruptedException {

  template.send(MessagingConfiguration.DEMO_EXCHANGE_NAME, "eai",
                MessageBuilder.withBody("Message Channel".getBytes()).build());

  Thread.sleep(100);

  final Object data = template.receiveAndConvert(MessagingConfiguration.DEMO_QUEUE_NAME);
  assertThat(data, not(nullValue()));

  final String message = new String((byte[]) data);
  assertThat(message, equalTo("Message Channel"));
}

Der erste Test sendet die Nachricht Message Channel mit dem Routing-Key eai an den Topic Exchange demo.topic und empfängt die Nachricht nach kurzer Pause von der Queue demo.eai.

@Test
public void routingKeyDP() throws InterruptedException {

  template.send(MessagingConfiguration.DEMO_EXCHANGE_NAME, "dp",
                MessageBuilder.withBody("State Machine".getBytes()).build());


  Thread.sleep(100);

  final Object data = template.receiveAndConvert(MessagingConfiguration.DEMO_QUEUE_NAME);
  assertThat(data, nullValue());
}

Im zweiten Test wird die Nachricht Queue an den gleichen Topic Exchange versendet. Diesmal jedoch mit dem Routing-Key dp. Ergebnis des Ganzen: Es wird keine Nachricht empfangen.

Der komplette Quelltext zu dieser Demonstrations-Anwendung ist auf GitHub unter amqp-demo hinterlegt. Dort sind auch die Abhängigkeiten zu finden.

Teilen Sie diesen Beitrag

Das könnte dich auch interessieren …

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Durch die weitere Nutzung der Seite stimmen Sie der Verwendung von Cookies zu. Weitere Informationen

Die Cookie-Einstellungen auf dieser Website sind auf "Cookies zulassen" eingestellt, um das beste Surferlebnis zu ermöglichen. Wenn du diese Website ohne Änderung der Cookie-Einstellungen verwendest oder auf "Akzeptieren" klickst, erklärst du sich damit einverstanden.

Schließen