Clickhouse — useful SQL queries and efficient querying techniques

Sairam Krish
4 min readSep 8, 2023

--

While using clickhouse as a OLAP database, at times, we spend lot of time in going through the official documentation to find the right function that suite our needs. We may have something in mind but translating that to how it’s done in Clickhouse may be time consuming.

Here we will try to capture some useful SQL snippets:

WITH statement & usage of Scalar

WITH (
SELECT arrayStringConcat(groupArray(host_name), ',') FROM system.clusters
WHERE cluster = 'cluster_name'
GROUP BY cluster
) as host_names
SELECT host_names
  • The above query showcases, WITH statement
  • It also uses Scalar value ( host_names in above case )
  • They WITH first part is the query and the second part is the variable name. The other way won’t work

Database size | Table size | Finding large tables

-- Total data size
select
sum(rows) as rows,
formatReadableSize(sum(bytes)) as disk_size,
formatReadableSize(sum(primary_key_bytes_in_memory)) as primary_keys_size,
any(engine) as engine,
sum(bytes) as bytes_size,
sum(primary_key_bytes_in_memory) as primary_key_bytes_in_memory_sum
from system.parts
where active
group by database
-- View top 10 tables by size
select parts.*,
columns.compressed_size,
columns.uncompressed_size,
columns.compression_ratio,
columns.compression_percentage
from (
select table,
formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_size,
formatReadableSize(sum(data_compressed_bytes)) AS compressed_size,
round(sum(data_compressed_bytes) / sum(data_uncompressed_bytes), 3) AS compression_ratio,
round((100 - (sum(data_compressed_bytes) * 100) / sum(data_uncompressed_bytes)), 3) AS compression_percentage

from system.columns
group by table
) columns
right join (
select table,
sum(rows) as rows,
max(modification_time) as latest_modification,
formatReadableSize(sum(bytes)) as disk_size,
formatReadableSize(sum(primary_key_bytes_in_memory)) as primary_keys_size,
any(engine) as engine,
sum(bytes) as bytes_size,
sum(primary_key_bytes_in_memory) as primary_key_bytes_in_memory_sum
from system.parts
where active
group by database, table
) parts on columns.table = parts.table
order by parts.primary_key_bytes_in_memory_sum desc
LIMIT 10
;

Export and import table to AWS S3 in parquet format

-- export as parquet
INSERT INTO FUNCTION
s3(
'https://bucket-name.s3.amazonaws.com/clickhouse_backup/backup-2023_09_02_students.parquet',
'Parquet'
)
SELECT * FROM students;

-- assumes students table already exists in destination
INSERT INTO students
SELECT * FROM
s3(
'https://bucket-name.s3.amazonaws.com/clickhouse_backup/backup-2023_09_02_students_parquet',
'Parquet'
);

Backup and restore schema alone

-- backup in async mode
BACKUP DATABASE default
ON CLUSTER 'my-cluster'
TO S3(
'https://bucket-name.s3.amazonaws.com/clickhouse_backup/backup-2023_09_02'
)
SETTINGS structure_only=true ASYNC;

-- status of async
-- replace id with the one from above
-- It may take a while based on number of tables
-- If things goes wrong, error would appear in table
SELECT
*
FROM system.backups
where id='2ab09f62-7797-41a0-b310-60f8af9f55ee'
FORMAT Vertical;

SET connect_timeout=3600;
RESTORE DATABASE default
ON CLUSTER 'my-cluster'
FROM S3(
'https://bucket-name.s3.amazonaws.com/clickhouse_backup/backup-2023_09_02'
) ASYNC;

-- status of async
-- replace id with the one from above
-- It may take a while based on number of tables
-- If things goes wrong, error would appear in table
SELECT
*
FROM system.backups
where id='2ab09f62-7797-41a0-b310-60f8af9f55ee'
FORMAT Vertical;

Details of table in a cluster

-- View all copies of this table what is persisted in all clickhouse nodes for this table
SELECT
hostname() AS h,
*
FROM
remote(
'chi-xxxx-xxxx-{0,1,2}-{0,1,2}',
db_name,
students_local
);


-- row count in different pods
SELECT
hostname() AS h,
count() AS c
FROM
remote(
'chi-xxxx-xxxx-{0,1,2}-{0,1,2}',
db_name,
students_local
);

Generator based streaming results

There are different language drivers for clickhouse. The technique mentioned here uses python’s clickhouse-driver. However the concept should be applicable in other language drivers as well.

client = Clickhouse.create_client()
row_iterator = client.execute_iter(
f"""
SELECT *
FROM large_table
""",
settings={'max_block_size': 100000},
)

for row in row_iterator:
print(row)

# disconnect after processing all the rows
# Since it's streaming, if we close the connection, it would end the stream abruptly
client.disconnect()

Enabling multiple queries and multi line statements

clickhouse-client --multiline --multiquery

Using variable within statements

SET param_table_name = 'distributed_table_name';
SET param_table_name_local = 'distributed_table_name_local';
DROP table IF EXISTS {table_name_local: Identifier } ON CLUSTER 'cluster_name' SYNC;
DROP table IF EXISTS {table_name: Identifier} ON CLUSTER 'cluster_name' SYNC;
  • I couldn’t figure out how to use variable within another variable. So we see duplication in variable content above.

Find tables that are not in all replicas

At times, zookeeper causes sync issues and debugging replication and sharding issues are hard. Following snippets could help identify problematic tables

-- Using variables
SET param_replica_count = 3;

-- finding based on table name convention

SELECT
name AS table_name,
count() AS replica_count
FROM clusterAllReplicas('cluster_name', 'system.tables')
WHERE name ILIKE '%_local%'
GROUP BY table_name
HAVING replica_count < {replica_count: UInt8};

-- Only for ReplicatedMergeTree tables
SELECT
name AS table_name,
count() AS replica_count
FROM clusterAllReplicas('cluster_name', 'system.tables')
WHERE engine = 'ReplicatedMergeTree'
GROUP BY table_name
HAVING replica_count < {replica_count: UInt8};

Find data missing in any replicas

SELECT
table_name,
engine,
host0,
host1,
host2,
replica_count
FROM (
SELECT
name AS table_name,
engine,
MAX(CASE WHEN hostname()='chi-pod-name-0-0-0' THEN total_rows END) AS host0,
MAX(CASE WHEN hostname()='chi-pod-name-0-1-0' THEN total_rows END) AS host1,
MAX(CASE WHEN hostname()='chi-pod-name-0-2-0' THEN total_rows END) AS host2,
COUNT(*) AS replica_count
FROM clusterAllReplicas('cluster_name', 'system.tables')
WHERE engine IN ('Distributed', 'ReplicatedMergeTree', 'View')
GROUP BY name, engine
) AS subquery
HAVING (
(host0 IS NOT NULL AND host1 IS NOT NULL AND host2 IS NOT NULL
AND NOT (host0 = host1 AND host1 = host2))
OR replica_count < 3
);

Backup status

-- show latest backup status across all clickhouse instances
-- this is for single shard, 3 replica scenario
-- change below value accordingly
SELECT *
FROM remote(
'chi-pod-name-0-{0,1,2}',
system,
backups
)
ORDER BY start_time DESC
LIMIT 1
Format Vertical;

Related articles

--

--