原创

流处理和批处理讲解、主流框架对比、流批一体架构

温馨提示:
本文最后更新于 2023年04月23日,已超过 341 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

什么是流处理和批处理?

  • 流处理:对数据进行实时处理的方式,数据会以流的形式不断地产生和处理。

    流处理可以快速响应数据的变化,及时地进行数据处理和分析,适用于需要实时处理数据的场景。

    例如:实时数仓、实时监控、实时推荐等等。

    • 优点:
      1. 实时性:数据在产生的时候就立即被处理,能及时反馈结果。
      2. 高效性:不间断接受新数据并进行处理,因此可以更加高效利用硬件资源。
    • 缺点:
      1. 数据突发性:因为流式数据具有不可预测性,可能会突然出现突发的高峰,会导致系统压力急剧增加。
      2. 处理复杂度高:实时处理可能需要更高的处理能力和更复杂的算法。
  • 批处理:对数据进行离线处理的方式,数据会按照一定的时间间隔或者数据量进行批量处理。

    批处理可以对大量数据进行高效处理和分析,适用于需要对历史数据进行分析和挖掘的场景。

    例如:离线数仓、批量报表、离线推荐等等。

    • 优点:
      1. 处理复杂度低:通常不需要考虑数据的顺序、时间窗口等因素。
      2. 容错性高:数据多批次集中处理,通常一条数据的失败不会影响后续数据的处理,也可以采用多种容错机制来确保任务正确完成。
    • 缺点:
      1. 响应速度慢:由于批处理是周期性执行,不能及时响应数据变化。
      2. 处理结果滞后:由于批处理是周期性执行,在某些场景下可能会出现数据结果滞后的情况。

流处理和批处理都是常用的数据处理方式,它们各有优劣。流处理通常用于需要实时响应的场景,如在线监控和警报系统等。而批处理则通常用于离线数据分析和挖掘等大规模数据处理场景。选择合适的处理方式取决于具体的业务需求和数据处理场景。

什么是流批一体架构?

以前很多系统的架构都是采用的Lambda架构,它将所有的数据分成了三个层次:批处理层、服务层和速率层,每个层次都有自己的功能和目的。

  • 批处理层:负责离线计算和历史数据的存储。
  • 服务层:负责在线查询和实时数据的处理。
  • 速率层:负责对实时数据进行快速的处理和查询。

这种架构,需要一套流处理平台和一套批处理平台,这就可能导致了一些问题:

  1. 资源浪费:一般来说,白天是流计算的高峰期,此时需要更多的计算资源,相对来说,批计算就没有严格的限制,可以选择凌晨或者白天任意时刻,但是,流计算和批计算的资源无法进行混合调度,无法对资源进行错峰使用,这就会导致资源的浪费。
  2. 成本高:流计算和批计算使用的是不同的技术,意味着需要维护两套代码,不论是学习成本还是维护成本都会更高。
  3. 数据一致性:两套平台都是不一样的,可能会导致数据不一致的问题。

因此,流批一体诞生了!

流批一体的技术理念最早是2015年提出的,初衷就是让开发能用同一套代码和API实现流计算和批计算,但是那时候实际落地的就少之又少,阿里巴巴在2020年双十一首次实际落地。

Flink流批一体架构

有哪些流处理的框架?

Kafka Stream

基于 Kafka 的一个轻量级流式计算框架,我们可以使用它从一个或多个输入流中读取数据,对数据进行转换和处理,然后将结果写入一个或多个输出流中。

工作原理:读取数据流 -> 数据转换/时间窗口处理/状态管理 -> 任务调度 -> 输出结果

简单示例:统计20秒内每个input的key输入的次数,典型的例子:统计网站20秒内用户的点击次数。

public class WindowCountApplication {

    private static final String STREAM_INPUT_TOPIC = "streams-window-input";
    private static final String STREAM_OUTPUT_TOPIC = "streams-window-output";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(APPLICATION_ID_CONFIG, WindowCountApplication.class.getSimpleName());
        props.put(BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVERS);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(STREAM_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
                .peek((key, value) -> Console.log("[input] key={}, value={}", key, value))
                .groupByKey()
                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(20)))
                .count()
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), value))
                .peek((key, value) -> Console.log("[output] key={}, value={}", key, value))
                .to(STREAM_OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams kStreams = new KafkaStreams(builder.build(), props);
        Runtime.getRuntime().addShutdownHook(new Thread(kStreams::close));
        kStreams.start();
    }
}

运行结果{key}={value},发送了3次A=1,2次B=1,以及1次C=1,统计结果在预期之内,即A出现3次,B出现2次,C出现1次。

Pulsar Function

和 Kafka Stream 类似,也是轻量级的流处理框架,不过它是基于 Pulsar 实现的一个流处理框架,同样的,也是从一个或多个输入流中读取数据,对数据进行转换和处理,然后将结果写入一个或多个输出流中。感兴趣的可以参考我之前写的文章:Pulsar Function简介以及使用

工作原理:订阅消息流 -> 处理消息 -> 发布处理结果

简单示例:LocalRunner模式,按照逗号“,”去切分 input topic 的消息,然后转换成数字进行求和,结果发送至 output topic。

public class IntSumFunction implements Function<String, Integer> {

    public static final String BROKER_SERVICE_URL = "pulsar://localhost:6650";
    public static final String INPUT_TOPIC = "persistent://public/default/int-sum-input";
    public static final String OUTPUT_TOPIC = "persistent://public/default/int-sum-output";
    public static final String LOG_TOPIC = "persistent://public/default/int-sum-log";

