Published on

ClickHouse 社区版更新与去重问题实践与优化方案

Authors
  • avatar
    Name
    Liant
    Twitter

思考实践问题及解决方案

Clickhouse社区版本已有部分实践,基于实践做出一些解决方案.

痛点

痛点
  1. OLAP能力不够好用。在一些特定的场景下,半结构化数据的分析能力不足……原生ClickHouse能力难以支持。

  2. ClickHouse在单表性能上非常的强劲,但多表能力非常局限,且对标准SQL兼容性低。

  3. 缺乏成熟运维管理工具,运维复杂程度高,需要投入极大的人力,这是一个很大的缺陷。

  4. ClickHouse是MPP架构(存算一体架构),性能和扩展性极强,但缺陷也很明显:

  • 横向扩容成本非常高,增加一个节点要进行数据重新分布。
  • 隔离性差,单一用户的查询会非常容易打满整个集群,导致ClickHouse并发度不高。

对于现有项目实践来说: Clickhouse对于只查询统计且不更新的数据非常适用,对于需要更新的数据不好用.

最简单的解决方案

商业解决方案是有比较成熟的解决方案:

  1. Clickhouse Cloud,官方提供增强服务

官方提供的云服务版本,并且有相当多好用的功能(SharedMergeTree和轻量级更新)是社区版本不具备的.

  1. ByteHouse,基于Clickhouse的增强方案
  • 丰富的自研表引擎,UniqueMergeTree表引擎能解决更新延迟问题
  • 性能优化:优化器、字典、索引支持

其他增强功能暂不展开

这两种商业解决方案都是比较成熟的,使用起来也是比较方便的.

实际的解决方案

不考虑商业解决方案的话,这里的问题都是社区版本使用过程中遇到的问题

实际场景:

数据量在百万级别,依靠MySQL查询统计会有几秒的查询,在目前业务中尚能接受.但是随着数据量的增加达到千万级别时,查询统计时间可能增长到十几秒甚至跟长.所以预估业务数据之后,寻找其他的方案作为数据分析使用.调研结束之后Clickhouse最为目前场景比较合适的技术选型.在初期实践之后,最重要的问题就凸显出来了.

  1. 更新,社区版本的更新都是相当重的操作,频繁更新会直接导致服务不工作.
  2. 去重,社区版本提供的数据去重方案会有延迟.(数据库为了计算速度,牺牲了其他特性,所以在一些情况下会有重复数据).

以下的方案主要解决这两个问题

  • 数据更新

在任何场景中,坚决不使用更新和删除,这两项是相当重的操作.在更新提交之后,服务会在后台执行合并段操作,占用大量服务,如果更新操作有多个,就会导致服务处于不工作的状态.既不能写入数据,也不能查询数据.

解决方法:任何修改和删除操作都需要转为插入操作.依靠数据库提供的表引擎提供的合并功能自动合并.

  • 数据去重

由于Clickhouse的数据库设计偏重点不一样,更新是非常不划算的操作.基于数据更新的方案(把删除和更新转为插入操作)带来的新问题.所以,数据会出现重复问题.

解决办法:针对不同数据做不同的表设计.偏重于统计和偏重于查询时使用不同的表结构.

具体方案

经验基于少的可怜的实践经验.

方法一 INSERT+Final

对于有去重功能的表引擎使用,比如 ReplacingMergeTree,CollapsingMergeTree.插入数据之后并不会立即去掉所有的重复数据,在查询语句中加入Final语句即可在查询过程中去掉数据.

结论:数据是实时,但是查询会慢一些.也是改动成本最小的方法

INSERT INTO
    table (col1, col2, col3)
VALUES
    (val1, val2, val3)
  , (val11, val21, val31);

SELECT
    *
FROM
    table FINAL
WHERE
    col1 = value;

查询时会比较慢,因为:

1 需要对所有记录按照排序键来 sort merge。

2 除了读取查询的列还会读取主键列。

3 在读表数据的时候,有多少个partition就有多少个并发的线程分别去读取,如果分区个数只有2个,就只会启动2个线程并发去读,不会启动更多的线程。而不加final,如果系统有20 cores就会启动20个线程并发读。

方法二 INSERT+argMax(偏向于查询数据,统计很少)

1. 新增标志位

以一个ReplacingMergeTree引擎为例:

创建一张表ReplacingMergeTree,通过新增两个列: deleted来表明是更新还是删除,create_time列来表明操作时间的先后顺序,比如有对同一记录的多次更新。

结论:数据是实时的,查询速度也还可以,查询写法啰嗦,统计不好写.

建表的标志位:

CREATE TABLE test_a(
  user_id UInt64,
  score String,
  deleted UInt8 DEFAULT 0,
  create_time DateTime DEFAULT toDateTime(0)
)ENGINE= ReplacingMergeTree(create_time)
ORDER BY user_id

去重时策略为保留最新的一条数据

通过group查询最新的数据:

SELECT
    user_id
  , argMax(score, create_time)   AS score
  , argMax(deleted, create_time) AS deleted
  , MAX(create_time)             AS ctime
FROM
    test_a
GROUP BY
    user_id
HAVING
    deleted = 0

