話說(shuō)用了就要有點(diǎn)產(chǎn)出,要不然過(guò)段時(shí)間又忘了,所以在這里就記錄一下試用kafka的安裝過(guò)程和php擴(kuò)展的試用。
實(shí)話說(shuō),如果用于隊(duì)列的話,跟PHP比較配的,還是Redis。用的順手,呵呵,只是Redis不能有多個(gè)consumer。但Kafka官方對(duì)PHP不支持,PHP擴(kuò)展是愛(ài)好者或使用者寫(xiě)的。下面就開(kāi)始講Kafka的安裝吧。我以CentOS6.4為例,64位。
一. 首先確認(rèn)下jdk有沒(méi)有安裝
使用命令
[root@localhost ~]# java -<span>version java version </span><span>"</span><span>1.8.0_73</span><span>"</span><span> Java(TM) SE Runtime Environment (build </span><span>1.8</span>.0_73-<span>b02) Java HotSpot(TM) </span><span>64</span>-Bit Server VM (build <span>25.73</span>-b02, mixed mode)
如果有以上信息的話,就往下安裝吧,有些可能是jdk對(duì)不上,那就裝到對(duì)的上的。如果沒(méi)有安裝,就看一下下面的jdk安裝方法:
立即學(xué)習(xí)“PHP免費(fèi)學(xué)習(xí)筆記(深入)”;
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
到這個(gè)地址下載jdk8版本,我下載的是jdk-8u73-linux-x64.tar.gz,然后解壓到/usr/local/jdk/下。
然后打開(kāi)/etc/profile文件
[root@localhost ~]# vim /etc/profile
把下面這段代碼寫(xiě)到文件里
export JAVA_HOME=/usr/local/jdk/jdk1.<span>8</span><span>.0_73 export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span>dt.jar export PATH</span>=$JAVA_HOME/bin:$PATH
最后
[root@localhost ~]# source /etc/profile
這時(shí)jdk就生效了,可以使用 java -version驗(yàn)證下。
二. 接下來(lái)安裝Kafka
1. 下載Kafka
到http://kafka.apache.org/downloads.html下載相應(yīng)的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。
2. 下載完解壓到你喜歡的目錄
我是解壓到 /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. 運(yùn)行默認(rèn)的Kafka
啟動(dòng)Zookeeper server
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/zookeeper-server-start.<span>sh</span> config/zookeeper.properties &
啟動(dòng)Kafka server
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-server-start.<span>sh</span> config/server.properties &
運(yùn)行生產(chǎn)者producer
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-producer.<span>sh</span> --broker-list localhost:<span>9092</span> --topic test
運(yùn)行消費(fèi)者consumer
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-consumer.<span>sh</span> --zookeeper localhost:<span>2181</span> --topic test --from-beginning
這樣,在producer那邊輸入內(nèi)容,consumer馬上就能接收到。
4. 當(dāng)有跨機(jī)的producer或consumer連接時(shí)
需要配置config/server.properties的host.name,要不然跨機(jī)的連不上。
三. Kafka-PHP擴(kuò)展
使用了一圈,就https://github.com/nmred/kafka-php可以用。
我是使用composer安裝的,以下是示例:
producer.php
<?<span>php </span><span>require</span> 'vendor/autoload.php'<span>; </span><span>while</span> (1<span>) { </span><span>$part</span> = <span>mt_rand</span>(0, 1<span>); </span><span>$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span>); </span><span>//</span><span> get available partitions</span> <span>$partitions</span> = <span>$produce</span>->getAvailablePartitions('topic_name'<span>); </span><span>var_dump</span>(<span>$partitions</span><span>); </span><span>//</span><span> send message</span> <span>$produce</span>->setRequireAck(-1<span>); </span><span>$produce</span>->setMessages('topic_name', 0, <span>array</span>(<span>date</span>('Y-m-d H:i:s'<span>)); </span><span>sleep</span>(3<span>); }</span>
consumer.php
<span>require</span> 'vendor/autoload.php'<span>; </span><span>$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span>); </span><span>$group</span> = 'topic_name'<span>; </span><span>$consumer</span>->setGroup(<span>$group</span><span>); </span><span>$consumer</span>->setFromOffset(<span>true</span><span>); </span><span>$consumer</span>->setTopic('topic_name', 0<span>); </span><span>$consumer</span>->setMaxBytes(102400<span>); </span><span>$result</span> = <span>$consumer</span>-><span>fetch(); </span><span>print_r</span>(<span>$result</span><span>); </span><span>foreach</span> (<span>$result</span> <span>as</span> <span>$topicName</span> => <span>$partition</span><span>) { </span><span>foreach</span> (<span>$partition</span> <span>as</span> <span>$partId</span> => <span>$messageSet</span><span>) { </span><span>var_dump</span>(<span>$partition</span>-><span>getHighOffset()); </span><span>foreach</span> (<span>$messageSet</span> <span>as</span> <span>$message</span><span>) { </span><span>var_dump</span>((<span>string</span>)<span>$message</span><span>); } </span><span>var_dump</span>(<span>$partition</span>-><span>getMessageOffset()); } }</span>
?
PHP怎么學(xué)習(xí)?PHP怎么入門(mén)?PHP在哪學(xué)?PHP怎么學(xué)才快?不用擔(dān)心,這里為大家提供了PHP速學(xué)教程(入門(mén)到精通),有需要的小伙伴保存下載就能學(xué)習(xí)啦!
微信掃碼
關(guān)注PHP中文網(wǎng)服務(wù)號(hào)
QQ掃碼
加入技術(shù)交流群
Copyright 2014-2025 http://m.miracleart.cn/ All Rights Reserved | php.cn | 湘ICP備2023035733號(hào)