博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
php-resque消息队列
阅读量:3738 次
发布时间:2019-05-22

本文共 10315 字,大约阅读时间需要 34 分钟。

参考:

tp3.2集成php-resque消息队列

推送,发送短信等第三方库,找到以下几种方案:

     1)PHP+redis自己做消息队列

     2)PHP-Resque

     3)MemcacheQ

    4)RabbitMQ

集成方法

将源码放到ThinkPHP的Vendor目录中

将源码更新到 ThinkPHP/Library/Vendor/php-resque/ 目录中

注意要定义应用目录,之前发的内容没定义应用目录导致部分小伙伴引发了找不到Queue类的异常

在项目根目录中创建resque入口脚本

#!/usr/bin/env php

 

创建Queue控制器

Home模块的Controller中创建Queue控制器

keys[] = $key; $this->args[$key] = $val; } $this->init(); } /** * 执行队列 * 环境变量参数值: * --queue|QUEUE: 需要执行的队列的名字 * --interval|INTERVAL:在队列中循环的间隔时间,即完成一个任务后的等待时间,默认是5秒 * --app|APP_INCLUDE:需要自动载入PHP文件路径,Worker需要知道你的Job的位置并载入Job * --count|COUNT:需要创建的Worker的数量。所有的Worker都具有相同的属性。默认是创建1个Worker * --debug|VVERBOSE:设置“1”启用更啰嗦模式,会输出详细的调试信息 * --pid|PIDFILE:手动指定PID文件的位置,适用于单Worker运行方式 */ private function init() { $is_sington = false; //是否单例运行,单例运行会在tmp目录下建立一个唯一的PID // 根据参数设置QUEUE环境变量 $QUEUE = in_array('--queue', $this->keys) ? $this->args['--queue'] : '*'; if (empty($QUEUE)) { die("Set QUEUE env var containing the list of queues to work.\n"); } $this->queues = explode(',', $QUEUE); // 根据参数设置INTERVAL环境变量 $interval = in_array('--interval', $this->keys) ? $this->args['--interval'] : 5; putenv("INTERVAL={$interval}"); // 根据参数设置COUNT环境变量 $count = in_array('--count', $this->keys) ? $this->args['--count'] : 1; putenv("COUNT={$count}"); // 根据参数设置APP_INCLUDE环境变量 $app = in_array('--app', $this->keys) ? $this->args['--app'] : ''; putenv("APP_INCLUDE={$app}"); // 根据参数设置PIDFILE环境变量 $pid = in_array('--pid', $this->keys) ? $this->args['--pid'] : ''; putenv("PIDFILE={$pid}"); // 根据参数设置VVERBOSE环境变量 $debug = in_array('--debug', $this->keys) ? $this->args['--debug'] : ''; putenv("VVERBOSE={$debug}"); } public function index() { $act = getenv('Q_ACTION'); switch ($act) { case 'stop': $this->stop(); break; case 'status': $this->status(); break; default: $this->start(); } } /** * 开始队列 */ public function start() { // 载入任务类 $path = COMMON_PATH . "Job"; $flag = \FilesystemIterator::KEY_AS_FILENAME; $glob = new \FilesystemIterator($path, $flag); foreach ($glob as $file) { if('php' === pathinfo($file, PATHINFO_EXTENSION)) require realpath($file); } $logLevel = 0; $LOGGING = getenv('LOGGING'); $VERBOSE = getenv('VERBOSE'); $VVERBOSE = getenv('VVERBOSE'); if (!empty($LOGGING) || !empty($VERBOSE)) { $logLevel = Resque\Worker::LOG_NORMAL; } else { if (!empty($VVERBOSE)) { $logLevel = Resque\Worker::LOG_VERBOSE; } } $APP_INCLUDE = getenv('APP_INCLUDE'); if ($APP_INCLUDE) { if (!file_exists($APP_INCLUDE)) { die('APP_INCLUDE (' . $APP_INCLUDE . ") does not exist.\n"); } require_once $APP_INCLUDE; } $interval = 5; $INTERVAL = getenv('INTERVAL'); if (!empty($INTERVAL)) { $interval = $INTERVAL; } $count = 1; $COUNT = getenv('COUNT'); if (!empty($COUNT) && $COUNT > 1) { $count = $COUNT; } if ($count > 1) { for ($i = 0; $i < $count; ++$i) { $pid = pcntl_fork(); if ($pid == -1) { die("Could not fork worker " . $i . "\n"); } // Child, start the worker else { if (!$pid) { $worker = new Resque\Worker($this->queues); $worker->logLevel = $logLevel; fwrite(STDOUT, '*** Starting worker ' . $worker . "\n"); $worker->work($interval); break; } } } } // Start a single worker else { $worker = new Resque\Worker($this->queues); $worker->logLevel = $logLevel; $PIDFILE = getenv('PIDFILE'); if ($PIDFILE) { file_put_contents($PIDFILE, getmypid()) or die('Could not write PID information to ' . $PIDFILE); } fwrite(STDOUT, '*** Starting worker ' . $worker . "\n"); $worker->work($interval); } } /** * 停止队列 */ public function stop() { $worker = new Resque\Worker($this->queues); $worker->shutdown(); } /** * 查看某个任务状态 */ public function status() { $id = in_array('--id', $this->keys) ? $this->args['--id'] : ''; $status = new \Resque\Job\Status($id); if (!$status->isTracking()) { die("Resque is not tracking the status of this job.\n"); } echo "Tracking status of " . $id . ". Press [break] to stop.\n\n"; while (true) { fwrite(STDOUT, "Status of " . $id . " is: " . $status->get() . "\n"); sleep(1); } }}

 

新增队列配置

在公共config.php中新增队列配置,如下

/* 消息队列配置 */'QUEUE' => array(    'type' => 'redis',    'host' => '127.0.0.1',    'port' =>  '6379',    'persistent' => false, //是否启用    'prefix' => 'queue',    'password' =>  '', // 密码),

 

新增队列初始化行为

app_init行为中新增队列初始化的行为,run内容为

public function run(){	// 处理队列配置    $config = C('QUEUE');    if ($config) {        vendor('php-resque.autoload');        // 初始化队列服务        $select = isset($config['select']) ? $config['select'] : 0;        $password = isset($config['password']) ? $config['password'] : null;        $persistent = isset($config['persistent']) ? $config['persistent'] : false;        $timeout = isset($config['timeout']) ? $config['timeout'] : 30;        $server = $config['host'] . ":" . $config['port'];        \Resque::setBackend($server, $select, $password, $persistent, $timeout);        // 初始化缓存前缀        if(isset($config['prefix']) && !empty($config['prefix'])){            \Resque\Redis::prefix($config['prefix']);        }    }}

 这里我是这么弄的如图

 

到此,整个队列服务基本已配置完成。

接下来就要创建队列执行的任务了

Jobs

创建 Jobs(就是业务在这里了)

目前任务类固定在Common模块的Job中,命名格式为WjpJob.class.php

args;// fwrite(STDOUT, json_encode($args) . PHP_EOL);// echo $this->args['name']; dump($this->args); }}

要获取队列中传入的参数值请使用$this->args

任务perform方法中抛出的任何异常都会导致任务失败,所以在写任务业务时要小心,并且处理异常情况。

任务也有setUptearDown方法,如果定义了一个setUp方法,那么它将在perform方法之前调用,如果定义了一个tearDown方法,那么它将会在perform方法之后调用。

 

添加任务到队列中

在程序控制器的任意方法中引入队列类库时,使用Resque::enqueue方法执行入栈,Resque::enqueue方法有四个参数,第一个是当前的队列名称,第二个参数为任务类,第三个是传入的参数,第四个表示是否返回工作状态的令牌

public function test12(){        vendor('php-resque.autoload');  // 引入队列类库        $job = '\\Common\\Job\\WjpJob'; // 定义任务类        $names = [            '李灵黛','冷文卿','阴露萍','柳兰歌','秦水支','李念儿','文彩依','柳婵诗','顾莫言','任水寒','金磨针','丁玲珑','凌霜华','水笙','景茵梦','容柒雁','林墨瞳','华诗','千湄','剑舞','兰陵',' 洛离'        ];        foreach($names as $name){            // 定义参数            $args = array(                'name'=>$name,                'time' => time(),                'array' => array(                    'test' => 'test',                ),            );            // 入栈            $jobId = \Resque::enqueue('default', $job, $args, true);            echo "Queued job ".$jobId."\n\n";        }    }

 测试:

终端:cd 到根目录

bogon:yjhxh Mac$ php resque.php

在浏览器上请求

 

接着终端输出结果:就是方法已经调用,

*** Starting worker bogon:60636:*
array(3) {  ["name"] => string(9) "李灵黛"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "冷文卿"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "阴露萍"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "柳兰歌"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "秦水支"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "李念儿"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "文彩依"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "柳婵诗"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "顾莫言"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "任水寒"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "金磨针"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "丁玲珑"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "凌霜华"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(6) "水笙"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "景茵梦"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "容柒雁"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(9) "林墨瞳"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(6) "华诗"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(6) "千湄"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(6) "剑舞"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(6) "兰陵"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}
array(3) {  ["name"] => string(7) " 洛离"  ["time"] => int(1561105170)  ["array"] => array(1) {    ["test"] => string(4) "test"  }}

 

 

转载地址:http://katin.baihongyu.com/

你可能感兴趣的文章
kafka生产者常用参数含义
查看>>
kafka topic消息分配partition规则
查看>>
mysql编写函数
查看>>
面试笔试题之hql
查看>>
sql函数之cast()
查看>>
hql中substr函数截取字符串匹配
查看>>
mysql之指定ip、用户、数据库权限
查看>>
zookeeper的读和写数据流程(有图欧)
查看>>
bin/schematool -dbType mysql -initSchema HiveMetaException: Failed to get schema version.
查看>>
flink知识总结
查看>>
mysql之部门工资前三的所有员工
查看>>
flink之检查点(checkpoint)和保存点(savepoint)的区别
查看>>
flink面试题
查看>>
Java_最长公共前缀_LeetCode
查看>>
Linux系统编程---进程I/O
查看>>
spring学习知识补充
查看>>
杂文之生成随机字符串
查看>>
springBoot基础(一)
查看>>
springBoot基础(二)
查看>>
在springBoot中使用Mapper类问题
查看>>