Understanding Apache Carbondata from the basics — Part I
While using SparkSQL, I was browsing a few file formats like Parquet, ORC, etc. I stumbled upon Carbondata and tried to understand it. To my surprise, not many resources are present which helped me understand the basics at once. This blog post will try to explain Carbondata from the basics using official documentation.
- Basic introduction
- The need for Unified File Format
- Carbondata structure with a detailed explanation of the basic terms used in Data Engineering
- What’s next?
So, what is Apache Carbondata?
Apache CarbonData is an indexed columnar data format for fast analytics on big data platform, e.g. Apache Hadoop, Apache Spark, etc.
In simple words, Carbondata will enable you to access the whole data using a single copy. The challenge in query processing is that we have different types of queries:
- OLAP queries and detailed queries.
- Big Scan queries and small scan queries: you could have queries which require comparably larger scans and you could also have queries which are scanned over a smaller area.
- Point queries: consuming a very less portion of the big table.
Hence, a unified file format so that you can have different types of queries(resulting in different types of data access).
As Apache claims, you can have fast queries over petabytes of data with this columnar storage.
Let us begin with discussing the architecture of CarbonData file format. Simply put, it has three components: file header, file footer, and blocklets.
What is a blocklet?
- dataset inside the carbondata file
- each blocklet contains a column chunk for each column
- one column chunk could contain one or more column pages depending upon the version of the file format used
Let me give you a bit of visualization about column pages. Say you have a data of 4 pages, each containing 25 rows and 4 columns. Carbondata’s blocklet would contain the column chunk and within it, column pages, i.e. it would contain column data from page 1 to page 4(not all of the data).
File footer is sort of important out here. CarbonData leverages it to have fast query processing and optimized scans, simply because it is read in memory.
As in the documentation, let’s first discuss the File Directory Structure.
The CarbonData files are stored in the location specified by the carbon.storelocation configuration (configured in carbon.properties; if not configured, the default is ../carbon.store).
This is the structure, taken from the documentation:
Explanation of this structure, again, from the doc:
- ModifiedTime.htmlt records the timestamp of the metadata with the modification time attribute of the file. When the drop table and create table are used, the modification time of the file is updated. This is common to all databases and hence is kept in parallel to databases
- The default is the database name and contains the user tables.default is used when a user doesn’t specify any database name; else user configured database name will be the directory name. user_table is the table name.
- Metadata directory stores schema files, table status and dictionary files (including .dict, .dictmeta and .sortindex). There are three types of metadata data information files.
- data and index files are stored under a directory named Fact. The Fact directory has a Part0 partition directory, where 0 is the partition number.
- There is a Segment_0 directory under the Part0 directory, where 0 is the segment number.
- There are two types of files, carbondata, and carbonindex, in the Segment_0 directory.
Point 4 has to do with the concept called partitioning of the data. So, the best explanation, to begin with, is the following from this source.
Imagine we have an encyclopedia written in one single book, and with the passing of time, historical and social events are updated in this book. At some point the book would be so huge that we would need a truck to move it, so we need to divide this book into several volumes. Data partitioning is a technique for physically dividing the data during the loading of the Master Data. Using this method we are going to split a table into smaller pieces according to rules set by the user. It is normally used when we have very large tables which require a huge amount of time to read the entire data set, therefore it will allow us to improve the maintenance, performance or management. Regarding how the user does the partitioning of wide data tables, there are basically two ways: either horizontally (by row) or vertically (by column). Horizontal partitioning consists of distributing the rows of the table in different partitions, while vertical partitioning consists of distributing the columns of the table. We can’t forget we are working with huge amounts of data and we are going to store the information in a cluster, using a distributed filesystem. One of the most popular is Hadoop HDFS.
What is the difference between partitioning and segmentation?
Both are used to localize your data for faster access and processing. Segmentation refers to organizing and distributing your data across all the nodes of your cluster. Partitioning means organizing your data within a node of the cluster.
File Content details
When the table is created, the user_table directory is generated, and a schema file is generated in the Metadata directory for recording the table structure.
When loading data in batches, each batch loading generates a new segment directory. The scheduling tries to control a task processing data loading task on each node. Each task will generate multiple carbondata files and one carbonindex file.
As you can see, batch loading of data will make a new segment directory. What does the segment directory do? Under each partition(here P0), you will have an organized data across all the node(s). This will enable Carbondata to have fast data purges and high query performance. The aim is to distribute the batch of the data across all the nodes so that all of them can participate parallelly while the query is executed.
During global dictionary generation, if the two-pass scheme is used, before the data is loaded, the corresponding dict, dictmeta and sortindex files are generated for each dictionary-encoded column, and partial dictionary files can be provided by the pre-define dictionary method to reduce the need. A dictionary-encoded column is generated by scanning the full amount of data; a dictionary file of all dictionary code columns can also be provided by the all dictionary method to avoid scanning data. If the single-pass scheme is adopted, the global dictionary code is generated in real time during data loading, and after the data is loaded, the dictionary is solidified into a dictionary file.
The following sections use the Java object generated by the thrift file describing the carbondata file format to explain the contents of each file one by one (you can also directly read the format defined in the thrift file)
Now that we know the file format, let us discuss how Carbondata stores the schema of user’s data.
Before we begin with the explanation of the above diagram, we should understand what is meant by the bucket table.
Bucketing is a method to evenly distribute the data across many files. … Data for each bucket is stored in a separate HDFS file under the table directory on HDFS.
Bucketing is an alternate to partitioning. It decomposes data sets into more manageable parts.
If you are getting confused or want more clarity on what is bucketing or what’s the difference between bucketing and partitioning, read this answer on StackOverflow.
Now, the explanation of the schema diagram:
- TableSchema class The TableSchema class does not store the table name, it is infered from the directory name(user_table). tableProperties is used to record table-related properties, such as: table_blocksize.
- ColumnSchema class Encoders are used to record the encoding used in column storage. columnProperties is used to record column related properties.
- BucketingInfo class When creating a bucket table, you can specify the number of buckets in the table and the column to splitbuckets.
- DataType class Describes the data types supported by CarbonData.
- Encoding class Several encodings that may be used in CarbonData files.
Until now, everything was about how CarbonData stores the stuff about your data. Now, we will study how CarbonData stores our data.
CarbonData file format
I have already described above what the file format is. I hope the diagram helps you visualize it better.
I didn’t tell you about different versions in Blocklets. This has nothing to do with version controlling of the data.
So, there are three versions: V1, V2 and V3
Blocket consists of all column data pages, RLE pages, and rowID pages. Since the pages in the blocklet are grouped according to the page type, the three pieces of data of each column are distributed and stored in the blocklet, and the offset and length information of all the pages need to be recorded in the footer part.
- RLE is run length encoding
- Footer stores the offset and length info which is read in memory.
The blocklet consists of ColumnChunk for all columns. The ColumnChunk for a column consists of a ColumnPage, which includes the data chunk header, data page, RLE page, and rowID page. Since ColumnChunk aggregates the three types of Page data of the column together, it can read the column data using fewer readers. Since the header part records the length information of all the pages, the footer part only needs to record the offset and length of the ColumnChunk, and also reduces the amount of footer data.
V2 is advanced as compared to V1 in the following ways:
- all columns are stored in the form of column chunks, whereas V1 has all column data pages i.e. in V1, if column C1 is present in 3 pages, all the pages will be present. Whereas in V2, all the columns will be present.
- In addition to the data page, RLE page and rowID page provided in V1, V2 has data chunk header as well.
- in V1, footer records offset and length information of all the pages whereas in V2, the footer records the offset and the length of ColumnChunk. Since the length of a columnchunk is smaller than storing the length of all pages, footer in V2 is lesser in size than in V1. Thereby, V2 will use less memory than V1 since footer is read and stored in memory.
The blocklet is also composed of ColumnChunks of all columns. What is changed is that a ColumnChunk consists of one or more Column Pages, and Column Page adds a new BlockletMinMaxIndex.
Compared with V2: The blocklet data volume of V2 format defaults to 120,000 lines, and the blocklet data volume of V3 format defaults to 64MB. For the same size data file, the information of the footer part index metadata may be further reduced; meanwhile, the V3 format adds a new page. Level data filtering, and the amount of data per page is only 32,000 lines by default, which is much less than the 120,000 lines of V2 format. The accuracy of data filtering hits further, and more data can be filtered out before decompressing data.
I hope you now understand the main differences between V1, V2 and V3 file formats.
Remaining topics to be covered in the next blog post are:
- Footer format
- carbonindex file format
- Dictionary file format
- tablestatus file format
- Understanding Apache Spark SQL integration
- Understanding Presto integration
- Code tutorial.
In code tutorial, I will use some sample open datasets with a size of 200GB or more, 100Million rows and 50 columns at least). The code tutorial will compare the datasets in different file formats: orc, carbondata, and parquet.