ClickHouse Blog

ClickHouse at Data@Scale 2017
15 June, 21:00


No comments
events,conference,Percona Live,Seattle
ClickHouse Meetup at Santa Clara, May 4, 2017
11 May, 08:00

After Percona Live 2017, Yandex ClickHouse team stayed for one more week in San Francisco Bay Area to meet with local companies in person to talk about ClickHouse and how it can be applied to their tasks. On the last evening we even managed to organize our own meetup with active ClickHouse users in the area, not as large as we regularly host in Russia, but still had some very interesting discussions.

No comments
events,meetup,Bay Area
ClickHouse at Percona Live 2017
28 April, 12:00

For those who haven't heard, Percona Live is probably one of the largest international conferences about opensource database management systems, having 12 talk tracks in parallel. It's been around for many years and initially it was focused mainly on MySQL (and had that in it's name), but nowadays it is more generic and other products of this category get lots of attention too. Needless to say that for a relatively new player on the market like ClickHouse, it's been a great opportunity to spread the word about the technology and how exactly it allows to perform analytics on petabytes of data in real time.

Yandex team members had three chances to talk about ClickHouse from stage:

  1. A large portion of Opening Keynote has been dedicated to different time series databases. ClickHouse is not really a specialized time series database, but still outperforms many alternatives if used as such. So Dmitry Andreev, Head of Yandex.Market Infrastructure Development Group, had a short talk about how ClickHouse can be used a as storage backend for Graphite using Graphouse, an opensource adapter that implements this. This setup is used in Yandex.Market and number of other Yandex services and has proven to be very reliable and effective. Chain of short talks has been followed by live panel about time series in general with same speakers including Dmitry. Unfortunately as we figured out later, many keynote attendees perceived ClickHouse as just yet another time series database and missed the explicitly said part that it opens up way more opportunities to analyze data.
  2. Victor Tarnavsky, Head of Yandex.Metrica, and Alexey Milovidov, Head of ClickHouse Development Group, gave a full-length talk about ClickHouse overview, capabilities, features and use cases. There video has not been recorded, but you can check out the slides.
  3. Later on Dmitry Andreev went deeper on the same topic he covered on an opening keynote. He spoke in more detail about how Graphouse works, shown the benchmark results and future plans of the project. Also only slides are available.

Besides, ClickHouse has been represented on the exhibition accompanying the conference. Altinity, the private company independent from Yandex that provides consulting and support services for ClickHouse, organized the booth and invited Yandex team members to join them to talk about ClickHouse with conference attendees which appeared to be quite productive.

No comments
events,conference,Percona Live,Bay Area,Graphouse
Evolution of data structures in Yandex.Metrica
13 December 2016, 09:00

Yandex.Metrica takes in a stream of data representing events that took place on sites or on apps. Our task is to keep this data and present it in an analyzable form. The real challenge lies in trying to determine what form the processed results should be saved in so that they are easy to work with. During the development process, we had to completely change our approach to data storage organization several times. We started with MyISAM tables, then used LSM-trees and eventually came up with column-oriented database, ClickHouse.

At its founding, Metrica was designed as an offshoot of Yandex.Direct, the search ads service. MySQL tables with MyISAM engine were used in Direct to store statistics and it was natural to use same approach in Metrica. Initially Yandex.Metrica for websites had more than 40 “fixed” report types (for example, the visitor geography report), several in-page analytics tools (like click maps), Webvisor (tool to study individual user actions in great detail), as well as the separate report constructor. But with time to keep up with business goals the system had to become more flexible and provide more customization opportunities for customers. Nowadays instead of using fixed reports Metrica allows to freely add new dimensions (for example, in a keyword report you can break data down further by landing page), segment and compare (between, let's say, traffic sources for all visitors vs. visitors from Moscow), change your set of metrics, etc. These features demanded a completely different approach to data storage than what we used with MyISAM, we will further discuss this transition from technical perspective.


Most SELECT queries that fetch data for reports are made with the conditions WHERE CounterID = AND Date BETWEEN min_date AND max_date. Sometimes there is also filter by region, so it made sense to use complex primary key to turn this into primary key range is read. So table schema for Metrica looks like this: CounterID, Date, RegionID -> Visits, SumVisitTime, etc. Now we'll take a look at what happens when it comes in.

A MyISAM table is comprised of a data file and an index file. If nothing was deleted from the table and the rows did not change in length during updating, the data file will consist of serialized rows arranged in succession in the order that they were added. The index (including the primary key) is a B-tree, where the leaves contain offsets in the data file. When we read index range data, a lot of offsets in the data file are taken from the index. Then reads are issued for this set of offsets in the data file.

Let's look at the real-life situation when the index is in RAM (key cache in MySQL or system page cache), but the table data is not cached. Let's assume that we are using HDDs. The time it takes to read data depends on the volume of data that needs to be read and how many Seek operations need to be run. The number of Seek's is determined by the locality of data on the disk.

Data locality illustrated

Metrica events are received in almost the same order in which they actually took place. In this incoming stream, data from different counters is scattered completely at random. In other words, incoming data is local by time, but not local by CounterID. When writing to a MyISAM table, data from different counters is also placed quite randomly. This means that to read the data report, you will need to perform about as many random reads as there are rows that we need in the table.

A typical 7200 rpm hard disk can perform 100 to 200 random reads per second. A RAID, if used properly, can handle the same amount multiplied by number of disks in it. One five-year-old SSD can perform 30,000 random reads per second, but we cannot afford to keep our data on SSD. So in this case, if we needed to read 10,000 rows for a report, it would take more than 10 seconds, which would be totally unacceptable.

InnoDB is much better suited to reading primary key ranges since it uses a clustered primary key (i.e., the data is stored in an orderly manner on the primary key). But InnoDB was impossible to use due to its slow write speed. If this reminds you of TokuDB, then read on.

It took a lot of tricks like periodic table sorting, complicated manual partitioning schemes, and keeping data in generations to keep Yandex.Metrica working on MyISAM. This approach also had a lot of lot of operational drawbacks, for example slow replication, consistency, unreliable recovery, etc. Nevertheless, as of 2011, we stored more than 580 billion rows in MyISAM tables.

Metrage and OLAPServer

Metrage is an implementation of LSM Tree, a fairly common data structure that works well for workloads with intensive stream of writes and mostly primary key reads, like Yandex.Metrica has. LevelDB did not exist in 2010 and TokuDB was proprietary at the time.

In Metrage arbitrary data structures (fixed at compile time) can be used as “rows” in it. Every row is a key, value pair. A key is a structure with comparison operations for equality and inequality. The value is an arbitrary structure with operations to update (to add something) and merge (to aggregate or combine with another value). In short, it's a CRDT. Data is located pretty locally on the hard disk, so the primary key range reads are quick. Blocks of data are effectively compressed even with fast algorithms because of ordering (in 2010 we used QuickLZ, since 2011 - LZ4). Storing data in a systematic manner enables us to use a sparse index.

Since reading is not performed very often (even though lot of rows are read when it does) the increase in latency due to having many chunks and decompressing the data block does not matter. Reading extra rows because of the index sparsity also does not make a difference.

After transferring reports from MyISAM to Metrage, we immediately saw an increase in Metrica interface speed. Whereas earlier the 90% of page-title reports loaded in 26 seconds, with Metrage they loaded in 0.8 seconds (total time, including time to process all database queries and follow-up data transformations). The time it takes Metrage itself to process queries (for all reports) is as follows according to percent: average = 6 ms, 90tile = 31 ms, 99tile = 334 ms.

We've been using Metrage for five years and it has proved to be a reliable solution. As of 2015 we stored 3.37 trillion rows in Metrage and used 39 * 2 servers for this.

Its advantages were simplicity and effectiveness, which made it a far better choice for storing data than MyISAM. Though the system still had one huge drawback: it really only works effectively with fixed reports. Metrage aggregates data and saves aggregated data. But in order to do this, you have to list all the ways in which you want to aggregate data ahead of time. So if we do this in 40 different ways, it means that Metrica will contain 40 types of reports and no more.

To mitigate this we had to keep for a while a separate storage for custom report wizard, called OLAPServer. It is a simple and very limited implementation of a column-oriented database. It supports only one table set in compile time — a session table. Unlike Metrage, data is not updated in real-time, but rather a few times per day. The only data type supported is fixed-length numbers of 1-8 bytes, so it wasn“t suitable for reports with other kinds of data, for example URLs.


Using OLAPServer, we developed an understanding of how well column-oriented DBMS's handle ad-hoc analytics tasks with non-aggregated data. If you can retrieve any report from non-aggregated data, then it begs the question of whether data even needs to be aggregated in advance, as we did with Metrage.

On the one hand, pre-aggregating data can reduce the volume of data that is used at the moment when the report page is loading. On the other hand, though, aggregated data doesn't solve everything. Here are the reasons why:

  • you need to have a list of reports that your users need ahead of time • in other words, the user can't put together a custom report
  • when aggregating a lot of keys, the amount of data is not reduced and aggregation is useless • when there are a lot of reports, there are too many aggregation options (combinatorial explosion)
  • when aggregating high cardinality keys (for example, URLs) the amount of data does not decrease by much (by less than half)
  • due to this, the amount of data may not be reduced, but actually grow during aggregation
  • users won't view all the reports that we calculate for them (in other words, a lot of the calculations prove useless)
  • it's difficult to maintain logical consistency when storing a large number of different aggregations

As you can see, if nothing is aggregated and we work with non-aggregated data, then it's possible that the volume of computations will even be reduced. But only working with non-aggregated data imposes very high demands on the effectiveness of the system that executes the queries.

So if we aggregate the data in advance, then we should do it constantly (in real time), but asynchronously with respect to user queries. We should really just aggregate the data in real time; a large portion of the report being received should consist of prepared data.

If data is not aggregated in advance, all the work has to be done at the moment the user request it (i.e. while they wait for the report page to load). This means that many billions of rows need to be processed in response to the user's query; the quicker this can be done, the better.

For this you need a good column-oriented DBMS. The market didn‘t have any column-oriented DBMS's that would handle internet-analytics tasks on the scale of Runet (the Russian internet) well enough and would not be prohibitively expensive to license.

Recently, as an alternative to commercial column-oriented DBMS's, solutions for efficient ad-hoc analytics of data in distributed computing systems began appearing: Cloudera Impala, Spark SQL, Presto, and Apache Drill. Although such systems can work effectively with queries for internal analytical tasks, it is difficult to imagine them as the backend for the web interface of an analytical system accessible to external users.

At Yandex, we developed and later opensourced our own column-oriented DBMS — ClickHouse. Let's review the basic requirements that we had in mind before we proceeded to development.

Ability to work with large datasets. In current Yandex.Metrica for websites, ClickHouse is used to store all data for reports. As of November, 2016, the database is comprised of 18.3 trillion rows. It‘s made up of non-aggregated data that is used to retrieve reports in real-time. Every row in the largest table contains over 200 columns.

The system should scale linearly. ClickHouse allows you to increase the size of cluster by adding new servers as needed. For example, Yandex.Metrica's main cluster has increased from 60 to 426 servers in three years. In the aim of fault tolerance, our servers are spread across different data centers. ClickHouse can use all hardware resources to process a single query. This way more than 2 terabyte can be processed per second.

High efficiency. We especially pride ourselves on our database's high performance. Based on the results of internal tests, ClickHouse processes queries faster than any other system we could acquire. For example, ClickHouse works an average of 2.8-3.4 times faster than Vertica. With ClickHouse there is no one silver bullet that makes the system work so quickly.

Functionality should be sufficient for Web analytics tools. The database supports the SQL language dialect, subqueries and JOINs (local and distributed). There are numerous SQL extensions: functions for web analytics, arrays and nested data structures, higher-order functions, aggregate functions for approximate calculations using sketching, etc. By working with ClickHouse, you get the convenience of a relational DBMS.

ClickHouse was initially developed by the Yandex.Metrica team. Furthermore, we were able to make the system flexible and extensible enough that it can be successfully used for different tasks. Although the database can run on large clusters, it can be installed on one server or even on a virtual machine. There are now more than a dozen different ClickHouse applications within our company.

ClickHouse is well equipped for creating all kinds of analytical tools. Just consider: if the system can handle the challenges of Yandex.Metrica, you can be sure that ClickHouse will cope with other tasks with a lot of performance headroom to spare.

ClickHouse works well as a time series database; at Yandex it is commonly used as the backend for Graphite instead of Ceres/Whisper. This lets us work with more than a trillion metrics on a single server.

ClickHouse is used by analytics for internal tasks. Based on our experience at Yandex, ClickHouse performs at about three orders of magnitude higher than traditional methods of data processing (scripts on MapReduce). But this is not a simple quantitative difference. The fact of the matter is that by having such a high calculation speed, you can afford to employ radically different methods of problem solving.

If an analyst has to make a report and they are competent at their job, they won't just go ahead and construct one report. Rather, they will start by retrieving dozens of other reports to better understand the nature of the data and test various hypotheses. It is often useful to look at data from different angles in order to posit and check new hypotheses, even if you don't have a clear goal.

This is only possible if the data analysis speed allows you to conduct online research. The faster queries are executed, the more hypotheses you can test. Working with ClickHouse, one even gets the sense that they are able to think faster.

In traditional systems, data is like a dead weight, figuratively speaking. You can manipulate it, but it takes a lot of time and is inconvenient. If your data is in ClickHouse though, it is much more malleable: you can study it in different cross-sections and drill down to the individual rows of data.


Yandex.Metrica has become the second largest web-analytics system in the world. The volume of data that Metrica takes in grew from 200 million events a day in 2009 to more than 25 billion in 2016. In order to provide users with a wide variety of options while still keeping up with the increasing workload, we've had to constantly modify our approach to data storage.

Effective hardware utilization is very important to us. In our experience, when you have a large volume of data, it's better not to worry as much about how well the system scales and instead focus on how effectively each unit of resource is used: each processor core, disk and SSD, RAM, and network. After all, if your system is already using hundreds of servers, and you have to work ten times more efficiently, it is unlikely that you can just proceed to install thousands of servers, no matter how scalable your system is.

To maximize efficiency, it's important to customize your solution to meet the needs of specific type of workload. There is no data structure that copes well with completely different scenarios. For example, it's clear that key-value databases don't work for analytical queries. The greater the load on the system, the narrower the specialization required. One should not be afraid to use completely different data structures for different tasks.

We were able to set things up so that Yandex.Metrica's hardware was relatively inexpensive. This has allowed us to offer the service free of charge to even very large sites and mobile apps, even larger than Yanex‘s own, while competitors typically start asking for a paid subscription plan.

No comments
Yandex.Metrica,data structures,LSM tree,columnar storage
How to update data in ClickHouse
20 November 2016, 15:00

There is no UPDATE or DELETE commands in ClickHouse at the moment. And that's not because we have some religious believes. ClickHouse is performance-oriented system; and data modifications are hard to store and process optimally in terms of performance.

But sometimes we have to modify data. And sometimes data should be updated in realtime. Don't worry, we have these cases covered.

Work with partitions

Data in MergeTree engine family is partitioned by partition_key engine parameter. MergeTree split all the data by this partition key. Partition size is one month.

That's very useful in many terms. Especially when we're talking about data modification.

Yandex.Metrica hits table

Let's look at an example on Yandex.Metrica server mtlog02-01-1 which store some Yandex.Metrica data for year 2013. Table we are looking at contains user events we call “hits”. This is the engine description for hits table:

ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{layer}-{shard}/hits', -- zookeeper path
    '{replica}', -- settings in config describing replicas
    EventDate, -- partition key column
    intHash32(UserID), -- sampling key
    (CounterID, EventDate, intHash32(UserID), WatchID), -- index
    8192 -- index granularity

You can see that the partition key column is EventDate. That means that all the data will be splitted by months using this column.

With this SQL we can get partitions list and some stats about current partitions:

    count() as number_of_parts, 
    formatReadableSize(sum(bytes)) as sum_size 
    AND database = 'merge' 
    AND table = 'hits' 
GROUP BY partition 
ORDER BY partition;

│ 201306    │               1 │ 191.34 GiB │
│ 201307    │               4 │ 537.86 GiB │
│ 201308    │               6 │ 608.77 GiB │
│ 201309    │               5 │ 658.68 GiB │    
│ 201310    │               5 │ 768.74 GiB │
│ 201311    │               5 │ 654.61 GiB │

There are 6 partitions with a few parts in each of them. Each partition is around 600 Gb of data. Partition is strictly one piece of data for partition key, here we can see that it is months. Part is one piece of data inside partition. Basically it's one node of LSMT structure, so there are not so many of them, especially for old data. If there are too many of them, they merge and form bigger ones.

Partition operations

There is a nice set of operations to work with partitions.

DETACH PARTITION -- Move a partition to the 'detached' directory and forget it.
DROP PARTITION -- Delete a partition.
ATTACH PART|PARTITION -- Add a new part or partition from the 'detached' directory to the table.
FREEZE PARTITION -- Create a backup of a partition.
FETCH PARTITION -- Download a partition from another server.

We can do any data management operations on partitions level: move, copy and delete. Also, special DETACH and ATTACH operations are created to simplify data manipulation. DETACH detaches partition from table, moving all data to detached directory. Data is still there and you can copy it anywhere but detached data is not visible on request level. ATTACH is the opposite: attaches data from detached directory so it become visible.

This attach-detach commands works almost in no time so you can make your updates almost transparently to database clients.

Here is the plan how to update data using partitions:

  • Create modified partition with updated data on another table
  • Copy data for this partition to detached directory
  • DROP PARTITION in main table
  • ATTACH PARTITION in main table

Partition swap especially useful for huge data updates with low frequency. But they're not so handy when you need to update a lot of data in real time.

Update data on the fly

In Yandex.Metrica we have user sessions table. Each row is one session on a website: some pages checked, some time spent, some banners clicked. This data is updated every second: user on a website view more pages, click more buttons, and do other things. Site owner can see that actions in Yandex.Metrica interface in real time.

So how do we do that?

We update data not by updating that data, but adding more data about what have changed. This is usually called CRDT approach, and there is an article on Wikipedia about that.

It was created to solve conflict problem in transactions but this concept also allows updating data. We use our own data model with this approach. We call it Incremental Log.

Incremental Log

Let's look at an example.

Here we have one session information with user identifier UserID, number of page viewed PageViews, time spent on site in seconds Duration. There is also Sign field, we describe it later.

│ 4324182021466249494 │         5 │      146 │    1 │

And let's say we calculate some metrics over this data.

count() -- number of sessions
sum(PageViews) -- total number of pages all users checked
avg(Duration) -- average session duration, how long user usually spent on the website

Let's say now we have update on that: user checked one more page, so we should change PageViews from 5 to 6 and Duration from 146 to 185.

We insert two more rows:

│ 4324182021466249494 │         5 │      146 │   -1 │
│ 4324182021466249494 │         6 │      185 │    1 │

First one is delete row. It's exactly the same row what we already have there but with Sign set to -1. Second one is updated row with all data set to new values.

After that we have three rows of data:

│ 4324182021466249494 │         5 │      146 │    1 │
│ 4324182021466249494 │         5 │      146 │   -1 │
│ 4324182021466249494 │         6 │      185 │    1 │

The most important part is modified metrics calculation. We should update our queries like this:

 -- number of sessions
count() -> sum(Sign)
 -- total number of pages all users checked
sum(PageViews) -> sum(Sign * PageViews)
 -- average session duration, how long user usually spent on the website
avg(Duration) -> sum(Sign * Duration) / sum(Sign)

You can see that it works as expected over this data. Deleted row 'hide' old row, same values come with + and - signs inside aggregation and annihilate each other.

Moreover, it works totally fine with changing keys for grouping. If we want to group data by PageViews, all data for PageView = 5 will be 'hidden' for this rows.

There are some limitations with this approach:

  • It works only for metrics which can be presented through this Sign operations. It covers most cases, but it's not possible to calculate min or max values. There is an impact to uniq calculations also. But it's fine at least for Yandex.Metrica cases, and there are a lot of different analytical calculations;
  • You need to remember somehow old value in external system doing updates, so you can insert this 'delete' rows;
  • Some other effects; there is a great answer on Google Groups.


ClickHouse has support of Incremental Log model in Collapsing engines family.

If you use Collapsing family, 'delete' row and old 'deleted' rows will collapse during merge process. Merge is a background process of merging data into larger chunks. Here is a great article about merges and LSMT structures.

For most cases 'delete' and 'deleted' rows will be removed in terms of days. What's important here is that you will not have any significant overhead on data size. Using Sign field on selects still required.

Also there is FINAL modifier available over Collapsing family. Using FINAL guarantees that user will see already collapsing data, thus using Sign field isn't required. FINAL usually make tremendous performance degradation because ClickHouse have to group data by key and delete rows during SELECT execution. But it's useful when you want to check your queries or if you want to see raw, unaggregated data in their final form.

Future plans

We know that current feature set is not enough. There are some cases which do not fit to limitations. But we have huge plans, and here are some insights what we've preparing:

  • Partitions by custom key: current partitioning scheme is binded to months only. We will remove this limitation and it will be possible to create partitions by any key. All partition operations like FETCH PARTITION will be available.
  • UPDATE and DELETE: there are a lot of issues with updates and deletes support. Performance degradation, consistency guarantees, distributed queries and more. But we believe that if you need to update few rows of data in your dataset, it should not be painful. It will be done.

Feel free to comment or send feedback!

No comments
Yandex opensources ClickHouse
15 June 2016, 11:00

Today analytical DBMS ClickHouse initially developed internally at Yandex, became available to everyone. Source code is published on GitHub under Apache 2.0 license.

ClickHouse allows interactive analytical query execution on data updated in real time. System is able to scale to tens of trillions of rows and petabytes of stored data. Using ClickHouse opens up opportunities that were hard to imagine: you can store full stream of data and slice and dice it to produce reports without offline aggregation. ClickHouse was initially developed as a backend for Yandex.Metrica — second largest web analytics system in the world.

Discussion on Hacker News.

No comments