Fork me on GitHub

The Messaging Pattern

Overview

Provides a simple implementation of Store and Forward Messaging, built using Oracle Coherence.

Distribution

The Messaging Pattern implementation is distributed in the jar file called: coherence-messagingpattern-11.0.0.jar.

However as the Messaging Pattern has several other dependencies, we **strongly recommend** that developers adopt and use tools like Apache Maven/Ivy/Gradle to transitively resolve the said dependencies, instead of attempting to do so manually.

To configure your Apache Maven-based application to use the Messaging Pattern, simply add the following declaration to your application pom.xml file <dependencies> element.

<dependency>
    <groupId>com.oracle.coherence.incubator</groupId>
    <artifactId>coherence-messagingpattern</artifactId>
    <version>11.0.0</version>
</dependency>

Details

The Messaging Pattern advocates that:* * Payload, typically represented as a Message object, may be sent to a Destination from a Publisher.

  • It is the responsibility of the infrastructure managing the Destination to ensure that Messages (arriving at the said Destination) are then stored (in some manner) and consequently forwarded (in the order in which they arrived at the said Destination) to one or more Subscribers.

  • The Subscribers appropriately consume (receive and acknowledge receipt) of the said Messages from the Destination in the order in which they were forwarded to the said Subscribers.

  • The infrastructure managing the Messages appropriately clean-up (remove and garbage collect) the said Messages that have been consumed by Subscribers.

  • The type of the Destination determines the method of delivery to the Subscribers of that Destination.

  • A Topic Destination or as they are more commonly known, Topics, will store and forward Messages to all of the Subscribers of the said Topic. This form of Message delivery is often called "publish-and-subscribe messaging", or "one-to-many messaging".

  • A Queue Destination, or as they are more commonly known, Queues, will store and forward Messages to at most one of the Subscribers of the said Queue. For each Message a different Subscriber may be used, but this is implementation and runtime dependent. This form of Message delivery is often called "point-to-point messaging" or "one-to-one messaging".

  • A Message may be Persistent or Non-Persistent. In the case of Persistent Messages, the infrastructure managing the Destination must safely store the said Messages to a persistent (and recoverable) storage device so that in the case of infrastructure failure, Messages may be recovered (not lost).

  • A Subscriber to a Topic is either Durable or Non-Durable. An application using Durable Subscriptions is permitted to terminate and restart without losing Messages that were destined for delivery to the said application Subscribers, including those that would have been delivered during the application shutdown or disconnection.

While it's rare that an architecture making extensive use of Coherence will require Store and Forward Messaging (due to the ability to use MapListeners, Continuous Queries and events for notifications), there are arguably some circumstances where the pattern is particularly useful.

Although providing an implementation of store and forward Messaging on-top-of-Coherence has always been possible (as demonstrated by this implementation), most application-level requirements for messaging are often satisfied using existing, off-the-shelf and standardized corporate messaging infrastructure, like JMS. In the cases where such infrastructure is not available or not appropriate, this implementation may provide some benefits.

While it is not the intention nor is it the purpose of this implementation to replace existing organizational messaging infrastructure, this pattern is specifically designed to provide a flexible framework for embedded, application-specific, high-performance messaging on a Data Grid.

More specifically, this implementation has been designed as a minimal framework to support the implementation of multi-directional and multi-point (no single point of failure) guaranteed event distribution, typically between multiple Coherence clusters separated by high-latency, high-bandwidth and often unreliable wide-area-networks (WANs)... aka: The Event Distribution Pattern.

Design

Internally the Messaging Pattern uses three Oracle Coherence distributed caches: the Destination cache (for storing Topic and Queue definitions), the Subscription cache and the Message cache. All of these caches are managed though the MessagingSession interface.

