What is the next stop of big data? Service/Analysis Integration (HSAP)

Posted Jun 15, 202013 min read

Introduction:What is the next stop of big data? Service/Analysis Integration(HSAP)

Author:Jiang Xiaowei(amount Aberdeen) Alibaba researcher
Because of different emphasis, traditional databases can be divided into transactional OLTP systems and analytical OLAP systems. With the development of the Internet, the amount of data has increased exponentially, and the stand-alone database has been unable to meet business needs. Especially in the field of analysis, a query may need to process a large part or even a full amount of data, and the pressure caused by massive data becomes particularly urgent. This has contributed to the big data revolution that began with Hadoop technology over the past decade and has solved the need for massive data analysis. At the same time, a number of distributed database products have emerged in the database field to cope with the increase in data volume in OLTP scenarios.
![01.jpg]( https://i0.wp.com/segmentfault.comhttps://intranetproxy.alipay.com/skylark/lark/0/2020/jpeg/225557/1591599865241-480cd1b6-de46-431e-8b3b -76f9842a8e41.jpeg#align=left&display=inline&height=384&margin=%5Bobject%20Object%5D&name=01.jpg&originHeight=384&originWidth=1070&size=47853&status=done&style=none&width=1070 "01.jpg")
In order to analyze the data in the OLTP system, the standard practice is to synchronize the data inside(for example, every day) to an OLAP system regularly. This architecture guarantees that analytical queries will not affect online transactions through two systems. However, regular synchronization results in analysis results that are not based on the latest data, and this delay makes us lose the opportunity to make more timely business decisions. In order to solve this problem, the HTAP architecture has appeared in recent years. This architecture allows us to directly analyze the data in the OLTP database, thereby ensuring the timeliness of the analysis. Analysis is no longer the unique capability of traditional OLAP systems or big data systems. A natural question is:Since HTAP has the analysis capabilities, will it replace big data systems? What is the next stop of big data?


To answer this question, we take the recommendation system as an example to analyze the typical scenarios of big data systems.
When you see the shopping app show you exactly what you want to buy, and the short video app plays your favorite music, the recommendation system is playing its magical role. The core goal of an advanced recommendation system is to make personalized recommendations based on the user's real-time behavior. Each interaction between the user and the system will immediately optimize the next experience. To support such a system, the back-end big data technology stack has evolved into a very complex and diverse system.
The following figure shows a big data technology stack that supports a real-time recommendation system.
![02.jpg]( https://i0.wp.com/segmentfault.comhttps://intranetproxy.alipay.com/skylark/lark/0/2020/jpeg/225557/1591599910657-56de3b94-85b6-4045-aa25 -4ba1ed750e16.jpeg#align=left&display=inline&height=679&margin=%5Bobject%20Object%5D&name=02.jpg&originHeight=679&originWidth=1333&size=128411&status=done&style=none&width=1333 "02.jpg")
In order to provide quality real-time personalized recommendations, the recommendation system relies heavily on real-time features and continuous updating of models.

Real-time features can be divided into two categories:

  1. The system will collect a large number of user behavior events(such as browsing, clicking, etc.) and transaction records(such as payment records synchronized from the OLTP database, etc.). The amount of these data is huge(possibly tens of millions or even hundreds of millions of items per second), and most of them are not from the trading system. For future use, these data will be imported into the system(a in the figure), and they will be associated with various dimension table data to derive a series of important features(1 in the figure), these features will be updated to Recommend a system to optimize the user experience. The real-time dimension table association here needs low-latency and high-throughput enumeration support to keep up with the newly generated data.
  2. The system also uses sliding windows and other methods to calculate the characteristics of various dimensions and time granularities(such as the number of clicks of a product in the past 5 minutes, the number of views in the past 7 days, and the sales in the past 30 days, etc.). Depending on the granularity of the sliding window, these aggregations may be accomplished through streaming or batch processing.

These data are also used to generate real-time and offline machine learning samples, and the trained model will be continuously updated to the recommendation system after verification.

The above explanation is the core part of an advanced recommendation system, but this is only the tip of the iceberg of the entire system. In addition, a complete system of real-time model monitoring, verification, analysis, and tuning is required. This includes:using real-time large screens to view the results of A/B testing(3), and using interactive analysis(4) to do BI Analysis, refine and tune the model. In addition, the operation will also use various complex queries to gain insight into the progress of the business, and conduct targeted marketing through methods such as circling people and products.
This example shows a very complex but typical big data scenario, from real-time data import(a), to pre-aggregation(b), from data service(1), continuous aggregation(3), to interactive query(4) , Until batch processing(2). Such complex scenarios have very diverse requirements for big data systems. In the practice of building these systems, we have seen two new trends.

  • Real-time:Business needs to quickly gain business insights from the data just collected. The written data needs to be visible at the second or even sub-second level. The lengthy offline ETL process is becoming intolerable. At the same time, the data collected is much larger than the data synced from the OLTP system, and log data such as user browsing clicks is even orders of magnitude larger than it. Our system needs to be able to provide low-latency query capabilities while writing large amounts of real-time data.
  • Service/Analysis integration:Traditional OLAP systems often play a relatively static role in business. We get business insights by analyzing massive amounts of data(such as pre-computed views, models, etc.), and these acquired knowledge provides online data services through another system. The service and analysis here is a fragmented process. Unlike this, the ideal business decision-making process is often a continuously optimized online process. The process of service will generate a lot of new data, we need to carry out complex analysis on these new data. The insights generated by the analysis are fed back in real time to the service to create greater business value. Services and analysis are forming a closed loop.

Existing solutions address the need for real-time service/analysis integration through a combination of products. For example, through Apache Flink for real-time pre-aggregation of data, the aggregated data will be stored in a product that provides multi-dimensional analysis like Apache Druid, and provide data services through products such as Apache HBase. This model of chimney development will inevitably produce data islands, which will cause unnecessary data duplication. The complex data synchronization between various products also makes data consistency and security a challenge. This complexity makes it difficult for application development to quickly respond to new demands, which affects the speed of business iteration and also brings a large additional cost to development and operation and maintenance.
![03.jpg]( https://i0.wp.com/segmentfault.comhttps://intranetproxy.alipay.com/skylark/lark/0/2020/jpeg/225557/1591599953290-d319aabc-58d8-4115-8797 -b697ee69c7d3.jpeg#align=left&display=inline&height=395&margin=%5Bobject%20Object%5D&name=03.jpg&originHeight=395&originWidth=1204&size=35790&status=done&style=none&width=1204 "03.jpg")![image.gif]( https://i0.wp.com/segmentfault.comhttps://intranetproxy.alipay.com/skylark/lark/lar/0/2020/gif/225557/1591271031227-9e41f158-5d6f-4000-a18a-0cbe9ffe9c8c.gif#align=left&display= inline&height=1&margin=%5Bobject%20Object%5D&name=image.gif&originHeight=1&originWidth=1&size=70&status=done&style=none&width=1 "image.gif")

We believe that the integration of real-time service/analysis should be achieved through a unified HybridServing/AnalyticalProcessing(HSAP) system.
Through such a system, application development no longer needs to deal with multiple different products, and no longer needs to learn and accept the problems and limitations of each product, which can greatly simplify the business architecture and improve development and operation and maintenance efficiency. Such a unified system can avoid unnecessary data duplication and save costs. At the same time, this architecture can also bring second-level or even sub-second real-time to the system, making business decisions more real-time, so that data can play a greater business value.

Although the distributed HTAP system has the capability of real-time analysis, it cannot solve the problem of big data.

First of all, the data synchronized by the trading system is only a small part of the data that the real-time recommendation system needs to process. The vast majority of other data comes from non-trading systems such as logs(users often have dozens or even hundreds of browsing behaviors before each purchase), Most of the analysis is performed on these non-transactional data. However, the HTAP system does not have this part of the data, so it is impossible to analyze these non-transactional data.
Is it possible to write these non-transactional data into the HTAP system for analysis? Let's analyze the difference in data writing mode between HTAP system and HSAP system. The cornerstone and advantage of the HTAP system is to support fine-grained distributed transactions. Transactional data is often written to the HTAP system in many distributed small transactions. However, the data from the log and other systems does not have the semantic meaning of fine-grained distributed transactions. If you want to import these non-transactional data into the HTAP system, it will inevitably bring unnecessary overhead.
In contrast, the HSAP system does not require such high-frequency distributed small transactions. Data writing HSAP system generally has two modes:1) massive single data is written in real time; 2) relatively low-frequency distributed batch data writing. This allows the HSAP system to make a series of optimizations in design to improve cost performance and avoid unnecessary overhead caused by importing non-transactional data into the HTAP system.
Even if we don't care about these costs, if we can write all the data into the HTAP system at no cost, can we solve the problem? The answer is still no.
Supporting the OLTP scenario is a prerequisite for the HTAP system. In order to achieve this, the HTAP system often uses the data format of row storage, and the efficiency of analytical queries in row storage is much larger than that of column storage(order of magnitude)) Disadvantages. Having the ability to analyze and being able to analyze efficiently is not the same thing.In order to provide efficient analysis capabilities, the HTAP system must copy large amounts of non-transactional data to the inventory, but this will inevitably bring a lot of costs. It is better to copy a small amount of transactional data to the HSAP system. To better avoid affecting online trading systems.
Therefore, we believe that HTAP and HSAP will complement each other and lead the direction of the database and big data fields, respectively. **

