ethan

ethan

新知,热爱生活,码农,读书
twitter
email
github

ClickHouse Notes

Chapter 4 Data Definition#

Basic Operations on Data Partitions#

Querying Partition Information#

ClickHouse has many built-in system tables for querying its own status information. Among them, the parts system table is specifically used to query the partition information of data tables.

select partition_id, name, table, database from system.parts where table = 'partition_v2'

Deleting a Specified Partition#

By designing a reasonable partition key and utilizing the partition deletion feature, the purpose of data updates can be achieved.

alter table tb_name drop partition partition_expr

Copying Partition Data#

ClickHouse supports copying partition data from Table A to Table B. This feature can be used for quick data writing, data synchronization between multiple tables, and backup scenarios.

alter table B replace partition partition_expr from A

Note: The following two prerequisites must be met:

  • Both tables must have the same partition key.
  • Their table structures must be identical.

Resetting Partition Data#

If there is erroneous data in a column of a data table that needs to be reset to its initial value, the following statement can be used:

alter table tb_name clear column column_name in partition partition_expr

Guideline: If a default value expression is declared, it takes precedence; otherwise, the default value of the corresponding data type applies.

Detaching and Attaching Partitions#

Table partitions can be detached using the DETACH statement. After a partition is detached, the physical data is not deleted but moved to the detached subdirectory of the table directory. Attaching a partition (ATTACH) is the reverse operation. This is commonly used for migrating and backing up partition data.

alter table tb_name detach partition partition_expr

Distributed DDL Execution#

ClickHouse supports cluster mode, where a cluster can have one or more nodes. DDL statements such as CREATE, ALTER, DROP, RENAME, and TRUNCATE support distributed execution. If a DDL statement is executed on any node in the cluster, every node in the cluster will execute the same statement in the same order. Simply add the ON CLUSTER cluster_name declaration.

Data Insertion#

The INSERT statement supports three syntax paradigms.

The first: values syntax

INSERT INTO [db.]table [(c1,c2,c3...)] VALUES (v11,v12,c13),(v21,v22,v23),...

The second: specified format syntax

INSERT INTO [db.]table [(c1,c2,c3...)] FORMAT format_name data_set

The third: using select clause syntax

INSERT INTO [db.]table [(c1,c2,c3...)] SELECT ...

Both VALUES and SELECT clause forms support expressions or functions, but expressions and functions incur additional performance overhead.

Data Deletion and Modification#

ClickHouse provides the ability to DELETE and UPDATE, which are referred to as Mutation queries and can be seen as a variant of the ALTER statement.

Differences: Mutation statements are "heavy" operations, more suitable for batch data modification and deletion. Additionally, they do not support transactions; once the statement is submitted for execution, it will immediately affect existing data and cannot be rolled back. Finally, the execution of this statement is an asynchronous background process, and it will return immediately after submission. Therefore, this does not mean that the specific logic has been completed; its specific execution progress needs to be queried through the system.mutations system table.

SELECT database, table, mutation_id, block_numbers as num, is_done FROM system.mutations

Chapter 5 Data Dictionary#

A data dictionary defines data in the form of key-value and attribute mapping. Data is actively loaded into memory when ClickHouse starts or lazily loaded during the first query (determined by parameter settings) and supports dynamic updates. It is very suitable for storing frequently used dimension table data, avoiding unnecessary JOIN queries.

Data dictionaries can be built-in (default dictionaries) or extended (implemented through custom configurations).

Built-in Dictionaries#

Chapter 6 MergeTree Principle Analysis#

Creation Methods and Storage Structure of MergeTree#

Creation Method of MergeTree#

CREATE TABLE [IF NOT EXISTS] [db_name.]table_name (
	name1 [type] 
    ...
) ENGINE = MergeTree()
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]

(1) PARTITION BY [optional]: The partition key used to specify how table data is partitioned based on various criteria. It can be a single column field, a tuple of multiple column fields, or a list expression. If not declared, it will be named all. Reasonable use of data partitioning can effectively reduce the scanning range of data files during queries.

(2) ORDER BY [mandatory]: The sorting key used to specify how data is sorted within a data fragment based on various criteria. The default is the primary key. It can be a single column or a tuple of multiple columns.

(3) PRIMARY KEY [optional]: The primary key generates a first-level index based on the primary key fields to accelerate table queries. By default, the primary key and sorting key are the same. MergeTree allows duplicate data in the primary key.

(4) SAMPLE BY [optional]: The sampling expression used to declare the sampling criteria for the data. The same expression must also be declared in the primary key configuration.

(5) SETTINGS: index_granularity [optional] indicates the granularity of the index, defaulting to 8192. An index is generated every 8192 rows of data.

(6) SETTINGS: index_granularity_bytes [optional], supported since version 19.12. It adds the feature of adaptive interval size (dynamically dividing interval sizes based on the volume of data written in each batch), defaulting to 10M, and setting it to 0 disables the adaptive feature.

(7) SETTINGS: enable_mixed_granularity_parts [optional]: Sets whether to enable the adaptive index interval feature, which is enabled by default.

(8) SETTINGS: merge_with_ttl_timeout [optional]: Provides data TTL functionality after version 19.6.

