原创

Pulsar Function简介以及使用

概览

pulsar-functions-overview.png

  • input topic: 数据来源
  • output topic:数据存入的topic
  • log topic:日志消息

Pulsar Functions 是轻量级计算流程,具有以下特点:

  • 从一个或多个 Pulsar topic 中消费消息;
  • 将用户提供的处理逻辑应用于每条消息;
  • 将运行结果发布到另一个 topic。

Pulsar Functions背后的核心目标是使您能够轻松创建各种级别的复杂的的处理逻辑,而无需部署单独的类似系统(例如 Apache Storm, Apache Heron, Apache Flink, 等等)

Pulsar支持的三种语言环境

pulsar-common模块下面org.apache.pulsar.common.functions.FunctionConfig类:

pulsar-function支持的三种语言环境.png

/**
 * Definition of possible runtime environments.
 */
public enum Runtime {
    JAVA,
    PYTHON,
    GO
}

pulsar function的三种处理语义

pulsar-function三种处理语义.png

/**
 * Definition of possible processing guarantees.
 */
public enum ProcessingGuarantees {
    // 默认
    ATLEAST_ONCE,
    ATMOST_ONCE,
    EFFECTIVELY_ONCE
}
  • At least once 是指消息至少发送一次。如果消息未能接受成功,可能会重发,直到接收成功。在整个 Pulsar Functions 处理消息的过程中,如果失败,都需要对该 message 执行 nack(重发) 操作,来保证 At least once 语义的正确性。
  • At most once 是指消息最多会被处理一次。从 input topics 中接收到之后,在真正处理消息之前去执行, at-most-once 模式下,不管 function 是否执行成功,这个 message 都会被确认(ack),而且只发送一次,无论是否发送成功,都不会重发。
  • Effectively once 是指消息会被有效执行一次。上述两种语意都没办法保证系统 crash 之后数据的一致性问题,Effectively once 可以保证只会对结果产生一次影响。Effectively once 本身更像是一个事务的处理过程,首先我们在 setup 生产者的时候需要保证生产者的幂等性;其次在处理消息的过程中,如果出现错误,我们需要让整个 function 停止操作,这点不同于 At least once

Pulsar Function三种订阅模式

为了同时兼容 queue 和 stream 的消费方式,Pulsar 在消费者之间抽象了一层订阅层,在 Pulasr 中,订阅的方式主要分为如下三种:

● exclusive ● failover ● share

但是 Pulsar Functions 中并没有支持 exclusive 的订阅方式。这是为什么呢?

在大部分 functions 的特定场景下,exclusive 的订阅类型没多大用,我们分为两种情况来讨论:

1.如果只有一个 instance,那么 failover 就相当于独占的类型。

2.如果有多个 instance,exclusive 类型的订阅会不断的 crash、 restart,而 failover 的订阅是通过 failover 的方式来进行切换,保证有一个 active 的 worker。(这个是本质原因)

如何部署Pulsar Function

1. local run

在本地运行或者集群外运行一个 Pulsar Functions,适用于开发者。

2. cluster

在集群内运行 Pulsar Functions。在该模式下部署 function 时,Apache BookKeeper 将自动处理状态存储,目前 go 版本的 function 暂时不支持状态存储。

pulsar function示例demo

生产者

public class ProducerApp {

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .build();

        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic(INPUT_TOPIC)
                .producerName("func-pro1")
                .create();

        MessageId messageId = producer.send("Hello,lzhpo,lewis");
        System.out.println("Send ok, messageId:" +messageId);

        client.close();
    }

}

pulsar function

在这里我简单的演示一下:按照逗号“,”去切分input topic的消息,然后转换成大写进行输出。

/**
 * 部署此Jar包到Pulsar集群中:
 * <pre>
 * {@code
 * $ bin/pulsar-admin functions create \
 *   --jar target/my-jar-with-dependencies.jar \
 *   --classname org.example.functions.WordCountFunction \
 *   --tenant public \
 *   --namespace default \
 *   --name word-count \
 *   --inputs persistent://public/default/sentences \
 *   --output persistent://public/default/count
 * }
 * </pre>
 * @author Zhaopo Liu
 */
@Slf4j
public class HelloWorldFunction implements Function<String, Void> {

    /**
     * 每次将消息发布到输入主题时,都会调用此函数
     *
     * @param input
     * @param context
     * @return
     * @throws Exception
     */
    @Override
    public Void process(String input, final Context context) throws Exception {
        log.info("收到来自 {} 的消息 {} " ,context.getInputTopics(), input);
        for (String word : input.split(",")) {
            String wordUpperCase = word.toUpperCase();
            System.out.println(wordUpperCase);
        }
        return null;
    }

    /**
     * <pre>
     * 设置localrun运行参数(包括pulsar地址、functionConfig、sourceConfig、sinkConfig等等),参考 LocalRunner {@link LocalRunner}类。
     * </pre>
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        FunctionConfig functionConfig = new FunctionConfig();
        // 设置pulsar function的名字
        functionConfig.setName("wordcount");
        // 设置pulsar function类的名字
        functionConfig.setClassName(HelloWorldFunction.class.getName());
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        // input topic,output topic,log topic
        functionConfig.setInputs(Collections.singleton(INPUT_TOPIC));
        functionConfig.setOutput(OUTPUT_TOPIC);
        functionConfig.setLogTopic(LOG_TOPIC);

        // 设置localrun运行参数
        LocalRunner localRunner = LocalRunner.builder()
                // 默认是本地
                .brokerServiceUrl(SERVER_URL)
                // 设置functionConfig
                .functionConfig(functionConfig)
                .build();
        // 非阻塞
        localRunner.start(false);
    }
}

pulsar-function示例-helloworld.png

设置pulsar-function运行参数

pulsar-function设置localrun运行参数.png

@Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig", hidden = true, converter = FunctionConfigConverter.class)
protected FunctionConfig functionConfig;
@Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig", hidden = true, converter = SourceConfigConverter.class)
protected SourceConfig sourceConfig;
@Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true, converter = SinkConfigConverter.class)
protected SinkConfig sinkConfig;
@Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
protected String stateStorageServiceUrl;
@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
protected String brokerServiceUrl;
@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
protected String clientAuthPlugin;
@Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
protected String clientAuthParams;
@Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1)
protected boolean useTls;
@Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n", hidden = true, arity = 1)
protected boolean tlsAllowInsecureConnection;
@Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true, arity = 1)
protected boolean tlsHostNameVerificationEnabled;
@Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
protected String tlsTrustCertFilePath;
@Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
protected int instanceIdOffset = 0;
@Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true, converter = RuntimeConverter.class)
protected RuntimeEnv runtimeEnv;

// 默认是在localhost,可以设置brokerServiceUrl进行更改
private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
正文到此结束
本文目录