HSAP Challenge

As a brand-new architecture, HSAP is facing very different challenges from the existing big data and traditional OLAP systems.

High-concurrency mixed workloads:HSAP systems need to handle concurrent queries far beyond traditional OLAP systems. In practice, the concurrency of data services goes far beyond OLAP queries. For example, we have seen in practice that data services need to process up to tens of millions of queries per second, which is 5 orders of magnitude higher than the concurrency of OLAP queries. At the same time, compared with OLAP queries, data service queries have more stringent requirements on latency. In addition, the greater challenge is that the system needs to process very complex analytical queries while providing data service queries. These mixed query loads have very different trade-offs between latency and throughput. How to efficiently use system resources to handle these very different queries, and to ensure the SLO of each query is a huge challenge.

High-throughput real-time data import: While processing highly concurrent query loads, the HSAP system also needs to support real-time writing of massive amounts of data. The amount of data written in real time far exceeds the requirements of traditional OLAP systems. For example, the above real-time recommendation scenario will continue to write tens of millions or even hundreds of millions of events per second. Another difference from the traditional OLAP system is that the HSAP system has high requirements for the real-time data. The written data needs to be visible at the second or even sub-second level, so as to ensure the timeliness of our service and analysis results.

Elasticity and scalability:Data write and query load may have sudden peaks, which puts high requirements on the system's elasticity and scalability. In practice, we have noticed that the peak data write can reach 2.5 times the average, and the peak query time can reach 3 times the average. Moreover, the peak values of data writing and query do not necessarily appear at the same time, which also requires the system to have the ability to quickly adjust according to different peak values.

