由於PHP不支援多線程,但是作為一個(gè)完善的系統(tǒng),有很多操作都是需要非同步完成的。為了完成這些非同步操作,我們做了一個(gè)基於Redis隊(duì)列任務(wù)系統(tǒng)。
大家知道,一個(gè)訊息佇列處理系統(tǒng)主要分成兩大部分:消費(fèi)者和製造者。
在我們的系統(tǒng)中,主系統(tǒng)作為製造者,任務(wù)系統(tǒng)作為消費(fèi)者。
具體的工作流程如下: 1、主系統(tǒng)將需要需要處理的任務(wù)名稱 任務(wù)參數(shù)push到佇列中。 2.任務(wù)系統(tǒng)即時(shí)的對(duì)任務(wù)隊(duì)列進(jìn)行pop,pop出來(lái)一個(gè)任務(wù)就fork一個(gè)子進(jìn)程,由子進(jìn)程完成具體的任務(wù)邏輯。
/** * 啟動(dòng)守護(hù)進(jìn)程 */ public function runAction() { Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-'); while (true) { $this->fork_process(); } exit; } /** * 創(chuàng)建子進(jìn)程 */ private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子進(jìn)程 $pid = posix_getpid(); //echo "* Process {$pid} was created \n\n"; $this->mq_process(); exit; } else {//主進(jìn)程 $pid = pcntl_wait($status, WUNTRACED); //取得子進(jìn)程結(jié)束狀態(tài) if (pcntl_wifexited($status)) { //echo "\n\n* Sub process: {$pid} exited with {$status}"; //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid ); } else { Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-'); } } } /** * 業(yè)務(wù)任務(wù)隊(duì)列處理 */ private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = '_task_' . $data['worker']; $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel'; $params = $data['params']; $class = new $class_name(); $class->$worker($params); return TRUE; }
+VX:PHPopen888PHP中高級(jí)教程分享,專注laravel,swoole,tp,分布式高并發(fā)處理教程分享