查询时,取最新数据,过滤删除掉的数据

删除操作:只需要把对应记录的deleted=0,加上当前的操作时间即可。

-- 单条删除:
INSERT INTO
    test_a(user_id, deleted, create_time)
VALUES
    (id号,1now());

-- 批量删除:
INSERT INTO
    test_a(user_id, deleted, create_time)
SELECT
    user_id
  , 1 now()
FROM
    test_a
WHERE
    user_id IN (..);

更新操作:关键在于每次插入要填充所有的列。

-- `* except(score,create_time,delete)` 需要更新的数据列
INSERT INTO
    test_b (* EXCEPT(score,create_time,DELETE), create_time, score)
SELECT
    *
EXCEPT
    (score,create_time, DELETE),
    now() AS create_time,
    'aaa' AS score
FROM
    test_b
WHERE
    user_id IN ();

侧重于查询数据,非做统计.

2. 使用物化视图

该思路主要是有一个基表存放原始数据以及后续更新转换为insert的数据。中间有一个物化视图当做触发器,每次有新数据到基表就将insert进来的数据进行聚合,存储到最上层的AggregatingMergeTree.所以最上层的的AggregatingMergeTree在没有merge之前是最原始的数据,每次批量更新转插入的数据的中间聚合状态。

结论:数据是实时,查询语句较复杂一些

建表语句

-- 基础表
CREATE TABLE IF NOT EXISTS default.local_dat_update
(
    pk1             UInt64     COMMENT '主键',
    pk2             String     COMMENT '主键',
    ver             DateTime64 COMMENT '版本,业务提供',
    col1            Int32      DEFAULT 0  COMMENT '选取!=0的最大版本',
    col2            String     DEFAULT '' COMMENT '选取!=空字符串的最大版本',
    col3            DateTime   DEFAULT 0  COMMENT '选取!=0的最大版本'
) ENGINE = ReplacingMergeTree
PARTITION BY toYYYYMM(kafka_time)
ORDER BY (kafka_topic, kafka_partition, kafka_offset);


-- 上层聚合表,查询也是查询这张表
CREATE TABLE IF NOT EXISTS default.local_agg_update
(
    pk1     UInt64,
    pk2     String,
    version AggregateFunction(max, DateTime64),
    col1    AggregateFunction(argMaxIf, Int32, DateTime64, UInt8),
    col2    AggregateFunction(argMaxIf, String, DateTime64, UInt8),
    col3    AggregateFunction(argMaxIf, DateTime, DateTime64, UInt8)
) ENGINE = AggregatingMergeTree
PARTITION BY (xxHash64(pk1, pk2) % 10)
ORDER BY (pk1, pk2);


-- 物化视图,作为触发器使用,主要执行聚合函数.
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_update
    TO local_agg_update
AS
SELECT pk1                                  AS pk1,
       pk2                                  AS pk2,
       maxState(ver)                        AS version,
       argMaxIfState(col1, ver, col1 != 0)  AS col1,
       argMaxIfState(col2, ver, col2 != '') AS col2,
       argMaxIfState(col3, ver, col3 != 0)  AS col3
FROM default.local_dat_update
GROUP BY pk1, pk2;

查询数据

-- 查询:每次查询看结果的时候需要给中间聚合状态加上Merge后缀得到最终的可视化结果,否则是一些二进制的数据。
SELECT pk1,
       pk2,
       maxMerge(version)   AS version,
       argMaxIfMerge(col1) AS col1,
       argMaxIfMerge(col2) AS col2,
       argMaxIfMerge(col3) AS col3
FROM default.all_agg_update
GROUP BY pk1, pk2;

方法三 折叠数据(偏向于统计,查询较少)

思路类似于CollapsingMergeTree,所有操作都是插入,通过sign标记正负值抵消,在实际查询中用来统计准确数据.

结论: 统计语句需要微小调整,查询需要借助argMax.

建表

CREATE TABLE UAct
(
    UserID UInt64,
    PageViews UInt8,
    Duration UInt8,
    Sign Int8
)
ENGINE = CollapsingMergeTree(Sign)
ORDER BY UserID;

更新一条数据

INSERT INTO UAct VALUES
(4324182021466249494, 5, 146, -1),
(4324182021466249494, 6, 185, 1);

统计(查询仍需要配合argMax):

-- sum(PageViews) => sum(PageViews * Sign)、
-- sum(Duration) => sum(Duration * Sign)
SELECT UserID,
       sum(PageViews * Sign) AS PageViews,
       sum(Duration * Sign) AS Duration
FROM UAct
GROUP BY UserID
HAVING sum(Sign) > 0;

结论

实践过程中,应为定位于OLAP数据库,而几乎所以OLAP数据库对可变数据并不欢迎,所以可变数据在该数据库社区版本中使用体验并不好用.

  • 实时统计不可变数据,强烈推荐.

  • 实时统计千万级别数据,且统计查询不复杂,推荐使用.

  • 数据量多,变更频繁,要求实时.建议用商业解决方案(省时省力省心).

参考资料

ClickHouse 正在退出开源世界?

干货 | ByteHouse:基于ClickHouse 的实时计算能力升级

ClickHouse如何更新数据