- Published on
SeaTunnel 安装配置与实时数据处理实战指南
- Authors
- Name
- Liant
Seatunnel 安装及使用
官方介绍: 下一代高性能、分布式、海量数据集成框架
介绍
本文基于最新的 2.3.0 版本作为实践基础.
官方文档 由于项目还是早期,所以资料很少.
结构图
以Flink
和Spark
作为设计思想,从source
中获取数据,经过transform
,最终写入到sink
中.
坑点
- 使用官方的推荐版本,
Oracle JDK
和FLink 1.3
- 将
Flink
作为执行引擎较为稳定.以SeaTunnel本身的引擎作为运行引擎问题较多 2.3版本
不稳定
安装
1. Java环境
Java 版本号至少是大于8
的版本,官方使用Oracle JDK
.不要使用OpenJDK
.
具体安装见于网络,资料较多.
2. 安装Flink
下载 flink-1.13.6 程序
启动 flink
# 保持运行
./bin/start-cluster.sh
访问localhost:8081
,查看Flink的WebUI
3. 安装Seatunnel
下载程序 Seatunnel 2.3.0
解压到指定目录
4. 安装连接器插件
安装Seatunnel
的插件
bash bin/install_plugin.sh
在 config/plugin_config
文件中可以调整需要的连接器.
--connectors-v2--
connector-cdc-mysql
connector-dingtalk
connector-elasticsearch
connector-http-wechat
connector-jdbc
connector-kafka
connector-mongodb
connector-pulsar
connector-rabbitmq
connector-redis
connector-socket
--end--
由于需要使用到maven,最好调整一下maven的repo的仓库地址.可以加速下载连接器
# 查看maven版本以及其他信息
./mvnw -version
# 进入 moven_home 目录,修改配置文件 conf/settings.xml 中的仓库地址
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
5. 配置
编辑文件 config/seatunnel-env.sh
# 需要指向flink的安装目录
FLINK_HOME=/usr/opt/flink
使用
编辑任务
编写一个任务配置,该任务模式是STREAMING
,这意味着不会停止.
env {
execution.planner=blink
execution.parallelism = 1
job.mode = "STREAMING"
}
source {
Socket{ # 从Socket中读取数据.监听本机,端口9999
host = "localhost"
port = 9999
result_table_name = "fake" # 数据集命名为fake
field_name = "info" # 字段命名为info
}
}
transform {
Split{ # 切割字段
separator = "#"
fields = ["name","age"]
}
sql { # 使用flink的SQL处理字段
sql = "select info, split(info) as info_row from fake"
}
}
sink {
Console { # 输出到控制台,如果是在Flink下运行任务,数据会作为标准输出,记录在Flink服务器日志中
}
}
将任务提交到Flink中
# 先打开本机的9999端口
nc -lk 9999
# 提交任务到SeaTunnel中
./bin/start-seatunnel-flink-connector-v2.sh -c config/flink1111.config
在 nc 所在终端继续输入
name01#10
name02#20
那么在Flink的webUI中即可看到任务标准输出
fields : value
types : STRING
fields : value
types : STRING
subtaskIndex=0: row=1 : name01#10
subtaskIndex=0: row=1 : name02#20
结果类似于: 

transform
目前支持好像有问题, 定义的 split 函数转换无效