Categories
Uncategorized

The simplest stream processing engine –Kafka Streams Profile

Kafka positioning in 0.10.0.0 previous version is distributed, the partition of the log with a backup mechanism, submitted by the service. And kafka before that it did not provide care services for data processing. We calculated the flow process or mainly dependent on the Storm, Spark Streaming, Flink et streaming framework.

Storm, Spark Streaming, Flink stream processing Troika each have their own advantages.

Storm low latency, and occupy a certain position in the market, many companies are still in use.

Spark Spark Streaming advantage of the aid system, active community, but also occupy a certain share.

The stream processing Flink closer in design, and has a convenient API, certainly have a good future development.

But they are inseparable from Kafka’s message transit, so Kafka in version 0.10.0.0 launched its own stream processing framework, Kafka Streams. Kafka positioning also became Apache Kafka® is a distributed streaming platform, distributed stream processing platform.

Real Time Streaming computing

In recent years, the rapid development of real-time streaming computing, mainly due to the value of real-time data and the impact on the data processing system architecture. Calculated real-time streaming data contains unbounded consistent repeatable results in near real time characteristics and the like. a type of data processing engine that is designed with infinite data sets in mind considering data processing engine of the wireless data sets.

1, unlimited data: an increasingly attractive, basically unlimited data sets. These are often called “streaming data.” Unlimited streaming data sets can be called unbounded data, relatively limited amounts of data is bounded data.

2, a data processing unbounded: a persistent data processing mode, data applied to the above unbounded. Batch process data (calculated offline) may be repeatedly run to process data, but there will be the performance bottleneck.

3, low-latency, near real-time results: relative to the calculated offline, offline calculation does not consider the issue of delay.

Solves two problems, stream processing can mention on behalf of a batch system:

1, correctness: With this, it is equivalent to the calculation and batch.

Streaming data need to be able still able to calculate certain time window over time. Spark Streaming solved by thinking the problem of micro-batch, real-time consistency of storage and off-line system, which is in the future, real-time computing systems should meet.

2, tool time reasoning: It allows us to go beyond batch calculations.

Time is critical for good reasoning tool order data processing unbounded different events.

The event time is divided into time and processing time.

There are many concepts related to real-time streaming calculated not elaborated here.

Kafka Streams Introduction

Kafka Streams is considered the easiest way to develop real-time applications. It is a Kafka client API library, written in simple java and scala code streaming can be achieved.

Advantage:

  • Flexible, highly scalable, fault-tolerant

  • Deployed to the container, VM, bare metal, cloud

  • The same applies to small, medium and large with the embodiment

  • Fully integrated with Kafka security

    Written in standard Java and Scala applications

  • Develop on Mac, Linux, Windows

  • Exactly-once semantics

Example:

The New York Times content using real-time storage Apache Kafka and Kafka Streams will be published and distributed to various applications and systems, in order for readers to use.

Pinterest large-scale use Apache Kafka and Kafka Streams to support real-time forecasting system of its advertising budget for infrastructure. Use Kafka Streams, predict more accurately than ever before.

As Europe’s leading online fashion retailer, Zalando use Kafka as an ESB (Enterprise Service Bus) to help us shift from a single service architecture micro-services architecture. Use Kafka event stream processing technology so that our team can achieve near real-time business intelligence.

Rabobank is one of the three major Dutch bank. It’s digital nervous system Business Event Bus is supported by Apache Kafka. It is used by a growing number of financial processes and services, one of which is Rabo Alerts. This service will be sent in real time to customers in the event of financial alerts, and use Kafka Streams building.

LINE Apache Kafka as a central database using our service to communicate with each other. Generate hundreds of millions billion messages a day, for performing various business logic, threat detection, search indexes and data analysis. LINE use Kafka Streams reliable switching and filtering topics, subtopics that consumers can make an effective spending, and because of its complex and simple code base, to maintain ease of maintenance.

Topology

Kafka Streams calculation logic which is defined by one or more topology, wherein the topology is a flow through (edge) and the stream processors (nodes) configured FIG.

There are two special topology processor

    Source Processor: The originating processor is a special type of stream processors, without any upstream processor. It is forwarded by using a recording from these topics and downstream thereof to a processor, for generating a topology of the one or more input streams from Kafka topics.

    The receiver processor to: receive processor is a special type of stream processors, no downstream processor. Kafka it sends to the specified theme from the upstream processor received any record.

In normal processor node, you can send data to the remote system. Thus, the results of treatment can stream back into the external system or Kafka.

Kafka Among these provide the most common data conversion operations, such as map, filter, join aggregations and the like, easy to use.

Of course, on some time window, polymerization disorder treatment. 11 to do again in the future in detail, let’s simple entry-case development.

Getting Started

First, the java version and provide WordCount scala version.

java8+:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
 
import java.util.Arrays;
import java.util.Properties;
 
public class WordCountApplication {
 
    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        KStream textLines = builder.stream("TextLinesTopic");
        KTable wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.>as("counts-store"));
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
 
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
 
}

scala:

import java.util.Properties
import java.util.concurrent.TimeUnit
 
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
object WordCountApplication extends App {
  import Serdes._
 
  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }
 
  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")
 
  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()
 
  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}

If kafka has started, you can skip the first two steps.

1. Download

Download the 2.3.0 version and unzip it. Please note that there are multiple downloadable version of Scala, we chose to use the recommended version (2.12):

> tar -xzf kafka_2.12-2.3.0.tgz
> cd kafka_2.12-2.3.0

2, start

Kafka use ZooKeeper, so if you have not ZooKeeper server, you need to start it.

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

Start Kafka server:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3. Create topic Start producers

We create a theme called input output streams-plaintext-input theme and named streams-wordcount-output of:

> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".


> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".

View:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
 
Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0

4, start WordCount

WordCount the following command to start the demo application:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

The demo application will enter a subject stream-plaintext-input reads, calculation of each read message WordCount execution algorithm, and its continuous output current results are written to the topic streams-wordcount-output. Therefore, in addition to the log entries you will not have any STDOUT output, because the result is written back Kafka.

Now we can start the generator console in a separate terminal, write some input data for this topic:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

And reading the output by using the console relating to the individual user to check the output terminal WordCount demonstration application:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

5, the processing of data

We ended input some data producers.

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka

Outputs:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1

Continue typing:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2

We see that the results of wordcount are output in real time as the data is entered in real time.

6, stop the program

You can now Ctrl-C in order to stop the console user console producers, Wordcount application, Kafka and ZooKeeper proxy server.

What is Kafka?
    Kafka monitoring tools summary
    Kafka Quick Start
    Kafka core of the Consumer
    Kafka’s core Producer

Alternative Flume – Kafka Connect Profile

More real-time calculation, Flink, Kafka and other related technologies Bowen, welcome attention to calculate real-time streaming

Leave a Reply