国产av日韩一区二区三区精品,成人性爱视频在线观看,国产,欧美,日韩,一区,www.成色av久久成人,2222eeee成人天堂

Home headlines PHP implements lightweight delay queue (multi-threading) based on Redis

PHP implements lightweight delay queue (multi-threading) based on Redis

Jan 17, 2020 pm 05:19 PM
php redis Multithreading delay queue lightweight

PHP implements lightweight delay queue (multi-threading) based on Redis

Delay queue, as the name suggests, is a message queue with delay function. So, under what circumstances do I need such a queue?

1. Background

Let’s take a look at the business scenario:

1. Send a recall notice 3 days before the membership expires

2. After the order payment is successful, check whether the downstream links are normal after 5 minutes. For example, after the user purchases a membership, whether the various membership statuses are set successfully

3. How to regularly check whether the order in refund status has been Refund successfully?

4. If the notification fails, the notification will be repeated in 1, 3, 5, and 7 minutes until the other party replies?

Usually the simplest and most direct way to solve the above problems is to scan the meter regularly.

The problems with table scanning are:

1. The table scanning is connected to the database for a long time. When the number is large, the connection is prone to abnormal interruption, which requires more exception handling and the program High robustness requirements

2. When the amount of data is large, the delay is high and the processing cannot be completed within the regulations, which affects the business. Although multiple processes can be started for processing, this will bring additional maintenance costs. , cannot be fundamentally solved.

3. Each business must maintain its own table scanning logic. When the business increases, it is found that the logic of the table scanning part will be developed repeatedly, but it is very similar

The delay queue can solve the above needs very well

2. Research

Researched some open source solutions on the market, as follows:

1. Youzan Technology: only principles, no open source code

2.Github personal: https://github.com/ouqiang/delay-queue

(1) Based on redis implementation, only one redis can be configured. If redis hangs, the entire service will be unavailable, and the availability will be almost

( 2) The consumer side implements the pull model, and the access cost is high. Each project must implement the access code

(3) There are not many people using it in star, and there are risks in putting it in the production environment. , coupled with the lack of understanding of the Go language, it is difficult to maintain if there is a problem

3.SchedulerX-Alibaba open source: Very powerful, but complex in operation and maintenance, relying on many components, and not lightweight enough

4 .RabbitMQ-delayed task: It does not have a delay function itself. It needs to be implemented by itself with the help of a feature. Moreover, the company has not deployed this queue. It is a bit expensive to deploy one separately to make a delay queue, and it also requires special operation and maintenance to maintain it. , currently the team does not support

Basically, for the above reasons, I plan to write one myself. I usually use PHP. The project basically uses the zset structure of redis as storage, which is implemented in PHP language. For the implementation principle, please refer to the Youzan team: https:// tech.youzan.com/queuing_delay/

The entire delay queue is mainly composed of 4 parts

JobPool is used to store the meta information of all jobs.

DelayBucket is a set of ordered queues with time as the dimension, used to store all jobs that need to be delayed (only Job IDs are stored here).

Timer is responsible for scanning each Bucket in real time and placing Jobs with delay times greater than or equal to the current time into the corresponding Ready Queue.

ReadyQueue stores Jobs in the Ready state (only JobId is stored here) for consumption by consumer programs.

PHP implements lightweight delay queue (multi-threading) based on Redis

Message structure

Each Job must contain the following attributes:

topic: Job type. It can be understood as a specific business name.

id: The unique identifier of the Job. Used to retrieve and delete specified Job information.

delayTime: jod delayed execution time, 13-digit timestamp

ttr (time-to-run): Job execution timeout.

body: Job content, for consumers to do specific business processing, stored in json format.

For the same type of topic delaytime, ttr is generally fixed, and the job properties can be streamlined

1.topic: Job type. It can be understood as a specific business name

2.id: the unique identifier of the Job. Used to retrieve and delete specified Job information.

3.body: The content of the Job, for consumers to do specific business processing, stored in json format.

delaytime, ttr are configured in the topicadmin background

3. Target

Lightweight: It can run directly with less PHP expansion. There is no need to introduce network frameworks, such as swoole, workman and the like.

Stability: Using the master-work architecture, the master does not do business processing, but is only responsible for managing the child process. When the child process exits abnormally, it will be automatically started.

