Fork me on GitHub

The Processing Pattern

Overview

The Processing Pattern provides a simple mechanism to perform distributed grid computing operations with Coherence. It is structured as an extensible cluster-based processing engine, capable of accepting, dispatching and executing a variety of workloads, the types of which may be customized by developers.

Unlike the traditional hub-and-spoke / master-worker / broker-agent, the Processing Pattern avoids the classic "single-point-of-failure", "single-point-of-dispatch", "single-point-of-contention" problems by managing and distributing the core state across a Coherence Data Grid.

Distribution

The Processing Pattern implementation is distributed in the jar file called: coherence-processingpattern-11.0.0.jar.

However as the Processing Pattern has many dependencies, we **strongly recommend** that developers adopt and use tools like Apache Maven/Ivy/Gradle to transitively resolve the said dependencies, instead of attempting to do so manually.

To configure your Apache Maven-based application to use the Processing Pattern, simply add the following declaration to your application pom.xml file <dependencies> element.

<dependency>
    <groupId>com.oracle.coherence.incubator</groupId>
    <artifactId>coherence-processingpattern</artifactId>
    <version>11.0.0</version>
</dependency>

Introduction

The Processing Pattern has four key concepts; Processing Sessions, Submissions, Submission Outcomes and Dispatchers.

  • ProcessingSessions, also commonly called Sessions, provide the ability to submit work for an implementation to process asynchronously. Additionally ProcessingSessions provide mechanisms to disconnect and later reconnect to previously submitted work, thus naturally permitting long running work to be processed and monitored.

  • Submissions encapsulate the information representing the work to process. Typically these are simply serialized objects. There's no contract on how work is represented or interfaces to implement.

  • Upon submitting work, a ProcessingSession will return a SubmissionOutcome, that of which may be used to track and manage the processing of the said work.

  • Once submitted, one or more configured Dispatchers are consulted to route Submissions for appropriate processing. How processing actually occurs is dependent on the type of Dispatcher and possibly the Submission.

Three dispatcher implementations are provided out-of-the-box:

  1. A LoggingDispatcher that simply logs Submissions that have been submitted.

  2. A LocalDispatcher that can execute Submissions that contain Java Runnables and Callables.

  3. A TaskDispatcher that can dispatch and execute Tasks, a more sophisticated form of Callable.

While LocalDispatchers are very useful for processing standard Java asynchronous processes, in most cases TaskDispatchers are the preferred type of dispatcher to use when adopting the Processing Pattern.

The TaskDispatcher implementation leverages the following concepts:

  1. A Task is a Submission that implements the Task interface. It is used to provide information concerning a task, including properties for routing. A specialized form of Task, called a ResumableTask is provided to permit check-pointing (saving) of intermediate state, thus allowing said tasks to be suspended and resumed as needed.

  2. A TaskProcessor is an entity that actually executes Tasks.

    Two types of TaskProcessor are provided. The first called a Grid processor that of which is instantiated on all storage-enabled cluster nodes. The second called a Single processor, that of which is instantiated on a single storage-enabled cluster node with in a cluster.

  3. A TaskDispatchPolicy governs how Tasks are allocated to TaskProcessors.

    Three TaskDispatchPolicies are provided, including Round Robin, Random and Attribute Matching.

The following section outlines the process by which ProcessingSessions may be used to submit and manage Submissions.

Submitting Work

Submitting work is main activity of a ProcessingSession. While there are several overloaded variants of the ProcessingSession.submit method, the result of submitting work is always a SubmissionOutcome instance, that of which is typically used to determine the current SubmissionState together with current processing result(s) of a submission.

As the Processing Pattern requires that all Submissions be uniquely identifiable (simply so that said Submissions may be tracked, managed and results of which may retrieved at some point in the future) submitting work requires a decision to be made concerning an appropriate Identifier to use. In general there are two approaches to identifying work:

  1. Explicitly specify and Identifier for each Submission or

  2. Request that the ProcessingSession automatically generate one for you (based on a UUID).

Regardless of the approach used, the Identifier for a Submission can always be retrieved by calling SubmissionOutcome.getIdentifier() method.

Coherence Common defines a number of Identifier implementations, including those for String, Integer, Long and UUID types, aptly named StringBasedIdentifier, IntegerBasedIdentifier, LongBasedIdentifier and UUIDBasedIdentifier.

Auto Generated Submission Identifiers:

//submit the work (without an identifier specified it will create one)
SubmissionOutcome outcome = processingSession.submit(work, configuration);

//get the identifier created for the submission
Identifier id = outcome.getIdentifier();

Explicitly specified Submission Identifiers:

//create an identifier for the submission
Identifier id = new StringBasedIdentifier("my-work");

//submit the work
SubmissionOutcome outcome = processingSession.submit(id, work, configuration);

//get the identifier specified for the submission
Identifier idSubmission = outcome.getIdentifier();

//it will be the same as the one we specified when we submitted the work
assert id.equals(idSubmission);
Submission Garbage Collection

Once a Submission has been submitted it is scheduled for acceptance (and later processing) using an appropriate Dispatcher. Once accepted by a Dispatcher a SubmissionOutcome is created and returned to the submitter, that of which may request the status of any processing and/or retrieve results from processing.

After a Submission has been processed (either successfully or through to failure), its internal representation together with its associated SubmissionOutcome must be removed from the system to prevent excessive use of memory. The approach used to control when and how this clean up occurs is defined by the configured SubmissionRetentionPolicy.

