Categories
Uncategorized

Disruptor- core concepts and experience

本文基于最新的3.4.2的版本文档进行翻译,翻译自:
https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

Disruptor Profile

The best way to understand Disruptor and it is readily understood and like queues, e.g. BlockingQueue. Disruptor is just like a queue like to migrate data between different threads, but also achieved a number of other Disruptor no queue features, such as:

    With an “event” may have a plurality of consumers, may be processed in parallel between the consumer, it may be formed in the order of interdependent processes (forming a dependence graph);

    Pre-allocated memory used to store the content of the event space;

    Optimization and extreme lock designed for high performance goals achieved;

Disruptor core architecture components

    Ring Buffer: Ring Buffer is considered in the previous version 3.0 is a core component of Disruptor, but in later versions only responsible for storing and updating data. In some cases, the use of advanced users can customize

    Sequence: Disruptor using a set of Sequence as a means to identify the particular progress of component (RingBuffer / Consumer). Each consumer and Disruptor itself will maintain a Sequence. While a AtomicLong can also be used to identify the progress, but the definition Sequence responsible for the problem has another purpose, which is to prevent the CPU cache false sharing (Flase Sharing) between different Sequence problem.

    Sequencer: Sequencer is the real core of the Disruptor. This interface has two implementation classes SingleProducerSequencer, MultiProducerSequencer, they define quickly and accurately transfer data between concurrent algorithms producers and consumers.

    Sequence Barrier: Sequencer and Consumer holding references other dependent Sequence of Consumer. In addition also defines the logic to decide whether Consumer also can handle the event.

    Wait Strategy: Wait Strategy determines how a consumer will wait for the producer event (Event) into the Disruptor.

    Event: Event is called from producer to consumer data transfer. It is not a specific type defined Disruptor, but user-defined and specified by the Disruptor.

    EventProcessor: hold a particular consumer Sequence, and has a main event loop (main event loop) event handler for the Disruptor. BatchEventProcessor which is a specific implementation, to achieve the event loop (event loop), and will call back to achieve the instances used in the EventHandler.

    EventHandler: user-implemented by the interface for handling events, is truly the Consumer

    Producer: Producer, just call Disruptor release refers to an event the user code, Disruptor not define a specific interface or type.

Event Broadcast (Multicast Events)

This queue is Disruptor and the biggest difference. When you have multiple consumers monitor a Disruptor, all events will be posted to all consumers, an event queue by contrast can only be sent to a consumer. Disruptor This characteristic is required for the case of multiple concurrent operations on the same data. The three operations can be performed simultaneously LMAX system: log (the log data to the persistent file), copy (to send data to other machines, in order to ensure that there is a remote copy of the data), the service logic processing. You can also use parallel processing WokrerPool to different events.

Consumer Dependency Graph

In order to support real-world business processes in parallel processing, Disruptor provided assistance function between multiple consumers. LMAX back above example, we can make a copy and then log processing and remote execution of business processes after the first complete implementation of the assignment, this function is called gating. gating occurs in two scenarios. First, we need to ensure that producers do not exceed the consumer. The associated increase in consumer by calling RingBuffer.addGatingConsumers () to Disruptor to complete. Second, is the scene we said before, to achieve by constructing contain SequenceBarrier Sequence needs of consumers must be completed.

To quote the example above, there are three consumers listening for events from RingBuffer. Here is a dependency graph. ApplicationConsumer relies on JournalConsumer and ReplicationConsumer. This means that JournalConsumer and ReplicationConsumer are free to run in parallel. The dependency can be seen as a connection from the SequenceBarrier of ApplicationConsumer to the Sequences of JournalConsumer and ReplicationConsumer. Another point worth paying attention is the relationship between Sequencer and downstream consumers. Its role is to ensure that the release does not wrap RingBuffer. For this reason, all downstream consumers cannot have a Sequence smaller than the Sequence of the ring buffer and cannot be smaller than the size of the ring buffer. Because the Sequencer for ApplicationConsumers is sure to be smaller or equal than the Sequences for JournalConsumer and ReplicationConsumer, Sequencer only needs to check the Sequences for ApplicationConsumers. In more common scenarios, Sequencer only needs to be aware of the sequence of the leaf nodes in the consumer tree.

Event Preallocation

One of the goals of Disruptor is to be used in low-latency environments. In a low-latency system it is necessary to reduce and reduce memory usage. In Java-based systems, there is a need to reduce the number of stops due to GC (in low-latency C/C++ systems, large amounts of memory allocations can also cause problems due to contention for memory allocators).

In order to meet this point, the user can pre-allocate memory for the event in the Disruptor. So EventFactory is to provide the user, and Disruptor the Ring Buffer in each entry will be called. When the new data will be released to the Disruptor, Disruptor the API will allow users to hold objects constructed so that the user can call methods and fields of these objects to update these objects. Disruptor will ensure that these operations are thread-safe.

Optional lock-free

All memory visibility and correctness of the lock-free algorithms are implemented Disruptor use memory barriers and implement CAS operations. Only a BlockingWaitStrategy only used to lock the scene. And this is only for use Condition, so that consumers can park live when the thread is waiting for a new event comes. Many low-latency systems use spin (busy-wait) to avoid jitter caused Condition. But the number of spins (busy-wait) change for a long time will lead to a decline in performance, especially in the case of a severely limited CPU resources. For example, in the virtual environment of the Web server.

