Skip to content
/ executor Public

Java Key Sequential Executor

License

Notifications You must be signed in to change notification settings

jano7/executor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Key Sequential Executor

This small library provides an optimized solution to a problem where tasks for a particular key need to be processed sequentially as they arrive. This kind of problem can be solved by a SingleThreadExecutor; however, it is not efficient. The issue is that the tasks for unrelated keys are not being processed in parallel, instead they are put into a queue common to all keys and wait for the single thread to execute them. This library allows them to be executed concurrently. Moreover this library works well in a situation where all the possible keys and their number is not known upfront.

Usage

A typical scenario in order management or booking systems is that messages for a particular trade A must be processed sequentially in the same order as they are received (otherwise the state of the trade will be incorrect). The same is true for any other trade - for example messages for the trade B must be processed sequentially as well. However, it is desirable that a message for the trade A does not block processing of a message for the trade B (and vice versa) if they happen to arrive at the same time.

ExecutorService underlyingExecutor = Executors.newFixedThreadPool(10);
KeySequentialRunner<String> runner = new KeySequentialRunner<>(underlyingExecutor);

String tradeIdA = "327";
String tradeIdB = "831";
// more Trade IDs can arrive in a real scenario, but it is usually not known how many upfront

Runnable task = new Runnable() {
    @Override
    public void run() {
        // process a message for the trade
    }
};

runner.run(tradeIdA, task); // execute the task by the underlying executor

runner.run(tradeIdB, task); // execution is not blocked by the task for tradeIdA

runner.run(tradeIdA, task); // execution starts when the previous task for tradeIdA completes

In the example above the key is a Trade ID. Tasks for a particular Trade ID are executed sequentially, but they do not block tasks for other Trade IDs (unless the tasks are blocked by the underlying executor).

Please note the Key needs to correctly implement hashCode and equals methods as the implementation stores the tasks in a HashMap.

If you require an Executor you can use KeySequentialExecutor instead of KeySequentialRunner which accepts Runnable delegating its hashCode and equals methods to the key.

Executor executor = new KeySequentialExecutor(underlyingExecutor);

// KeyRunnable is a helper class delegating 'hashCode' and 'equals' to the key
Runnable runnable = new KeyRunnable<>(tradeIdA, task);

executor.execute(runnable);

underlyingExecutor.shutdown();

// at this point, tasks for new keys will be rejected
// however, tasks for keys being currently executed may still be accepted (and executed)

underlyingExecutor.awaitTermination(timeout, TimeUnit.SECONDS);

// if the executor terminates before a timeout, then it is guaranteed that all accepted
// tasks have been executed

The KeySequentialExecutor and KeySequentialRunner do not support back-pressure. It means that execute and run methods never block, instead the submitted tasks are put into a queue where they wait until executed by the underlying executor. In many cases this is not a problem, but in some situations it may cause an application to run out of memory as the number of waiting tasks grows. If you want to restrict the number of queued tasks, consider use of a KeySequentialBoundedExecutor which can be configured to block the task submission when the number of tasks, which haven't been executed yet, reaches the limit.

ExecutorService underlyingExecutor = Executors.newCachedThreadPool();
int maxTasks = 10;
KeySequentialBoundedExecutor boundedExecutor =
        new KeySequentialBoundedExecutor(maxTasks, BoundedStrategy.BLOCK, underlyingExecutor);

KeyRunnable<String> task = new KeyRunnable<>("my key", () -> {
    // do something
});

boundedExecutor.execute(task);
// execute more tasks ... at most 10 will be scheduled

// before shutting down you can call a 'drain' method
// which blocks until all submitted task have been executed

// returns true if drained; false if the timeout elapses
boundedExecutor.drain(timeout, TimeUnit.SECONDS);

// newly submitted tasks will be rejected after calling 'drain'

// safe to call 'shutdownNow' if drained as there should be no active tasks
underlyingExecutor.shutdownNow();

The source code of the examples can be found here.

A note on thread-safety: The library is thread-safe; i.e. methods run, execute or drain can be safely invoked from multiple threads without synchronization.

Maven Dependency

<dependency>
  <groupId>com.jano7</groupId>
  <artifactId>executor</artifactId>
  <version>2.0.2</version>
</dependency>