When the SubmissionRetentionPolicy is set to RemoveOnFinalState, all internal state related to a Submission is automatically removed when processing has finished (either due to completion or failure). That is, clients are not required to clean-up submissions once they are executed. This is the default behavior.

When the SubmissionRetentionPolicy is set to ExplicitRemove, all internal state related to a Submission is held by the system until a corresponding call to ProcessingSession.discardSubmission(Identifier) is made. That is, clients must make every effort to clean-up the submissions explicitly themselves.

Releasing and Acquiring Submission Outcomes

When a client ProcessingSession requires another client ProcessingSession take over ownership of a Submission, the client must "release" the said SubmissionOutcome using the ProcessingSession.releaseSubmission(SubmissionOutcome) method, after which the later client may "acquire" ownership by calling the ProcessingSession.acquireSubmission(...) method.

Releasing a Submission:

Identifier id = outcome.getIdentifier();
processingSession.releaseSubmission(outcome);

Acquiring a Submission:

//acquiring a submission
SubmissionOutcome outcome = processingSession.acquireSubmission(id, ...);

Identifiers used for representing Submissions are always serializable. Hence it's both legal and appropriate to serialize, store and transmit said Identifiers between applications and servers.

Cancelling a Submission

To cancel a submission you simply call the ProcessingSession.cancelSubmission(...) method using the Identifier of the submission to cancel.

//get the submission identifier to cancel
Identifier id = outcome.getIdentifer();

//cancel the submission
processingSession.cancelSubmission(id);

Calling cancelSubmission will mark the Submission as being CANCELLED (if it is not already in a final state). When this occurs, the SubmissionOutcome will notified and any blocking calls (like SubmissionOutcome.get()) will return.

If the Submission has started processing, e.g. if a Runnable has started running, the executing thread will be interrupted. If a Runnable has called wait, sleep or join on the current thread, an InterruptedException will be thrown.

A Runnable / Callable / Task can check whether the Submission has been cancelled by calling Thread.currentThread().interrupted() after which it may subsequently abort processing/perform potential rollback/compensatory actions.

Here is an example of submitting and cancelling a Submission:

ProcessingSession session = new DefaultProcessingSession(StringBasedIdentifier.newInstance("SessionIdentifier"));
SubmissionOutcome outcome = session.submit(new DoNothingRunnable(15000),
                                           new DefaultSubmissionConfiguration(null),
                                           StringBasedIdentifier.newInstance("submissionId"),
                                           SubmissionRetentionPolicy.ExplicitRemove,
                                           null);

System.out.println("Wait a second before CANCELLING submission");
Thread.currentThread().sleep(1000);

session.cancelSubmission(outcome.getIdentifier());
outcome.get(); //Waits for a final state for the Submission.

Dispatchers and Submissions

A central concept of the Processing Pattern is the concept of Dispatchers. In fact, for the Processing Pattern to operate at all, one or more Dispatchers must be configured.

When a Submission is submitted to the Processing Pattern, it is first offered to each of the configured Dispatchers, in the order in which they were defined or registered.

Each Dispatcher interogates the Submissions they are offered to determine if they are capable of handling the said Submissions. When a Dispatcher chooses to accept a Submission, a SubmissionOutcome is returned to the calling client.

After this the accepting Dispatcher will attempt to find a way to process the Submission. This may involved executing the Submission or in some cases, like the TaskDispatcher locating a suitable TaskProcessor to process the Submission.

Should a Dispatcher choose to reject (not accept) a Submission, the said Submission will be provided to the remaining Dispatchers in the configured list of Dispatchers.

The following diagram illustrates the typical operational flow of an application using the Processing Pattern.

Processing Pattern Operational Flow

Task Dispatching

As mentioned previously, TaskDispatchers are special in that they themselves don't execute Tasks. Instead they use the configured TaskDispatchPolicy to determine a suitable TaskProcessor, that of which will actually execute submitted Tasks.

Typically a TaskDispatchPolicy simply compares the attributes supplied as part of a SubmissionConfiguration with those that are defined by each TaskProcessor. This activity is typically called Task Mediation.

The following diagram illustrates the typical operational flow of an application using Tasks with the Processing Pattern.

Task Dispatching Flow

Example

The following demonstrates a simple example of the Processing Pattern.

package com.oracle.coherence.examples.incubator.processing.task;

import java.util.concurrent.ExecutionException;

import com.oracle.coherence.common.identifiers.StringBasedIdentifier;
import com.oracle.coherence.patterns.processing.ProcessingSession;
import com.oracle.coherence.patterns.processing.ProcessingSessionFactory;
import com.oracle.coherence.patterns.processing.SubmissionOutcome;
import com.oracle.coherence.patterns.processing.internal.DefaultSubmissionConfiguration;
import com.tangosol.net.CacheFactory;

public class PiCalculationSample
{

    public void executeTask() throws Exception
    {
        System.out.println("Testing PI calculation");
        ProcessingSession session = new DefaultProcessingSession(StringBasedIdentifier
            .newInstance("PICalculationSample " + DateFormat.getDateTimeInstance().format(System.currentTimeMillis())));

        SubmissionOutcome pioutcome = session.submit(new CalculatePITask(100000), new DefaultSubmissionConfiguration());

        System.out.println("PI (100000) iterations is :"+ pioutcome.get().toString());

        factory.shutdown();
        CacheFactory.shutdown();
    }

    public static void main(String[] args)
    {
        PiCalculationSample sample = new PiCalculationSample();
        try
        {
            sample.executeTask();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}