kafka裝配及Kafka-PHP擴(kuò)展的使用
Jun 13, 2016 pm 12:28 PM
kafka安裝及Kafka-PHP擴(kuò)展的使用
話說用了就要有點(diǎn)產(chǎn)出,要不然過段時間又忘了,所以在這里就記錄一下試用Kafka的安裝過程和php擴(kuò)展的試用。
實(shí)話說,如果用于隊列的話,跟PHP比較配的,還是Redis。用的順手,呵呵,只是Redis不能有多個consumer。但Kafka官方對PHP不支持,PHP擴(kuò)展是愛好者或使用者寫的。下面就開始講Kafka的安裝吧。我以CentOS6.4為例,64位。
一. 首先確認(rèn)下jdk有沒有安裝
使用命令
[[email?protected] ~]# java -<span style="color: #000000;">versionjava version </span><span style="color: #800000;">"</span><span style="color: #800000;">1.8.0_73</span><span style="color: #800000;">"</span><span style="color: #000000;">Java(TM) SE Runtime Environment (build </span><span style="color: #800080;">1.8</span>.0_73-<span style="color: #000000;">b02)Java HotSpot(TM) </span><span style="color: #800080;">64</span>-Bit Server VM (build <span style="color: #800080;">25.73</span>-b02, mixed mode)
如果有以上信息的話,就往下安裝吧,有些可能是jdk對不上,那就裝到對的上的。如果沒有安裝,就看一下下面的jdk安裝方法:
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
到這個地址下載jdk8版本,我下載的是jdk-8u73-linux-x64.tar.gz,然后解壓到/usr/local/jdk/下。
然后打開/etc/profile文件
[[email?protected] ~]# vim /etc/profile
把下面這段代碼寫到文件里
export JAVA_HOME=/usr/local/jdk/jdk1.<span style="color: #800080;">8</span><span style="color: #000000;">.0_73export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span style="color: #000000;">dt.jarexport PATH</span>=$JAVA_HOME/bin:$PATH
最后
[[email?protected] ~]# source /etc/profile
這時jdk就生效了,可以使用 java -version驗(yàn)證下。
二. 接下來安裝Kafka
1. 下載Kafka
到http://kafka.apache.org/downloads.html下載相應(yīng)的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。
2. 下載完解壓到你喜歡的目錄
我是解壓到 /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. 運(yùn)行默認(rèn)的Kafka
啟動Zookeeper server
[[email?protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/zookeeper-server-start.<span style="color: #0000ff;">sh</span> config/zookeeper.properties &
啟動Kafka server
[[email?protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/kafka-server-start.<span style="color: #0000ff;">sh</span> config/server.properties &
運(yùn)行生產(chǎn)者producer
[[email?protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/kafka-console-producer.<span style="color: #0000ff;">sh</span> --broker-list localhost:<span style="color: #800080;">9092</span> --topic test
運(yùn)行消費(fèi)者consumer
[[email?protected] kafka_2.<span style="color: #800080;">9.1</span>-<span style="color: #800080;">0.8</span>.<span style="color: #800080;">2.2</span>]# <span style="color: #0000ff;">sh</span> bin/kafka-console-consumer.<span style="color: #0000ff;">sh</span> --zookeeper localhost:<span style="color: #800080;">2181</span> --topic test --from-beginning
這樣,在producer那邊輸入內(nèi)容,consumer馬上就能接收到。
4. 當(dāng)有跨機(jī)的producer或consumer連接時
需要配置config/server.properties的host.name,要不然跨機(jī)的連不上。
三. Kafka-PHP擴(kuò)展
使用了一圈,就https://github.com/nmred/kafka-php可以用。
我是使用composer安裝的,以下是示例:
producer.php
<span style="color: #000000;">php</span><span style="color: #0000ff;">require</span> 'vendor/autoload.php'<span style="color: #000000;">;</span><span style="color: #0000ff;">while</span> (1<span style="color: #000000;">) { </span><span style="color: #800080;">$part</span> = <span style="color: #008080;">mt_rand</span>(0, 1<span style="color: #000000;">); </span><span style="color: #800080;">$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span style="color: #000000;">); </span><span style="color: #008000;">//</span><span style="color: #008000;"> get available partitions</span> <span style="color: #800080;">$partitions</span> = <span style="color: #800080;">$produce</span>->getAvailablePartitions('topic_name'<span style="color: #000000;">); </span><span style="color: #008080;">var_dump</span>(<span style="color: #800080;">$partitions</span><span style="color: #000000;">); </span><span style="color: #008000;">//</span><span style="color: #008000;"> send message</span> <span style="color: #800080;">$produce</span>->setRequireAck(-1<span style="color: #000000;">); </span><span style="color: #800080;">$produce</span>->setMessages('topic_name', 0, <span style="color: #0000ff;">array</span>(<span style="color: #008080;">date</span>('Y-m-d H:i:s'<span style="color: #000000;">)); </span><span style="color: #008080;">sleep</span>(3<span style="color: #000000;">);}</span>
consumer.php
<span style="color: #0000ff;">require</span> 'vendor/autoload.php'<span style="color: #000000;">;</span><span style="color: #800080;">$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span style="color: #000000;">);</span><span style="color: #800080;">$group</span> = 'topic_name'<span style="color: #000000;">;</span><span style="color: #800080;">$consumer</span>->setGroup(<span style="color: #800080;">$group</span><span style="color: #000000;">);</span><span style="color: #800080;">$consumer</span>->setFromOffset(<span style="color: #0000ff;">true</span><span style="color: #000000;">);</span><span style="color: #800080;">$consumer</span>->setTopic('topic_name', 0<span style="color: #000000;">);</span><span style="color: #800080;">$consumer</span>->setMaxBytes(102400<span style="color: #000000;">);</span><span style="color: #800080;">$result</span> = <span style="color: #800080;">$consumer</span>-><span style="color: #000000;">fetch();</span><span style="color: #008080;">print_r</span>(<span style="color: #800080;">$result</span><span style="color: #000000;">);</span><span style="color: #0000ff;">foreach</span> (<span style="color: #800080;">$result</span> <span style="color: #0000ff;">as</span> <span style="color: #800080;">$topicName</span> => <span style="color: #800080;">$partition</span><span style="color: #000000;">) { </span><span style="color: #0000ff;">foreach</span> (<span style="color: #800080;">$partition</span> <span style="color: #0000ff;">as</span> <span style="color: #800080;">$partId</span> => <span style="color: #800080;">$messageSet</span><span style="color: #000000;">) { </span><span style="color: #008080;">var_dump</span>(<span style="color: #800080;">$partition</span>-><span style="color: #000000;">getHighOffset()); </span><span style="color: #0000ff;">foreach</span> (<span style="color: #800080;">$messageSet</span> <span style="color: #0000ff;">as</span> <span style="color: #800080;">$message</span><span style="color: #000000;">) { </span><span style="color: #008080;">var_dump</span>((<span style="color: #0000ff;">string</span>)<span style="color: #800080;">$message</span><span style="color: #000000;">); } </span><span style="color: #008080;">var_dump</span>(<span style="color: #800080;">$partition</span>-><span style="color: #000000;">getMessageOffset()); }}</span>
?

Hot AI Tools

Undress AI Tool
Undress images for free

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

The settings.json file is located in the user-level or workspace-level path and is used to customize VSCode settings. 1. User-level path: Windows is C:\Users\\AppData\Roaming\Code\User\settings.json, macOS is /Users//Library/ApplicationSupport/Code/User/settings.json, Linux is /home//.config/Code/User/settings.json; 2. Workspace-level path: .vscode/settings in the project root directory

To correctly handle JDBC transactions, you must first turn off the automatic commit mode, then perform multiple operations, and finally commit or rollback according to the results; 1. Call conn.setAutoCommit(false) to start the transaction; 2. Execute multiple SQL operations, such as INSERT and UPDATE; 3. Call conn.commit() if all operations are successful, and call conn.rollback() if an exception occurs to ensure data consistency; at the same time, try-with-resources should be used to manage resources, properly handle exceptions and close connections to avoid connection leakage; in addition, it is recommended to use connection pools and set save points to achieve partial rollback, and keep transactions as short as possible to improve performance.

TheJVMenablesJava’s"writeonce,runanywhere"capabilitybyexecutingbytecodethroughfourmaincomponents:1.TheClassLoaderSubsystemloads,links,andinitializes.classfilesusingbootstrap,extension,andapplicationclassloaders,ensuringsecureandlazyclassloa

Use classes in the java.time package to replace the old Date and Calendar classes; 2. Get the current date and time through LocalDate, LocalDateTime and LocalTime; 3. Create a specific date and time using the of() method; 4. Use the plus/minus method to immutably increase and decrease the time; 5. Use ZonedDateTime and ZoneId to process the time zone; 6. Format and parse date strings through DateTimeFormatter; 7. Use Instant to be compatible with the old date types when necessary; date processing in modern Java should give priority to using java.timeAPI, which provides clear, immutable and linear

Pre-formanceTartuptimeMoryusage, Quarkusandmicronautleadduetocompile-Timeprocessingandgraalvsupport, Withquarkusoftenperforminglightbetterine ServerLess scenarios.2.Thyvelopecosyste,

Java's garbage collection (GC) is a mechanism that automatically manages memory, which reduces the risk of memory leakage by reclaiming unreachable objects. 1.GC judges the accessibility of the object from the root object (such as stack variables, active threads, static fields, etc.), and unreachable objects are marked as garbage. 2. Based on the mark-clearing algorithm, mark all reachable objects and clear unmarked objects. 3. Adopt a generational collection strategy: the new generation (Eden, S0, S1) frequently executes MinorGC; the elderly performs less but takes longer to perform MajorGC; Metaspace stores class metadata. 4. JVM provides a variety of GC devices: SerialGC is suitable for small applications; ParallelGC improves throughput; CMS reduces

Networkportsandfirewallsworktogethertoenablecommunicationwhileensuringsecurity.1.Networkportsarevirtualendpointsnumbered0–65535,withwell-knownportslike80(HTTP),443(HTTPS),22(SSH),and25(SMTP)identifyingspecificservices.2.PortsoperateoverTCP(reliable,c

Gradleisthebetterchoiceformostnewprojectsduetoitssuperiorflexibility,performance,andmoderntoolingsupport.1.Gradle’sGroovy/KotlinDSLismoreconciseandexpressivethanMaven’sverboseXML.2.GradleoutperformsMaveninbuildspeedwithincrementalcompilation,buildcac