Disruptor use

Let’s use a simple example to experience Disruptor. The producer will pass a long value to the consumer, and the consumer will print the value after accepting the value.

Defining an Event

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }
}

In order to use Disruptor’s memory preallocation event, we need to define an EventFactory:

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

In order for consumers to handle these events, we define an event handler that is responsible for printing events:

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler
{
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("Event: " + event);
    }
}

Use Translators publishing events

In version 3.0 Disruptor, since adding a wealth of Lambda-style API, developers can use to help the group simplify the process. So after version 3.0 is preferred to use Event Publisher / Event Translator to publish events.

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;

public class LongEventProducerWithTranslator
{
    private final RingBuffer ringBuffer;
    
    public LongEventProducerWithTranslator(RingBuffer ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }
    
    private static final EventTranslatorOneArg TRANSLATOR =
        new EventTranslatorOneArg()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void onData(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}

Another advantage of this method is that the code can be placed in a separate translator class, and they are easily independent testing unit.

Using outdated API release event

import com.lmax.disruptor.RingBuffer;

public class LongEventProducer
{
    private final RingBuffer ringBuffer;

    public LongEventProducer(RingBuffer ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

Here we need to publish parcel try / finally block in. If a request has not been submitted to the sequence could block the release of the follow-up operations or other producer. If you do not submit Sequence especially in multi-production, it will cause consumers to stagnate, leading to only restart consumers to recover.

Integrate


import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.onData(bb);
            Thread.sleep(1000);
        }
    }
}

We can also write this example uses the Java programming function 8:

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

Using functional programming we can find that many classes are not needed, such as handler, translator, etc.
The above code can also be simplified a bit more:

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
    bb.putLong(0, l);
    ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
    Thread.sleep(1000);
}

But this will instantiate an object to hold ByteBuffer bb variable passed in the value of lambda. This creates unnecessary garbage. Furthermore, if the pressure is low GC should be preferred to pass parameters to the calling of lambda.

Two parameters to enhance performance

If you want to make Disruptor better, there are two options to adjust, wait and producer type.

Single Producer vs Multiple Producer

The best way to improve performance in a concurrent environment is to uphold the principle of a single write (Single Writer Principle). If your business scenario that only one thread writes data to the Disruptor, then you can set a single producer to improve performance:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        //.....
        // Construct the Disruptor with a SingleProducerSequencer
        Disruptor disruptor = new Disruptor(
            factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
        //.....
    }
}

性能测试:
Multiple Producer

Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec

Single Producer

Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec

Holding Policy

BlockingWaitStrategy
The default policy for Disruptor is BlockingWaitStrategy. Inside BlockingWaitStrategy is the use of locks and condition to control the wake-up of threads. BlockingWaitStrategy is the least efficient strategy, but it uses minimal CPU consumption and provides more consistent performance across different deployment environments

SleepingWaitStrategy
SleepingWaitStrategy performs almost the same performance as BlockingWaitStrategy, with similar CPU consumption, but it has minimal impact on producer threads, with looping waiting by using Locksupport.Parknanos (1). In general, Linux systems pause a thread about 60µs, and the benefit of this is that the production thread does not need to take any other action to increase the appropriate counter, nor does it take time to signal condition variables. However, the average delay of moving events between producer and consumer threads is higher. It is best in situations where low latency is not required and has less impact on production threads. A common use case is asynchronous logging.

YieldingWaitStrategy
YieldingWaitStrategy is one of the strategies that can be used in low-latency systems. YieldingWaitStrategy spins to wait for the sequence to be added to the appropriate value. Inside the loop body, Thread.yield () is called to allow other queued threads to run. This strategy is recommended in scenarios where high performance is required and the number of event processing lines is less than the number of CPU logical cores; for example, the CPU is opening hyperthreading.

BusySpinWaitStrategy
    The best performance, is suitable for low-latency system. Requiring very high performance and event processing thread is less than the number of CPU core logic tree scene, it is recommended to use this strategy; for example, the opening behavior hyperthreading CPU.

Clear Ring Buffer objects in

When passing data through the Disruptor, object lifetime may be longer than expected. To avoid this situation, you may need to clear the event after event handling. If only one event handler, you need to clear the corresponding objects in the processor. If you have a series of event handlers, you may need to place a specific handler at the end of the chain to handle remove the object.

class ObjectEvent
{
    T val;

    void clear()
    {
        val = null;
    }
}

public class ClearingEventHandler implements EventHandler>
{
    public void onEvent(ObjectEvent event, long sequence, boolean endOfBatch)
    {
        // Failing to call clear here will result in the 
        // object associated with the event to live until
        // it is overwritten once the ring buffer has wrapped
        // around to the beginning.
        event.clear(); 
    }
}

public static void main(String[] args)
{
    Disruptor> disruptor = new Disruptor<>(
        () -> ObjectEvent(), bufferSize, DaemonThreadFactory.INSTANCE);

    disruptor
        .handleEventsWith(new ProcessingEventHandler())
        .then(new ClearingObjectHandler());
}

Leave a Reply