12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- <?php
- declare(strict_types=1);
- namespace App\Process;
- use App\Master\Framework\Library\Mqtt\Subscribe;
- use App\Utils\LogUtil;
- use Hyperf\Process\AbstractProcess;
- use Hyperf\Process\Annotation\Process;
- #[Process(name: "mqtt_process", redirectStdinStdout: false, pipeType: 2, enableCoroutine: true)]
- class MqttProcess extends AbstractProcess
- {
- // 日志模块名称
- const LOG_MODULE = 'MqttProcess';
- const LOG_FUNCTION = 'handle';
- /**
- * 进程数量
- */
- public int $nums = 1;
- /**
- * 进程名称
- */
- public string $name = 'mqtt_process';
- /**
- * 重定向自定义进程的标准输入和输出
- */
- public bool $redirectStdinStdout = false;
- /**
- * 管道类型
- */
- public int $pipeType = 2;
- /**
- * 是否启用协程
- */
- public bool $enableCoroutine = true;
- /**
- * 监听订阅
- *
- * @return void
- */
- public function handle(): void
- {
- // 日志统一写入
- LogUtil::getInstance("Mqtt/");//设置日志存入通道
- $topic = [
- '/test/subscribe' => 0
- ];
- $subscribe = new Subscribe();
- $subscribe->endlessLoop($topic, function ($cline,$message, array $topic) {
- // 日志统一写入
- LogUtil::getInstance("Mqtt/");//设置日志存入通道
- // 接收订阅消息
- LogUtil::info('订阅主题', self::LOG_MODULE, self::LOG_FUNCTION, $message);
- // 日志统一写入
- LogUtil::close();
- });
- }
- public function isEnable($server): bool
- {
- // 跟随服务启动一同启动
- return false;
- }
- }
|