Categories
Uncategorized

Reactive (2) responsive to the flow of traffic Milk Factory

table of Contents

  • 再谈响应式

      Why Web back-end development, there is no feeling of Reactive

  • Java 9 support Reactive Stream

    example

    summary

    Further reading

Talk response of formula

An article from Reactive programmed to “Hollywood” in the front, talked about some type of response concept, talking about some divergence. But still only remain in the conceptual level, for what did not involve actual combat.
    So after you read, or perhaps some superficial.

Reactive programming emphasis is asynchronous technology, for the treatment stream, both of which give birth is not out of thin air, but from a large number of technical practice, summarizes the concept out of it such as:

  • We talked Asynchronized, easy to think of Java asynchronous IO (Asynchronized IO), and used to do the comparison and BIO, NIO concepts. As everyone knows, Swing framework long ago appeared (Java UI) has been thinking of the asynchronous playing very slip, do not believe you can look at the implementation of its internal Observer Mode (observer) is.

  • We are talking about streaming, is easy to think Flink framework popular nowadays. But almost all the big data analytics, batch applications are handled based streaming, such as ETL, even the simplest of Map Reduce jobs.

Why Web back-end development, there is no feeling of Reactive

In addition to front-end applications Reactive concept in the field of big data is actually very extensive. But for most of the back-end Web developers do people who probably is not high degree of popularity to the author’s own experience is that code these many years, in addition to doing the code stratification outside, does not seem to see the play Reactive place a major role. The reason is that, in the field of the back-end Web development is basically relying on HTTP protocol mechanism to achieve, this is a fairly simple request -> response interactive mode, the client after sending a request, would have been waiting for the results to return, which is the result of notification initiative is acquired by the client rather than asynchronous notification, and therefore not Reactive style. But this is in line with the consistent use of the user, what changes most cases do not need, at this time we are not responsive to the profound perception.

More in line with Reactive Another scenario is a rich client (Rich Application), assuming a scenario requires a lot of complex front-end interactive, we can choose to put some logic implemented in front-end code. At this Web interaction is no longer to refresh the entire page, but turned into “real-time” two-way messaging client and server, these applications are more common, such as those based chat application WebSocket implementation, games and so on.

Plain Judging from the trend, Reactive outlook is still very uncertain, not to say that here because now most popular programming language has its shadow (such as providing Rx-style framework).
    But the next big data processing, real-time stream computing will become the mainstream, which is determined by the environment. Reactive And then this “stream-oriented” programming mode is undoubtedly a very suitable.

Java 9 support Reactive Stream

Java platform JDK 9 until it provides full support for Reactive, and in JDK versions prior to this, but also, and there are some API has relevance, such as:

    Future and CompletableFuture interface for implementing asynchronous computation. The latter than the former is to improve the results of asynchronous notification, and other characteristics of a serial task.

    Stream interfaces, can be converted into a conventional set to be “flow” manner, such iterations, mapping transformation.

Complete Reactive these associations is not the API, Java 9 supported Reactive Stream API from 2013. responsive flow specification (Reactive Stream Specification).

https://www.reactive-streams.org/

Based on this specification defines the following several main interfaces:

Java-responsive flow unified interface defined in the interfaces java.util.concurrent.Flow

  • Publisher
                Ie, the issuer of the data. Publisher subscribe interface defines a method for adding subscribers:

  • Subscriber
                It refers to the subscriber data. Subscriber interface defines four methods for responding to different events.

First, after the subscribe method call is successful, Subscriber’s onSubscribe (Subscription s) method will be triggered (Subscription represents the current subscription relationship).
    After that, normal can continue to call the Subscription request (long n) method to request data to the publisher, n is the maximum number of data entries.

Publisher will have three different message corresponding to three Subscriber callback method:

Message data: corresponding to onNext method, data indicating the publisher generated.
    Error message: onError method corresponds to showing the publisher produced an error.
    End of the message: the corresponding onComplete method, indicate the publisher has completed the release of all data.

