关键词搜索

源码搜索 ×
×

TP6+Redis+think-queue+Supervisor实现进程常驻消息队列/job任务

发布2022-02-17浏览992次

详情内容

安装省略

创建消息队列任务

  • 配置消息队列,将config/queue.php将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动。

  1. php
  2. // +----------------------------------------------------------------------
  3. // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
  8. // +----------------------------------------------------------------------
  9. // | Author: yunwuxin <448901948@qq.com>
  10. // +----------------------------------------------------------------------
  11. return [
  12. 'default' => 'redis',
  13. 'connections' => [
  14. 'sync' => [
  15. 'type' => 'sync',
  16. ],
  17. 'database' => [
  18. 'type' => 'database',
  19. 'queue' => 'default',
  20. 'table' => 'jobs',
  21. ],
  22. 'redis' => [
  23. 'type' => 'redis',
  24. 'queue' => 'default',
  25. 'host' => '127.0.0.1',
  26. 'port' => 6379,
  27. 'password' => '',
  28. 'select' => 0,
  29. 'timeout' => 0,
  30. 'persistent' => false,
  31. ],
  32. ],
  33. 'failed' => [
  34. 'type' => 'none',
  35. 'table' => 'failed_jobs',
  36. ],
  37. ];

队列生产者
修改app/controller/Index.php里使用Queue::later或者Queue::push发布任务。

  1. <?php
  2. namespace app\controller;
  3. use app\BaseController;
  4. use think\facade\Queue;
  5. class Index extends BaseController
  6. {
  7. public function index()
  8. {
  9. $jobHander = 'app\job\Job';
  10. $jobData = ['ts' => time()];
  11. //入队列,later延时发送,单位秒。push立即发送,其他配置请参考[thinkphp-queue]
  12. Queue::later(2, $jobHander, $jobData);
  13. return '<style type="text/css">*{ padding: 0; margin: 0; } div{ padding: 4px 48px;} a{color:#2E5CD5;cursor: pointer;text-decoration: none} a:hover{text-decoration:underline; } body{ background: #fff; font-family: "Century Gothic","Microsoft yahei"; color: #333;font-size:18px;} h1{ font-size: 100px; font-weight: normal; margin-bottom: 12px; } p{ line-height: 1.6em; font-size: 42px }</style><div style="padding: 24px 48px;"> <h1>:) </h1><p> ThinkPHP V' . \think\facade\App::version() . '<br/><span style="font-size:30px;">14载初心不改 - 你值得信赖的PHP框架</span></p><span style="font-size:25px;">[ V6.0 版本由 <a href="https://www.yisu.com/" target="yisu">亿速云</a> 独家赞助发布 ]</span></div><script type="text/javascript" src="https://tajs.qq.com/stats?sId=64890268" charset="UTF-8"></script><script type="text/javascript" src="https://e.topthink.com/Public/static/client.js"></script><think id="ee9b1aa918103c4fc"></think>';
  14. }
  15. public function hello($name = 'ThinkPHP6')
  16. {
  17. return 'hello,' . $name;
  18. }
  19. }

消费者和删除。
在app目录下创建job文件夹,并创建Job1.php类。内容如下

  1. <?php
  2. namespace app\job;
  3. use think\facade\Log;
  4. use think\queue\Job;
  5. class Job1
  6. {
  7. public function fire(Job $job, $data)
  8. {
  9. //在此执行具体任务,示例只简单打印执行次数
  10. $count = $job->attempts();
  11. Log::write('run the ' . $count . ' round');
  12. //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
  13. $job->delete();
  14. // 也可以重新发布这个任务,此为延时2秒发布
  15. $job->release(2);
  16. }
  17. }

监听任务并执行

work和listen两种,具体的用法和可选参数可以输入命令加 --help 查看

  1. php think queue:listen
  2. php think queue:work

发布任务
在浏览器里输入http://localhost就可发布任务,此时从日志里看到输出的日志信息。

至此think-queue的功能已经完全实现,然而监听任务需要手动输入命令。而且如果服务器出现问题导致监听退出得需要手动再次启动。think-queue有提到配合supervisor使用,保证进程常驻,于是也使用supervisor让其保持进程常驻。

安装Supervisor
 

  1. yum install epel-release
  2. yum install -y supervisor
  3. //设置开机自动启动
  4. systemctl enable supervisord

