Big data practice analysis (on): talk about spark file organization

Posted Jun 28, 20206 min read

Summary:

In the field of big data/database, the storage format of data directly affects the read and write performance of the system. For different users/developers, Spark supports multiple data file storage methods. The content of this article mainly comes from a talk in Spark AI Summit 2019 [1]. We divide the entire talk into two parts. The above will introduce the file/data organization method of spark based on the concept, and the following examples Explain the reading and writing process in spark. This article is the first half. First, we will introduce the characteristics of several popular file sources(File Sources) in Spark. This will involve the comparison of row and column storage. Then I will introduce two different data layouts(partitioning and bucketing), which are two important query optimization methods in spark.

file format

Before introducing the file format, I have to mention the two important data organization methods of row-oriented and column-oriented storage in the storage process. They are suitable for different scenarios of OLTP and OLAP in the database. . Spark supports both of these file formats. Parquet and ORC are listed in columns; Avro, JSON, CSV, Text and Binary are stored in rows.

The following uses a simple example to illustrate the applicable scenarios of the two storage formats:

In the music table in the figure above, if you use column storage and row storage, you will get the following two different ways of organization. In the column storage on the left, the data in the same column is organized together. When the data in one column is stored, the data in the next column is stored until the data is all stored; in the row storage, the data is placed in order according to the order of the rows. The same row contains a piece of data in different columns, and the arrangement of the data is marked by different colors in the figure.

If you use column storage to process the following query, you can find that it only involves two columns of data(album and artist), and the data of the same column in the column storage are put together, then we can quickly locate the position of the required column , And then only read the columns required in the query, effectively reducing the useless data IO(year and genre). Similarly, if you use row storage to process the query, you cannot play the role of "column clipping", because the data in a column is scattered in various positions in the file, and every time IO inevitably needs to read other data, So you need to read almost all the data in the table to meet the query conditions.

Through this example, we can find that column storage is suitable for processing queries on several columns for analysis, because it can avoid reading unneeded column data, and at the same time, putting data in the same column together is also very suitable for compression. But what if you need to perform INSET INTO on the column store? It needs to move almost all data, which is very inefficient. Line storage only needs to append a line of data at the end of the file. In the academic world, in order to neutralize these two "extreme" storage methods, it is proposed to mix rows and columns to design the HTAP(Hybrid transactional/analytical processing) database. Interested readers can refer to [2].

So the simple summary is:column storage is suitable for read-intensive workloads, especially those analytical queries that only require some columns; row storage is suitable for writing intensive workloads, or queries that require all columns.

File structure introduction

  • Parquet

In Parquet, the magic number of the parquet is at the beginning and the end. It is used to check whether the file is a parquet file. Footer is placed at the end of the file and stores metadata information, including schema information and meta data for each row group. Each row group is composed of a series of row data, and each column in the row group is a column.

The parquet format can effectively apply the optimization rules in query optimization, such as predicate push, which pushes the filter condition to scan data, reducing unnecessary calculations of upper-level operation nodes. For example, by setting the min/max in the metadata, you can compare the condition and metadata during the query. If the query condition does not meet the min/max at all, you can directly skip the data block pointed by the metadata, which reduces Useless data IO.

  • ORC

The full name of ORC is Optimized Row Columnar, and its organization is as shown below.

Postsctipt saves the number of rows, compression parameters, compression size, columns and other information of the table;

File Footer is the location information of each stripe, and the statistical results of the table;

The data is divided into stripes, corresponding to the row group in the parquet;

Stripe Footer mainly records the statistical information of each stripe, including min, max, count, etc.;

Row data is the specific storage of data;

Index Data holds the specific location of the stripe data, the total number of rows, etc.

The relationship between them is well supplemented by the solid line in the above figure.

Line save file format

Row storage is simpler than column storage, and it may be relatively more involved in actual development, so here is a brief introduction to its advantages and disadvantages.

  • Avro:Avro is characterized by being fast and compressible, and supports schema operations, such as adding/deleting/renaming a field, changing the default value, etc.
  • JSON:In Spark, it is usually regarded as a structure, and the number of keys needs to be paid attention to during use(it is easy to trigger OOM errors). It does not support the schema very well. The advantages are light weight, easy deployment and easy debugging.
  • CSV:usually used for data collection, such as log information, etc., write performance is better than read performance, its disadvantage is that the file specification is not standard enough(separator, escape character, quotation mark), support for nested data types Not enough. Both it and JSON are semi-structured text structures.
  • Raw text file:line-based text file, which can be directly read and divided into lines by spark.read.text() in spark, but the size of the line needs to be kept at a reasonable value, and a limited schema is supported.
  • Binary:Binary file, is a new feature of Spark 3.0**. Spark reads each binary file and converts it into a record. The record stores the original binary data and the matedata of the file. The record here is a schema, including the path of the file(StringType), the time when the file was modified(TimestampType), the length of the file(LongType), and the content(BinaryType).

For example, if we need to read all JPG files in a directory recursively, we can do it through the following API:

spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.option("recursiveFileLookup", "true")
.load("/path/to/dir")

Data layout(Data layout)

partitioning

Partition(Partition) refers to the way that when the amount of data is large, the data can be coarsely divided in a certain way, such as the year field in the above figure, and the genre field inside the year field. Split. The benefits brought by this are also obvious. When processing the query of "year = 2019 and genre ='folk'", you can filter out the data that does not need to be scanned, and directly locate the corresponding slice to do the query, which improves the query. effectiveness.

Spark SQL and DataFrame API provide corresponding ways to create partitions.

At the same time, more partitions does not mean better performance. When there are more partitions, the number of partition files also increases, which puts a lot of pressure on the metastore to obtain partition data and file files in the file system, which also reduces query performance. So the suggestion is to choose a suitable field for partitioning, and there should not be too many distinct values in this field, so that the number of partitions is in a suitable number. What if there are many distinct values? You can try to hash the field into a suitable bucket, or you can use a small part of the field as a partition field, such as the first letter in the name.

bucketing

In the Spark join operation, if the tables on both sides are relatively large, the data will be shuffled. The shuffle data will occupy a lot of time in the query process. When a time-consuming Join field is frequently used, we can use Bucketing is used to optimize this type of query. Through bucketing, we pre-shuffle and sort the data according to the joinkey. Each time we process sort merge join, we only need to process our own local data, which reduces the time consumption of shuffle. It should be noted here that the performance of the bucket table is closely related to the number of buckets. Too many buckets will cause small file problems, and too few buckets will cause too low concurrency and affect performance.

Sort merge join before bucketing:

After bucketing:

Spark SQL and DataFrame API respectively provide corresponding ways to create buckets. By sorting, we can also record min/max to avoid reading useless data.

reference

[1]Databricks. 2020. Apache Spark's Built-In File Sources In Depth-Databricks. [online]Available at:< https://databricks.com/session_eu19/apache-sparks-built-in-file -sources-in-depth > .

[2]Bridging the Archipelago between Row-Stores and Column-Stores for Hybrid Workloads(SIGMOD'16)

Click to follow and learn the latest Huawei cloud technology~