安装省略
创建消息队列任务
- 配置消息队列,将config/queue.php将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动。
- php
- // +----------------------------------------------------------------------
- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
- // +----------------------------------------------------------------------
- // | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
- // +----------------------------------------------------------------------
- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
- // +----------------------------------------------------------------------
- // | Author: yunwuxin <448901948@qq.com>
- // +----------------------------------------------------------------------
-
- return [
- 'default' => 'redis',
- 'connections' => [
- 'sync' => [
- 'type' => 'sync',
- ],
- 'database' => [
- 'type' => 'database',
- 'queue' => 'default',
- 'table' => 'jobs',
- ],
- 'redis' => [
- 'type' => 'redis',
- 'queue' => 'default',
- 'host' => '127.0.0.1',
- 'port' => 6379,
- 'password' => '',
- 'select' => 0,
- 'timeout' => 0,
- 'persistent' => false,
- ],
- ],
- 'failed' => [
- 'type' => 'none',
- 'table' => 'failed_jobs',
- ],
- ];
队列生产者
修改app/controller/Index.php里使用Queue::later或者Queue::push发布任务。
- <?php
-
- namespace app\controller;
-
- use app\BaseController;
- use think\facade\Queue;
-
- class Index extends BaseController
- {
- public function index()
- {
- $jobHander = 'app\job\Job';
- $jobData = ['ts' => time()];
- //入队列,later延时发送,单位秒。push立即发送,其他配置请参考[thinkphp-queue]
- Queue::later(2, $jobHander, $jobData);
- return '<style type="text/css">*{ padding: 0; margin: 0; } div{ padding: 4px 48px;} a{color:#2E5CD5;cursor: pointer;text-decoration: none} a:hover{text-decoration:underline; } body{ background: #fff; font-family: "Century Gothic","Microsoft yahei"; color: #333;font-size:18px;} h1{ font-size: 100px; font-weight: normal; margin-bottom: 12px; } p{ line-height: 1.6em; font-size: 42px }</style><div style="padding: 24px 48px;"> <h1>:) </h1><p> ThinkPHP V' . \think\facade\App::version() . '<br/><span style="font-size:30px;">14载初心不改 - 你值得信赖的PHP框架</span></p><span style="font-size:25px;">[ V6.0 版本由 <a href="https://www.yisu.com/" target="yisu">亿速云</a> 独家赞助发布 ]</span></div><script type="text/javascript" src="https://tajs.qq.com/stats?sId=64890268" charset="UTF-8"></script><script type="text/javascript" src="https://e.topthink.com/Public/static/client.js"></script><think id="ee9b1aa918103c4fc"></think>';
- }
-
- public function hello($name = 'ThinkPHP6')
- {
- return 'hello,' . $name;
- }
- }
消费者和删除。
在app目录下创建job文件夹,并创建Job1.php类。内容如下
- <?php
-
- namespace app\job;
-
- use think\facade\Log;
- use think\queue\Job;
-
- class Job1
- {
- public function fire(Job $job, $data)
- {
- //在此执行具体任务,示例只简单打印执行次数
- $count = $job->attempts();
- Log::write('run the ' . $count . ' round');
- //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
- $job->delete();
- // 也可以重新发布这个任务,此为延时2秒发布
- $job->release(2);
- }
- }
监听任务并执行
work和listen两种,具体的用法和可选参数可以输入命令加 --help 查看
- php think queue:listen
- php think queue:work
发布任务
在浏览器里输入http://localhost就可发布任务,此时从日志里看到输出的日志信息。
至此think-queue的功能已经完全实现,然而监听任务需要手动输入命令。而且如果服务器出现问题导致监听退出得需要手动再次启动。think-queue有提到配合supervisor使用,保证进程常驻,于是也使用supervisor让其保持进程常驻。
安装Supervisor
- yum install epel-release
- yum install -y supervisor
- //设置开机自动启动
- systemctl enable supervisord
配置Supervisor
- ; Sample supervisor config file.
- ;
- ; For more information on the config file, please see:
- ; http://supervisord.org/configuration.html
- ;
- ; Notes:
- ; - Shell expansion ("~" or "$HOME") is not supported. Environment
- ; variables can be expanded using this syntax: "%(ENV_HOME)s".
- ; - Quotes around values are not supported, except in the case of
- ; the environment= options as shown below.
- ; - Comments must have a leading space: "a=b ;comment" not "a=b;comment".
- ; - Command will be truncated if it looks like a config file comment, e.g.
- ; "command=bash -c 'foo ; bar'" will truncate to "command=bash -c 'foo ".
- ;
- ; Warning:
- ; Paths throughout this example file use /tmp because it is available on most
- ; systems. You will likely need to change these to locations more appropriate
- ; for your system. Some systems periodically delete older files in /tmp.
- ; Notably, if the socket file defined in the [unix_http_server] section below
- ; is deleted, supervisorctl will be unable to connect to supervisord.
-
- [unix_http_server]
- file=/tmp/supervisor.sock ; the path to the socket file
- ;chmod=0700 ; socket file mode (default 0700)
- ;chown=nobody:nogroup ; socket file uid:gid owner
- ;username=user ; default is no username (open server)
- ;password=123 ; default is no password (open server)
-
- ; Security Warning:
- ; The inet HTTP server is not enabled by default. The inet HTTP server is
- ; enabled by uncommenting the [inet_http_server] section below. The inet
- ; HTTP server is intended for use within a trusted environment only. It
- ; should only be bound to localhost or only accessible from within an
- ; isolated, trusted network. The inet HTTP server does not support any
- ; form of encryption. The inet HTTP server does not use authentication
- ; by default (see the username= and password= options to add authentication).
- ; Never expose the inet HTTP server to the public internet.
-
- ;[inet_http_server] ; inet (TCP) server disabled by default
- ;port=127.0.0.1:9001 ; ip_address:port specifier, *:port for all iface
- ;username=user ; default is no username (open server)
- ;password=123 ; default is no password (open server)
-
- [supervisord]
- logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
- logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB
- logfile_backups=10 ; # of main logfile backups; 0 means none, default 10
- loglevel=info ; log level; default info; others: debug,warn,trace
- pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
- nodaemon=false ; start in foreground if true; default false
- silent=false ; no logs to stdout if true; default false
- minfds=1024 ; min. avail startup file descriptors; default 1024
- minprocs=200 ; min. avail process descriptors;default 200
- ;umask=022 ; process file creation umask; default 022
- ;user=supervisord ; setuid to this UNIX account at startup; recommended if root
- ;identifier=supervisor ; supervisord identifier, default is 'supervisor'
- ;directory=/tmp ; default is not to cd during start
- ;nocleanup=true ; don't clean up tempfiles at start; default false
- ;childlogdir=/tmp ; 'AUTO' child log dir, default $TEMP
- ;environment=KEY="value" ; key value pairs to add to environment
- ;strip_ansi=false ; strip ansi escape codes in logs; def. false
- ; The rpcinterface:supervisor section must remain in the config file for
- ; RPC (supervisorctl/web interface) to work. Additional interfaces may be
- ; added by defining them in separate [rpcinterface:x] sections.
- [rpcinterface:supervisor]
- supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
- ; The supervisorctl section configures how supervisorctl will connect to
- ; supervisord. configure it match the settings in either the unix_http_server
- ; or inet_http_server section.
- [supervisorctl]
- serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
- ;serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket
- ;username=chris ; should be same as in [*_http_server] if set
- ;password=123 ; should be same as in [*_http_server] if set
- ;prompt=mysupervisor ; cmd line prompt (default "supervisor")
- ;history_file=~/.sc_history ; use readline history if available
- ; The sample program section below shows all possible program subsection values.
- ; Create one or more 'real' program: sections to be able to control them under
- ; supervisor.
- ;[program:theprogramname]
- ;command=/bin/cat ; the program (relative uses PATH, can take args)
- ;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
- ;numprocs=1 ; number of processes copies to start (def 1)
- ;directory=/tmp ; directory to cwd to before exec (def no cwd)
- ;umask=022 ; umask for process (default None)
- ;priority=999 ; the relative start priority (default 999)
- ;autostart=true ; start at supervisord start (default: true)
- ;startsecs=1 ; # of secs prog must stay up to be running (def. 1)
- ;startretries=3 ; max # of serial start failures when starting (default 3)
- ;autorestart=unexpected ; when to restart if exited after running (def: unexpected)
- ;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0)
- ;stopsignal=QUIT ; signal used to kill process (default TERM)
- ;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10)
- ;stopasgroup=false ; send stop signal to the UNIX process group (default false)
- ;killasgroup=false ; SIGKILL the UNIX process group (def false)
- ;user=chrism ; setuid to this UNIX account to run the program
- ;redirect_stderr=true ; redirect proc stderr to stdout (default false)
- ;stdout_logfile=/a/path ; stdout log path, NONE for none; default AUTO
- ;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
- ;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10)
- ;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
- ;stdout_events_enabled=false ; emit events on stdout writes (default false)
- ;stdout_syslog=false ; send stdout to syslog with process name (default false)
- ;stderr_logfile=/a/path ; stderr log path, NONE for none; default AUTO
- ;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
- ;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10)
- ;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
- ;stderr_events_enabled=false ; emit events on stderr writes (default false)
- ;stderr_syslog=false ; send stderr to syslog with process name (default false)
- ;environment=A="1",B="https://files.jxasp.com/image/2" ; process environment additions (def no adds)
- ;serverurl=AUTO ; override serverurl computation (childutils)
- ; The sample eventlistener section below shows all possible eventlistener
- ; subsection values. Create one or more 'real' eventlistener: sections to be
- ; able to handle event notifications sent by supervisord.
- ;[eventlistener:theeventlistenername]
- ;command=/bin/eventlistener ; the program (relative uses PATH, can take args)
- ;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
- ;numprocs=1 ; number of processes copies to start (def 1)
- ;events=EVENT ; event notif. types to subscribe to (req'd)
- ;buffer_size=10 ; event buffer queue size (default 10)
- ;directory=/tmp ; directory to cwd to before exec (def no cwd)
- ;umask=022 ; umask for process (default None)
- ;priority=-1 ; the relative start priority (default -1)
- ;autostart=true ; start at supervisord start (default: true)
- ;startsecs=1 ; # of secs prog must stay up to be running (def. 1)
- ;startretries=3 ; max # of serial start failures when starting (default 3)
- ;autorestart=unexpected ; autorestart if exited after running (def: unexpected)
- ;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0)
- ;stopsignal=QUIT ; signal used to kill process (default TERM)
- ;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10)
- ;stopasgroup=false ; send stop signal to the UNIX process group (default false)
- ;killasgroup=false ; SIGKILL the UNIX process group (def false)
- ;user=chrism ; setuid to this UNIX account to run the program
- ;redirect_stderr=false ; redirect_stderr=true is not allowed for eventlisteners
- ;stdout_logfile=/a/path ; stdout log path, NONE for none; default AUTO
- ;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
- ;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10)
- ;stdout_events_enabled=false ; emit events on stdout writes (default false)
- ;stdout_syslog=false ; send stdout to syslog with process name (default false)
- ;stderr_logfile=/a/path ; stderr log path, NONE for none; default AUTO
- ;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
- ;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10)
- ;stderr_events_enabled=false ; emit events on stderr writes (default false)
- ;stderr_syslog=false ; send stderr to syslog with process name (default false)
- ;environment=A="1",B="https://files.jxasp.com/image/2" ; process environment additions
- ;serverurl=AUTO ; override serverurl computation (childutils)
-
- ; The sample group section below shows all possible group values. Create one
- ; or more 'real' group: sections to create "heterogeneous" process groups.
-
- ;[group:thegroupname]
- ;programs=progname1,progname2 ; each refers to 'x' in [program:x] definitions
- ;priority=999 ; the relative start priority (default 999)
-
- ; The [include] section can just contain the "files" setting. This
- ; setting can list multiple files (separated by whitespace or
- ; newlines). It can also contain wildcards. The filenames are
- ; interpreted as relative to this file. Included files *cannot*
- ; include files themselves.
-
- [include]
- files = /etc/supervisord.d/*.ini
配置think-queue队列
在/etc/supervisord.d目录下生成一个queue.ini文件,文件内容如下
- [program:queue] ; 程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
- autorestart=True ; 程序异常退出后自动重启
- autostart=True ; 在 supervisord 启动的时候也自动启动
- redirect_stderr=True ; 把 stderr 重定向到 stdout,默认 false
- user=root ; 用哪个用户启动
- command=php /opt/lampp/htdocs/tp6/think queue:work
- stdout_logfile_maxbytes = 20MB ; stdout 日志文件大小,默认 50MB
- stdout_logfile_backups = 20 ; stdout 日志文件备份数
- ; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
- stdout_logfile = /run/log/usercenter_stdout.log
启动Supervisor
supervisord -c /etc/supervisord.conf
更新Supervisor
当增加新的配置时,可以使用下面命令更新
supervisorctl update
查看Supervisor
- supervisorctl status
- //返回信息如下,可以看到设置的queue进程已经启动
- queue RUNNING pid 96906, uptime 0:06:19