(9) SETTINGS: storage_policy [optional]: Multi-path storage strategy.

Data Partitioning#

Data Partitioning Rules#

The rules for MergeTree data partitioning are determined by the partition ID. The generation logic for partition IDs follows four rules:

(1) No partition key specified: The partition ID is defaulted to all, and all data will be written to the all partition.

(2) Using integer: If the partition key value belongs to an integer type and cannot be converted to the date type YYYYMMDD, it will be output directly in the character form of that integer as the partition ID value.

(3) Using date type: If the partition key value belongs to a date type or can be converted to YYYYMMDD format as an integer, it will be output in the formatted character form according to YYYYMMDD and used as the partition ID value.

(4) Using other types: If the partition key value is neither an integer nor a date type, an ID will be generated using a 128-bit hash algorithm.

Naming Rules for Partition Directories#

A complete partition directory naming formula is as follows:

PartitionID_MinBlockNum_MaxBlockNum_Level

(1) PartitionID: The partition ID.

(2) MinBlockNum_MaxBlockNum: The minimum and maximum data block numbers. Here, BlockNum is an integer auto-incrementing number.

(3) Level: The level of merging, which can be understood as the number of times a partition has been merged or the age of the partition.

Merging Process of Partition Directories#

The partition directories of MergeTree do not exist immediately after the data table is created; they are created during the data writing process. After establishing, the partition directories are not immutable.

The merging method for new directory names follows these rules:

  • MinBlockNum: Take the minimum value from all directories within the same partition.
  • MaxBlockNum: Take the maximum value from all directories within the same partition.
  • Level: Take the maximum Level value from the same partition and add 1.

First-Level Index#

MergeTree generates a first-level index for the data table based on index_granularity intervals and saves it in the primary.idx file, with index data sorted according to the PRIMARY_KEY.

Sparse Index#

In a dense index, each row index marker corresponds to a specific data record. In a sparse index, each row index marker corresponds to a segment of data rather than a single row.

The advantage of a sparse index is that it can record a large amount of data's interval location information using only a small number of index markers, with the benefits becoming more pronounced as the data volume increases. Due to the small space occupied by sparse indexes, the index data in primary.idx remains resident in memory.

Index Granularity#

Index granularity marks data based on the length of the data, ultimately marking the data into multiple small segments with index_granularity as the granularity. MergeTree uses MarkRange to represent a specific interval, with start and end indicating its specific range. This parameter affects both the first-level index and the data marking and data files.

The first-level index alone cannot complete the query work; it requires the help of data marking to locate the data. The data files will also generate compressed data blocks based on the interval granularity of index_granularity.

Rules for Generating Index Data#

MergeTree generates an index record for every index_granularity rows of data, with the index value obtained based on the declared primary key fields. The storage of sparse indexes is very compact, with index values tightly arranged in order based on the primary key fields.

Query Process of the Index#

MarkRange in ClickHouse is used to define the marking interval. MergeTree divides a complete data segment into multiple small interval data segments based on the interval granularity of index_granularity, with a specific data segment being a MarkRange.

The entire index query process can be roughly divided into three steps:

(1) Generate query condition intervals: First, convert the query conditions into condition intervals; even a single value query condition will be converted into interval form.

(2) Recursive intersection judgment: In a recursive manner, sequentially check the intersection of the MarkRange numerical intervals with the condition intervals.

  • If there is no intersection, directly optimize this entire MarkRange using a pruning algorithm.
  • If there is an intersection and the MarkRange step size is greater than 8 (end-start), this interval will be further split into 8 sub-intervals (as specified by merge_tree_coarse_index_granularity), and this rule will be repeated to continue recursive intersection judgment.
  • If there is an intersection and the MarkRange cannot be decomposed (step size less than 8), record the MarkRange and return.

(3) Merge MarkRange intervals: Aggregate the final matching MarkRange together, merging their ranges.

MergeTree continuously splits intervals recursively, ultimately locating the MarkRange to the finest granularity to help minimize the scanning range of data during subsequent reads.

Second-Level Index#

The second-level index, also known as a hop index, is constructed from the aggregation information of the data. Different index types have different contents of aggregation information. The purpose is also to help reduce the data scanning range during queries. By default, it is turned off and requires setting allow_experimental_data_skipping_indices to use.

SET allow_experimental_data_skipping_indices = 1

The second-level index needs to be defined within the CREATE statement, supporting declaration in tuple and expression forms. Its complete definition syntax is as follows:

INDEX index_name expr TYPE index_type (...) GRANULARITY granularity

It will additionally generate the corresponding index (skp_idx_[column].idx) and marking files (skp_idx_[column].mrk).

Relationship Between Granularity and Index Granularity#

For hop indexes, index_granularity defines the granularity of the data, while granularity defines the granularity of the aggregation information summary (how many index_granularity intervals of data a single hop index can skip).

Types of Hop Indexes#

MergeTree supports four types of hop indexes: minmax, set, ngrambf_v1, and tokenbf_v1.

Usage of hop indexes:

(1) minmax: Records the minimum and maximum extreme values within a set of data. Its index function is similar to the minmax index of the partition directory, allowing for quick skipping of useless data intervals.

INDEX a ID TYPE minmax GRANULARITY 5

