xiaohuilam/laravel

17. 通知/广播 (Broadcast) 机制

Opened this issue · 0 comments

鉴权

config/app.php 中我们能看到 Illuminate\Broadcasting\BroadcastServiceProvider 这个服务提供者被注册:

Illuminate\Broadcasting\BroadcastServiceProvider::class,

在前面对服务提供者的讲解中,我们得出了一个服务提供者肯定会包含 bootregister 方法之一,这 BroadcastServiceProvider 就是一个包含而且只包含了 boot 方法的服务提供者:
/**
* Bootstrap any application services.
*
* @return void
*/
public function boot()
{
Broadcast::routes();
require base_path('routes/channels.php');
}

第17行调用到了 Illuminate\Support\Facades\Broadcast::routes() 假面方法。

顾名思义,就是注册路由的。而这个方法,最终是穿透到了 Illuminate\Broadcasting\BroadcastManager::routes()

/**
* Register the routes for handling broadcast authentication and sockets.
*
* @param array|null $attributes
* @return void
*/
public function routes(array $attributes = null)
{
if ($this->app->routesAreCached()) {
return;
}
$attributes = $attributes ?: ['middleware' => ['web']];
$this->app['router']->group($attributes, function ($router) {
$router->match(
['get', 'post'], '/broadcasting/auth',
'\\'.BroadcastController::class.'@authenticate'
);
});
}

第19行是为了在 config/app.php 卸载 BroadcastServiceProvider 时,直观的让 routes/channels.php 失效的做法,所以就不放在别处

require base_path('routes/channels.php');

第61~63行是如果缓存过路由,则跳出。

第62~第72行相当于与执行了

Route::group(['middleware' => ['web']], function () {
    Route::match(['get', 'post'], '/broadcasting/auth', '\\'.BroadcastController::class.'@authenticate');
});

方法调用到的 controller action 其实是

/**
* Authenticate the request for channel access.
*
* @param \Illuminate\Http\Request $request
* @return \Illuminate\Http\Response
*/
public function authenticate(Request $request)
{
return Broadcast::auth($request);
}

这个假面方法 Broadcast::auth() 比较特殊,是根据 config('broadcasting.default') 分发到 Illuminate/Broadcasting/Broadcasters 具体的类中:

  • LogBroadcaster.php
  • NullBroadcaster.php
  • PusherBroadcaster.php
  • RedisBroadcaster.php

我们以 redis 的配置为例 (laravel-echo-server) 。

/**
* Authenticate the incoming request for a given channel.
*
* @param \Illuminate\Http\Request $request
* @return mixed
*
* @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException
*/
public function auth($request)
{
if (Str::startsWith($request->channel_name, ['private-', 'presence-']) &&
! $request->user()) {
throw new AccessDeniedHttpException;
}
$channelName = Str::startsWith($request->channel_name, 'private-')
? Str::replaceFirst('private-', '', $request->channel_name)
: Str::replaceFirst('presence-', '', $request->channel_name);
return parent::verifyUserCanAccessChannel(
$request, $channelName
);
}

第一步,如果 $request->channel_name 不以 private- 或者 presence- 开头,或者没有提供用户授权导致请求取不到用户,抛出 Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException 异常

第二步,将 $request->channel_name 中的 private- 或者 presence- 还原移除。

第三步,调用 Broadcaster:: verifyUserCanAccessChannel()

/**
* Authenticate the incoming request for a given channel.
*
* @param \Illuminate\Http\Request $request
* @param string $channel
* @return mixed
*
* @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException
*/
protected function verifyUserCanAccessChannel($request, $channel)
{
foreach ($this->channels as $pattern => $callback) {
if (! Str::is(preg_replace('/\{(.*?)\}/', '*', $pattern), $channel)) {
continue;
}
$parameters = $this->extractAuthParameters($pattern, $channel, $callback);
$handler = $this->normalizeChannelHandlerToCallable($callback);
if ($result = $handler($request->user(), ...$parameters)) {
return $this->validAuthenticationResponse($request, $result);
}
}
throw new AccessDeniedHttpException;
}

routes/channels.php 中定义的 channel_name 匹配上的通道的鉴权回调执行,拿到返回值。如果 if ($result = $handler($request->user(), ...$parameters)) 返回成功的 RedisBroadcaster::validAuthenticationResponse(),否则抛出 Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException 异常

而成功的响应 RedisBroadcaster::validAuthenticationResponse() 逻辑为返回 user_iduser_info 给 websocket 客户端:

