Implementation of Mass Data Management System Based on Openresty + CEPH

Posted May 25, 20204 min read

"Continuously updated, welcome to follow ..."

1 . Requirements:

As a company specializing in three-dimensional high-precision map services, there are massive(PB-level) raw data, intermediate data, and successful data that need to be stored, managed, and regularly archived.

  1. According to project management data, the data is classified into flight data, control point data, intermediate data, achievement data and other data. Data sources include unmanned aerial vehicle data, load data, ground station data, and manual strike data. Data gathered from different channels.
  2. Adopt a form similar to Baidu network disk, upload and download, support breakpoint resume, progress tracking.
  3. Support fine-grained file-level permission control, and more file(folder) attributes.

2 . Analysis:

  1. The system focuses on the selection of data storage, supports the storage of massive data, and can support data upload under complex networks. Choose CEPH as data storage, RGW object storage, S3 protocol upload and download, perfectly support sharding and breakpoint resume.
  2. The difficulty of the system lies in the control of file-level business permissions and the support of more attributes of files(folders). CEPH RGW itself supports permission control, but it cannot be connected with service permissions. Object storage itself does not have the concept of folders, and it is impossible to classify, display the number, and size of the folders. Therefore, to implement a custom index service, CEPH is mainly responsible for storage, and the custom index service implements display and query.
  3. Since the upload and download will go through Openresty, the file information flowing through the lua script will be forwarded to the business service via kafka for business processing applications.

3 . Implementation

3.1 Architecture

Spatial Data System Architecture Diagram.png

  1. The upload assistant is the desktop software of Baidu network disk, using Electron JS

)achieve. Main functions:project display, upload and download.

  1. The business layer includes gateway services, account services, project services, file indexing services, etc. Using Java + Spring Boot + Spring Cloud technology stack. The key service is the file index service Index Server, which is responsible for index maintenance and query of massive files.
  2. Business data MySQL cluster + Redis cluster, mass file storage uses CEPH object storage, supports S3 API.

3.3 Key flow chart

Upload process.png

  1. The upload assistant uses ordinary Put Object to request uploading files, plus custom metadata fields(project ID, user ID, etc.) to complete data submission.
  2. Openresty uses proxy mode to forward file requests to CEPH RGW, and RGW completes background data storage processing.
  3. After RGW finishes data storage, Openresty calls log \ _by \ _lua \ _file to forward the user-defined metadata and file attributes corresponding to the request to Kafka in the background.
  4. The file index service(Index Server) consumes tasks from Kafka and gets the information of each file.
  5. The file index service(Index Server) processes the file data according to business requirements and stores it in the MySQL database.

3.4 Sample code

log \ _by \ _lua \ _file.lua:get file information from Openresty and send it to Kafka

local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local broker_list = {
    {host = "172.16.0.20", port = 9092},
}
function send_job_to_kafka()
    local log_json = {}
    local req_headers_ = ngx.req.get_headers()

    for k, v in pairs(req_headers_) do
        if k == "content-length" then
            log_json ["contentLength"]= tostring(v)
        end
        if k == "u-id" then
            log_json ["uId"]= tostring(v)
        end
        if k == "p-id" then
            log_json ["pId"]= tostring(v)
        end
    end

    local resp_headers_ = ngx.resp.get_headers()
    for k, v in pairs(resp_headers_) do
        if k == "etag" then
            log_json ["etag"]= string.gsub(v, "\" "," ")
            break
        end
    end

    log_json ["uri"]= ngx.var.uri
    log_json ["host"]= ngx.var.host
    log_json ["remoteAddr"]= ngx.var.remote_addr
    log_json ["status"]= ngx.var.status
    local message = cjson.encode(log_json);
    ngx.log(ngx.ERR, "message is [", message, "]")
    return message
end

--local is_args = ngx.var.is_args
local request_method = ngx.var.request_method
local status_code = ngx.var.status

-Filter Put Object successful requests, record the corresponding metadata and request ID, and forward to kafka
if request_method == "PUT" and status_code == "200" then
    local bp = producer:new(broker_list, {producer_type = "async"})
    local ok, err = bp:send("ceph_lua_test", nil, send_job_to_kafka())
    if not ok then
        ngx.log(ngx.ERR, "kafka send err:", err)
        return
    end
    ngx.log(ngx.ERR, "kafka send success:", ok)
end

4 . Summary

  1. Through this architecture scheme, during the archiving of massive files, the basic information of the files is asynchronously imported into the business database, which is convenient for business application development.
  2. This architecture generally also applies to multimedia file processing of object storage, such as image processing, video processing, watermarking, yellow identification, event notification, etc.