(2) set: Records the values (unique values, no duplicates) of declared fields or expressions. The complete form is set(max_rows), indicating the maximum number of data rows the index can record within an index_granularity. If max_rows=0, it means no limit.

INDEX b (length(ID) * 8) TYPE set(100) GRANULARITY 5

(3) ngrambf_v1: Records a bloom filter for data phrases, supporting string and fixedstring data types. It is only useful for in, notIn, like, equals, and notEquals.

INDEX c (ID, Code) TYPE ngrambf_v1(3,256,2,0) GRANULARITY 5
  • n: Token length, which splits the data into token phrases based on the length of n.
  • size_of_bloom_filter_in_bytes: Size of the bloom filter.
  • number_of_hash_functions: Number of hash functions used in the bloom filter.
  • random_seed: Random seed for the hash function.

(4) tokenbf_v1: A variant of ngrambf_v1. It changes the handling of tokens, automatically splitting tokens based on non-character and numeric strings.

INDEX d ID TYPE tokenbf_v1(256,2,0) GRANULARITY 5

Data Storage#

Independent Storage of Each Column#

Each column corresponds to a field with a corresponding .bin data file, organized and stored in partition directories.

The advantages of columnar independent storage design:

  1. Better data compression (similar types of data stored together are more compression-friendly).
  2. Minimizes the scanning range of data.

Data is compressed, currently supporting several algorithms including LZ4, ZSTD, Multiple, and Delta, with LZ4 being the default. Additionally, data is sorted according to the ORDER BY declaration; finally, data is organized and written into .bin files in the form of compressed data blocks.

Compressed Data Blocks#

A compressed data block consists of header information and compressed data. The header information is fixed at 9 bytes, specifically composed of 1 UInt8 (1 byte) integer and 2 UInt32 (4 bytes) integers, representing the type of compression algorithm used, the size of the compressed data, and the size of the uncompressed data.

The .bin compressed file is composed of multiple compressed data blocks, with the header information of each compressed data block generated based on the CompressionMethod_CompressedSize_UncompressedSize formula.

The clickhouse-compressor tool can query the statistical information of compressed data in .bin files.

The size of each compressed data block is strictly controlled between 64KB and 1MB, with the upper and lower limits specified by min_compress_block_size (65536) and max_compress_block_size (1048576). The final size of a compressed data block is related to the actual size of the data within an interval (index_granularity).

The specific writing process of MergeTree data follows the principles based on index granularity, processing data in batches. If the uncompressed size is set to size, the entire writing process follows these principles:

(1) For a single batch of data size<64KB: Continue to fetch the next batch of data until accumulated to size>64KB, generating the next compressed data block.

(2) For a single batch of data 64KB <=size<=1MB: Directly generate the next compressed data block.

(3) For a single batch of data size>=1MB: First truncate to 1MB and generate the next compressed data block. The remaining data will continue to execute according to the above rules, which may result in a single batch of data generating multiple compressed data blocks.

The purpose of introducing compressed data blocks in .bin files has at least two reasons:

  1. Although data compression can effectively reduce data size, lowering storage space and accelerating data transmission efficiency, the actions of compressing and decompressing data also incur additional performance losses. Therefore, it is necessary to control the size of compressed data to seek a balance between performance loss and compression rate.

  2. When reading a specific column of data, it is first necessary to load the compressed data into memory and decompress it. By using compressed data blocks, the reading granularity can be reduced to the level of compressed data blocks without reading the entire .bin, further narrowing the data reading range.

Data Marking#

Principles for Generating Data Markings#

Data markings and index intervals are aligned, both spaced according to index_granularity granularity. Therefore, it is possible to directly find the corresponding data marking using the index interval's index number.

The marking data file corresponds one-to-one with the .bin file, used to record the offset information of data in the .bin file.

A single marking data is represented by a tuple, containing two integer values of offset information. These represent the starting offset of the compressed data block in the corresponding .bin compressed file within this data interval; and the starting offset of the uncompressed data after decompressing that data block.

Marking data differs from first-level index data; it cannot remain in memory but uses an LRU caching strategy to speed up its access.

Working Method of Data Marking#

When reading data, it is necessary to find the required data through the positional information of marking data. The entire lookup process can be roughly divided into two steps: reading compressed data blocks and reading data.

How MergeTree locates compressed data blocks and reads data:

(1) Reading compressed data blocks: When querying a specific column of data, MergeTree does not need to load the entire .bin file at once; instead, it can load only the specific compressed data blocks as needed, relying on the offset information saved in the marking file.

(2) Reading data: When reading decompressed data, MergeTree does not need to scan the entire decompressed data segment at once; it can load a specific small segment based on the granularity of index_granularity.

Summary of Collaboration Between Partitions, Indexes, Markings, and Compressed Data#

Writing Process#

The first step in data writing is generating partition directories. With each batch of data written, a new partition directory is generated. At a later time, directories belonging to the same partition will be merged according to rules; then, based on index_granularity, primary.idx first-level indexes or second-level indexes will be generated, along with the .mrk marking data and .bin compressed data files for each column field.

Query Process#

