简介
批量写入又称为bulk write,对于单表插入多条数据的场景,可以减少插入请求数量,提高吞吐量和效率。clickhouse官方Golang驱动clickhouse-go[1]支持该关键特性,但是文档的介绍不是很详细,只有一句:
Bulk?write?support?:?begin->prepare->(in?loop?exec)->commit
并没有详细介绍用法和原理,笔者在开发业务时使用的库是sqlx[2],sql也支持clickhouse-go驱动。参考了官方样例代码[3]:
...
tx,?err?:=?connect.Begin()
checkErr(err)
stmt,?err?:=?tx.Prepare("INSERT?INTO?example?(country_code,?os_id,?browser_id,?categories,?action_day,?action_time)?VALUES?(?,??,??,??,??,??)")
checkErr(err)
for?i?:=?0;?i?100;?i++?{
?if?_,?err?:=?stmt.Exec(
??"RU",
??10+i,
??100+i,
??[]int16{1,?2,?3},
??time.Now(),
??time.Now(),
?);?err?!=?nil?{
??log.Fatal(err)
?}
}
...
我写的bulk write类似上面的代码,但是提交给同事review时,他提出了疑问:stmt.Exec是每次执行都发送写请求到数据库吗?这个问题其实我不敢肯定,官方文档也说得不明确。考虑到严谨性,让自己的PR更有说服力,自己去翻看了相关源代码。
这里需要指出,如果利用编辑器里的代码跳转功能会跳到database/sql库中的Exec函数实现,实际上我们要看的代码是clickhouse-go中的实现,至于编辑器跳转到database/sql中的原因,书写此文时笔者也没弄清楚,先挖个坑吧。
核心实现
stmt.Exec的核心代码如下[4]:
func?(stmt?*stmt)?execContext(ctx?context.Context,?args?[]driver.Value)?(driver.Result,?error)?{
?if?stmt.isInsert?{
??stmt.counter++
??if?err?:=?stmt.ch.block.AppendRow(args);?err?!=?nil?{
???return?nil,?err
??}
??if?(stmt.counter?%?stmt.ch.blockSize)?==?0?{
???stmt.ch.logf("[exec]?flush?block")
???if?err?:=?stmt.ch.writeBlock(stmt.ch.block);?err?!=?nil?{
????return?nil,?err
???}
???if?err?:=?stmt.ch.encoder.Flush();?err?!=?nil?{
????return?nil,?err
???}
??}
??return?emptyResult,?nil
?}
?if?err?:=?stmt.ch.sendQuery(stmt.bind(convertOldArgs(args)));?err?!=?nil?{
??return?nil,?err
?}
?if?err?:=?stmt.ch.process();?err?!=?nil?{
??return?nil,?err
?}
?return?emptyResult,?nil
}
上面的代码不多,非常清晰,当执行Exec时,stmt.ch.block.AppendRow(args)会先把sql参数附加到本地缓存block中,然后(stmt.counter % stmt.ch.blockSize)判断本地缓存大小是否到达阈值,到达则执行Flush(),将数据写入远端。综上,clickhouse-go中的核心实现逻辑是:
- 底层维护一个缓存block,同时设置block_size控制缓存大小
- 执行stmt.Exec时,不会直接写入远程ClickHouse中,而是将插入参数Append到block中
- 每次Append后,判断block的size和block_size的关系,如果正好整除,则刷新block(即写入clickhouse)
因此block_size这个参数很重要,它表示本地缓存的上限,如果很大的话,程序会占用一些内存。笔者起初设置为100000,在调试日志中看不到stmt.ch.logf("[exec] flush block")打印的log,设置小后就看到下面的输出:
...
[clickhouse][connect=1][begin]?tx=false,?data=false
[clickhouse][connect=1][prepare]
[clickhouse][connect=1][read?meta]?<-?data:?packet=1,?columns=6,?rows=0
[clickhouse][connect=1][exec]?flush?block
[clickhouse][connect=1][exec]?flush?block
....
总结
很多数据库驱动都支持bulk write特性,clickhouse-go这个驱动也不例外,但是它的文档写得不是很详细,只是在文档中指明要放在begin/commit中做。再加上clickhouse不支持事务,begin/commit这种写法会让人困惑。
本文通过分析clickhouse-go的源代码,了解bulk write的执行过程,帮助大家梳理其具体实现。
参考资料
[1]
clickhouse-go: https://github.com/ClickHouse/clickhouse-go
[2]
sqlx: https://github.com/jmoiron/sqlx
[3]
官方样例代码: https://github.com/ClickHouse/clickhouse-go/blob/master/examples/sqlx.go#L35-L51
[4]
核心代码如下: https://github.com/clickhouse/clickhouse-go/blob/master/stmt.go#L44-L68
[5]
INSERT INTO Statement: https://clickhouse.tech/docs/en/sql-reference/statements/insert-into/
[6]
go-clickhouse-batchinsert: https://github.com/MaruHyl/go-clickhouse-batchinsert/blob/master/batch.go#L349-L354