Availability:

1. Supports multi-instance deployment, each instance is stateless, and the failure of one instance will not affect the service

2. Supports the configuration of multiple redis, if one redis fails Only affects some messages

3. The business side has easy access, and only needs to fill in the relevant message types and callback interfaces in the background

Extensibility: When there is a bottleneck in the consumption process, you can configure it to increase consumption The number of processes. When there is a bottleneck in writing, the number of instances can be increased. The writing performance can be linearly improved.

Real-time performance: A certain time error is allowed.

Support message deletion: Business users can delete specified messages at any time.

Message transmission reliability: After the message enters the delay queue, it is guaranteed to be consumed at least once.

Write performance: qps>1000

4. Architecture design and description

Overall architecture

PHP implements lightweight delay queue (multi-threading) based on Redis

采用master-work架構(gòu)模式,主要包括6個(gè)模塊:

1.dq-mster: 主進(jìn)程,負(fù)責(zé)管理子進(jìn)程的創(chuàng)建,銷毀,回收以及信號(hào)通知

2.dq-server: 負(fù)責(zé)消息寫入,讀取,刪除功能以及維護(hù)redis連接池

3.dq-timer-N: 負(fù)責(zé)從redis的zset結(jié)構(gòu)中掃描到期的消息,并負(fù)責(zé)寫入ready 隊(duì)列,個(gè)數(shù)可配置,一般2個(gè)就行了,因?yàn)橄⒃趜set結(jié)構(gòu)是按時(shí)間有序的

4.dq-consume-N: 負(fù)責(zé)從ready隊(duì)列中讀取消息并通知給對(duì)應(yīng)回調(diào)接口,個(gè)數(shù)可配置

5.dq-redis-checker: 負(fù)責(zé)檢查redis的服務(wù)狀態(tài),如果redis宕機(jī),發(fā)送告警郵件

6.dq-http-server: 提供web后臺(tái)界面,用于注冊(cè)topic

五、模塊流程圖

消息寫入:

PHP implements lightweight delay queue (multi-threading) based on Redis

timer查找到期消息:

PHP implements lightweight delay queue (multi-threading) based on Redis

consumer消費(fèi)流程:

PHP implements lightweight delay queue (multi-threading) based on Redis

六、部署

環(huán)境依賴:PHP 5.4+ 安裝sockets,redis,pcntl,pdo_mysql 拓展

ps: 熟悉docker的同學(xué)可以直接用鏡像: shareclz/php7.2.14 里面包含了所需拓展

step1:安裝數(shù)據(jù)庫用于存儲(chǔ)一些topic以及告警信息

執(zhí)行:

mysql> source dq.sql

step2:在DqConfg.文件中配置數(shù)據(jù)庫信息: DqConf::$db

step3: 啟動(dòng)http服務(wù)

在DqConf.php文件中修改php了路徑

命令:

php DqHttpServer.php --port 8088

訪問:http://127.0.0.1:8088,出現(xiàn)配置界面

PHP implements lightweight delay queue (multi-threading) based on Redis

redis信息格式:host:port:auth 比如 127.0.0.1:6379:12345

stop4:配置告信息(比如redis宕機(jī))

PHP implements lightweight delay queue (multi-threading) based on Redis

stop5:注冊(cè)topic

PHP implements lightweight delay queue (multi-threading) based on Redis

重試標(biāo)記說明:

1.接口返回為空默認(rèn)重試
2.滿足指定返回表達(dá)會(huì)重試,res表示返回的json數(shù)組,比如:
回調(diào)接口返回json串:{"code":200,"data":{"status":2,"msg":"返回失敗"}},重試條件可以這樣寫
    {res.code}!=200 
    {res.code}!=200 && {res.data.status}!=2 
    {res.code}==200 && {res.data.status}==2 || {res.data.msg}=='返回失敗'

PHP implements lightweight delay queue (multi-threading) based on Redis

step6:啟動(dòng)服務(wù)進(jìn)程:

php DqInit.php --port 6789 &

執(zhí)行 ps -ef | grep dq 看到如下信息說明啟動(dòng)成功

PHP implements lightweight delay queue (multi-threading) based on Redis

step7: 寫入數(shù)據(jù),參考demo.php

