- Published on
ClickHouse 社区版更新与去重问题实践与优化方案
- Authors
- Name
- Liant
思考实践问题及解决方案
Clickhouse社区版本已有部分实践,基于实践做出一些解决方案.
痛点

OLAP能力不够好用。在一些特定的场景下,半结构化数据的分析能力不足……原生ClickHouse能力难以支持。
ClickHouse在单表性能上非常的强劲,但多表能力非常局限,且对标准SQL兼容性低。
缺乏成熟运维管理工具,运维复杂程度高,需要投入极大的人力,这是一个很大的缺陷。
ClickHouse是MPP架构(存算一体架构),性能和扩展性极强,但缺陷也很明显:
- 横向扩容成本非常高,增加一个节点要进行数据重新分布。
- 隔离性差,单一用户的查询会非常容易打满整个集群,导致ClickHouse并发度不高。
对于现有项目实践来说: Clickhouse对于只查询统计且
不更新
的数据非常适用,对于需要更新
的数据不好用.
最简单的解决方案
商业解决方案是有比较成熟的解决方案:
- Clickhouse Cloud,官方提供增强服务
官方提供的云服务版本,并且有相当多好用的功能(SharedMergeTree和轻量级更新)是社区版本不具备的.
- ByteHouse,基于Clickhouse的增强方案
- 丰富的自研表引擎,
UniqueMergeTree表引擎
能解决更新延迟问题 - 性能优化:优化器、字典、索引支持
其他增强功能暂不展开
这两种商业解决方案都是比较成熟的,使用起来也是比较方便的.
实际的解决方案
不考虑商业解决方案的话,这里的问题都是社区版本使用过程中遇到的问题
实际场景:
数据量在百万级别,依靠MySQL查询统计会有几秒的查询,在目前业务中尚能接受.但是随着数据量的增加达到千万级别时,查询统计时间可能增长到十几秒甚至跟长.所以预估业务数据之后,寻找其他的方案作为数据分析使用.调研结束之后Clickhouse最为目前场景比较合适的技术选型.在初期实践之后,最重要的问题就凸显出来了.
- 更新,社区版本的更新都是相当重的操作,频繁更新会直接导致服务不工作.
- 去重,社区版本提供的数据去重方案会有延迟.(数据库为了计算速度,牺牲了其他特性,所以在一些情况下会有重复数据).
以下的方案主要解决这两个问题
- 数据更新
在任何场景中,坚决不使用更新和删除,这两项是相当重的操作.在更新提交之后,服务会在后台执行合并段操作,占用大量服务,如果更新操作有多个,就会导致服务处于不工作的状态.既不能写入数据,也不能查询数据.
解决方法:任何修改和删除操作都需要转为插入操作.依靠数据库提供的表引擎提供的合并功能自动合并.
- 数据去重
由于Clickhouse的数据库设计偏重点不一样,更新是非常不划算的操作.基于数据更新的方案(把删除和更新转为插入操作)带来的新问题.所以,数据会出现重复问题.
解决办法:针对不同数据做不同的表设计.偏重于统计和偏重于查询时使用不同的表结构.
具体方案
经验基于少的可怜的实践经验.
方法一 INSERT+Final
对于有去重功能的表引擎使用,比如 ReplacingMergeTree,CollapsingMergeTree.插入数据之后并不会立即去掉所有的重复数据,在查询语句中加入Final语句即可在查询过程中去掉数据.
结论:数据是实时,但是查询会慢一些.也是改动成本最小的方法
INSERT INTO
table (col1, col2, col3)
VALUES
(val1, val2, val3)
, (val11, val21, val31);
SELECT
*
FROM
table FINAL
WHERE
col1 = value;
查询时会比较慢,因为:
1 需要对所有记录按照排序键来 sort merge。
2 除了读取查询的列还会读取主键列。
3 在读表数据的时候,有多少个partition就有多少个并发的线程分别去读取,如果分区个数只有2个,就只会启动2个线程并发去读,不会启动更多的线程。而不加final,如果系统有20 cores就会启动20个线程并发读。
方法二 INSERT+argMax(偏向于查询数据,统计很少)
1. 新增标志位
以一个ReplacingMergeTree引擎为例:
创建一张表ReplacingMergeTree,通过新增两个列: deleted来表明是更新还是删除,create_time列来表明操作时间的先后顺序,比如有对同一记录的多次更新。
结论:数据是实时的,查询速度也还可以,查询写法啰嗦,统计不好写.
建表的标志位:
CREATE TABLE test_a(
user_id UInt64,
score String,
deleted UInt8 DEFAULT 0,
create_time DateTime DEFAULT toDateTime(0)
)ENGINE= ReplacingMergeTree(create_time)
ORDER BY user_id
去重时策略为保留最新的一条数据
通过group查询最新的数据:
SELECT
user_id
, argMax(score, create_time) AS score
, argMax(deleted, create_time) AS deleted
, MAX(create_time) AS ctime
FROM
test_a
GROUP BY
user_id
HAVING
deleted = 0
查询时,取最新数据,过滤删除掉的数据
删除操作:只需要把对应记录的deleted=0,加上当前的操作时间即可。
-- 单条删除:
INSERT INTO
test_a(user_id, deleted, create_time)
VALUES
(id号,1, now());
-- 批量删除:
INSERT INTO
test_a(user_id, deleted, create_time)
SELECT
user_id
, 1 now()
FROM
test_a
WHERE
user_id IN (…..);
更新操作:关键在于每次插入要填充所有的列。
-- `* except(score,create_time,delete)` 需要更新的数据列
INSERT INTO
test_b (* EXCEPT(score,create_time,DELETE), create_time, score)
SELECT
*
EXCEPT
(score,create_time, DELETE),
now() AS create_time,
'aaa' AS score
FROM
test_b
WHERE
user_id IN (…);
侧重于查询数据,非做统计.
2. 使用物化视图
该思路主要是有一个基表存放原始数据以及后续更新转换为insert的数据。中间有一个物化视图当做触发器,每次有新数据到基表就将insert进来的数据进行聚合,存储到最上层的AggregatingMergeTree.所以最上层的的AggregatingMergeTree在没有merge之前是最原始的数据,每次批量更新转插入的数据的中间聚合状态。
结论:数据是实时,查询语句较复杂一些
建表语句
-- 基础表
CREATE TABLE IF NOT EXISTS default.local_dat_update
(
pk1 UInt64 COMMENT '主键',
pk2 String COMMENT '主键',
ver DateTime64 COMMENT '版本,业务提供',
col1 Int32 DEFAULT 0 COMMENT '选取!=0的最大版本',
col2 String DEFAULT '' COMMENT '选取!=空字符串的最大版本',
col3 DateTime DEFAULT 0 COMMENT '选取!=0的最大版本'
) ENGINE = ReplacingMergeTree
PARTITION BY toYYYYMM(kafka_time)
ORDER BY (kafka_topic, kafka_partition, kafka_offset);
-- 上层聚合表,查询也是查询这张表
CREATE TABLE IF NOT EXISTS default.local_agg_update
(
pk1 UInt64,
pk2 String,
version AggregateFunction(max, DateTime64),
col1 AggregateFunction(argMaxIf, Int32, DateTime64, UInt8),
col2 AggregateFunction(argMaxIf, String, DateTime64, UInt8),
col3 AggregateFunction(argMaxIf, DateTime, DateTime64, UInt8)
) ENGINE = AggregatingMergeTree
PARTITION BY (xxHash64(pk1, pk2) % 10)
ORDER BY (pk1, pk2);
-- 物化视图,作为触发器使用,主要执行聚合函数.
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_update
TO local_agg_update
AS
SELECT pk1 AS pk1,
pk2 AS pk2,
maxState(ver) AS version,
argMaxIfState(col1, ver, col1 != 0) AS col1,
argMaxIfState(col2, ver, col2 != '') AS col2,
argMaxIfState(col3, ver, col3 != 0) AS col3
FROM default.local_dat_update
GROUP BY pk1, pk2;
查询数据
-- 查询:每次查询看结果的时候需要给中间聚合状态加上Merge后缀得到最终的可视化结果,否则是一些二进制的数据。
SELECT pk1,
pk2,
maxMerge(version) AS version,
argMaxIfMerge(col1) AS col1,
argMaxIfMerge(col2) AS col2,
argMaxIfMerge(col3) AS col3
FROM default.all_agg_update
GROUP BY pk1, pk2;
方法三 折叠数据(偏向于统计,查询较少)
思路类似于CollapsingMergeTree,所有操作都是插入,通过sign标记正负值抵消,在实际查询中用来统计准确数据.
结论: 统计语句需要微小调整,查询需要借助argMax.
建表
CREATE TABLE UAct
(
UserID UInt64,
PageViews UInt8,
Duration UInt8,
Sign Int8
)
ENGINE = CollapsingMergeTree(Sign)
ORDER BY UserID;
更新一条数据
INSERT INTO UAct VALUES
(4324182021466249494, 5, 146, -1),
(4324182021466249494, 6, 185, 1);
统计(查询仍需要配合argMax):
-- sum(PageViews) => sum(PageViews * Sign)、
-- sum(Duration) => sum(Duration * Sign)
SELECT UserID,
sum(PageViews * Sign) AS PageViews,
sum(Duration * Sign) AS Duration
FROM UAct
GROUP BY UserID
HAVING sum(Sign) > 0;
结论
实践过程中,应为定位于OLAP数据库,而几乎所以OLAP数据库对可变数据并不欢迎,所以可变数据在该数据库社区版本中使用体验并不好用.
实时统计不可变数据,强烈推荐.
实时统计千万级别数据,且统计查询不复杂,推荐使用.
数据量多,变更频繁,要求实时.建议用商业解决方案(省时省力省心).