Since PHP does not support multi-threading, as a complete system, many operations need to be completed asynchronously. In order to complete these asynchronous operations, we built a Redis queue task system.
As we all know, a message queue processing system is mainly divided into two parts: consumers and producers.
In our system, the main system acts as the producer and the task system acts as the consumer.
The specific workflow is as follows: 1. The main system pushes the name of the task that needs to be processed and the task parameters into the queue. 2. The task system pops the task queue in real time. When a task pops out, it forks a sub-process, and the sub-process completes the specific task logic.
/** * 啟動守護(hù)進(jìn)程 */ public function runAction() { Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-'); while (true) { $this->fork_process(); } exit; } /** * 創(chuàng)建子進(jìn)程 */ private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子進(jìn)程 $pid = posix_getpid(); //echo "* Process {$pid} was created \n\n"; $this->mq_process(); exit; } else {//主進(jìn)程 $pid = pcntl_wait($status, WUNTRACED); //取得子進(jìn)程結(jié)束狀態(tài) if (pcntl_wifexited($status)) { //echo "\n\n* Sub process: {$pid} exited with {$status}"; //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid ); } else { Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-'); } } } /** * 業(yè)務(wù)任務(wù)隊(duì)列處理 */ private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = '_task_' . $data['worker']; $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel'; $params = $data['params']; $class = new $class_name(); $class->$worker($params); return TRUE; }
+VX:PHPopen888PHP中高級教程分享,專注laravel,swoole,tp,分布式高并發(fā)處理教程分享