step8:查看日志

默認(rèn)日志目錄在項(xiàng)目目錄的logs目錄下,在DqConf.php修改$logPath

1.請(qǐng)求日志:request_ymd.txt

2.通知日志:notify_ymd.txt

3.錯(cuò)誤日志:err_ymd.txt

step9:如果配置文件有改動(dòng)

1.系統(tǒng)會(huì)自動(dòng)檢測(cè)配置文件新,如果有改動(dòng),會(huì)自動(dòng)退出(沒有找到較好的熱更新的方案),需要重啟,可以在crontab里面建個(gè)任務(wù),1分鐘執(zhí)行一次,程序有check_self的判斷

2.優(yōu)雅退出命令: master檢測(cè)偵聽了USR2信號(hào),收到信號(hào)后會(huì)通知所有子進(jìn)程,子進(jìn)程完成當(dāng)前任務(wù)后會(huì)自動(dòng)退出

ps -ef | grep dq-master| grep -v grep | head -n 1 | awk '{print $2}' | xargs kill -USR2

七、性能測(cè)試

需要安裝pthreads拓展:

測(cè)試原理:使用多線程模擬并發(fā),在1s內(nèi)能成功返回請(qǐng)求成功的個(gè)數(shù)

八、值得一提的性能優(yōu)化點(diǎn):

1.redis multi命令:將多個(gè)對(duì)redis的操作打包成一個(gè)減少網(wǎng)絡(luò)開銷

2.計(jì)數(shù)的操作異步處理,在異步邏輯里面用函數(shù)的static變量來保存,當(dāng)寫入redis成功后釋放static變量,可以在redis出現(xiàn)異常時(shí)計(jì)數(shù)仍能保持一致,除非進(jìn)程退出

3.內(nèi)存泄露檢測(cè)有必要: 所有的內(nèi)存分配在底層都是調(diào)用了brk或者mmap,只要程序只有大量brk或者mmap的系統(tǒng)調(diào)用,內(nèi)存泄露可能性非常高 ,檢測(cè)命令: strace -c -p pid | grep -P 'mmap| brk'

4.檢測(cè)程序的系統(tǒng)調(diào)用情況:strace -c -p pid ,發(fā)現(xiàn)某個(gè)系統(tǒng)函數(shù)調(diào)用是其他的數(shù)倍,可能大概率程序存在問題

九、異常處理

1. If the notification interface is called within the timeout period and no reply is received, the notification is considered failed. The system will put the data into the queue again and notify again. The system defaults to a maximum notification of 10 times (can be modified in the Dqconf.php file $ notify_exp_nums) The notification interval is 2n 1. For example, if the notification fails for 1 minute for the first time, until a reply is received after 3 minutes for the second time, the system will automatically discard it after exceeding the maximum number of notifications, and send an email notification at the same time

2 .Online redis is persisted every 1s. There may be cases where 1s data is lost. In this case, you can compare the request_ymd.txt and notify_ymd.txt logs to manually restore it.

3.redis downtime notification:

PHP implements lightweight delay queue (multi-threading) based on Redis

ps: Network jitter is inevitable. If the notification interface involves core services, it must be idempotent! !

10. Online situation

Two instances were deployed online, one in each computer room, 4 redis with a total of 16G memory for storage, and the service has been running stably for several months. , all indicators are in line with expectations.

Main access business:

##·Order 10-minute recall notice

·Compensation when calling the interface times out or fails

·Recall notification 3 days before membership expiration

11. Shortcomings and prospects

1. Since the image used by the team lacks libevent extension, dq-server is based on the select model, and there is a performance bottleneck in high-concurrency scenarios. It can be changed to be based on the libevent event model in the future to improve Concurrency performance.

2. Timer and consumer are currently implemented using multiple processes. This granularity feels a bit rough. You can consider using multi-threading mode and support dynamically creating the number of threads to improve consumer performance and ensure timely consumption to the greatest extent. .

3.dq-server and redis are called synchronously, which is also the bottleneck of performance. We plan to process it asynchronously based on swoole_redis.

PHP Chinese website has a large number of free

PHP video tutorials, everyone is welcome to learn!

This article is reproduced from: https://www.jianshu.com/p/58f10ac42162

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undress AI Tool

