C++使用librdkafka创建消费者和生产者
·
官方示例
https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp
生产者
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <librdkafka/rdkafkacpp.h>
static bool run = true;
int main() {
std::string brokers = "192.168.1.109:9092";
std::string topic = "WOCAO";
// 创建全局配置
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
// 错误信息
std::string errstr;
// 设置集群
if (conf->set("bootstrap.servers", brokers, errstr) !=
RdKafka::Conf::CONF_OK) {
std::cerr << errstr << std::endl;
exit(1);
}
// 创建消费者实例
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: "
<< errstr
<< std::endl;
exit(1);
}
delete conf;
for (std::string line; run && std::getline(std::cin, line);) {
if (line.empty()) {
producer->poll(0);
continue;
}
// 生产者根据主题发布信息
retry:
RdKafka::ErrorCode err = producer->produce(
// 主题
topic,
//任何分区:内置分区器将
//用于将消息分配给基于主题的在消息键上,或没有设定密钥
RdKafka::Topic::PARTITION_UA,
// 创建副本?
RdKafka::Producer::RK_MSG_COPY,
// 值
const_cast<char*>(line.c_str()), line.size(),
// 键
NULL, 0,
// 投递时间,默认当前时间
0,
// 消息头
NULL,
NULL);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "% 发布主题错误 " << topic << ": "
<< RdKafka::err2str(err) << std::endl;
if (err == RdKafka::ERR__QUEUE_FULL) {
// 队列已经满了,等待最大十秒
producer->poll(1000);
goto retry;
}
}
else {
std::cerr << "% 队列消息 ("
<< line.size()
<< " bytes) "
<< " 主题: "
<< topic
<< std::endl;
}
producer->poll(0);
}
// 等待消息投递?
std::cerr << "% Flushing final messages..." << std::endl;
// 等待十秒
producer->flush(10 * 1000);
if (producer->outq_len() > 0) {
std::cerr << "% "
<< producer->outq_len()
<< " 消息未投递"
<< std::endl;
}
delete producer;
return 0;
}
消费者
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <librdkafka/rdkafkacpp.h>
// 是否继续消费
static volatile bool run = true;
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
if (event.fatal()) {
std::cerr << "FATAL ";
run = false;
}
std::cerr << "ERROR (" << RdKafka::err2str(event.err())
<< "): " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(),
event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() << " ("
<< RdKafka::err2str(event.err()) << "): " << event.str()
<< std::endl;
break;
}
}
};
void msg_consume(RdKafka::Message* message, void* opaque) {
const RdKafka::Headers* headers;
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
// 读取消息
std::cout << "读取到的消息偏移量" << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
headers = message->headers();
if (headers) {
std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
for (size_t i = 0; i < hdrs.size(); i++) {
const RdKafka::Headers::Header hdr = hdrs[i];
if (hdr.value() != NULL)
printf(" Header: %s = \"%.*s\"\n", hdr.key().c_str(),
(int)hdr.value_size(), (const char*)hdr.value());
else
printf(" Header: %s = NULL\n", hdr.key().c_str());
}
}
printf("%.*s\n", static_cast<int>(message->len()),
static_cast<const char*>(message->payload()));
break;
case RdKafka::ERR__PARTITION_EOF:
// 最后一条信息
//run = false;
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "消费错误: " << message->errstr() << std::endl;
run = false;
break;
default:
/* Errors */
std::cerr << "消费错误: " << message->errstr() << std::endl;
run = false;
}
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb(RdKafka::Message& msg, void* opaque) {
msg_consume(&msg, opaque);
}
};
int main(int argc, char** argv) {
// 集群地址
std::string brokers = "192.168.1.109:9092";
// 话题
std::string topic_str = "WOCAO";
// 分区
int32_t partition = 0;
// 开始消费的偏移地址
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
// 错误信息
std::string errstr;
// 创建全局配置
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// 设置集群地址列表
conf->set("metadata.broker.list", brokers, errstr);
// 监听事件
ExampleEventCb ex_event_cb;
conf->set("event_cb", &ex_event_cb, errstr);
// 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件
conf->set("enable.partition.eof", "true", errstr);
// 创建消费者
RdKafka::Consumer* consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer) {
std::cerr << "创建消费者失败: "
<< errstr
<< std::endl;
exit(1);
}
std::cout << "% 消费者名字:"
<< consumer->name()
<< std::endl;
// 创建主题指针
RdKafka::Topic* topic =
RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "创建主题错误: "
<< errstr
<< std::endl;
exit(1);
}
// 开始通过主题、分区、偏移量消费
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: "
<< RdKafka::err2str(resp)
<< std::endl;
exit(1);
}
// 监听回调对象
ExampleConsumeCb ex_consume_cb;
// 循环监听信息
while (run) {
RdKafka::Message* msg = consumer->consume(topic, partition, 1000);
msg_consume(msg, NULL);
delete msg;
consumer->poll(0);
}
// 停止消费
consumer->stop(topic, partition);
// 再拉取一次
consumer->poll(1000);
delete topic;
delete consumer;
delete conf;
delete tconf;
// 等待销毁
RdKafka::wait_destroyed(5000);
return 0;
}
效果图
docker创建服务示例
version: '3.7'
services:
zookeeper:
image: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.16.106
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# volumes:
# - D:/kafka:/kafka
# - D:/kafka/docker.sock:/var/run/docker.sock
kafka-map:
image: dushixiang/kafka-map:latest
ports:
- 9000:8080
environment:
ZK_HOSTS: zookeeper:2181
DEFAULT_USERNAME: admin
DEFAULT_PASSWORD: admin
更多推荐
已为社区贡献3条内容
所有评论(0)