十年網(wǎng)站開(kāi)發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶(hù) + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專(zhuān)業(yè)推廣+無(wú)憂(yōu)售后,網(wǎng)站問(wèn)題一站解決
這篇文章主要介紹了think-queue的示例分析,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
成都創(chuàng)新互聯(lián)是創(chuàng)新、創(chuàng)意、研發(fā)型一體的綜合型網(wǎng)站建設(shè)公司,自成立以來(lái)公司不斷探索創(chuàng)新,始終堅(jiān)持為客戶(hù)提供滿(mǎn)意周到的服務(wù),在本地打下了良好的口碑,在過(guò)去的十余年時(shí)間我們累計(jì)服務(wù)了上千家以及全國(guó)政企客戶(hù),如玻璃鋼雕塑等企業(yè)單位,完善的項(xiàng)目管理流程,嚴(yán)格把控項(xiàng)目進(jìn)度與質(zhì)量監(jiān)控加上過(guò)硬的技術(shù)實(shí)力獲得客戶(hù)的一致表?yè)P(yáng)。
分析之前請(qǐng)大家務(wù)必了解消息隊(duì)列的實(shí)現(xiàn)
tp5的消息隊(duì)列是基于database redis 和tp官方自己實(shí)現(xiàn)的 Topthink
本章是圍繞redis來(lái)做分析
| key | 類(lèi)型 | 描述 |
|---|---|---|
queues:queueName | list | 要執(zhí)行的任務(wù) |
think:queue:restart | string | 重啟隊(duì)列時(shí)間戳 |
queues:queueName:delayed | zSet | 延遲任務(wù) |
queues:queueName:reserved | zSet | 執(zhí)行失敗,等待重新執(zhí)行 |
work和listen的區(qū)別在下面會(huì)解釋
| 命令 | 描述 |
|---|---|
php think queue:work | 監(jiān)聽(tīng)隊(duì)列 |
php think queue:listen | 監(jiān)聽(tīng)隊(duì)列 |
php think queue:restart | 重啟隊(duì)列 |
php think queue:subscribe | 暫無(wú),可能是保留的 官方有什么其他想法但是還沒(méi)實(shí)現(xiàn) |
| 標(biāo)簽 | 描述 |
|---|---|
worker_daemon_start | 守護(hù)進(jìn)程開(kāi)啟 |
worker_memory_exceeded | 內(nèi)存超出 |
worker_queue_restart | 重啟守護(hù)進(jìn)程 |
worker_before_process | 任務(wù)開(kāi)始執(zhí)行之前 |
worker_before_sleep | 任務(wù)延遲執(zhí)行 |
queue_failed | 任務(wù)執(zhí)行失敗 |
| 參數(shù) | 默認(rèn)值 | 可以使用的模式 | 描述 |
|---|---|---|---|
queue | null | work,listen | 要執(zhí)行的任務(wù)名稱(chēng) |
daemon | null | work | 以守護(hù)進(jìn)程執(zhí)行任務(wù) |
delay | 0 | work,listen | 失敗后重新執(zhí)行的時(shí)間 |
force | null | work | 失敗后重新執(zhí)行的時(shí)間 |
memory | 128M | work,listen | 限制最大內(nèi)存 |
sleep | 3 | work,listen | 沒(méi)有任務(wù)的時(shí)候等待的時(shí)間 |
tries | 0 | work,listen | 任務(wù)失敗后最大嘗試次數(shù) |
1: 執(zhí)行原理不同
work: 單進(jìn)程的處理模式;
無(wú) daemon 參數(shù) work進(jìn)程在處理完下一個(gè)消息后直接結(jié)束當(dāng)前進(jìn)程。當(dāng)不存在新消息時(shí),會(huì)sleep一段時(shí)間然后退出;
有 daemon 參數(shù) work進(jìn)程會(huì)循環(huán)地處理隊(duì)列中的消息,直到內(nèi)存超出參數(shù)配置才結(jié)束進(jìn)程。當(dāng)不存在新消息時(shí),會(huì)在每次循環(huán)中sleep一段時(shí)間;
listen: 父進(jìn)程 + 子進(jìn)程 的處理模式;
會(huì)在所在的父進(jìn)程會(huì)創(chuàng)建一個(gè)單次執(zhí)行模式的work子進(jìn)程,并通過(guò)該work子進(jìn)程來(lái)處理隊(duì)列中的下一個(gè)消息,當(dāng)這個(gè)work子進(jìn)程退出之后;
所在的父進(jìn)程會(huì)監(jiān)聽(tīng)到該子進(jìn)程的退出信號(hào),并重新創(chuàng)建一個(gè)新的單次執(zhí)行的work子進(jìn)程;
2: 退出時(shí)機(jī)不同
work: 看上面
listen: 所在的父進(jìn)程正常情況會(huì)一直運(yùn)行,除非遇到下面兩種情況
01: 創(chuàng)建的某個(gè)work子進(jìn)程的執(zhí)行時(shí)間超過(guò)了 listen命令行中的--timeout 參數(shù)配置;此時(shí)work子進(jìn)程會(huì)被強(qiáng)制結(jié)束,listen所在的父進(jìn)程也會(huì)拋出一個(gè) ProcessTimeoutException 異常并退出;
開(kāi)發(fā)者可以選擇捕獲該異常,讓父進(jìn)程繼續(xù)執(zhí)行;
02: 所在的父進(jìn)程因某種原因存在內(nèi)存泄露,則當(dāng)父進(jìn)程本身占用的內(nèi)存超過(guò)了命令行中的 --memory 參數(shù)配置時(shí),父子進(jìn)程均會(huì)退出。正常情況下,listen進(jìn)程本身占用的內(nèi)存是穩(wěn)定不變的。
3: 性能不同
work: 是在腳本內(nèi)部做循環(huán),框架腳本在命令執(zhí)行的初期就已加載完畢;
listen: 是處理完一個(gè)任務(wù)之后新開(kāi)一個(gè)work進(jìn)程,此時(shí)會(huì)重新加載框架腳本;
因此 work 模式的性能會(huì)比listen模式高。
注意: 當(dāng)代碼有更新時(shí),work 模式下需要手動(dòng)去執(zhí)行 php think queue:restart 命令重啟隊(duì)列來(lái)使改動(dòng)生效;而listen 模式會(huì)自動(dòng)生效,無(wú)需其他操作。
4: 超時(shí)控制能力
work: 本質(zhì)上既不能控制進(jìn)程自身的運(yùn)行時(shí)間,也無(wú)法限制執(zhí)行中的任務(wù)的執(zhí)行時(shí)間;
listen: 可以限制其創(chuàng)建的work子進(jìn)程的超時(shí)時(shí)間;
可通過(guò) timeout 參數(shù)限制work子進(jìn)程允許運(yùn)行的最長(zhǎng)時(shí)間,超過(guò)該時(shí)間限制仍未結(jié)束的子進(jìn)程會(huì)被強(qiáng)制結(jié)束;
expire 和time的區(qū)別
expire 在配置文件中設(shè)置,指任務(wù)的過(guò)期時(shí)間 這個(gè)時(shí)間是全局的,影響到所有的work進(jìn)程
timeout 在命令行參數(shù)中設(shè)置,指work子進(jìn)程的超時(shí)時(shí)間,這個(gè)時(shí)間只對(duì)當(dāng)前執(zhí)行的listen 命令有效,timeout 針對(duì)的對(duì)象是 work 子進(jìn)程;
5: 使用場(chǎng)景不同
work 適用場(chǎng)景是:
01: 任務(wù)數(shù)量較多
02: 性能要求較高
03: 任務(wù)的執(zhí)行時(shí)間較短
04: 消費(fèi)者類(lèi)中不存在死循環(huán),sleep() ,exit() ,die() 等容易導(dǎo)致bug的邏輯
listen 適用場(chǎng)景是:
01: 任務(wù)數(shù)量較少
02: 任務(wù)的執(zhí)行時(shí)間較長(zhǎng)
03: 任務(wù)的執(zhí)行時(shí)間需要有嚴(yán)格限制
由于我們是根據(jù)redis來(lái)做分析 所以只需要分析src/queue/connector/redis.php
01: 首先調(diào)用src/Queue.php中的魔術(shù)方法__callStatic
02: 在__callStatic方法中調(diào)用了buildConnector
03: buildConnector 中首先加載配置文件 如果無(wú)將是同步執(zhí)行
04: 根據(jù)配置文件去創(chuàng)建連接并且傳入配置
在redis.php類(lèi)的構(gòu)造方法中的操作:
01: 檢測(cè)redis擴(kuò)展是否安裝
02: 合并配置
03: 檢測(cè)是redis擴(kuò)展還是 pRedis
04: 創(chuàng)建連接對(duì)象
| 參數(shù)名 | 默認(rèn)值 | 描述 | 可以使用的方法 |
|---|---|---|---|
| $job | 無(wú) | 要執(zhí)行任務(wù)的類(lèi) | push,later |
| $data | 空 | 任務(wù)數(shù)據(jù) | push,later |
| $queue | default | 任務(wù)名稱(chēng) | push,later |
| $delay | null | 延遲時(shí)間 | later |
push($job, $data, $queue) Queue::push(Test::class, ['id' => 1], 'test');
一頓騷操作后返回一個(gè)數(shù)組 并且序列化后 rPush到redis中 key為 queue:queueName
數(shù)組結(jié)構(gòu):
[ 'job' => $job, // 要執(zhí)行任務(wù)的類(lèi) 'data' => $data, // 任務(wù)數(shù)據(jù) 'id'=>'xxxxx' //任務(wù)id ]
寫(xiě)入 redis并且返回隊(duì)列id
至于中間的那頓騷操作太長(zhǎng)了就沒(méi)寫(xiě)
later($delay, $job, $data, $queue) Queue::later(100, Test::class, ['id' => 1], 'test');
跟上面的差不多
一頓騷操作后返回一個(gè)數(shù)組 并且序列化后 zAdd 到redis中 key為 queue:queueName:delayed score為當(dāng)前的時(shí)間戳+$delay
執(zhí)行過(guò)程有work模式和listen模式 兩種 區(qū)別上面已經(jīng)說(shuō)了 代碼邏輯由于太多等下回分解;
最后講一下標(biāo)簽的使用
//守護(hù)進(jìn)程開(kāi)啟 'worker_daemon_start' => [ \app\index\behavior\WorkerDaemonStart::class ], //內(nèi)存超出 'worker_memory_exceeded' => [ \app\index\behavior\WorkerMemoryExceeded::class ], //重啟守護(hù)進(jìn)程 'worker_queue_restart' => [ \app\index\behavior\WorkerQueueRestart::class ], //任務(wù)開(kāi)始執(zhí)行之前 'worker_before_process' => [ \app\index\behavior\WorkerBeforeProcess::class ], //任務(wù)延遲執(zhí)行 'worker_before_sleep' => [ \app\index\behavior\WorkerBeforeSleep::class ], //任務(wù)執(zhí)行失敗 'queue_failed' => [ \app\index\behavior\QueueFailed::class ]
public function run(Output $output)
{
$output->write('任務(wù)執(zhí)行失敗 ', true);
}控制臺(tái)執(zhí)行 php think queue:work --queue test --daemon
會(huì)在控制臺(tái)一次輸出
守護(hù)進(jìn)程開(kāi)啟 任務(wù)延遲執(zhí)行
失敗的處理 如果有任務(wù)執(zhí)行失敗或者執(zhí)行次數(shù)達(dá)到最大值
會(huì)觸發(fā) queue_failed
在app\index\behavior@run方法里面寫(xiě)失敗的邏輯 比如郵件通知 寫(xiě)入日志等
最后我們來(lái)說(shuō)一下如何在其他框架或者項(xiàng)目中給tp的項(xiàng)目推送消息隊(duì)列,例如兩個(gè)項(xiàng)目是分開(kāi)的 另一個(gè)使用的卻不是tp5的框架
redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
$this->redis->select(10);
}
public function push($job, $data, $queue)
{
$payload = $this->createPayload($job, $data);
$this->redis->rPush('queues:' . $queue, $payload);
}
public function later($delay, $job, $data, $queue)
{
$payload = $this->createPayload($job, $data);
$this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
}
private function createPayload($job, $data)
{
$payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
return $this->setMeta($payload, 'attempts', 1);
}
private function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
$payload[$key] = $value;
$payload = json_encode($payload);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
}
return $payload;
}
private function random(int $length = 16): string
{
$str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
$randomString = '';
for ($i = 0; $i < $length; $i++) {
$randomString .= $str[rand(0, strlen($str) - 1)];
}
return $randomString;
}
}
(new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');package main
import (
"encoding/json"
"github.com/garyburd/redigo/redis"
"math/rand"
"time"
)
type Payload struct {
Id string `json:"id"`
Job string `json:"job"`
Data interface{} `json:"data"`
Attempts int `json:"attempts"`
}
var RedisClient *redis.Pool
func init() {
RedisClient = &redis.Pool{
MaxIdle: 20,
MaxActive: 500,
IdleTimeout: time.Second * 100,
Dial: func() (conn redis.Conn, e error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
return nil, err
}
_, _ = c.Do("SELECT", 10)
return c, nil
},
}
}
func main() {
var data = make(map[string]interface{})
data["id"] = "1"
later(10, "app\\index\\jobs\\Test", data, "test")
}
func push(job string, data interface{}, queue string) {
payload := createPayload(job, data)
queueName := "queues:" + queue
_, _ = RedisClient.Get().Do("rPush", queueName, payload)
}
func later(delay int, job string, data interface{}, queue string) {
m, _ := time.ParseDuration("+1s")
currentTime := time.Now()
op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
createPayload(job, data)
payload := createPayload(job, data)
queueName := "queues:" + queue + ":delayed"
_, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}
// 創(chuàng)建指定格式的數(shù)據(jù)
func createPayload(job string, data interface{}) (payload string) {
payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}
jsonStr, _ := json.Marshal(payload1)
return string(jsonStr)
}
// 創(chuàng)建隨機(jī)字符串
func random(n int) string {
var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, n)
for i := range b {
b[i] = str[rand.Intn(len(str))]
}
return string(b)
}感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“think-queue的示例分析”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!