Apache Pulsar——Function 轻量级盘算框架

手机软件开发 2024-10-3 10:09:12 57 0 来自 中国
一、Function背景先容

当我们举行流式处置惩罚的时间,很多环境下,我们的需求大概只是下面这些简单的使用:简单的ETL 使用\聚合盘算使用等相干服务。
但为了实现这些功能,我们不得不去摆设一整套 SPE 服务。摆设乐成后才发现须要的仅是SPE(流处置惩罚引擎)服务中的一小部分功能,摆设 SPE 的本钱大概比用户开辟这个功能自己更困难。由于SPE 自己API 的复杂性,我们须要相识这些算子的使用场景,明白差异算子之间有哪些区别,什么环境下,应该使用什么算子来处置惩罚相应的逻辑。
基于以上缘故因由,我们计划并实现了 Pulsar Functions,在 Pulsar Functions 中,用户只需关心盘算逻辑自己,而不须要去相识大概摆设 SPE 的相干服务,固然你也可以将pulsar-function 与现有的SPE 服务一起使用。也就是说,在 Pulsar Functions 中,无需摆设SPE 的整套服务,就可以到达与 SPE 服务同样的上风。
二、什么是Functions

Pulsar Functions 是一个轻量级的盘算框架,像 AWS 的 lambda、Google Cloud 的Functions 一样,Pulsar Functions 可以给用户提供一个摆设简单、运维简单、API 简单的 FASS(Function as a service)平台。Pulsar Functions 的计划灵感来自于 Heron 如许的流处置惩罚引擎,Pulsar Functions 将会拓展Pulsar和整个消息范畴的将来。使用 Pulsar Functions,用户可以轻松地摆设和管理 function,通过function 从Pulsartopic 读取数据大概生产新数据到 Pulsar topic。
引入 Pulsar Functions 后,Pulsar 成为同一的消息投递/盘算/存储平台。只需摆设一套Pulsar 集群,便可以实现一个盘算引擎,页面简单,使用便捷。
1.png Input topic 是数据的泉源,在 Pulsar Functions 中,全部的数据均来自 input topic。当数据进入inputtopic 中,Pulsar Functions 充当消耗者的脚色,去 input topic 中消耗消息;当从input topic 中拿到须要处置惩罚的消息时,Pulsar Functions 充当生产者的脚色往 output topic 大概 log topic 中生产消息。
Output topic 和 log topic 都可以看作是 Pulsar Functions 的输出。从是否会有output 这个点来看,我们可以将 Pulsar Functions 分为两类,当有输出的时间 Pulsar Functions 会将相应的output 输出到outputtopic中。log topic 紧张存储用户的日记信息,当 Pulsar Functions 出现题目时,方便用户定位错误并调试。
综上所述:我们不丢脸出 Pulsar Functions 充当了一个消息处置惩罚和转运的脚色。
在使用Pulsar Functions,可以使用差异的语言来编写,比如Python、Java、Go等。编写方式紧张两种:

  • 当地模式:集群外部,举行当地运行
  • 集群模式:集群内部运行(支持独立模式和集成模式)
三、Pulsar Function的使用

3.1 Pulsar Function的启用

修改Pulsar集群全部服务器的conf/broker.conf,如下内容
functionsWorkerEnabled=true修改Pulsar集群全部服务器的conf/functions_worker.yml,如下内容
pulsarFunctionsCluster: pulsar-cluster然后重启broker服务
留意:三台节点都须要实验,依次都制止,然后依次启动
3.2 使用Pulsar Function

