ShTcp.php 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. <?php
  2. namespace app\workerman;
  3. use app\common\server\ShouhuanCommand;
  4. use app\common\server\SouhuanAnalysis;
  5. use think\facade\Log;
  6. use think\worker\Server;
  7. use Workerman\Lib\Timer;
  8. class ShTcp extends Server
  9. {
  10. protected $socket = 'tcp://0.0.0.0:21444';
  11. //protected $socket = 'tcp://0.0.0.0:9501';
  12. protected function init()
  13. {
  14. $this->count = 10; //线程数
  15. }
  16. public function onConnect($connection)
  17. {
  18. }
  19. public function onWorkerStart($worker)
  20. {
  21. define('HEARTBEAT_TIME', 1000);
  22. //异步处理
  23. //发送
  24. $server_sa = new SouhuanAnalysis();
  25. $server_sc = new ShouhuanCommand();
  26. Timer::add(1, function () use ($worker, $server_sa) {
  27. if (isset($worker->device_id_code)) {
  28. $device_id_code = $worker->device_id_code;
  29. foreach ($worker->connections as $connection) {
  30. $size = 500;
  31. $info = $server_sa->getsendqueuelist($device_id_code, null, $size);
  32. $totalpage = $info['totalpage'];
  33. if ($totalpage > 0) {
  34. $size = $info['size'];
  35. for ($page = 1; $page <= $totalpage; $page++) {
  36. $list = $server_sa->getsendqueuelist($device_id_code, 1, $size);
  37. $list = (array) $list;
  38. foreach ($list as $key => $value) {
  39. echo 'send:' . $value['id'] . "-" . $value['msg'] . "\n";
  40. $connection->send($value['msg']);
  41. $server_sa->showfortest($value['msg']);
  42. $server_sa->sendmsgsuccess($value);
  43. }
  44. }
  45. }
  46. }
  47. }
  48. });
  49. //解析
  50. $server_sa->initanalysisShoneReceiveMsg();
  51. Timer::add(10, function () use ($worker, $server_sa, $server_sc) {
  52. $page = 1;
  53. $size = 500;
  54. $list = $server_sa->getreceivequeuelist($page, $size);
  55. if ($list) {
  56. $list = (array) $list;
  57. foreach ($list as $key => $value) {
  58. try {
  59. $msg = $value['msg'];
  60. $linedata = $server_sa->getline($msg);
  61. if (!is_array($linedata)) {
  62. Log::write("不可解析_onWorkerStart:" . $msg, 'shouhuan');
  63. $server_sa->receivemsgsuccess($value);
  64. continue;
  65. }
  66. //长度相等可解析
  67. $content = $linedata['content_str'];
  68. $device_id_code = $linedata['device_id_code'];
  69. $command = $linedata['command'];
  70. $content = $linedata['content_arr'];
  71. $fid = $server_sa->getfacilityid($device_id_code);
  72. $config = $linedata['config'];
  73. if ($config) {
  74. $result = $server_sa->saveshinfo($fid, $device_id_code, $linedata, $value);
  75. if ($result && 'tsend' == $config['kind']) {
  76. $data = $server_sc->createsendmsg($fid, $device_id_code, $linedata);
  77. }
  78. }
  79. $server_sa->receivemsgsuccess($value);
  80. } catch (\Exception $e) {
  81. $msg = $e->getMessage();
  82. echo $msg . "\n";
  83. Log::write($msg, 'shouhuan');
  84. Log::write($e->getTraceAsString(), 'shouhuan');
  85. }
  86. }
  87. }
  88. });
  89. //下线校验 间隔10秒
  90. Timer::add(10, function () use ($worker, $server_sa) {
  91. $server_sa->setofflineinfo();
  92. //校验离线
  93. // $time_now = time();
  94. // foreach ($worker->connections as $connection) {
  95. // //当前时间和更改时间超过1分钟 则视为断开连接
  96. // // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
  97. // if (empty($connection->lastMessageTime)) {
  98. // $connection->lastMessageTime = $time_now;
  99. // continue;
  100. // }
  101. // // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
  102. // if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
  103. // //$connection->close();
  104. // //设置对应设备下线
  105. // }
  106. // }
  107. });
  108. }
  109. public function onMessage($connection, $data)
  110. {
  111. Log::write($data, 'shouhuan');
  112. $server_sa = new SouhuanAnalysis();
  113. $server_sc = new ShouhuanCommand();
  114. $datas = (array) $server_sa->unpack($data);
  115. foreach ($datas as $key => $data) {
  116. //测试
  117. echo "[" . date('Y-m-d H:i:s') . "] onmessage receive msg:" . $data . "\n";
  118. //$server_sa->showfortest($data);
  119. try {
  120. //$device_id_code = $server_sa->getline($data, 'device_id_code');
  121. $lineinfo = $server_sa->getline($data);
  122. $device_id_code = $lineinfo['device_id_code'];
  123. //设置上线
  124. $server_sa->setonlineinfo($device_id_code);
  125. if ($lineinfo['config']) {
  126. $result = $server_sa->savereceivequeue($data, $device_id_code);
  127. if (empty($result)) {
  128. Log::write("手环接收信息队列,数据保存失败", 'shouhuan');
  129. }
  130. }
  131. $command = $lineinfo['command'];
  132. //回复消息
  133. $fid = $server_sa->getfacilityid($device_id_code);
  134. $sendmsg = $server_sc->createsendmsg2($fid, $device_id_code, $lineinfo);
  135. if ($sendmsg) {
  136. $connection->send($sendmsg);
  137. } else {
  138. $sendmsg = 'NULL';
  139. }
  140. echo "[" . date('Y-m-d H:i:s') . "] onmessage send:" . $sendmsg . "\n";
  141. $this->device_id_code = $device_id_code;
  142. //测试
  143. //$this->device_id_code = "358800006072996";
  144. } catch (\Exception $e) {
  145. Log::write($data, 'shouhuan');
  146. Log::write($e->getMessage(), 'shouhuan');
  147. Log::write($e->getTraceAsString(), 'shouhuan');
  148. var_dump($e->getMessage());
  149. //var_dump($e->getTraceAsString());
  150. }
  151. }
  152. }
  153. public function onClose($connection)
  154. {
  155. $msg = 'colse time' . date('Y-m-d H:i:s');
  156. Log::write($msg, 'shouhuan');
  157. }
  158. }