MqttProcess.php 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Process;
  4. use App\Master\Framework\Library\Mqtt\Subscribe;
  5. use App\Utils\LogUtil;
  6. use Hyperf\Process\AbstractProcess;
  7. use Hyperf\Process\Annotation\Process;
  8. #[Process(name: "mqtt_process", redirectStdinStdout: false, pipeType: 2, enableCoroutine: true)]
  9. class MqttProcess extends AbstractProcess
  10. {
  11. // 日志模块名称
  12. const LOG_MODULE = 'MqttProcess';
  13. const LOG_FUNCTION = 'handle';
  14. /**
  15. * 进程数量
  16. */
  17. public int $nums = 1;
  18. /**
  19. * 进程名称
  20. */
  21. public string $name = 'mqtt_process';
  22. /**
  23. * 重定向自定义进程的标准输入和输出
  24. */
  25. public bool $redirectStdinStdout = false;
  26. /**
  27. * 管道类型
  28. */
  29. public int $pipeType = 2;
  30. /**
  31. * 是否启用协程
  32. */
  33. public bool $enableCoroutine = true;
  34. /**
  35. * 监听订阅
  36. *
  37. * @return void
  38. */
  39. public function handle(): void
  40. {
  41. // 日志统一写入
  42. LogUtil::getInstance("Mqtt/");//设置日志存入通道
  43. $topic = [
  44. '/test/subscribe' => 0
  45. ];
  46. $subscribe = new Subscribe();
  47. $subscribe->endlessLoop($topic, function ($cline,$message, array $topic) {
  48. // 日志统一写入
  49. LogUtil::getInstance("Mqtt/");//设置日志存入通道
  50. // 接收订阅消息
  51. LogUtil::info('订阅主题', self::LOG_MODULE, self::LOG_FUNCTION, $message);
  52. // 日志统一写入
  53. LogUtil::close();
  54. });
  55. }
  56. public function isEnable($server): bool
  57. {
  58. // 跟随服务启动一同启动
  59. return false;
  60. }
  61. }