Cool a COhort OnLine analytical processing system
7 minute read ∼ Filed in : A paper noteIntroduction
Current Problems
Cohort analysis is about comparing a different group of users, and grouping is based on events or times users start a service.
Cohort steps:
Find birth user cohort:
group users experiencing similar given events to an affiliate.Segmentation:
for each cohort, assign the records into diverse segments.Aggregation:
for each segment, measure using a given aggregator.
However, the OLAP system (MonetDB, Druid) is inefficient in processing complex cohort queries. However, they use columnar architecture
to scale up or take advantage of distributed processing
for specific queries
.
It is non-trivial to integrate both of them together.
Contributions
The paper proposes Cool, a cohort OLAP system. It supports both OLAP and emerging cohort queries for data analytics with superb performance.
- The system includes: Two operators for OLAP query (metal chunk selector, data chunk selector). Three operators for cohort query (birth selector, age selector, cohort aggregator.)
- The system design a sophisticated storage layout to optimize query processing and space consumption. It stores precomputation results to boost OLAP execution.
- Scale the system with HDFS and Zookeeper. The system can recover from failures.
- Compare Cool with Druid, and MonetDB in a single-node setting. Compare Cool with SparkSQL and distributed Druid in a distributed setting.
Cool differs with Cohana in following:
- Cool, use more general definition for cohort queries. It employs a sequence of birth events to find the valid users.
- Cool support conventional cube queries.
- Cool can be distributed.
Related Work
The existing system can be divided into:
- conventional OLAP systems built on top of DBS.
- specific query engines for specific query types.
Conventional OLAP
columnar architecture
Most of them use columnar stores
because
- OLAP queries normally scan a large volume of a limited set of columns, and
columnar store
can only load the required column. - In columnar store, the scanned data can be further shrunk by compression such as dictionary encoding, run length encoding. Row re-ordering encoding, etc.
But the data insertion and point query
are worse than row-oriented
database.
So hybrid storage is proposed, it combines both row-oriented store and columnar store together within one system,
eg,. MemSQL, SAP HANA, Oracle Database in-memory Hyper.
Distributed processing
To handle enormous amount of data, OLAP system also supoorts distributed processing. such as Hive, Qylin. But it normally depends on worker resources.
Emerging Query Engines
Point offers a real-time response for iceberg queries.( prevailing type of query selecting a small number of records that satisfy some given conditions.)
Data structure/indexs
Data cube
is widely used for queries involving multiple dimensions from dataset.
Start-Tree
is used to support OLAP queries.
Compression
New compression algorithms also proposed to improve the performance
- PowerDrill use two-level dictionary compression schema to enable processing of trillion record cells in seconds.
- Condensed cube reduce size of cube and imporive the computation time.
Injection
Spark used to handle stream queries. Shark further improves its performance by leveraging distributed memories.
Single Node Architecture
Loader, controller, parser, planner, compressor and executor.
Data Model
Each table is horizontally split into different partitions called cublets.
Each cublet consists of multiple chunks (data chunk, meta chunk, header, header offset)
Storage Layout
Chunks
DataChunk:
Cool transcriptscolumnar data
into field inside DataChunk.MetaChunk:
is used to store the metadatafor each dataChunk
within the same cublet, including number of contained fields and range of each fieldHeader
: store the number of data chunks.Header offset
: store header’s position.
Field Types:
- Range: integer, float, time,
- world: string, event.
the system has different storage plan for different filed type.
Storage plan
- For range filed: only keep max/min value
- For the world field: apply a double dictionary schema to save space, besides, the number in dataChunk further translated into local dictionary wit shortened numbers to achieve high compression ratio.
Compression
Firstly, each field is encoded to reduce space usage.
- Numbers are translated into a lock dict
- the distinct string is put into a global dictionary and can be indexed by a unique number.
Besides, compression is also used.
- bit packing and vectorization are introduced to deal with delta values and dictionary indexing numbers.
Query Processing
Two operators for OLAP query:
-
MetaChunk selector: (schema of chunkD, MetaChunk M, predicate tree P) => traverse M with P => whether the chunk contains data.
-
DataChunk selector: find matched data records in query processing.
(dataChunk R, Record Number N, predicate tree P) => bites B (indexs of records)
Three operators for cohort query
- Birth selector: capture qualified users. (MetaChunk M, dataChunk, predicate tree P) => where the events in birth event sequence exist in scanning chunk.
- Age selector: (dataChunk, predicate tree P, bitset B) => marking whether the user can be selected for aggregation in age.
- Cohort aggregator: the records that pass both above selectors can be aggregated.
OLAP Processing Flow
Planner
=> generate execution plan- Fetch a
cublet
from specified data source. MetaChunk selector
=> findcublet
with candidate values- Repeat form 2 unit find a
cublet
DataChunk selector
=> find records.Aggregators
on the scanning result and group the results;- Repeat from 2 until all cublets are processed.
Compressor
: compress and store agggregate results
Cohort Query Processing
Planner
=> generate execution plan- Fetch a
cublet
from the specified data source. MetaChunk selector
=> findcubelet
with thebirth event in sequence
- Repeat form 2 unit find a
cubelet
Birth selector
=> scan data chunk to find users.Age selector
=> calculate ages.Aggregator
: aggregate cohorts from the user.- Repeat from 2 until all cubelets are processed.
Compressor
: compress and store aggregate results
Distributed System Architecture
Performance Evaluation
Experiment setup
Compare with Apache Druid, (OLAP related query evaluation)
MonetDB (columnar analytical dataabse for cohort query evaluation)
SparkSQL/Distributed Druid(distributed processing)
Matrix
Query latency / compression ratio / memory consumption in processing.
Workloads
TPC-H benchmark / medical dataset MED
Queries
Cohort Query: used to measure cohort analysis query processing
IceBerg Query (OLAP): used to measure performance of selector in COOL
Cube query
Composite Query
Latency
MonetDBincurs more disk accesses during the processing compared to Cool
And the frequent data swapping, from disk to memory slows down the system performance in terms of query latency.
Processing Memory
First, Druid needs to maintain auxiliary system components in order to manage the data segments, which is complex and incurs high overhead. However, Cool exploits a relatively simple storage hierarchy and therefore eliminates such extra costs.
Second, Druid must map the data from disk to memory during query processing even for the tiny dataset while for Cool, due to the highly compressed
cublet structures, it may keep the entire dataset in memory for small datasets, which further leads to the performance gap between the two systems.
Multi-node Benchmark
Employ 1 to 16 nodes to
evaluate the system performance
SparkSQL: merge of all the results, instead of the computation of each parquet partition, dominates the system performance.