运行官网提供的example包,先在集群模式下创建Function,创建完成的Function是运行的
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \>   --jar examples/api-examples.jar \>   --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \>   --inputs persistent://public/default/exclamation-input \>   --output persistent://public/default/exclamation-output \>   --tenant public \>   --namespace default \>   --name exclamation"Created successfully"[root@bigdata001 apache-pulsar-2.9.1]#然后触发Function运行,得到结果。原理是向exclamation-input这个topic发送消息,然后消耗exclamation-output这个topic的消息
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"hello world![root@bigdata001 apache-pulsar-2.9.1]#检察Function状态
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions status --name exclamation {  "numInstances" : 1,  "numRunning" : 1,  "instances" : [ {    "instanceId" : 0,    "status" : {      "running" : true,      "error" : "",      "numRestarts" : 0,      "numReceived" : 0,      "numSuccessfullyProcessed" : 0,      "numUserExceptions" : 0,      "latestUserExceptions" : [ ],      "numSystemExceptions" : 0,      "latestSystemExceptions" : [ ],      "averageLatency" : 0.0,      "lastInvocationTime" : 0,      "workerId" : "c-pulsar-cluster-fw-bigdata003-8086"    }  } ]}[root@bigdata001 apache-pulsar-2.9.1]#stop Function
[root@bigdata001 apache-pulsar-2.9.1]# [root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions stop --name exclamationStopped successfully[root@bigdata001 apache-pulsar-2.9.1]# start Function
[root@bigdata001 apache-pulsar-2.9.1]# [root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions start --name exclamationStarted successfully[root@bigdata001 apache-pulsar-2.9.1]# delete Function
[root@bigdata001 apache-pulsar-2.9.1]# [root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions delete --name exclamation"Deleted successfully"[root@bigdata001 apache-pulsar-2.9.1]#别的Function使用说明
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions [command]属性说明
bin/pulsar-admin functions属性说明:    functions:    可选值:        localrun: 创建当地function举行运行        create: 在集群模式下创建        delete: 删除在集群中运行的function        get: 获取function的相干信息        restart: 重启        stop : 制止运行        start: 启动        status: 查抄状态        stats: 检察状态        list: 检察特定租户和名称空间下的全部的function--classname: 设置function实验类--jar 设置function对应的jar包--inputs : 输入的topic--output : 输出的topic--tenant : 设置function运行在谁人租户中--namespace: 设置function运行在谁人名称空间中--name : 界说function的名称四、自己编写一个Function

需求:读取input topic,此中日期格式为yyyy/MM/dd HH/mm/ss,转换为格式yyyy-MM-dd HH:mm:ss,然后发送到output topic
4.1 添加依赖

<dependency>    <groupId>org.apache.pulsar</groupId>    <artifactId>pulsar-client</artifactId>    <version>2.10.0</version></dependency><dependency>    <groupId>org.apache.pulsar</groupId>    <artifactId>pulsar-functions-api</artifactId>    <version>2.10.0</version></dependency>       4.2 编写步调

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;/** * @Author: huangyibo * @Date: 2022/5/28 18:15 * @Description: 读取input topic,此中日期格式为yyyy/MM/dd HH/mm/ss, * 转换为格式yyyy-MM-dd HH:mm:ss,然后发送到output topic */public class FormatDateFunction implements Function<String, String> {    private DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");    private DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");    /**     * 每来一条消息,都会调用process举行处置惩罚     * @param input     输入的消息数据     * @param context   体现上下文对象,用于实验一些相干的统计盘算使用,以及获取相干的对象和元数据信息     * @return     * @throws Exception     */    @Override    public String process(String input, Context context) throws Exception {        LocalDateTime localDateTime = LocalDateTime.parse(input, formatter1);        return localDateTime.format(formatter2);    }}4.3 然后将步调举行打包,上传到Pulsar集群中的一台服务器

4.4 创建Function

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \> --jar /opt/pulsar_dev-1.0-SNAPSHOT.jar \> --classname DateTransfromFunction \> --inputs persistent://public/default/dateTransfrom-input \> --output persistent://public/default/dateTransfrom-output \> --tenant public \> --namespace default \> --name dateTransfrom"Created successfully"[root@bigdata001 apache-pulsar-2.9.1]#4.5 触发Function

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name dateTransfrom --trigger-value "2022/04/10 16/32/18"2022-04-10 16:32:18[root@bigdata001 apache-pulsar-2.9.1]#参考:
https://blog.csdn.net/yy8623977/article/details/124072174
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2024-10-18 16:54, Processed in 0.154935 second(s), 35 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表