欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

ThinkphpRdKafka公共类封装

时间:2023-07-12

推荐Weary

在 extend/RdKafkaClient.php 封装 RdKafkaClient:

<?phpini_set('default_socket_timeout', -1);use RdKafka;class RdKafkaClient{ public static $instance = null; // //单例模式 private $base_config; private $kafka_conf; private $kafka_topic_conf; private $producer; private $consumer; public function __construct() { try { // 读取tp的配置文件 $this->base_config = config('game.kafka') ?? []; // 实例化Conf $this->kafka_conf = new RdkafkaConf(); // 实例化topicConf $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'broker'); $topicConf->set('auto.offset.reset', 'earliest'); $this->kafka_topic_conf = $topicConf; } catch (Exception $e) { error_log($e->getMessage()); throw new Exception($e->getMessage()); } return true; } public static function getInstance() { $key = md5(json_encode(array())); if (!isset(self::$instance[$key])) { self::$instance[$key] = new self(); } return self::$instance[$key]; } public function getConf() { $this->kafka_conf->set('metadata.broker.list', $this->base_config['brokers']); return $this->kafka_conf; } public function getProducer() { return $this->producer = new RdKafkaProducer($this->kafka_conf); } public function getConsumer($group_id) { $this->kafka_conf->set('group.id', $group_id); $this->consumer = new RdKafkaConsumer($this->kafka_conf); $this->consumer->addBrokers($this->base_config['brokers']); return $this->consumer; } public function setConsumeTopic($topic) { $topic = $this->consumer->newTopic($topic, $this->kafka_topic_conf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); return $topic; } public function __call($name, $arguments) { $logFile = dirname(__DIR__) 、"/data/logs/rdkafka_method_not_exist.log"; if (file_exists(dirname($logFile))) { file_put_contents($logFile, date("Y-m-d H:i:s") 、$name 、"n", FILE_APPEND); } return false; }}?>

生产者调用方法:

public function rd_producer() { $kafka = RdKafkaClient::getInstance(); $producer = $kafka->getProducer(); $topic = $producer->newTopic("mytopic"); for ($i = 0; $i < 10; $i++) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i"); $producer->poll(0); } for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) { $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { break; } } if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new RuntimeException('Was unable to flush, messages might be lost!'); }}

消费者调用方法:

public function rd_consumer() { $kafka = RdKafkaClient::getInstance(); $kafka->getConsumer("aaa"); $topic = $kafka->setConsumeTopic("mytopic"); while (true) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for moren"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed outn"; break; default: throw new Exception($message->errstr(), $message->err); break; } }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。