Laravel RabbitMQ 扩展

安装Rabbitmq

1
$ brew install rabbitmq

启动Rabbitmq

1
2
$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

访问web界面

http://localhost:15672

web界面

安装扩展包

http://www.rabbitmq.com/getstarted.html

1
$ composer require vladimir-yuldashev/laravel-queue-rabbitmq

在 config/app.php 中,providers 数组中添加

1
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,

config/queue.php添加rabbitmq配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
'connections' => [
'rabbitmq' => [
'driver' => 'rabbitmq',
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'vhost' => env('RABBITMQ_VHOST', '/'),
'login' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'queue' => env('RABBITMQ_QUEUE'),
'exchange_declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),
'queue_declare_bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),
'queue_params' => [
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
],
'exchange_params' => [
'name' => env('RABBITMQ_EXCHANGE_NAME', null),
'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'),
'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
],
],
]

.env添加相应的参数

1
2
3
4
5
6
7
8
9
QUEUE_DRIVER=rabbitmq
RABBITMQ_QUEUE=queue_name
RABBITMQ_HOST=127.0.0.1
RABBITMQ_PORT=5672
RABBITMQ_VHOST=/
RABBITMQ_LOGIN=guest
RABBITMQ_PASSWORD=guest
// 注意队列连接配置填写为: rabbitmq,跟上面的配置相对应
QUEUE_CONNECTION=rabbitmq

好了,现在试一下效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//创建一个event时间
$ laravel artisan make:event DevEvent
// 实现 ShouldBroadcast
<?php

namespace App\Events;

use Illuminate\Broadcasting\Channel;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Support\Facades\Log;

class DevEvent implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;

/**
* Create a new event instance.
*
* @return void
*/
public function __construct()
{
//测试写入日志
Log::info('我是队列');
}

/**
* Get the channels the event should broadcast on.
*
* @return \Illuminate\Broadcasting\Channel|array
*/
public function broadcastOn()
{
return [];
}
}

构建一个触发事件

1
2
3
Artisan::command('queue:action', function () {
event(new \App\Events\DevEvent());
});

先执行几次生产

这里测试时,队列监听暂时不开启,不然立刻会消费掉

1
2
3
4
5
6
7
// 执行2~3次
$ php artisan queue:action
//rabbitmq查看,详细的可以到web界面查看
$ /usr/local/sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
queue_name 3

QQ20181025-114741.png

开启监听,进行消费

1
2
3
4
5
6
7
» php artisan queue:work 
[2018-10-25 03:42:56][5bd13bbe62b323.92629498] Processing: App\Events\DevEvent
[2018-10-25 03:42:56][5bd13bbe62b323.92629498] Processed: App\Events\DevEvent
[2018-10-25 03:42:56][5bd13bbf41fce3.30995796] Processing: App\Events\DevEvent
[2018-10-25 03:42:56][5bd13bbf41fce3.30995796] Processed: App\Events\DevEvent
[2018-10-25 03:42:56][5bd13bc0402947.79951558] Processing: App\Events\DevEvent
[2018-10-25 03:42:56][5bd13bc0402947.79951558] Processed: App\Events\DevEvent

查看事件当中写入的日志内容

1
2
3
4
// storage/log/xxx.log
[2018-10-25 03:42:54] local.INFO: 我是队列
[2018-10-25 03:42:55] local.INFO: 我是队列
[2018-10-25 03:42:56] local.INFO: 我是队列