ShTcp.php 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. <?php
  2. namespace app\workerman;
  3. use app\common\server\ShOneanalysis;
  4. use think\facade\Log;
  5. use think\worker\Server;
  6. use Workerman\Lib\Timer;
  7. class ShTcp extends Server
  8. {
  9. protected $socket = 'tcp://0.0.0.0:21444';
  10. //protected $socket = 'tcp://0.0.0.0:9501';
  11. protected function init()
  12. {
  13. $this->count = 4; //线程数
  14. }
  15. public function onConnect($connection)
  16. {
  17. }
  18. public function onWorkerStart($worker)
  19. {
  20. define('HEARTBEAT_TIME', 1000);
  21. //异步处理
  22. $server_sa = new ShOneanalysis();
  23. Timer::add(1, function () use ($worker, $server_sa) {
  24. if (isset($worker->device_id_code)) {
  25. $device_id_code = $worker->device_id_code;
  26. foreach ($worker->connections as $connection) {
  27. $info = $server_sa->getsendqueuelist($device_id_code);
  28. $totalpage = $info['totalpage'];
  29. if ($totalpage > 0) {
  30. $size = $info['size'];
  31. for ($page = 1; $page <= $totalpage; $page++) {
  32. $list = $server_sa->getsendqueuelist($device_id_code, 1, $size);
  33. foreach ($list as $key => $value) {
  34. $connection->send($value['msg']);
  35. $server_sa->sendmsgsuccess($value);
  36. }
  37. }
  38. }
  39. }
  40. }
  41. });
  42. Timer::add(1, function () use ($worker) {
  43. $time_now = time();
  44. foreach ($worker->connections as $connection) {
  45. // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
  46. if (empty($connection->lastMessageTime)) {
  47. $connection->lastMessageTime = $time_now;
  48. continue;
  49. }
  50. // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
  51. if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
  52. //$connection->close();
  53. //设置对应设备下线
  54. }
  55. }
  56. });
  57. }
  58. public function onMessage($connection, $data)
  59. {
  60. Log::write("ShTcp", 'shouhuan');
  61. $server_sa = new ShOneanalysis();
  62. $datas = (array) $server_sa->unpack($data);
  63. foreach ($datas as $key => $data) {
  64. try {
  65. $result = $server_sa->savereceivequeue($data);
  66. if (empty($result)) {
  67. Log::write("手环接收信息队列,数据保存失败", 'shouhuan');
  68. }
  69. $line = $server_sa->getline($data);
  70. $this->device_id_code = $line['device_id_code'];
  71. //$this->device_id_code = "358800006072996";
  72. } catch (\Exception $e) {
  73. Log::write($data, 'shouhuan');
  74. Log::write($e->getMessage(), 'shouhuan');
  75. var_dump($e->getMessage());
  76. var_dump($e->getTraceAsString());
  77. }
  78. }
  79. }
  80. public function onClose($connection)
  81. {
  82. }
  83. }