In the above three acknowledgment, error messages indicate the current end of the stream has reached its end, no message will be generated later.

  • Subscription
                Subscription is represented by a subscription relationship. May request data (request method) by which the object or unsubscribe (Cancel method).

  • Processor
                A special objects Processor represented both a producer, but also a subscriber.

Negative pressure support

The negative pressure is an important capability in response to the flow of formula as defined in the above interface, substantially negative pressure support have been provided.
    Publisher only after receipt of the request, will produce data. This ensures that the Subscriber according to their capacity, Publisher determine the amount of data to be requested in order to ensure their own will not be washed away.

example

Now, with a simple code examples to demonstrate how Reactive Stream API is used.

Milk with a certain plant, for example, in order to increase revenue, the plant launched a factory direct sales business. Customers can order dairy certain number of days directly to the factory, every day is a factory service personnel milkman door.
    To simulate this scenario, we realize the code is as follows:

    Milk factory, a realization Publisher:

public class MilkFactory extends SubmissionPublisher {

    private final ScheduledFuture periodicTask;
    private final ScheduledExecutorService scheduler;

    private static final List milks = Arrays.asList("益力多", "酸牛奶", "原味奶", "低脂蛋奶", "羊奶", "甜牛奶");

    public MilkFactory() {
        super();
        //初始化定时器
        scheduler = new ScheduledThreadPoolExecutor(1);

        //每一天生产完牛奶并推送给消费者
        periodicTask = scheduler.scheduleAtFixedRate(
                () -> submit(produceMilk()), 0, 1, TimeUnit.SECONDS);
    }

    //随机生产牛奶
    private String produceMilk() {
        return milks.get((int) (Math.random() * milks.size()));
    }

    //关闭流
    public void close() {
        periodicTask.cancel(false);
        scheduler.shutdown();
        super.close();
    }
}

MilkFactory integrated self SubmissionPublisher (providing a buffer realization Publisher), its internal starts a timer for the user to simulate daily milk production release.
    Data may be pushed to the user through a submit () method.

    Customers achieve a Subscriber:

public class MilkCustomer implements Flow.Subscriber {
    private Flow.Subscription subscription;
    private AtomicInteger available = new AtomicInteger(0);
    private int dayCount;

    public MilkCustomer(int dayCount) {
         this.dayCount = dayCount;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        //设置总量
        available.set(dayCount);

        //第一天
        subscription.request(1);
    }

    @Override
    public void onNext(String milk) {
        System.out.println("今天的牛奶到了: " + milk);

        //如果还有存量,继续请求
        if(available.decrementAndGet() > 0){
            subscription.request(1);
        }else{
            System.out.println("牛奶套餐已经派完,欢迎继续订购");
            this.subscription.cancel();
        }
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("closed.");
    }
}

MilkCustomer to accept a dayCount to the Senate, it means that the quantity ordered, when first subscription requests on the first day of milk, then the milk each time it receives the next-to-day after the request until the completion of the total consumption.

    test program

Execute the following code:

MilkFactory factory = new MilkFactory();

//订阅1周
MilkCustomer customer = new MilkCustomer(7);

factory.subscribe(customer);

Output:

今天的牛奶到了: 酸牛奶
今天的牛奶到了: 羊奶
今天的牛奶到了: 原味奶
牛奶套餐已经派完,欢迎继续订购

summary

In the above example, we use Reactive Stream API Java implementation provided a “milkman door” traffic flow.
    The whole process is relatively simple, the key lies in understanding the local convection process, and subscription relationship. However, the current Reactive achieve has not been fully unified, such as Spring WebFlux (SpringBoot 2 support) is still based on Reactor private API instead of Reactive Stream API to build, followed by the opportunity to do the next.

Further reading

关于Future和CompletableFuture的区别
https://juejin.im/post/5adbf8226fb9a07aac240a67

Leave a Reply