配置Supervisor

  1. ; Sample supervisor config file.
  2. ;
  3. ; For more information on the config file, please see:
  4. ; http://supervisord.org/configuration.html
  5. ;
  6. ; Notes:
  7. ; - Shell expansion ("~" or "$HOME") is not supported. Environment
  8. ; variables can be expanded using this syntax: "%(ENV_HOME)s".
  9. ; - Quotes around values are not supported, except in the case of
  10. ; the environment= options as shown below.
  11. ; - Comments must have a leading space: "a=b ;comment" not "a=b;comment".
  12. ; - Command will be truncated if it looks like a config file comment, e.g.
  13. ; "command=bash -c 'foo ; bar'" will truncate to "command=bash -c 'foo ".
  14. ;
  15. ; Warning:
  16. ; Paths throughout this example file use /tmp because it is available on most
  17. ; systems. You will likely need to change these to locations more appropriate
  18. ; for your system. Some systems periodically delete older files in /tmp.
  19. ; Notably, if the socket file defined in the [unix_http_server] section below
  20. ; is deleted, supervisorctl will be unable to connect to supervisord.
  21. [unix_http_server]
  22. file=/tmp/supervisor.sock ; the path to the socket file
  23. ;chmod=0700 ; socket file mode (default 0700)
  24. ;chown=nobody:nogroup ; socket file uid:gid owner
  25. ;username=user ; default is no username (open server)
  26. ;password=123 ; default is no password (open server)
  27. ; Security Warning:
  28. ; The inet HTTP server is not enabled by default. The inet HTTP server is
  29. ; enabled by uncommenting the [inet_http_server] section below. The inet
  30. ; HTTP server is intended for use within a trusted environment only. It
  31. ; should only be bound to localhost or only accessible from within an
  32. ; isolated, trusted network. The inet HTTP server does not support any
  33. ; form of encryption. The inet HTTP server does not use authentication
  34. ; by default (see the username= and password= options to add authentication).
  35. ; Never expose the inet HTTP server to the public internet.
  36. ;[inet_http_server] ; inet (TCP) server disabled by default
  37. ;port=127.0.0.1:9001 ; ip_address:port specifier, *:port for all iface
  38. ;username=user ; default is no username (open server)
  39. ;password=123 ; default is no password (open server)
  40. [supervisord]
  41. logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
  42. logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB
  43. logfile_backups=10 ; # of main logfile backups; 0 means none, default 10
  44. loglevel=info ; log level; default info; others: debug,warn,trace
  45. pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
  46. nodaemon=false ; start in foreground if true; default false
  47. silent=false ; no logs to stdout if true; default false
  48. minfds=1024 ; min. avail startup file descriptors; default 1024
  49. minprocs=200 ; min. avail process descriptors;default 200
  50. ;umask=022 ; process file creation umask; default 022
  51. ;user=supervisord ; setuid to this UNIX account at startup; recommended if root
  52. ;identifier=supervisor ; supervisord identifier, default is 'supervisor'
  53. ;directory=/tmp ; default is not to cd during start
  54. ;nocleanup=true ; don't clean up tempfiles at start; default false
  55. ;childlogdir=/tmp ; 'AUTO' child log dir, default $TEMP
  56. ;environment=KEY="value" ; key value pairs to add to environment
  57. ;strip_ansi=false ; strip ansi escape codes in logs; def. false
  58. ; The rpcinterface:supervisor section must remain in the config file for
  59. ; RPC (supervisorctl/web interface) to work. Additional interfaces may be
  60. ; added by defining them in separate [rpcinterface:x] sections.
  61. [rpcinterface:supervisor]
  62. supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
  63. ; The supervisorctl section configures how supervisorctl will connect to
  64. ; supervisord. configure it match the settings in either the unix_http_server
  65. ; or inet_http_server section.
  66. [supervisorctl]
  67. serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
  68. ;serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket
  69. ;username=chris ; should be same as in [*_http_server] if set
  70. ;password=123 ; should be same as in [*_http_server] if set
  71. ;prompt=mysupervisor ; cmd line prompt (default "supervisor")
  72. ;history_file=~/.sc_history ; use readline history if available
  73. ; The sample program section below shows all possible program subsection values.
  74. ; Create one or more 'real' program: sections to be able to control them under
  75. ; supervisor.
  76. ;[program:theprogramname]
  77. ;command=/bin/cat ; the program (relative uses PATH, can take args)
  78. ;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
  79. ;numprocs=1 ; number of processes copies to start (def 1)
  80. ;directory=/tmp ; directory to cwd to before exec (def no cwd)
  81. ;umask=022 ; umask for process (default None)
  82. ;priority=999 ; the relative start priority (default 999)
  83. ;autostart=true ; start at supervisord start (default: true)
  84. ;startsecs=1 ; # of secs prog must stay up to be running (def. 1)
  85. ;startretries=3 ; max # of serial start failures when starting (default 3)
  86. ;autorestart=unexpected ; when to restart if exited after running (def: unexpected)
  87. ;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0)
  88. ;stopsignal=QUIT ; signal used to kill process (default TERM)
  89. ;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10)
  90. ;stopasgroup=false ; send stop signal to the UNIX process group (default false)
  91. ;killasgroup=false ; SIGKILL the UNIX process group (def false)
  92. ;user=chrism ; setuid to this UNIX account to run the program
  93. ;redirect_stderr=true ; redirect proc stderr to stdout (default false)
  94. ;stdout_logfile=/a/path ; stdout log path, NONE for none; default AUTO
  95. ;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
  96. ;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10)
  97. ;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
  98. ;stdout_events_enabled=false ; emit events on stdout writes (default false)
  99. ;stdout_syslog=false ; send stdout to syslog with process name (default false)
  100. ;stderr_logfile=/a/path ; stderr log path, NONE for none; default AUTO
  101. ;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
  102. ;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10)
  103. ;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
  104. ;stderr_events_enabled=false ; emit events on stderr writes (default false)
  105. ;stderr_syslog=false ; send stderr to syslog with process name (default false)
  106. ;environment=A="1",B="https://files.jxasp.com/image/2" ; process environment additions (def no adds)
  107. ;serverurl=AUTO ; override serverurl computation (childutils)
  108. ; The sample eventlistener section below shows all possible eventlistener
  109. ; subsection values. Create one or more 'real' eventlistener: sections to be
  110. ; able to handle event notifications sent by supervisord.
  111. ;[eventlistener:theeventlistenername]
  112. ;command=/bin/eventlistener ; the program (relative uses PATH, can take args)
  113. ;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
  114. ;numprocs=1 ; number of processes copies to start (def 1)
  115. ;events=EVENT ; event notif. types to subscribe to (req'd)
  116. ;buffer_size=10 ; event buffer queue size (default 10)
  117. ;directory=/tmp ; directory to cwd to before exec (def no cwd)
  118. ;umask=022 ; umask for process (default None)
  119. ;priority=-1 ; the relative start priority (default -1)
  120. ;autostart=true ; start at supervisord start (default: true)
  121. ;startsecs=1 ; # of secs prog must stay up to be running (def. 1)
  122. ;startretries=3 ; max # of serial start failures when starting (default 3)
  123. ;autorestart=unexpected ; autorestart if exited after running (def: unexpected)
  124. ;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0)
  125. ;stopsignal=QUIT ; signal used to kill process (default TERM)
  126. ;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10)
  127. ;stopasgroup=false ; send stop signal to the UNIX process group (default false)
  128. ;killasgroup=false ; SIGKILL the UNIX process group (def false)
  129. ;user=chrism ; setuid to this UNIX account to run the program
  130. ;redirect_stderr=false ; redirect_stderr=true is not allowed for eventlisteners
  131. ;stdout_logfile=/a/path ; stdout log path, NONE for none; default AUTO
  132. ;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
  133. ;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10)
  134. ;stdout_events_enabled=false ; emit events on stdout writes (default false)
  135. ;stdout_syslog=false ; send stdout to syslog with process name (default false)
  136. ;stderr_logfile=/a/path ; stderr log path, NONE for none; default AUTO
  137. ;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
  138. ;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10)
  139. ;stderr_events_enabled=false ; emit events on stderr writes (default false)
  140. ;stderr_syslog=false ; send stderr to syslog with process name (default false)
  141. ;environment=A="1",B="https://files.jxasp.com/image/2" ; process environment additions
  142. ;serverurl=AUTO ; override serverurl computation (childutils)
  143. ; The sample group section below shows all possible group values. Create one
  144. ; or more 'real' group: sections to create "heterogeneous" process groups.
  145. ;[group:thegroupname]
  146. ;programs=progname1,progname2 ; each refers to 'x' in [program:x] definitions
  147. ;priority=999 ; the relative start priority (default 999)
  148. ; The [include] section can just contain the "files" setting. This
  149. ; setting can list multiple files (separated by whitespace or
  150. ; newlines). It can also contain wildcards. The filenames are
  151. ; interpreted as relative to this file. Included files *cannot*
  152. ; include files themselves.
  153. [include]
  154. files = /etc/supervisord.d/*.ini

配置think-queue队列

在/etc/supervisord.d目录下生成一个queue.ini文件,文件内容如下

  1. [program:queue] ; 程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
  2. autorestart=True ; 程序异常退出后自动重启
  3. autostart=True ; 在 supervisord 启动的时候也自动启动
  4. redirect_stderr=True ; 把 stderr 重定向到 stdout,默认 false
  5. user=root ; 用哪个用户启动
  6. command=php /opt/lampp/htdocs/tp6/think queue:work
  7. stdout_logfile_maxbytes = 20MB ; stdout 日志文件大小,默认 50MB
  8. stdout_logfile_backups = 20 ; stdout 日志文件备份数
  9. ; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
  10. stdout_logfile = /run/log/usercenter_stdout.log

启动Supervisor

supervisord -c /etc/supervisord.conf

更新Supervisor

当增加新的配置时,可以使用下面命令更新

supervisorctl update

查看Supervisor

  1. supervisorctl status
  2. //返回信息如下,可以看到设置的queue进程已经启动
  3. queue RUNNING pid 96906, uptime 0:06:19

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载