/**
* Return the valid authentication response.
*
* @param \Illuminate\Http\Request $request
* @param mixed $result
* @return mixed
*/
public function validAuthenticationResponse($request, $result)
{
if (is_bool($result)) {
return json_encode($result);
}
return json_encode(['channel_data' => [
'user_id' => $request->user()->getAuthIdentifier(),
'user_info' => $result,
]]);
}

至此,鉴权部分完成。


推送

broadcast() 方法定义于

function broadcast($event = null)
{
return app(BroadcastFactory::class)->event($event);
}

此方法最终走到了

/**
* Begin broadcasting an event.
*
* @param mixed|null $event
* @return \Illuminate\Broadcasting\PendingBroadcast|void
*/
public function event($event = null)
{
return new PendingBroadcast($this->app->make('events'), $event);
}

PendingBroadcast 类比较特殊,有个 __destruct 解构方法:

/**
* Handle the object's destruction.
*
* @return void
*/
public function __destruct()
{
$this->events->dispatch($this->event);
}

走到

/**
* Fire an event and call the listeners.
*
* @param string|object $event
* @param mixed $payload
* @param bool $halt
* @return array|null
*/
public function dispatch($event, $payload = [], $halt = false)
{
// When the given "event" is actually an object we will assume it is an event
// object and use the class as the event name and this event itself as the
// payload to the handler, which makes object based events quite simple.
list($event, $payload) = $this->parseEventAndPayload(
$event, $payload
);
if ($this->shouldBroadcast($payload)) {
$this->broadcastEvent($payload[0]);
}
$responses = [];
foreach ($this->getListeners($event) as $listener) {
$response = $listener($event, $payload);
// If a response is returned from the listener and event halting is enabled
// we will just return this response, and not call the rest of the event
// listeners. Otherwise we will add the response on the response list.
if ($halt && ! is_null($response)) {
return $response;
}
// If a boolean false is returned from a listener, we will stop propagating
// the event to any further listeners down in the chain, else we keep on
// looping through the listeners and firing every one in our sequence.
if ($response === false) {
break;
}
$responses[] = $response;
}
return $halt ? null : $responses;
}

调用了

/**
* Broadcast the given event class.
*
* @param \Illuminate\Contracts\Broadcasting\ShouldBroadcast $event
* @return void
*/
protected function broadcastEvent($event)
{
$this->container->make(BroadcastFactory::class)->queue($event);
}

还是回到了 BroadcastManager

/**
* Queue the given event for broadcast.
*
* @param mixed $event
* @return void
*/
public function queue($event)
{
$connection = $event instanceof ShouldBroadcastNow ? 'sync' : null;
if (is_null($connection) && isset($event->connection)) {
$connection = $event->connection;
}
$queue = null;
if (method_exists($event, 'broadcastQueue')) {
$queue = $event->broadcastQueue();
} elseif (isset($event->broadcastQueue)) {
$queue = $event->broadcastQueue;
} elseif (isset($event->queue)) {
$queue = $event->queue;
}
$this->app->make('queue')->connection($connection)->pushOn(
$queue, new BroadcastEvent(clone $event)
);
}

根据前文的 config('broadcasting.default') 配置,

第127~129行,是将广播事件 BroadcastEvent 入队。

根据我们对队列的了解,BroadcastEvent 应该有 handle() 方法

/**
* Handle the queued job.
*
* @param \Illuminate\Contracts\Broadcasting\Broadcaster $broadcaster
* @return void
*/
public function handle(Broadcaster $broadcaster)
{
$name = method_exists($this->event, 'broadcastAs')
? $this->event->broadcastAs() : get_class($this->event);
$broadcaster->broadcast(
Arr::wrap($this->event->broadcastOn()), $name,
$this->getPayloadFromEvent($this->event)
);
}

第46~49行,是执行广播,调用了 RedisBroadcaster::broadcast()

/**
* Broadcast the given event.
*
* @param array $channels
* @param string $event
* @param array $payload
* @return void
*/
public function broadcast(array $channels, $event, array $payload = [])
{
$connection = $this->redis->connection($this->connection);
$payload = json_encode([
'event' => $event,
'data' => $payload,
'socket' => Arr::pull($payload, 'socket'),
]);
foreach ($this->formatChannels($channels) as $channel) {
$connection->publish($channel, $payload);
}
}

消息进入 redis。

// TODO: laravel-echo-server 部分。