The essence of data querying can be seen as a process of continuously reducing the data range. Ideally, MergeTree can first minimize the scanning range of data using partition indexes, first-level indexes, and second-level indexes. Then, by using data markings, it can further minimize the range of data that needs to be decompressed and calculated.

If a query statement does not specify any WHERE conditions, or if it specifies WHERE conditions but does not match any indexes (partition indexes, first-level indexes, and second-level indexes), then MergeTree cannot preemptively reduce the data range. During subsequent data queries, it will scan all partition directories and the maximum range of index segments within those directories. Although it cannot reduce the data range, MergeTree can still read multiple compressed data blocks simultaneously using multithreading, improving performance.

Correspondence Between Data Markings and Compressed Data Blocks#

Due to the division of compressed data blocks being related to the size of data within an interval (index_granularity), the size of each compressed data block is strictly controlled between 64KB and 1MB. Each interval of data will only produce one marking data. Therefore, based on the actual byte size of data within an interval, there will be three different correspondence relationships between data markings and compressed data blocks.

Many-to-One#

Multiple data markings correspond to a single compressed data block when the uncompressed size of data within an interval is less than 64KB.

One-to-One#

A single data marking corresponds to a single compressed data block when the uncompressed size of data within an interval is greater than or equal to 64KB and less than or equal to 1MB.

One-to-Many#

A single data marking corresponds to multiple compressed data blocks when the compressed size of data within an interval exceeds 1MB.

Chapter 7 MergeTree Series Table Engines#

Currently, in ClickHouse, table engines can be roughly divided into six series based on characteristics: MergeTree, external storage, memory, file, interface, and others. Each series of table engines has its own characteristics and usage scenarios.

MergeTree#

As the most basic table engine in the family series, MergeTree provides features such as data partitioning, first-level indexes, and second-level indexes.

Data TTL#

TTL represents the lifespan of data. In MergeTree, TTL can be set for a specific column field or the entire table. When the time is reached, if it is a column-level TTL, the data in that column will be deleted; if it is a table-level TTL, the data in the entire table will be deleted; if both column-level and table-level TTLs are set, the one that expires first takes precedence.

TTL must rely on a DateTime or Date type field, indicating the expiration time of TTL through the INTERVAL operation on this time field.

Column-Level TTL#

To set a column-level TTL, you need to declare the TTL expression for them when defining the table fields. The primary key field cannot be declared with TTL.

