利用RuntimeReplaceable实现Spark Native function
关于Spark Native Function在Spark中实现自定义函数,有多种方式:
1、实现Hive UDF,Spark是兼容Hive UDF的,简单易用,支持纯SQL环境,因此这可能是使用最为广泛的。
2、实现Spark-SQL UDF,需要嵌入到代码中,因此也主要用在代码中,目前还不支持纯SQL环境。
3、通过拓展SparkSessionExtensions,基本等价于Spark Built-in内置函数,可以充分利用Spark catalyst优化器和Codegen,从而带来可观的性能提升,这里称之为Spark Native Function。但是这种方式也是实现最为困难的,需要对SQL解析器、优化器等有一定的理解。同时网上关于这种方式的资料几乎没有,Spark官方文档中也是根本没有提及这种方式,足以说明这种方式较高的门槛。
应用场景:RuntimeReplaceableSpark已经内置足够多的UDF,已经可以满足绝大部分的应用场景。
剩下的不能满足的应用场景中,其中很大一部分可以通过组合这些内置的函数,来满足。因此也就带来一个问题,就是有时候应用场景非常复杂,需要组 ...
DAG实现与任务调度
前言在任务调度场景中,常常通过DAG将多个任务编排成一个复杂的Job,进而满足复杂的任务调度应用场景。特别是在大数据领域,这类调度系统是必须的,比如Azkaba、DolphinScheduler、AirFlow…。而这些系统正是通过DAG进行任务编排的,那么下面让我们试着简单的实现一个DAG调度程序。
Code1、抽象出一个任务执行接口123public interface Executor { boolean execute();}
2、简单实现一个示例Task,需实现Executor接口。12345678910111213141516171819202122232425262728293031public class Task implements Executor{ private Long id; private String name; private int state; public Task(Long id, String name, int state) { this.id = ...
优化Flink ogg-json format
前言最近发现从kafka同步到Paimon中的数据不正确。具体表现为,明明数据库中某条记录已经Update了,但是Paimon中的同一条记录没有同步更新。经过一系列的排查发现,是由于公司ogg json格式不统一,导致Flink ogg-json format解析失败,同时因为配置了ogg-json.ignore-parse-errors = true,最终导致整条ogg更新Record被丢弃,没有发送到下流的Paimon。
代码记录org.apache.flink.formats.json.ogg.OggJsonDeserializationSchema#deserialize(byte[], org.apache.flink.util.Collector<org.apache.flink.table.data.RowData>)
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253@Overridepublic void dese ...
DolphinScheduler RPC框架源码分析
前言截至2023-07-09,DolphinScheduler3.x最新版本Dev分支,DolphinScheduler中虽然基于Netty实现了一个简单的RPC框架,但是并没有使用,或者说使用的不是完整版的RPC框架。其中大量直接使用Netty Client发送网络请求,并没有使用动态代理简化或或者说屏蔽掉通信细节,虽然在org.apache.dolphinscheduler.rpc包中已经有了完整实现。
本文主要分析org.apache.dolphinscheduler.rpc包中完整的RPC实现。虽然在DolphinScheduler中没有被使用,但是代码是共通的。
源码分析Rpc通信协议Protocol定义在org.apache.dolphinscheduler.rpc.protocol.MessageHeader类中,没有什么好说的,差不多的套路。
一字节的version
一字节的eventType:HEARTBEAT、REQUEST、RESPONSE
四字节的msgLength
……
一字节的serialization类型:dolphinscheduler目前实现了一种基 ...
Paimon动态Bucket设计与实现
前言Paimon Dynamic Bucket是Paimon-0.5引入的新特性,现在Paimon可以动态的创建Bucket进行扩容,旨在进一步简化了创建Paimon表的过程,用户无需关心需要创建多少个Bucket。通过dynamic-bucket.target-row-num配置指定每个桶存储多少条记录,默认是2_000_000L。
为了实现这个特性,Paimon需要利用文件记录所有Record与其Bucket的映射关系。在paimon中,使用Record的主键的hashcode代表一个Record,而hashcode是Int类型,减少了内存占用。使用主键的hashcode代表一个Record还有一个好处就是,使用int就可以覆盖所有的Record,即使dynamic-bucket.target-row-num是Long类型,避免了空间无限膨胀的问题。这是因为即使hash冲突,并不影响正确性。
通过不断检查映射文件中key的行数,当大于dynamic-bucket.target-row-num时,创建新的bucket进行扩容。
设计与实现实现集中在paimon-core/s ...
Spark Batch Read Paimon源码分析
A bucket is the smallest storage unit for reads and writes, so the number of buckets limits the maximum processing parallelism. This number should not be too big, though, as it will result in lots of small files and low read performance. In general, the recommended data size in each bucket is about 1GB.
num-sorted-run.compaction-trigger
5
Integer
The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).
Pa ...
修复dolphinscheduler Hive SQL数据源连接Kyuubi偶现Read timed out错误
前言dolphinscheduler中利用Hive SQL数据源连接Kyuubi偶现Read timed out错误,错误日志如下:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354[ERROR] 2023-06-08 17:39:06.166 +0800 - Task execute failed, due to meet an exceptionorg.apache.dolphinscheduler.plugin.task.api.TaskException: Execute sql task failed at org.apache.dolphinscheduler.plugin.task.sql.SqlTask.handle(SqlTask.java:168) at org.apache.dolphinscheduler.server.worker.runner.DefaultWorkerDelayTaskExecuteRunna ...
记一次大量TCP连接CLOSE_WAIT问题排查
问题今天突然发现Spark SQL任务启动不起来,报下面的错误,'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on a random free port. You may check whether configuring an appropriate binding address. 2023-05-18 13:57:40,952 WARN util.Utils: Service,看到这段日志后,表明服务器大量端口被占用,Spark申请不到端口,尝试了100次后,抛出了下面的异常。
1234567891011121314151617181920212220/12/21 12:55:18 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on a random free port. You may check wh ...
初识Apache Druid
Why Druid
OLAP 是一种让用户可以用从不同视角方便快捷的分析数据的计算方法。主流的 OLAP 可以分为3类:多维 MOLAP ( Multi-dimensional OLAP )、关系型 ROLAP ( Relational OLAP ) 和混合 HOLAP ( Hybrid OLAP ) 三大类。
在海量数据上进行亚秒级的多维分析,并且要求高并发,可以选择的OLAP系统并不多。而其中MOLAP最为知名的就是Apache Kylin和Apache Druid。
先前我们一直采用Apache Kylin进行离线多维分析,在使用中发现了一系列问题,让我们不得不将目光放在Druid上面:
1、由于Kylin构建cube的数量和维度的关系是2的n次方,指数级增长是非常可怕的,一般超过20个维度,在Kylin中就要小心了,因此使用Kylin需要时刻担心维度爆炸的问题。
2、Kylin目前要求不超过63个Normal维度,这是因为cubeid是Long类型,而Long的最大值是2^63 -1,所以不能超过63个维度。而我们的业务场景最大会有二百多个维度,Kylin已经不满足我们的要 ...
Paimon-Flink-Sink源码分析
前言Paimon在Flink下面的层次结构,大概为:Catalog -> Database -> Table -> Record。因此看Paimon如何在Flink中实现Read、Write等操作,先从Catalog开始。
CatalogPaimon在Flink中实现Catalog的源码位于org.apache.paimon.flink.FlinkCatalog,该类实现了org.apache.flink.table.catalog.Catalog接口。该接口中定义了一系列方法,包括listTables、listViews等方法,而paimon一一实现了这些接口。
在Flink中自定义Catalog,还需要实现org.apache.flink.table.factories.CatalogFactory工厂接口,paimon中对应的为org.apache.paimon.flink.FlinkCatalogFactory类。该工厂类当然是用来创建Catalog了。
最后还需要将Catalog工厂实现类,添加到将此实现类添加到 META_INF/services/org. ...