To use the Messaging Pattern, an application creates a MessagingSession, through which it will create Destinations, subsribe to `Destinations and then publish Messages to those Destinations.

When an application publishes a Message, it is put in the Messages cache.
A backing map listener on the Messages cache then receives an insert event after which schedules a thread to process the newly inserted Message.

For Topics, the background thread will notify each Subscription about the new Message. All Topic Subscriptions will receive the message.

For Queues the background thread will request that the Queue hands the Message to a single Subscription, the next one waiting for a message. Meanwhile, a Subscriber, which typically runs in a client JVM, will be notified of a Message to process (as their Subscription is updated).

Features of the Messaging Pattern: * The Messaging Pattern is naturally geared for accepting as many messages as physically possible (until the network capacity is exceeded or you run out of storage).

  • Monitoring of all Destinations is provided via JMX. Simply enable Oracle Coherence JMX monitoring to the Messaging tree which shows all Topics, Queues and Subscriptions.

  • Message delivery is guaranteed to occur in-order on a per client session publisher basis.

  • Server-loss (i.e.: cluster member loss) does not effect the availability of the messaging infrastructure to deliver or consume messages in-order. (as it's built using Oracle Coherence distributed caching).

The most significant interface (from an application-level perspective) is the MessagingSession. Implementations of this interface, namely the DefaultMessagingSession, is how you control the messaging infrastructure, including: * Creating Topic and Queue Destinations

  • Subscribing and Unsubscribing to Destinations (creating Subscribers).

  • Publishing Payload to Destinations, and

  • Subscribing and reading (ie: consuming) Payload from Subscriptions.

Example: Using Topics

The following examples outline the use of Topics (one to many messaging), where each published message is delivered to all subscribers.

One To Many Messaging

Step 1: Creating a MessagingSession

Once you've installed and appropriately configured the implementation (see below), the first thing you need to do is create a MessagingSession by calling DefaultMessagingSession.getInstance() method.

MessagingSession messagingSession = DefaultMessagingSession.getInstance();

Message ordering is guaranteed on a per session basis. All messages published using a given session will arrive at the subscriber in the same order, however there is no defined ordering for messages between sessions. Consider this example. A client thread is publishing messages A and B. At the same time, another client thread is publishing C and D to the same topic using a different session. A subscriber might receive the messages in any of the following orders: A,B,C,D or A,C,B,D or C,A,B,D, etc.

Step 2: Creating a Topic

Once you have a MessagingSession you can create a Topic.

Identifier topicIdentifier = messagingSession.createTopic("my-topic");

A Topic is uniquely identified within the cluster by its name. If the Topic with the specified name (or Identifier) already exists, the Identifier of the existing Topic is returned.

Step 3a: Subscribing to a Topic (non-durable subscriptions)

Once you have created a Topic you can subscribe to it by creating a Subscriber. By default, Subscribers to Topics (and Queues) are non-durable. That is, when a Subscriber becomes disconnected from the messaging infrastructure, all of the visible messages for the said Subscriber are appropriately rolled back and the subscription for the Subscriber is removed.

Subscriber subscriber = messagingSession.subscribe(topicIdentifier);

If you don't have the Identifier of the Destination to which you would like to s ubscribe, you can simply use it's name (as a String). For example;

Subscriber subscriber = messagingSession.subscribe("my-topic");
Step 3b: Subscribing to a Topic (durable subscriptions)

To create a durable subscription to a Topic, you need to provide an appropriate TopicSubscriptionConfiguration when subscribing.

Subscriber subscriber = messagingSession.subscribe(
    "my-topic", 
    TopicSubscriptionConfiguration.newDurableConfiguration("my-durable-subscription"));

To resubscribe to a previously created durable subscription, simply call the subscribe method again with the same durable configuration (including the same subscription name)

Step 4: Publishing Messages

To publish payload to a Destination, either Topics or Queues, you need to again use the MessagingSession. It's simple, call publishMessage(...) with the Identifier (or name) of the Destination and the corresponding Payload.

messagingSession.publishMessage(topicIdentifier, "Hello World");

or

messagingSession.publishMessage("my-topic", "Hello World");

The only thing you need to ensure is that the payload being published is in some way serializable, either by implementing standard Java java.io.serializable or Oracle Coherence ExternalizableLite or PortableObject.

Step 5: Consuming a message with a Subscriber

In order to consume a message using a Subscriber you need to use the Subscriber getMessage() method. This method will request, block and wait for a message to be delivered to the Subscriber and then returned to your application.

String message = (String)subscriber.getMessage();

By default newly created Subscribers are in "auto-commit" mode. This means that any message received from a Subscriber will be automatically acknowledged and removed from underlying messaging infrastructure. If however you'd like the opportunity to rollback received messages, simply set the Susbcriber auto commit mode to false. For Example:

subscriber.setAutoCommit(false);

Once you have done this you may make use of the rollback or commit methods on the Subscriber interface to manually control message acknowledgement in the underlying messaging infrastructure.

Like most messaging systems, including JMS, subscribers should only be used by a single thread. If you require concurrent subscriptions to a Destination, simply create more Subscribers.

Example: Using Queues

The following examples outline the use of Queues (many to many messaging), where each message published is delivered to one and only one subscriber.

One To One Messaging

Step 1: Creating a Messaging Session

Once you've installed and appropriately configured the implementation (see below), the first thing you need to do is create a MessagingSession. This simplest way to achieve this is to use the statically defined DefaultMessagingSession.getInstance() method.

MessagingSession messagingSession = DefaultMessagingSession.getInstance();
Step 2: Creating a Queue

Much like creating Topics, to create a Queue you use a MessagingSession.

Identifier queueIdentifier = messagingSession.createQueue("my-queue");

If the Queue with the specified name (or Identifier) already exists, the Identifier of the existing Queue is returned.

Step 3: Subscribing to a Queue

Once you have created a Queue you can subscribe to it by creating a Subscriber. Subscribers to Queues are always non-durable. That is, when a Subscriber becomes disconnected from the messaging infrastructure, all of the visible messages for the said Subscriber are appropriately rolled back and the subscription for the Subscriber is removed.

Subscriber subscriber = messagingSession.subscribe(queueIdentifier);

If you don't have the Identifier of the Destination to which you would like to subscribe, you can simply use it's name (as a String). For example;

Subscriber subscriber = messagingSession.subscribe("my-queue");

Frequently Asked Questions

Why don't you support the Java Messaging Specification (ie: JMS) or feature X of JMS?

While it is theoretically possible for this implementation to be a SPI (Service Provider Implementation) for the Java Messaging Specification (JMS), this implementation has been explicitly designed to support the development of the Event Distribution Pattern and other WAN-based architectures, that of which do not necessarily require JMS.

How can I monitor the infrastructure?

Two ways. Firstly by enabling JMX, you'll find that all Destinations (ie: Topics) are automatically registered into the clustered JMX tree (under Messaging). Secondly, as all of the infrastructure state is represented using Oracle Coherence distributed caches; you may examine, listen to and mutate the appropriately named caches, called: coherence.messagingpattern.destinations, coherence.messagingpattern.messages and coherence.messagingpattern.subscriptions.