Undress AI Tool

Undress images for free

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

How to get the current session ID in PHP? How to get the current session ID in PHP? Jul 13, 2025 am 03:02 AM

The method to get the current session ID in PHP is to use the session_id() function, but you must call session_start() to successfully obtain it. 1. Call session_start() to start the session; 2. Use session_id() to read the session ID and output a string similar to abc123def456ghi789; 3. If the return is empty, check whether session_start() is missing, whether the user accesses for the first time, or whether the session is destroyed; 4. The session ID can be used for logging, security verification and cross-request communication, but security needs to be paid attention to. Make sure that the session is correctly enabled and the ID can be obtained successfully.

PHP get substring from a string PHP get substring from a string Jul 13, 2025 am 02:59 AM

To extract substrings from PHP strings, you can use the substr() function, which is syntax substr(string$string,int$start,?int$length=null), and if the length is not specified, it will be intercepted to the end; when processing multi-byte characters such as Chinese, you should use the mb_substr() function to avoid garbled code; if you need to intercept the string according to a specific separator, you can use exploit() or combine strpos() and substr() to implement it, such as extracting file name extensions or domain names.

How do you perform unit testing for php code? How do you perform unit testing for php code? Jul 13, 2025 am 02:54 AM

UnittestinginPHPinvolvesverifyingindividualcodeunitslikefunctionsormethodstocatchbugsearlyandensurereliablerefactoring.1)SetupPHPUnitviaComposer,createatestdirectory,andconfigureautoloadandphpunit.xml.2)Writetestcasesfollowingthearrange-act-assertpat

How to split a string into an array in PHP How to split a string into an array in PHP Jul 13, 2025 am 02:59 AM

In PHP, the most common method is to split the string into an array using the exploit() function. This function divides the string into multiple parts through the specified delimiter and returns an array. The syntax is exploit(separator, string, limit), where separator is the separator, string is the original string, and limit is an optional parameter to control the maximum number of segments. For example $str="apple,banana,orange";$arr=explode(",",$str); The result is ["apple","bana

JavaScript Data Types: Primitive vs Reference JavaScript Data Types: Primitive vs Reference Jul 13, 2025 am 02:43 AM

JavaScript data types are divided into primitive types and reference types. Primitive types include string, number, boolean, null, undefined, and symbol. The values are immutable and copies are copied when assigning values, so they do not affect each other; reference types such as objects, arrays and functions store memory addresses, and variables pointing to the same object will affect each other. Typeof and instanceof can be used to determine types, but pay attention to the historical issues of typeofnull. Understanding these two types of differences can help write more stable and reliable code.

Using std::chrono in C Using std::chrono in C Jul 15, 2025 am 01:30 AM

std::chrono is used in C to process time, including obtaining the current time, measuring execution time, operation time point and duration, and formatting analysis time. 1. Use std::chrono::system_clock::now() to obtain the current time, which can be converted into a readable string, but the system clock may not be monotonous; 2. Use std::chrono::steady_clock to measure the execution time to ensure monotony, and convert it into milliseconds, seconds and other units through duration_cast; 3. Time point (time_point) and duration (duration) can be interoperable, but attention should be paid to unit compatibility and clock epoch (epoch)

How to pass a session variable to another page in PHP? How to pass a session variable to another page in PHP? Jul 13, 2025 am 02:39 AM

In PHP, to pass a session variable to another page, the key is to start the session correctly and use the same $_SESSION key name. 1. Before using session variables for each page, it must be called session_start() and placed in the front of the script; 2. Set session variables such as $_SESSION['username']='JohnDoe' on the first page; 3. After calling session_start() on another page, access the variables through the same key name; 4. Make sure that session_start() is called on each page, avoid outputting content in advance, and check that the session storage path on the server is writable; 5. Use ses

How does PHP handle Environment Variables? How does PHP handle Environment Variables? Jul 14, 2025 am 03:01 AM

ToaccessenvironmentvariablesinPHP,usegetenv()orthe$_ENVsuperglobal.1.getenv('VAR_NAME')retrievesaspecificvariable.2.$_ENV['VAR_NAME']accessesvariablesifvariables_orderinphp.iniincludes"E".SetvariablesviaCLIwithVAR=valuephpscript.php,inApach