Druid: loading streaming data via Kafka

Posted Jun 4, 20205 min read

Start

This tutorial demonstrates how to use Druid's Kafka indexing service to load data from Kafka streams to Druid.

In this tutorial, we assume that you have downloaded Druid as described in the quickstart document using the micro-quickstart stand-alone configuration and run Druid locally. You don't need to load any data.

Download and start Kafka

Apache Kafka is a high-throughput message bus that works well with Druid. In this tutorial, we will use Kafka 2.1.0. Run the following command in the terminal to download Kafka:

curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0

Run the following command in the terminal to start kafka broker:

./bin/kafka-server-start.sh config/server.properties

Run the following command to create a topic named wikipedia, and we will send data to it:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia

Loading data into Kafka

Start a kafka producer for wikipedia topic and send the data.

In the Druid directory, run the following command:

cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz> wikiticker-2015-09-12-sampled.json

Run the following command in the Kafka directory, replacing {PATH_TO_DRUID} with your Kafka path:

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia <{PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

The above command will send events to kakfa's wikiapedia topic. After that, we will use Druid's Kafka indexing service to extract data from Kafka topic.

Load data via data loader

Navigate to localhost:8080 and click Load data at the top of the console.

Select Apache Kafka and click Connect data.

Enter bootstrap:localhost:9092 and topic:wikipedia.

Click Preview and make sure the data you see is correct.

After finding the data, you can click "Next:Parse data" to enter the next step.

The data loader will try to automatically select the correct data parser. In this example, the json parser will be selected. You can try to choose another parser and see how Druid parses the data.

Select the json parser and click Next:Parse time to go to the next step to determine the timestamp column.

Druid requires a main timestamp column(which will be stored internally in the __time column). If there is no timestamp column in your data, select Constant value. In our example, the time column will be selected because it is the only candidate in the data that can be the main time column.

Click Next:... twice to skip the steps of Transform and Filter.

You do not need to enter anything in these steps, because applying time transformations and filters to extract data is beyond the scope of this tutorial.

In the Configure schema step, you can configure which dimensions and indicators can be ingested into Druid. This is what the data looks like after being ingested into Druid. Since our data set is relatively small, click the Rollup switch to turn off the rollup function.

After you are satisfied with the schema configuration, click Next to enter the Partition step to adjust the data to the partition of the segment.

Here, you can adjust how to split the data into multiple segments in Druid. Since this is a very small data set, no adjustments are necessary in this step.

After clicking the Tune step, enter the publishing step.

In the Publish step, we can specify the data source name in Druid. We named this data source wikipedia. Finally, click Next to view the spec.

This is the spec you built. Try to go back and make changes in the previous steps to see how the changes will update the spec. Similarly, you can edit the spec directly and see it in the previous step.

When you are satisfied with the spec, click Submit to create the ingest task.

You will enter the task view, focusing on the newly created task. The task view is set to automatically refresh and wait for the task to succeed.

When a task is successfully completed, it means that it has established one or more segments, which will be received by the data server.

Datasources navigate from the title to the view.

Wait until your data source(wikipedia) appears. It may take several seconds to load the segment.

Once you see the green(fully available) circle, you can query the data source. At this point, you can go to the Query view to run SQL queries against the data source.

Run the SELECT * FROM "wikipedia" query to see the results.

Submit supervisor through the console

In the console, click Submit supervisor to open the submit supervisor window.

Paste the following spec and click submit:

{
  "type":"kafka",
  "spec":{
    "dataSchema":{
      "dataSource":"wikipedia",
      "timestampSpec":{
        "column":"time",
        "format":"auto"
      },
      "dimensionsSpec":{
        "dimensions":[
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          {"name":"added", "type":"long" },
          {"name":"deleted", "type":"long" },
          {"name":"delta", "type":"long"}
       ]
      },
      "metricsSpec":[],
      "granularitySpec":{
        "type":"uniform",
        "segmentGranularity":"DAY",
        "queryGranularity":"NONE",
        "rollup":false
      }
    },
    "tuningConfig":{
      "type":"kafka",
      "reportParseExceptions":false
    },
    "ioConfig":{
      "topic":"wikipedia",
      "inputFormat":{
        "type":"json"
      },
      "replicas":2,
      "taskDuration":"PT10M",
      "completionTimeout":"PT20M",
      "consumerProperties":{
        "bootstrap.servers":"localhost:9092"
      }
    }
  }
}

This will start the supervisor and differentiate the task to monitor the data inflow.

Submit directly to supervisor

In order to directly start the service, we need to run the following command in the root directory of the Druid package to submit a supervisor spec to Druid overlord:

curl -XPOST -H'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor

If the supervisor is successfully created, you will get a response containing the supervisor ID. In our example, it will return {"id":"wikipedia"}.

You can view the current supervisor and tasks in the console: http://localhost :8888/unified-console.html#tasks.

Query data

When the data is sent to Kafka stream, the data can be queried immediately.

This article was translated from Druid Official Document

Please follow us. Learn Druid together.

Code brother byte