tencent cloud

Elastic MapReduce

  • Release Notes and Announcements
  • Product Introduction
  • Purchase Guide
    • EMR on CVM Billing Instructions
    • EMR on TKE Billing Instructions
    • EMR Serverless HBase Billing Instructions
    • EMR Serverless TCBase Billing Overview
  • Getting Started
  • EMR on CVM Operation Guide
    • Planning Cluster
    • Administrative rights
    • Configuring Cluster
    • Managing Cluster
    • Managing Service
    • Monitoring and Alarms
    • TCInsight
  • EMR on TKE Operation Guide
  • EMR Serverless HBase Operation Guide
  • EMR Serverless TCBase Operation Guide
  • EMR Development Guide
    • Hadoop Development Guide
    • Spark Development Guide
    • Hbase Development Guide
    • Phoenix on Hbase Development Guide
    • Hive Development Guide
    • Presto Development Guide
    • Sqoop Development Guide
    • Hue Development Guide
    • Oozie Development Guide
    • Flume Development Guide
    • Kerberos Development Guide
    • Knox Development Guide
    • Alluxio Development Guide
    • Kylin Development Guide
    • Livy Development Guide
    • Kyuubi Development Guide
    • Zeppelin Development Guide
    • Hudi Development Guide
    • Superset Development Guide
    • Impala Development Guide
    • Druid Development Guide
    • TensorFlow Development Guide
    • Kudu Development Guide
    • Ranger Development Guide
    • Kafka Development Guide
    • StarRocks Development Guide
    • Flink Development Guide
    • JupyterLab Development Guide
    • MLflow Development Guide
  • Practical Tutorial
    • Practice of EMR on CVM Ops
    • Data Migration
    • Practical Tutorial on Custom Scaling
  • API Documentation
    • History
    • Introduction
    • API Category
    • Making API Requests
    • Cluster Resource Management APIs
    • Cluster Services APIs
    • User Management APIs
    • Information Query APIs
    • Scaling APIs
    • Configuration APIs
    • Other APIs
    • Cluster Lifecycle APIs
    • Serverless HBase APIs
    • YARN Resource Scheduling APIs
    • Data Types
    • Error Codes
  • FAQs
    • EMR on CVM
  • Service Level Agreement
  • Contact Us

Ingesting Data from Kafka in Real Time

Download
Mode fokus
Ukuran font
Terakhir diperbarui: 2025-01-03 15:02:25
This document describes how to consume Kafka data in real time by using the Apache Druid Kafka indexing service. Before performing operations described in this document, just like in a Hadoop cluster, you need to make sure that the Kafka and Druid clusters can properly communicate with each other.
Note
The two clusters should be in the same VPC. If they are in different VPCs, the two VPCs should be able to communicate with each other (through CCN or Peering Connection, for example).
If necessary, you should configure the Druid cluster with the host information of the Kafka cluster.

Using Command Line

1. Start the Kafka broker in the Kafka cluster.
./bin/kafka-server-start.sh config/server.properties
2. Create a Kafka topic named mytopic.
./bin/kafka-topics.sh --create --zookeeper {kafka_zk_ip}:2181 --replication-factor 1 --partitions 1 --topic mytopic
Output:
Created topic "mytopic".
{kafka_zk_ip}:2181 is the ZooKeeper address of the Kafka cluster.
3. Prepare a data description file kafka-mytopic.json in the Druid cluster.
{
"type": "kafka",
"dataSchema": {
"dataSource": "mytopic-kafka",
"parser": {
"type": "string",
"parseSpec": {
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["url", "user"]
},
"format": "json"
}
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "none"
},
"metricsSpec": [{
"type": "count",
"name": "views"
},
{
"name": "latencyMs",
"type": "doubleSum",
"fieldName": "latencyMs"
}
]
},
"ioConfig": {
"topic": "mytopic",
"consumerProperties": {
"bootstrap.servers": "{kafka_ip}:9092",
"group.id": "kafka-indexing-service"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": "100000"
}
}
{kafka_ip}:9092 is the IP and port of the bootstrap.servers in your Kafka cluster.
4. Add the Kafka supervisor on the master nodes in the Druid cluster.
curl -XPOST -H 'Content-Type: application/json' -d @kafka-mytopic.json http://{druid_master_ip}:8090/druid/indexer/v1/supervisor
Output:
{"id":"mytopic-kafka"}
{druid_master_ip}:8090 is the node where the overload process is deployed, which is generally a master node.
5. Enable a console producer in the Kafka cluster.
./bin/kafka-console-producer.sh --broker-list {kafka_ip}:9092 --topic mytopic
{kafka_ip}:9092 is the IP and port of the bootstrap.servers in your Kafka cluster.
6. Prepare a query file named query-mytopic.json in the Druid cluster.
{
"queryType" : "search",
"dataSource" : "mytopic-kafka",
"intervals" : ["2020-03-13T00:00:00.000/2020-03-20T00:00:00.000"],
"granularity" : "all",
"searchDimensions": [
"url",
"user"
],
"query": {
"type": "insensitive_contains",
"value": "roni"
}
}
7. Enter some data on Kafka in real time.
{"time": "2020-03-19T09:57:58Z", "url": "/foo/bar", "user": "brozo", "latencyMs": 62}
{"time": "2020-03-19T16:57:59Z", "url": "/", "user": "roni", "latencyMs": 15}
{"time": "2020-03-19T17:50:00Z", "url": "/foo/bar", "user": "roni", "latencyMs": 25}
Timestamp generation command:
python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
8. Perform a query in the Druid cluster.
curl -XPOST -H 'Content-Type: application/json' -d @query-mytopic.json http://{druid_ip}:8082/druid/v2/?pretty
{druid_ip}:8082 is the broker node of your Druid cluster, which generally resides on a master or router node. Query result:
[ {
"timestamp" : "2020-03-19T16:00:00.000Z",
"result" : [ {
"dimension" : "user",
"value" : "roni",
"count" : 2
} ]
} ]

Using Web-Based Visualization

You can ingest data from a Kafka cluster and perform queries in the Druid Web UI Console in a visualized manner. For detailed directions, please see Loading data with the data loader.

Bantuan dan Dukungan

Apakah halaman ini membantu?

masukan