topic = $this->newTopic($topicname); $this->topic->consumeStart(0, RD_KAFKA_OFFSET_END); } public function poll($timeout) { return $this->topic->consume(0, $timeout); } } $cfg = new Conf(); $cfg->set('bootstrap.servers', SERVER); $cfg->set('group.id', 'mygroup'); $receiver = new MyConsumer($cfg); $receiver->setLogLevel(LOG_DEBUG); $receiver->subscribe(TNAME); while(true) { do { $message = $receiver->poll(100); } while($message == null || $message->err == -191); echo $message->key . ":\r\n" . $message->payload . "\r\n"; } ?>