Subversion Repositories cheapmusic

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
103 - 1
<?php
2
 
3
/*
4
 * This file is part of the Monolog package.
5
 *
6
 * (c) Jordi Boggiano <j.boggiano@seld.be>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
 
12
namespace Monolog\Handler;
13
 
14
use Monolog\Logger;
15
use Monolog\Formatter\JsonFormatter;
16
use PhpAmqpLib\Message\AMQPMessage;
17
use PhpAmqpLib\Channel\AMQPChannel;
18
use AMQPExchange;
19
 
20
class AmqpHandler extends AbstractProcessingHandler
21
{
22
    /**
23
     * @var AMQPExchange|AMQPChannel $exchange
24
     */
25
    protected $exchange;
26
 
27
    /**
28
     * @var string
29
     */
30
    protected $exchangeName;
31
 
32
    /**
33
     * @param AMQPExchange|AMQPChannel $exchange     AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use
34
     * @param string                   $exchangeName
35
     * @param int                      $level
36
     * @param bool                     $bubble       Whether the messages that are handled can bubble up the stack or not
37
     */
38
    public function __construct($exchange, $exchangeName = 'log', $level = Logger::DEBUG, $bubble = true)
39
    {
40
        if ($exchange instanceof AMQPExchange) {
41
            $exchange->setName($exchangeName);
42
        } elseif ($exchange instanceof AMQPChannel) {
43
            $this->exchangeName = $exchangeName;
44
        } else {
45
            throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required');
46
        }
47
        $this->exchange = $exchange;
48
 
49
        parent::__construct($level, $bubble);
50
    }
51
 
52
    /**
53
     * {@inheritDoc}
54
     */
55
    protected function write(array $record)
56
    {
57
        $data = $record["formatted"];
58
        $routingKey = $this->getRoutingKey($record);
59
 
60
        if ($this->exchange instanceof AMQPExchange) {
61
            $this->exchange->publish(
62
                $data,
63
                $routingKey,
64
                0,
65
                array(
66
                    'delivery_mode' => 2,
67
                    'content_type' => 'application/json',
68
                )
69
            );
70
        } else {
71
            $this->exchange->basic_publish(
72
                $this->createAmqpMessage($data),
73
                $this->exchangeName,
74
                $routingKey
75
            );
76
        }
77
    }
78
 
79
    /**
80
     * {@inheritDoc}
81
     */
82
    public function handleBatch(array $records)
83
    {
84
        if ($this->exchange instanceof AMQPExchange) {
85
            parent::handleBatch($records);
86
 
87
            return;
88
        }
89
 
90
        foreach ($records as $record) {
91
            if (!$this->isHandling($record)) {
92
                continue;
93
            }
94
 
95
            $record = $this->processRecord($record);
96
            $data = $this->getFormatter()->format($record);
97
 
98
            $this->exchange->batch_basic_publish(
99
                $this->createAmqpMessage($data),
100
                $this->exchangeName,
101
                $this->getRoutingKey($record)
102
            );
103
        }
104
 
105
        $this->exchange->publish_batch();
106
    }
107
 
108
    /**
109
     * Gets the routing key for the AMQP exchange
110
     *
111
     * @param  array  $record
112
     * @return string
113
     */
114
    protected function getRoutingKey(array $record)
115
    {
116
        $routingKey = sprintf(
117
            '%s.%s',
118
            // TODO 2.0 remove substr call
119
            substr($record['level_name'], 0, 4),
120
            $record['channel']
121
        );
122
 
123
        return strtolower($routingKey);
124
    }
125
 
126
    /**
127
     * @param  string      $data
128
     * @return AMQPMessage
129
     */
130
    private function createAmqpMessage($data)
131
    {
132
        return new AMQPMessage(
133
            (string) $data,
134
            array(
135
                'delivery_mode' => 2,
136
                'content_type' => 'application/json',
137
            )
138
        );
139
    }
140
 
141
    /**
142
     * {@inheritDoc}
143
     */
144
    protected function getDefaultFormatter()
145
    {
146
        return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false);
147
    }
148
}