-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathQueue.php
98 lines (85 loc) · 2.22 KB
/
Queue.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
<?php
namespace yiisolutions\queue;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use yii\base\Component;
use yii\helpers\Json;
class Queue extends Component
{
public $host;
public $port;
public $user;
public $password;
public $vhost;
/**
* @var AMQPStreamConnection
*/
private $_connection;
/**
* @var AMQPChannel
*/
private $_channel;
/**
* Отправить сообщение в очередь
*
* @param string $queue
* @param array $data
*/
public function send($queue, array $data)
{
$channel = $this->getChannel();
$channel->queue_declare($queue, false, true, false, false);
$msg = new AMQPMessage(Json::encode($data), ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', $queue);
}
/**
* Слушать очередь
*
* @param string $queue
*/
public function listen($queue, $callback)
{
$channel = $this->getChannel();
$channel->queue_declare($queue, false, true, false, false);
$channel->basic_consume($queue, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
/**
* Подтверждение канала
*
* @param AMQPMessage $msg
*/
public function acknowledgmentMessage(AMQPMessage $msg)
{
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
/**
* @return AMQPChannel
*/
protected function getChannel()
{
if (!$this->_channel) {
$this->_channel = $this->getConnection()->channel();
}
return $this->_channel;
}
/**
* @return AMQPStreamConnection
*/
protected function getConnection()
{
if (!$this->_connection) {
$this->_connection = new AMQPStreamConnection(
$this->host,
$this->port,
$this->user,
$this->password,
$this->vhost
);
}
return $this->_connection;
}
}