123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- <?php
- use Workerman\Worker;
- use think\App;
- use think\Db;
- // ######## 消息队列消费者 ########
- $consumer = new Worker();
- // 慢任务,消费者的进程数可以开多一些
- $consumer->count = 1;
- //进程启动
- $consumer->onWorkerStart = function($consumer)
- {
- App::initCommon();
- //创建socket套接字
- $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
- //设置阻塞模式
- socket_set_block($socket);
- //为套接字绑定ip和端口
- socket_connect($socket,'127.0.0.1',2346);
- //监听socket
- socket_listen($socket,4);
- while(true)
- {
- //接收客户端请求
- if(($msgsocket = socket_accept($socket)) !== false)
- {
- //读取请求内容
- $buf = socket_read($msgsocket, 8192);
- echo "Received msg: $buf \n";
- $str = "this is a service message";
- //向连接的客户端发送数据
- socket_write($msgsocket, $str,strlen($str));
- //操作完之后需要关闭该连接否则 feof() 函数无法正确识别打开的句柄是否读取完成
- socket_close($msgsocket);
- }else{
- echo 37;
- }
- }
- };
- $consumer->onMessage = function($consumer){
- echo date('Y-m-d H:i:s');
- sleep(1);
- };
- //进程关闭
- $consumer->onWorkerStop = function($consumer)
- {
- restore_error_handler();
- };
|