create table ttl_table_v1 (
	id String,
	create_time DateTime,
	code String TTL create_time + INTERVAL 10 SECOND,
	type UInt8 TTL create_time + INTERVAL 10 SECOND
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(create_time)
ORDER BY id

Currently, ClickHouse does not provide a method to cancel column-level TTL.

Table-Level TTL#

To set a TTL for the entire table, you need to add a TTL expression in the MergeTree table parameters.

create table ttl_table_v2 (
	id String,
	create_time DateTime,
	code String TTL create_time + INTERVAL 10 SECOND,
	type UInt8 TTL create_time + INTERVAL 10 SECOND
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(create_time)
ORDER BY create_time
TTL create_time + INTERVAL 1 DAY

Currently, there is also no way to cancel table-level TTL.

Mechanism of TTL Operation#

If a TTL expression is set, when writing data, a file named ttl.txt will be generated in each partition directory based on the data partition, saving the relevant information of TTL through JSON:

  • columns is used to save column-level TTL information.
  • table is used to save table-level TTL information.
  • min and max save the minimum and maximum values of the specified date field for TTL within the current data partition, respectively, along with the timestamps calculated from the INTERVAL expression.

The general processing logic is:

  1. MergeTree records the expiration time through the ttl.txt file at the partition directory level, which will serve as the basis for subsequent judgments.
  2. Each time a batch of data is written, a ttl.txt file will be generated for that partition based on the calculation results of the INTERVAL expression.
  3. The logic for deleting TTL expired data will only be triggered when MergeTree merges partitions.
  4. When selecting partitions to delete, a greedy algorithm will be used, aiming to find the partition that will expire the earliest and is also the oldest (more merge times, larger MaxBlockNum).
  5. If all data in a certain column of a partition is deleted due to TTL expiration, then the newly generated partition directory after merging will not contain the data files (both .bin and .mrk) for that column field.

Note:

  1. The default merge frequency for TTL is controlled by the merge_with_ttl_timeout parameter of MergeTree, defaulting to 86400 seconds (one day). It maintains a dedicated TTL task queue, which is different from MergeTree's regular merge tasks. If this value is set too low, it may lead to performance loss.
  2. In addition to passive triggering of TTL merges, the optimize command can also be used to force a merge.
optimize table table_name
optimize table table_name FINAL   # Trigger merging of all partitions
  1. ClickHouse currently does not provide a method to delete TTL declarations, but it does provide a way to control the start and stop of global TTL merge tasks.
SYSTEM STOP/START TTL MERGES

Multi-Path Storage Strategy#

MergeTree implements the functionality of custom storage strategies, supporting writing partition directories to multiple disk directories with data partitions as the minimum moving unit.

Currently, there are three types of storage strategies:

  • Default strategy: The original storage strategy of MergeTree, requiring no configuration. All partitions will be automatically saved to the path specified in config.xml.
  • JBOD strategy: This strategy is suitable for servers with multiple disks mounted but without RAID. JBOD is a round-robin strategy where each time an INSERT or MERGE is executed, the new partition generated will be written to each disk in a round-robin manner. This strategy is similar to RAID 0, reducing the load on a single disk and potentially increasing the performance of parallel data read and write under certain conditions. If a single disk fails, the data written using the JBOD strategy will be lost.
  • HOT/COLD strategy: This strategy is suitable for servers with different types of disks mounted. It divides storage disks into HOT and COLD areas. The HOT area uses high-performance storage media like SSDs, focusing on access performance; the COLD area uses high-capacity storage media like HDDs, focusing on access economy. When data accumulates to a threshold, it will automatically move to the COLD area. Additionally, multiple disks can be defined within each area, so the JBOD strategy can also be applied during the writing process within a single area.

ReplacingMergeTree#

ReplacingMergeTree has a primary key, but it does not have a unique key constraint. This means that even if multiple rows of data have the same primary key, they can still be written normally. ReplacingMergeTree is designed for data deduplication and can delete duplicate data during partition merging.

The sorting key declared in ORDER BY serves as the basis for determining whether data is duplicate. ReplacingMergeTree deletes duplicate data on a per-partition basis. Only duplicate data within the same data partition can be deleted, while duplicate data across different data partitions cannot be removed.

Processing logic:

  • Use the ORDER BY sorting key as the unique key for judging duplicate data.
  • The logic for deleting duplicate data is only triggered during partition merging.
  • Duplicate data is deleted on a per-partition basis. When partitions are merged, duplicate data within the same partition will be deleted; duplicate data across different partitions will not be deleted.
  • During data deduplication, since the data within the partition has already been sorted based on ORDER BY, it can find adjacent duplicate data.
  • There are two strategies for data deduplication:
    • If no version number (ver) is set, the last row in the group of duplicate data will be retained.
    • If a version number (ver) is set, the row with the maximum value of the ver field in the group of duplicate data will be retained.

SummingMergeTree#

It can aggregate and summarize data according to predefined conditions when merging partitions, combining multiple rows of data under the same group into a single row, which reduces the number of data rows and lowers the overhead of subsequent summary queries.

Processing logic:

  • Use the ORDER BY sorting key as the condition key for aggregating data.
  • The aggregation logic is only triggered during partition merging.
  • Data is aggregated on a per-partition basis. When partitions are merged, data with the same aggregation key within the same data partition will be merged, while data across different partitions will not be aggregated.
  • If aggregation columns (non-primary key numeric fields) are specified when defining the engine, those columns will be summed; if not specified, all non-primary key numeric fields will be aggregated.
  • During data aggregation, since the data within the partition has already been sorted based on ORDER BY, it can find adjacent data with the same aggregation key.
  • In aggregating data, multiple rows of data with the same aggregation key within the same partition will be merged into one row. The aggregated fields will be summed; for non-aggregated fields, the value from the first row will be used.
  • Supports nested structures, but column field names must end with the Map suffix. In nested types, the first field is used as the aggregation key by default. Any field name ending with Key, Id, or Type, other than the first field, will be combined with the first field to form a valid key.

AggregatingMergeTree#

Processing logic:

  • Use the ORDER BY sorting key as the condition key for aggregating data.
  • Use the AggregateFunction field type to define the type of aggregation function and the fields to aggregate.
  • The aggregation calculation logic can only be triggered during partition merging.
  • Data is aggregated on a per-partition basis. When partitions are merged, data with the same aggregation key within the same data partition will be merged, while data across different partitions will not be calculated.
  • During data calculation, since the data within the partition has already been sorted based on ORDER BY, it can find adjacent data with the same aggregation key.
  • In aggregating data, multiple rows of data with the same aggregation key within the same partition will be merged into one row. For non-primary key, non-AggregateFunction type fields, the value from the first row will be used.
  • Fields of type AggregateFunction are stored in binary format. When writing data, the *State function needs to be called; when querying data, the corresponding *Merge function needs to be called. Here, * represents the aggregation function used during definition.
  • AggregatingMergeTree is typically used as a table engine for materialized views, in conjunction with ordinary MergeTree.

Chapter 9 Data Querying#

ClickHouse's parsing of SQL statements is case-sensitive, meaning that SELECT a and SELECT A represent different semantics.

WITH Clause#

ClickHouse supports CTE (Common Table Expression) to enhance the expressiveness of query statements. This form can greatly improve the readability and maintainability of statements. CTE is represented by the WITH clause, currently supporting four usages.

  1. Defining Variables
    Variables can be defined, which can be directly accessed in subsequent query clauses.
WITH 10 AS start
SELECT number FROM system.numbers
WHERE number > start
LIMIT 5
  1. Calling Functions
WITH SUM(data_uncompressed_bytes) AS bytes
SELECT database, formatReadableSize(bytes) AS format FROM system.columns
GROUP BY database
ORDER BY bytes DESC
  1. Defining Subqueries
WITH(
	SELECT SUM(data_uncompressed_bytes) FROM system.columns
) AS total_bytes
SELECT database, (SUM(data_uncompressed_bytes)/total_bytes) * 100 AS database_disk_usage
FROM system.columns
GROUP BY database
ORDER BY database_disk_usage DESC

When using subqueries in WITH, it should be noted that the query statement can only return one row of data; if the result set contains more than one row, an exception will be thrown.

  1. Reusing WITH in Subqueries
WITH (round(database_disk_usage)) AS database_disk_usage_v1 
SELECT database, database_disk_usage, database_disk_usage_v1 
FROM (
    WITH (SELECT SUM(data_uncompressed_bytes) FROM system.columns) AS total_bytes 
    SELECT database, (SUM(data_uncompressed_bytes)/total_bytes)*100 AS database_disk_usage 
    FROM system.columns 
    GROUP BY database 
    ORDER BY database_disk_usage DESC
)

FROM Clause#

The FROM clause indicates where to read data from, currently supporting three forms:

  1. Fetching data from a data table
SELECT watch_id FROM hits_v1
  1. Fetching data from a subquery
SELECT max_watch_id from (SELECT MAX(watch_id) from hits_v1)
  1. Fetching data from a table function
SELECT number FROM numbers(5)

After the FROM clause, the Final modifier can be used, which can be used in conjunction with table engines like CollapsingMergeTree and VersionedCollapsingMergeTree to perform query operations that force merging during the query process. However, since the Final modifier reduces query performance, it should be avoided whenever possible.

SAMPLE Clause#

The SAMPLE clause can implement data sampling functionality, allowing queries to return only sampled data rather than all data, effectively reducing load. The sampling mechanism of the SAMPLE clause is an idempotent design, meaning that under unchanged data conditions, using the same sampling rules will always return the same data. Therefore, this feature is very suitable for scenarios where approximate query results are acceptable.

The SAMPLE clause can only be used with data tables of the MergeTree series and requires declaring a sampling expression during table creation.

CREATE TABLE hits_v1 (
	counterid UInt64,
	EventDate DATE,
	UserID UInt64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventDate)
ORDER BY (counterid, intHash32(UserID))
SAMPLE BY intHash32(UserID)

When declaring a sample key, it should be noted:

  • The expression declared in sample by must also be included in the primary key declaration.
  • The sample key must be of Int type; if not, ClickHouse will not report an error during the create table operation but will throw an exception during querying.

Three usages of the clause:

  1. SAMPLE factor
    SAMPLE factor indicates sampling by factor coefficient, where factor represents the sampling factor, which can take values between 0 and 1. If factor is set to 0 or 1, it is equivalent to not performing data sampling.

  2. SAMPLE rows
    SAMPLE rows indicates sampling by sample size, where rows represents the minimum number of rows to sample, and its value must be an integer greater than 1. If the value of rows exceeds the total number of rows in the table, it is equivalent to rows=1 (i.e., no sampling).

  3. SAMPLE factor OFFSET n
    Indicates sampling by factor coefficient and offset, where factor represents the sampling factor, and n indicates how much data to offset before starting sampling. Both values are between 0 and 1.

ARRAY JOIN Clause#

The ARRAY JOIN clause allows for JOIN operations with arrays or nested type fields within the data table, expanding one row of an array into multiple rows. Currently, it supports two join strategies: inner and left.

  1. Inner Array Join
    By default, ARRAY JOIN uses the INNER JOIN strategy. Data is expanded into multiple rows based on the value array, excluding empty arrays.

  2. LEFT ARRAY JOIN
    Does not exclude empty arrays. When performing array join operations on multiple array fields, the query's calculation logic merges by row rather than producing a Cartesian product.

JOIN Clause#

The syntax includes two parts: join precision and join type. Join precision is divided into ALL, ANY, and ASOF, while join types can be divided into outer join, inner join, and cross join.

Join Precision#

Join precision determines the strategy used for JOIN queries when connecting data. Currently, it supports ALL, ANY, and ASOF types. The default is ALL, which can be modified through the join_default_strictness configuration parameter.

ALL#

If a row of data in the left table matches multiple rows in the right table, it returns all matching data from the right table, with the basis for matching being that the values of the join keys in the left and right tables are exactly equal, equivalent to left.key=right.key.

ANY#

If a row of data in the left table matches multiple rows in the right table, it returns the first row of matching data from the right table. The basis for matching in ANY and ALL is the same.

ASOF#

ASOF is a fuzzy join that allows for appending a fuzzy matching condition asof_column after the join key.

SELECT a.id, a.name, b.rate, a.time, b.time FROM join_tab1 AS a ASOF INNER JOIN join_tb2 AS b ON a.id=b.id AND a.time = b.time

Here, a.id = b.id is the usual join key, and a.time=b.time is the asof_column fuzzy join condition, semantically equivalent to a.time>= b.time.

Note: asof_column must be of integer, floating-point, or date types that form ordered sequences. asof_column cannot be the unique field in the data table; in other words, the join key (JOIN_KEY) and asof_column cannot be the same field.

Join Type#

Join type determines the strategy used to combine the left and right data sets in JOIN queries, resulting in intersections, unions, Cartesian products, or other forms.

INNER#

INNER JOIN represents an inner join, which traverses data based on the left table and finds rows from the right table that connect with the left, returning only the intersection of the left and right data sets, while excluding the rest.

OUTER#

Represents an outer join, which can be further divided into left outer join (LEFT), right outer join (RIGHT), and full outer join (FULL). Depending on the join form, the logic for returning data sets varies.

  1. LEFT
    In a left outer join query, data is traversed row by row based on the left table, finding rows from the right table to supplement attributes. If no matching row is found in the right table, the corresponding field's default value is used to fill in. In other words, for left join queries, all data from the left table is always returned.

  2. RIGHT
    The effect of a right outer join query is exactly the opposite of a left join; all data from the right table is always returned, while data from the left table that cannot be connected is filled with default values. The internal execution logic during a right outer join query is roughly as follows:
    (1) Internally perform an inner join query similar to INNER JOIN, while calculating the intersection part, also recording the rows in the right table that could not be connected.
    (2) Append those unconnected rows to the end of the intersection.
    (3) Fill in the default values for the columns belonging to the left table in the appended data.

  3. FULL
    The internal execution logic of a full outer join is roughly as follows:
    (1) Internally perform a query similar to LEFT JOIN, while recording the rows in the right table that have already been connected during the left outer join process.
    (2) By recording the rows in the right table that have been connected, obtain the unconnected rows.
    (3) Append the unconnected rows from the right table to the result set, filling in the default values for the columns belonging to the left table.

CROSS#

Represents a cross join, which returns the Cartesian product of the left and right data sets.
Cross Join Diagram

Multi-Table Joins#

When performing join queries on multiple data tables, ClickHouse converts them into pairwise join forms.

Notes#

Regarding JOIN query notes:

  1. About Performance
    To optimize JOIN query performance, the principle of left large and right small should first be followed. Secondly, JOIN queries currently do not support caching. Finally, if it is a query scenario with a large number of dimension attributes to be supplemented, it is recommended to use dictionaries instead of JOIN queries.

  2. About Null Value Strategy and Abbreviated Forms
    The null values in join queries are filled with default values. The null value strategy for join queries is specified by the join_use_nulls parameter, defaulting to 0. When the parameter value is 0, null values are filled with the default values of the data type; when the parameter value is 1, null values are filled with Null.

WHERE and PREWHERE Clauses#

The WHERE clause implements data filtering based on conditional expressions. If the filtering condition happens to be a primary key field, it can further accelerate the query using indexes, so the WHERE clause is a criterion for whether a query statement uses indexes.

PREWHERE can currently only be used with MergeTree series table engines. It can be seen as an optimization of WHERE, serving the same purpose of filtering data. The difference is that when using PREWHERE, only the columns declared in SELECT will be read first to supplement other attributes. In some cases, PREWHERE may process less data than WHERE, resulting in higher performance.

In the following situations, WHERE will not be automatically optimized:

  • Using constant expressions.
  • Using fields with default values of ALIAS type.
  • Including queries with arrayJoin, globalIn, globalNotIn, or indexHint.
  • The column fields in the SELECT query are the same as the WHERE predicate.
  • Using primary key fields.

When using PREWHERE for primary key queries, it will first filter the data interval through the sparse index, then read the condition columns specified by PREWHERE for further filtering. This may potentially truncate the tail of the data interval, returning a data range below the granularity of index_granularity. Even so, compared to the performance improvement brought by moving predicates in other scenarios, this effect is still limited.

GROUP BY Clause#

Represents aggregation queries. This is where ClickHouse's outstanding performance shines. The expressions declared after GROUP BY become aggregation keys or keys, and data will be aggregated based on the aggregation keys.

In some cases, aggregation functions like any, max, and min can be used to access columns outside the aggregation keys.

WITH ROLLUP#

ROLLUP can roll up data from right to left based on the aggregation keys, generating subtotals and totals based on aggregation functions.

WITH CUBE#

CUBE generates subtotal information based on all combinations of aggregation keys, similar to a cube model.

WITH TOTALS#

Will total all data based on aggregation functions.

HAVING Clause#

Must appear alongside GROUP BY and cannot be used alone. It allows for secondary filtering of data after aggregation calculations.

ORDER BY Clause#

The ORDER BY clause specifies the order in which the query data is returned by declaring sorting keys. In MergeTree, after specifying ORDER BY, data within each partition will be sorted according to its defined rules, which is a form of local sorting within the partition. If data spans multiple partitions during a query, their return order is unpredictable, and the order returned may differ with each query. In such cases, if data needs to be returned in a consistent order, the ORDER BY clause must be used to specify a global order.

ORDER BY can define multiple sorting keys, each followed by ASC (ascending) or DESC (descending) to determine the sorting order. The default is ASC.

  1. NULLS LAST places null values at the end, which is the default behavior.
  2. NULLS FIRST places null values at the front.

LIMIT BY Clause#

LIMIT BY runs after ORDER BY and before LIMIT, allowing for the return of up to the first n rows of data based on specified partitions. Commonly used in TOP N query scenarios.

LIMIT Clause#

The LIMIT clause is used to return the specified top N rows of data, commonly used in pagination scenarios. The three syntax formats are as follows:

limit n
limit n offset m
limit m,n

When using the LIMIT clause, it should be noted that if the data spans multiple partitions, the data returned by each LIMIT query may differ if a global order is not specified using ORDER BY.

SELECT Clause#

Determines which column fields or expressions the query statement will ultimately return. Although SELECT is at the beginning of the SQL statement, it is executed after the aforementioned clauses. After other clauses are executed, SELECT applies the selected fields or expressions to each row of data. If the wildcard * is used, all fields of the data table will be returned.

DISTINCT Clause#

Used to remove duplicate data. It can be used simultaneously with GROUP BY, as they are complementary rather than mutually exclusive.

UNION ALL Clause#

The UNION ALL clause can combine the two sets of subqueries from the left and right, returning the results together. In a single query, multiple UNION ALL declarations can be made to combine multiple sets of queries, but UNION ALL cannot be directly used with other clauses.

Query SQL Execution Plan#

(1) By setting the ClickHouse service log to DEBUG or TRACE level, you can indirectly achieve EXPLAIN queries to analyze the SQL execution logs.
(2) ClickHouse can only print plan logs if the SQL query is actually executed, so if the table's data volume is large, it is best to use the LIMIT clause to reduce the number of returned results.
(3) Avoid using SELECT * for full field queries.
(4) Make full use of various indexes (partition indexes, first-level indexes, second-level indexes) to avoid full table scans.

clickhouse-client -h 127.0.0.1 -u uts --port 9100 --password ~Uts2020db --send_logs_level=trace <<< "select timestamp,sip4,sip6,sport,dip4,dip6,dport,protocol,app_proto,app_desc_cn,dmac,s_card_name,s_device_hash,tx_bytes,tx_pkts,rx_bytes,rx_pkts,isIPv4,direct,first_time,last_time,sid from uts.storage_session prewhere sid global in (select sid from uts.storage_session prewhere timestamp > 1698826475 and timestamp <= 1699337215 and msgtype = 12 order by timestamp desc limit 10 offset 2357) order by timestamp desc" > /dev/null

Chapter 10 Replicas and Shards#

A cluster is the foundation of replicas and shards, extending the ClickHouse service topology from a single point to multiple nodes. However, it does not require all nodes to form a single large cluster like some systems in the Hadoop ecosystem. ClickHouse's cluster configuration is very flexible; users can either form all nodes into a single cluster or divide nodes into multiple smaller clusters according to business needs. In each small cluster area, the number of nodes, partitions, and replicas can vary.

Diagram of Single Cluster and Multiple Clusters

From a functional perspective, the work of the ClickHouse cluster is more focused on the logical layer. The cluster defines the topological relationships of multiple nodes, which may work together in subsequent service processes, while the specific execution tasks are left to replicas and shards.

Distinction Between Replicas and Shards#

  1. From the data perspective: If N nodes of ClickHouse form a cluster, and there is a data table Y on each node that has the same result, if the data in N1's Y and N2's Y is completely different, then N1 and N2 are shards. If their data is completely identical, then they are replicas.
  2. From the functional perspective: The primary purpose of using replicas is to prevent data loss and increase data storage redundancy; while the primary purpose of using shards is to achieve horizontal data partitioning.

Data Replicas#

Only by using the ReplicatedMergeTree series of replicated table engines can the capabilities of replicas be applied. Alternatively, using a ReplicatedMergeTree data table is itself a replica.

Characteristics of Replicas#

As the main implementation carrier of data replicas, ReplicatedMergeTree has some significant characteristics in its design.

  • Relies on ZooKeeper: When executing INSERT and ALTER queries, ReplicatedMergeTree requires the distributed coordination capabilities of ZooKeeper to synchronize between multiple replicas. However, querying replicas does not require using ZooKeeper.
  • Table-level replicas: Replicas are defined at the table level, so the replica configuration for each table can be personalized according to its actual needs, including the number of replicas and their distribution within the cluster.
  • Multi-master architecture: INSERT and ALTER queries can be executed on any replica, and their effects are the same. These operations will be distributed to each replica for local execution using ZooKeeper's coordination capabilities.
  • Block data chunks: When executing INSERT commands to write data, the data will be split into several Block data chunks based on the size of max_insert_block_size (default 1048576 rows), so Block data chunks are the basic unit of data writing and have atomicity and uniqueness in writing.
  • Atomicity: When writing data, either all data in a block is successfully written, or all fail.
  • Uniqueness: When writing a Block data chunk, the data order, data size, and number of data rows in the current Block data chunk are used to calculate hash information and record it. If a Block data chunk being written has the same hash summary as a previously written Block data chunk (with the same data order, data size, and number of data rows), that Block data chunk will be ignored.

Definition Form of Replicas#

The benefits of using replicas include:

  • Increased redundancy in data storage, thus reducing the risk of data loss.
  • Replicas adopt a multi-master architecture, so each replica instance can serve as a data read and write entry, distributing the load on nodes.

Principles of ReplicatedMergeTree#

Data Structure#

In the core logic, a lot of ZooKeeper's capabilities are used to achieve coordination between multiple ReplicatedMergeTree replica instances, including primary replica election, replica state awareness, operation log distribution, task queues, and Block ID deduplication judgment. When executing INSERT data writing, MERGE partitioning, and MUTATION operations, communication with ZooKeeper is involved. However, no table data transmission is designed during communication, and data is not accessed through ZooKeeper when querying.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.