ShTcpTest.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. <?php
  2. namespace app\workerman;
  3. use app\common\server\ShOneanalysisTest;
  4. use think\facade\Log;
  5. use think\worker\Server;
  6. use Workerman\Lib\Timer;
  7. class ShTcpTest extends Server
  8. {
  9. protected $socket = 'tcp://0.0.0.0:21444';
  10. //protected $socket = 'tcp://0.0.0.0:9501';
  11. protected function init()
  12. {
  13. $this->count = 4; //线程数
  14. }
  15. public function onConnect($connection)
  16. {
  17. }
  18. public function onWorkerStart($worker)
  19. {
  20. define('HEARTBEAT_TIME', 1000);
  21. //异步处理
  22. //发送
  23. $server_sa = new ShOneanalysisTest();
  24. Timer::add(1, function () use ($worker, $server_sa) {
  25. if (isset($worker->device_id_code)) {
  26. $device_id_code = $worker->device_id_code;
  27. foreach ($worker->connections as $connection) {
  28. $info = $server_sa->getsendqueuelist($device_id_code);
  29. $totalpage = $info['totalpage'];
  30. if ($totalpage > 0) {
  31. $size = $info['size'];
  32. for ($page = 1; $page <= $totalpage; $page++) {
  33. $list = $server_sa->getsendqueuelist($device_id_code, 1, $size);
  34. $list = (array) $list;
  35. foreach ($list as $key => $value) {
  36. $connection->send($value['msg']);
  37. $server_sa->showfortest($value['msg']);
  38. $server_sa->sendmsgsuccess($value);
  39. }
  40. }
  41. }
  42. }
  43. }
  44. });
  45. //解析
  46. $server_sa->initanalysisShoneReceiveMsg();
  47. Timer::add(10, function () use ($worker, $server_sa) {
  48. $page = 1;
  49. $size = 30;
  50. $list = $server_sa->getreceivequeuelist($page, $size);
  51. if ($list) {
  52. $list = (array) $list;
  53. foreach ($list as $key => $value) {
  54. try {
  55. $linedata = $server_sa->getline($value['msg']);
  56. //长度相等可解析
  57. $content = $linedata['content_str'];
  58. $device_id_code = $linedata['device_id_code'];
  59. $fid = $server_sa->getfacilityid($device_id_code);
  60. $content = $server_sa->getcommandcontent($content);
  61. $result = $server_sa->saveshinfo($fid, $device_id_code, $content, $value['msg']);
  62. if ($result) {
  63. $command = $content[0];
  64. $data = $server_sa->createbacksendmsg($fid, $device_id_code, $content, $command);
  65. }
  66. $server_sa->receivemsgsuccess($value);
  67. } catch (\Exception $e) {
  68. $msg = $e->getMessage();
  69. echo $msg . "\n";
  70. Log::write($msg, 'shouhuan');
  71. }
  72. }
  73. }
  74. });
  75. Timer::add(1, function () use ($worker) {
  76. $time_now = time();
  77. foreach ($worker->connections as $connection) {
  78. // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
  79. if (empty($connection->lastMessageTime)) {
  80. $connection->lastMessageTime = $time_now;
  81. continue;
  82. }
  83. // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
  84. if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
  85. //$connection->close();
  86. //设置对应设备下线
  87. }
  88. }
  89. });
  90. }
  91. public function onMessage($connection, $data)
  92. {
  93. //Log::write($data, 'shouhuan');
  94. $server_sa = new ShOneanalysisTest();
  95. $datas = (array) $server_sa->unpack($data);
  96. foreach ($datas as $key => $data) {
  97. $server_sa->showfortest($data);
  98. try {
  99. $result = $server_sa->savereceivequeue($data);
  100. if (empty($result)) {
  101. Log::write("手环接收信息队列,数据保存失败", 'shouhuan');
  102. }
  103. $line = $server_sa->getline($data);
  104. $this->device_id_code = $line['device_id_code'];
  105. //$this->device_id_code = "358800006072996";
  106. } catch (\Exception $e) {
  107. Log::write($data, 'shouhuan');
  108. Log::write($e->getMessage(), 'shouhuan');
  109. var_dump($e->getMessage());
  110. }
  111. }
  112. }
  113. public function onClose($connection)
  114. {
  115. }
  116. }