HSAP system design

In order to meet these challenges, we believe that a typical HSAP system can adopt an architecture similar to the above.
![image.gif]( https://i0.wp.com/segmentfault.comhttps://intranetproxy.alipay.com/skylark/lark/0/2020/gif/225557/1591271031224-d4d32249-fa45-43b5-afa2 -ee05d1dccb5a.gif#align=left&display=inline&height=1&margin=%5Bobject%20Object%5D&name=image.gif&originHeight=1&originWidth=1&size=70&status=done&style=none&width=1 "image.gif")![04.jpg]( https://i0.wp.com/segmentfault.comhttps://intranetproxy.alipay.com/skylark/lark/lar/0/2020/jpeg/225557/1591600006526-9a0f2802-faff-4f29-b976-09e7ecfd2549.jpeg#align=left&display= inline&height=549&margin=%5Bobject%20Object%5D&name=04.jpg&originHeight=549&originWidth=1145&size=103029&status=done&style=none&width=1145 "04.jpg")
Storage and computing separation:All data is stored in a distributed file system. We expand the system by data sharding. Storage Manager manages these data shards. Resource Manager manages the computing resources of the system. , To ensure that the system can handle the needs of high-throughput data writing and querying. This architecture can quickly respond to changes in workload. When the query load becomes larger and more computing resources are needed, the computing resources can be expanded. When the amount of data increases rapidly, the storage resources can be quickly expanded. The separation of storage and computing guarantees that these operations can be completed quickly without waiting for moving/copying data. This architecture greatly simplifies operation and maintenance and provides a guarantee for the stability of the system.

Unified real-time storage: In order to support various query modes, a unified real-time storage layer is essential. Query can be roughly divided into two categories, one is simple point query(mostly in data service category), the other is complex query that scans a large amount of data(mostly in analysis category), of course this is a In the process of continuous change, many queries are somewhere in between. These two query modes also put forward different requirements for data storage. Row storage can more efficiently support point queries, while column storage has obvious advantages in queries that support a large number of scans. We can make a trade-off between row storage and column storage in a way similar to PAX. The price we pay is that the best performance may not be obtained in the scene of checking and scanning data. We hope to be optimal in both scenarios, so both row storage and column storage are supported in the system, and users can choose the storage method for each table according to the scenario. For tables that have two kinds of requirements at the same time, we use the index abstraction to allow users to choose two types of storage at the same time. The system maintains consistency between the two through the index maintenance mechanism. In practice, we found that the efficiency and flexibility brought by this design can better support the business.

Workload isolation:The SLO of the system under mixed workloads is guaranteed by scheduling. Under ideal circumstances, a large query should be able to use all resources. When there are multiple queries running at the same time, these queries need to share resources fairly. Since service-based queries are usually simpler and require fewer resources, this fair scheduling mechanism can ensure that the delay of service-based queries can be guaranteed even in the case of complex analytical queries. As a distributed system, scheduling can be divided into distributed and in-process parts. The Coordinator will decompose a query into multiple tasks. These tasks are distributed to different processes. The Coordinator needs to adopt certain strategies to ensure fairness. Equally important, we also need to allow different tasks to share resources fairly within a process. Because the operating system does not understand the relationship between tasks, we implemented a user-mode Scheduler in each process to more flexibly support workload isolation.

Openness of the system:Many businesses have already used other storage platforms or computing engines, and new systems must be considered for integration with existing systems. For queries requiring high timeliness, the integration of computing and storage can bring obvious advantages. But for offline computing that is not time-sensitive, the storage layer can provide a unified interface to open data. This openness allows other engines to pull the data out for processing to give the business greater flexibility. Another aspect of openness is the ability to process data stored in other systems. This can be achieved through federated queries.

Application of HSAP

Here we share Alibaba's search recommendation refined operation business. The following figure shows an example of such a system architecture before adopting HSAP.
![05.jpg]( https://i0.wp.com/segmentfault.comhttps://intranetproxy.alipay.com/skylark/lark/0/2020/jpeg/225557/1591600101200-5517d3b5-af92-4ef0-a837 -f8757e490755.jpeg#align=left&display=inline&height=749&margin=%5Bobject%20Object%5D&name=05.jpg&originHeight=749&originWidth=1427&size=120689&status=done&style=none&width=1427 "05.jpg")![image.gif]( https://i0.wp.com/segmentfault.comhttps://intranetproxy.alipay.com/skylark/lark/lar/0/2020/gif/225557/1591271031227-ef7c0cbd-fb59-451d-963a-45163b3dc38f.gif#align=left&display= inline&height=1&margin=%5Bobject%20Object%5D&name=image.gif&originHeight=1&originWidth=1&size=70&status=done&style=none&width=1 "image.gif")
We can meet the needs of the business through a complex coordination of a series of storage and computing engines(HBase, Druid, Hive, Drill, Redis, etc.), and data synchronization tasks between multiple storages are required to maintain approximate synchronization. This business architecture is extremely complex, and the development of the entire business takes a lot of time.
![06.jpg]( https://i0.wp.com/segmentfault.comhttps://intranetproxy.alipay.com/skylark/lark/0/2020/jpeg/225557/1591600112698-a2ed55ef-81c3-4b6e-99af -e42c0326ddac.jpeg#align=left&display=inline&height=687&margin=%5Bobject%20Object%5D&name=06.jpg&originHeight=687&originWidth=1317&size=78914&status=done&style=none&width=1317 "06.jpg")
We used HSAP system to upgrade this business during Double Eleven in 2019, and the new architecture has been greatly simplified. Users, commodities, business attribute data and massive user behavior data are unified into the HSAP system after real-time and offline data cleaning. The HSAP system undertakes query and analysis services such as real-time large-screen, real-time reports, effect tracking, and real-time data applications. Real-time large-screen, real-time sales forecast, real-time inventory monitoring, real-time BI reports monitor business progress in real time, gain insight into operational growth, and track algorithm effects to help efficient decision-making. Data products such as real-time tags, real-time portraits, competitive analysis, circling people and commodities, and equity investment help refine operations and decision-making. The real-time data interface service supports algorithms control, inventory monitoring and early warning services. A set of HSAP system realizes the data sharing and reuse of all channels and all links, and solves the data analysis and query needs of different business perspectives from operation, product, algorithm, development, analyst to decision layer.

to sum up

Through unified real-time storage, the HSAP architecture can provide one-stop data query and application services such as simple query, OLAP analysis, and online data services without copying to meet the access and access requirements of data application parties. This new architecture greatly reduces the complexity of the business, allowing us to quickly respond to new business needs. The second-level or even sub-second real-time performance provided by it makes decisions more timely and efficient, thereby allowing data to create greater business value.