    @Override
    public Integer process(String input, Context context) {
        Console.log("input: {}", input);
        return Arrays.stream(input.split(","))
                .map(Integer::parseInt)
                .mapToInt(Integer::intValue)
                .sum();
    }

    public static void main(String[] args) throws Exception {
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setName(IntSumFunction.class.getSimpleName());
        functionConfig.setClassName(IntSumFunction.class.getName());
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setInputs(Collections.singleton(INPUT_TOPIC));
        functionConfig.setOutput(OUTPUT_TOPIC);
        functionConfig.setLogTopic(LOG_TOPIC);

        LocalRunner localRunner = LocalRunner.builder()
                .brokerServiceUrl(BROKER_SERVICE_URL)
                .functionConfig(functionConfig)
                .build();
        localRunner.start(true);
    }
}

运行结果:1+2+3+4+5+6=21

  • 一种流处理框架,具有低延迟、高吞吐量和高可靠性的特性。
  • 支持流处理和批处理,并支持基于事件时间和处理时间的窗口操作、状态管理、容错机制等。
  • 提供了丰富的算子库和 API,支持复杂的数据流处理操作。

工作原理:接收数据流 -> 数据转换 -> 数据处理 -> 状态管理 -> 容错处理 -> 输出结果

简单来说就是将数据流分成多个分区,在多个任务中并行处理,同时维护状态信息,实现高吞吐量、低延迟的流处理。

简单示例:从9966端口读取数据,将输入的句子用空格分割成多个单词,每隔5秒做一次单词统计。

public class WindowSocketWordCount {

    private static final String REGEX = " ";

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketTextStreamSource = env.socketTextStream("localhost", 9966);

        SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = socketTextStreamSource
                .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (sentence, collector) -> {
                    for (String word : sentence.split(REGEX)) {
                        collector.collect(new Tuple2<>(word, 1));
                    }
                })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        streamOperator.print();
        env.execute();
    }
}

运行结果

Storm

  • 一个开源的流处理引擎,旨在实现快速、可靠的数据流处理。
  • 是业界最早出现的一个流处理框架(2011年),但是现在已经有许多其它优秀的流处理框架了,所以它在现在并不是唯一选择。

工作原理:将数据流分成多个小的流(也称为tuple),并将这些小流通过一系列的操作(也称为bolt)进行处理。

简单示例:在本地模式,使用Storm内置的RandomSentenceSpout充当数据源进行测试,用空格拆分生成的句子为多个单词,统计每个单词出现次数。

public class WindowedWordCountApplication {

    public static void main(String[] args) throws Exception {
        StreamBuilder builder = new StreamBuilder();
        builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
                .window(TumblingWindows.of(Duration.seconds(2)))
                .flatMap(sentence -> Arrays.asList(sentence.split(" ")))
                .peek(sentence -> Console.log("Random sentence: {}", sentence))
                .mapToPair(word -> Pair.of(word, 1))
                .countByKey()
                .peek(pair -> Console.log("Count word: ", pair.toString()));

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("windowedWordCount", new Config(), builder.build());
        Utils.sleep(20000);
        cluster.shutdown();
    }
}

内置的RandomSentenceSpout随机生成数据关键源代码:

@Override
public void nextTuple() {
    Utils.sleep(100);
    String[] sentences = new String[]{
        sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
        sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")
    };
    final String sentence = sentences[rand.nextInt(sentences.length)];
    LOG.debug("Emitting tuple: {}", sentence);
    collector.emit(new Values(sentence));
}

运行结果:随机找一个单词“nature”,统计的次数为10次。

Spark Streaming

基于 Spark API 的扩展,支持对实时数据流进行可扩展、高吞吐量、容错的流处理。

工作原理:接收实时输入数据流并将数据分成批次,然后由 Spark 引擎处理以批次生成最终结果流。

简单示例:从 kafka 的 spark-streaming topic 读取数据,按照空格“ ”拆分,统计每一个单词出现的次数并打印。

public class JavaDirectKafkaWordCount {

    private static final String KAFKA_BROKERS = "localhost:9092";
    private static final String KAFKA_GROUP_ID = "spark-consumer-group";
    private static final String KAFKA_TOPICS = "spark-streaming";
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        Configurator.setRootLevel(Level.WARN);
        SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("spark-streaming-word-count");
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(2));

        Set<String> topicsSet = new HashSet<>(Arrays.asList(KAFKA_TOPICS.split(",")));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP_ID);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
                streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

        JavaDStream<String> linesStream = messages.map(ConsumerRecord::value);
        JavaPairDStream<String, Integer> wordCountStream = linesStream
                .flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
                .mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey(Integer::sum);

        wordCountStream.print();

        streamingContext.start();
        streamingContext.awaitTermination();
    }
}

运行结果

如何选择流处理框架?

  • 简单数据流处理

    如果只是轻量级使用的话,可以结合技术栈使用消息中间件自带的流处理框架就更节省成本。

    • 使用的 Kafka 就用 Kafka Stream。

    • 使用的 Pulsar 就用 Pulsar Function。

  • 复杂数据流场景

    Flink Spark Streaming Storm
    容错性 基于CheckPoint机制 WAL及RDD机制 Records ACK
    延迟性 亚秒级 秒级 亚秒级
    吞吐量 非常高 中等
    一致性 Excatly-Once Excatly-Once Excatly-Once
    状态支持 ×
    流批一体 ×
    窗口支持
    机器学习 ×
    SQL查询 ×
    图计算 ×
    社区活跃度 中等

综上,可以结合数据规模、技术栈、处理延迟功能特性、未来的考虑、社区活跃度、成本和可用性等等进行选择。

参考文章:

本文目录