Implementación del patrón publish/subscribe en PHP

TEORÍA

El patrón publish/subscribe (abreviado p/s) es un patrón de diseño de software orientado a eventos en el cual un conjunto de objetos se subscriben a un tópico (o lo que es lo mismo, un tipo de evento) y otro conjunto de objetos publican datos bajo un tópico.

Un ejemplo práctico sería el siguiente: imaginemos que tenemos un servicio implementado en Java. Ese servicio se dedica a leer los estados de las líneas telefónicas de una centralita. Cada vez que este servicio detecte un cambio en una línea, manda una notificación de que algo ha cambiado. Eso es lo que se conocería como «publicar» (publish). Imaginemos que los eventos de cambios de las líneas los queremos mandar bajo el tópico (topic) «PBX_events».

Ahora lo querríamos integrar con nuestra aplicación Web basada en PHP en el back-end; el servicio Java manda una petición POST a través del protocolo HTTP a un script PHP que tengamos preparado para ello, indicando en el body de la petición el tópico (para saber de qué son los datos) y los datos nuevos de la línea que ha cambiado para que los subscriptores sepan qué es lo que ha cambiado.

Ahora el script PHP (al que llamaremos «publisher.php«) tiene que mandar este dato a todos los clientes u otros procesos PHP que estén subscritos a este tópico ¿Cómo lo hacemos? Pues otra vez volvemos a usar los fantásticos WebSockets (puedes leer qué son y cómo se usan los WebSockets en PHP en esta entrada). Para modularizar un poco, vamos a suponer que el servicio de Java no quiere o no puede conectarse directamente al socket maestro, y es por ello que mandará una simple petición POST a través del protocolo HTTP (que es más flexible y abierto que conectarse a un socket ya que puede dar problemas de conexión por el firewall u otros factores que no podríamos controlar si fuese el servicio de un tercero, en este caso el de Java, el que intentase la conexión directa por socket).

El script «publisher.php« va a actuar como el back-end de una petición AJAX, es decir, va a recibir los datos que le dicen que tiene que publicar bajo un tópico, va a mandar los datos al socket maestro del servicio PHP que lo esté ejecutando, le devolverá OK a quien mandó los datos en POST y morirá.

Ahora llegamos a la parte del servicio PHP donde se maneja el socket maestro. Debe mandar los datos solo a aquellos usuarios (sockets) que estén suscritos al tópico que le ha llegado, y para ello buscará en el array de usuarios (sockets conectados al servicio) a ver cuáles están suscritos y les manda la información de la publicación.

Finalmente a todos los usuarios del servicio PHP les llegan los datos del publish y lo tratan como consideren adecuado: actualizando la vista, los datos en memoria, etc.

Hasta aquí todo es sencillo, pero se presentan un par de dificultades técnicas.

DIFICULTADES TÉCNICAS

1.- Los datos de publicación sencillamente serán un String formado adecuadamente para su tratamiento (por ejemplo, serializado en JSON de la forma:


{
"auth":
{
"user":"bar",
"pass":"foo"
},
"data":
{
"lineId":1,
"status":2,
...
}
}

), pero cualquier usuario «gracioso» (entendiendo como usuario a cada socket conectado al servicio de PHP de websocket) puede escribir los datos formados adecuadamente para mandar una publicación de los datos de la línea telefónica falsa, por tanto hay que:

1.1.- Crear un sistema de login para los sockets conectados (ya que las sesiones PHP no sirven aquí, el servicio PHP del websocket maestro solo tiene una sesión: la suya propia) para así distinguir quién puede publicar y quién no asociando permisos a usuarios. El sistema de login planteado será sencillo, el cliente al conectarse al socket va a mandar sus datos de acceso al sistema en claro, lo cual plantea un problema de seguridad.

1.2.- Usar sockets seguros con SSL, ya que los datos de login se mandarán en claro, debemos evitar que cualquier usuario esnifando datos de la red conozca nuestras credenciales. Ya que esto requiere un certificado SSL y cambiar algunas cosas en el uso de las funciones de sockets de PHP, lo dejaremos para otra entrada del blog.

1.3.- Mantener el array de sockets de usuarios limpio de usuarios no autenticados, cuando un usuario realice una petición de conexión al socket maestro, se le concede la conexión directamente, así que vamos a permitir que cualquier usuario se conecte, pudiendo dejar «zombies» que se han conectado pero no se han autenticado para siempre. Así pues, para evitar eso, les daremos unos segundos para que mande sus credenciales (tras el tiempo establecido se le kickeará si aún no se ha autenticado) y habrá que realizar labores de mantenimiento para tener limpio el array «$users» de usuarios «zombies» (para ello tendremos que redefinir, en el servicio PHP de p/s, la función heredada de WebSocketServer «tick()» que se ejecuta indiscriminadamente en cada vuelta del bucle while).

1.4.- Especializar la clase WebSocketUser para el sistema de login; para saber si el usuario está logueado en el sistema, crearemos una clase AuthUser que herede de la clase WebSocketUser y almacenaremos los atributos «$authenticated» (booleano que nos indicará si está autenticado el usuario) y «$connectedTimeStamp» (una marca de tiempo que nos dirá cuándo se conectó).

1.5.- Optimizar la búsqueda de usuarios «zombies», para no buscar en todo el array de usuarios conectados, vamos a tener una lista de usuarios que aún no han sido autenticados. Por cada uno de ellos se comprobará su atributo «$connectedTimeStamp» y si supera el tiempo límite establecido se le desconectará y se eliminará del array de usuarios sin autenticar. Del mismo modo, si un usuario se desconecta antes de realizar la autenticación también hay que eliminarlo de esta lista.

2.- Si tenemos por ejemplo mil usuarios conectados pero solo unos pocos están suscritos al tópico «PBX_events» (recordemos que es el de los estados de las líneas de la centralita) vamos a recorrer un array muy largo en balde. Por tanto hay que:

2.1.- Optimizar la búsqueda de subscriptores de un tópico, necesitamos optimizar un poco la búsqueda manteniendo un array asociativo en el cual se indexen los usuarios por tópicos; esto es, un objeto de la forma:


[
"tópico1"=> [idUser1=>$user1, idUser2=>$user2],
"tópico2"=> [idUser2=>$user2, idUser4=>$user4],
...
]

2.2.- Mantenimiento del array de subscripciones, se debe eliminar/añadir usuarios al array de subscripciones cuando se desconecten, subscriban a un tópico y cuando se desubscriban, para ello tendremos que controlar la desconexión en el método heredado de WebSocketServer «closed()» y crear los métodos «publish()» y «subscribe()» en el servicio PHP.

2.3.- Especializar la clase WebSocketUser para p/s, debemos implementar los métodos para subscribirse y desubscribirse de un tópico y mantener un registro de todos los tópicos a los que está subscrito el usuario para que, a la hora de cerrar el socket, se le borre rápidamente de la lista de subscriptores global (en lugar de tener que recorrer todos los usuarios que están en cada uno de los tópicos). Para ello crearemos una clase llamada PubSubUser que herede de WebSocketUser.

3.- Si se produce un error entre la parte del servicio Java y el script «publisher.php» o entre este último y el script PHP que hace de servicio con el socket máster vamos a estar «ciegos y sordos» y el debugging será complicado sino imposible. Por tanto, hay que:

3.1.- Usar un sistema de log de errores, cuando se produzca un error se debe almacenar en algún archivo de texto, o el método que os sea más fácil/útil (yo prefiero el archivo de texto porque hay menos «capas de software intermediarias» a la hora de realizar la escritura, así reduzco las posibilidades de que algo falle también a grabar el log).

4.- Ya que se puede dar el caso de que se produzca un fallo controlado a la hora de realizar una operación de «publish» (por ejemplo, el proceso «publisher.php» ha mandado la acción de publicar pero no se ha autenticado aún en el sistema), necesitamos una especie de comunicación más estilizada dentro de los sockets para saber cómo proceder al recibir respuestas inesperadas por parte del servicio PHP de sockets. Por tanto hay que:

4.1.- Utilizar un protocolo de comunicación con estados en los sockets, para saber cómo ha resultado una petición, como se haría de forma similar con una API REST en la que el resultado esperado para un POST sería un estado HTTP 201 (created) y que sin embargo también puede devolver un estado HTTP 401 (unauthorized) si el usuario aún no inició sesión. En mi caso usaré el protocolo de estados HTTP para esta labor.

FLUJO DE COMUNICACIÓN

Entonces, en resumen, el flujo normal de comunicación, incluyendo el sistema de login, sería el siguiente:

Cliente (Subscriptor):

  1. Conecta al socket maestro, le devuelve OK (mediante el handshake).
  2. Manda autenticación (Si no manda autenticación es kickeado tras unos segundos mandando el estado HTTP 408 Request timeout).
    1. recibe OK (HTTP 200) ó
    2. recibe KO (HTTP 404 user not found), se desconecta el socket (kicked).
  3. Manda petición de subscribe al tópico «PBX_events», le devuelve OK (HTTP 201).
  4. Eventualmente recibe publicación del tópico «PBX_events» y lo trata. El cliente no devuelve nada.

Servidor (publicador):

  1. El proceso manda los datos de autenticación y publicación al script «publisher.php«.
  2. El script «publisher.php» se conecta al socket maestro y le devuelve OK (mediante el handshake).
  3. El script manda autenticación (igual que con el subscriptor, si no manda autenticación es kickeado al tiempo.
    1. recibe OK (HTTP 200) ó
    2. recibe KO (HTTP 404), lo cual provocará una excepción, ya que esto no debería ocurrir, y escribirá un log de lo ocurrido con un stacktrace.
  4. Publica la información recibida, recibe OK (HTTP 201 created).

PRÁCTICA

Vamos a resolver los problemas técnicos planteados paso a paso para luego ir hacia la solución global:

SECCIÓN PRIVADA (DAEMON)

1.1.- Crear un sistema de login
1.4.- Especializar la clase WebSocketUser para el sistema de login.

Hay un método para mandar datos de autenticación directamente en la URL de conexión del WebSocket del cliente de la siguiente forma:


var socket = new WebSocket('ws://user:pass@127.0.0.1';

Pero esto es similar a mandar credenciales en la cabecera HTTP GET, los datos no se encriptan aún usando SSL y por tanto algún usuario esnifando información podría recuperar las credenciales (incluso pueden quedarse registros en proxies de dicha cabecera con los datos de acceso). De hecho esto se traduciría en que en la cabecera del hand-shake del WebSocket se incluiría la siguiente información:


Authorization: Basic dXNlcm5hbWU6cGFzc3dvcmQ=

Esto es un tipo autorización básica (Basic auth). Descodificando estos datos en B64 se obtendrían las credenciales, es por ello que este sistema de autenticación no nos sirve.

Para implementar nuestro sistema de login, nos planteamos crear una clase llamada AuthUser que herede de WebSocketUser para controlar desde cuándo se conectó el usuario y si está autenticado.

class AuthUser extends WebSocketUser
{
protected $authenticated = false;
protected $connectedTimeStamp = 0;

function __construct($id, $socket)
{
//Ejecutar primero el constructor de la clase WebSocketUser
parent::__construct($id, $socket);
//En cuanto existe el objeto significa que el usuario se ha conectado, ergo hay que inicializar su timeStamp de existencia
$this->connectedTimeStamp = time();
}

//... getters y setters
}

1.3.- Mantener el array de sockets de usuarios limpio de usuarios no autenticados
1.5.- Optimizar la búsqueda de usuarios «zombies».

Vamos a crear una lista de AuthUsers que realice las operaciones de añadir y eliminar usuarios y devolver los usuarios zombies. Para ello vamos a crear la clase «AuthUsersList«. Nótese que siendo puristas, en versiones más modernas de PHP, deberíamos indicar que los parámetros «$user» que aparecen en la definición de las funciones son del tipo AuthUser y tendríamos que indicar también que estamos usando esa clase con la sentencia «use» (pero voy a usar una sintaxis lo más compatible posible con las versiones de php posteriores a la 5.3):

class AuthUsersList {
//Este valor debe modificarse según se desee
const MAX_SECONDS_UNAUTHORIZED = 60;
//Array de AuthUser's
protected $unauthorizedUsers = array();

public function remove(/*AuthUser*/ $user){
...
}

//Marca al usuario como autenticado y lo saca de la lista
public function authenticate(/*AuthUser*/ $user){
...
}

public function add(/*AuthUser*/ $user){
...
}

public function getZombies(){
$now = time();
//Array resultado
$zombies = array();
//Recorre la lista de usuarios no autorizados
foreach ($this->unauthorizedUsers as $currentUnauthorized) {
//Por cada uno comprueba que su estancia en el servicio  PHP no supere MAX_SECONDS_UNAUTHORIZED
if( ($now - $currentUnauthorized->getConnectedTimeStamp())
>= self::MAX_SECONDS_UNAUTHORIZED )
{
//Si se pasa de tiempo se añade al array resultado
$zombies[] = $currentUnauthorized;
}
}
//Devolver los zombies encontrados para hacer con ellos lo que aplique
return $zombies;
}
}

2.3.- Especializar la clase WebSocketUser para p/s

De forma análoga al AuthUser vamos a crear una clase que también herede de WebSocketUser llamada «PubSubUser» que tenga los atributos y operaciones adecuadas para pub/sub. Como no existe la herencia múltiple y AuthUser es una especialización de WebSocketUser, no podemos pensar que AuthUser sea una interfaz; además también pensamos que siempre que exista pub/sub vamos a tener que diferenciar usuarios para saber quién puede publicar y quién no y por tanto necesitaremos el sistema de login implementado, en parte, por la clase AuthUser. Así que decido heredar de AuthUser:


class PubSubUser extends AuthUser
{
protected $canPublish = false;
protected $subscriptions = array();

...

//Añade el tópico al array de subscripciones
public function subscribe($topic){
if(!in_array($topic, $this->subscriptions)){
$this->subscriptions[] = $topic;
}
}

//Elimina el tópico del array de subscripciones
public function unsubscribe($topic){
$index = array_search($topic, $this->subscriptions);
if($index!==false) unset($this->subscriptions[$index]);
}

//Devuelve el array de subscripciones (será muy útil a la hora de borrar
//al agente de la lista de subscriptores pues acelerará la búsqueda)
public function getSubscriptions(){
return $this->subscriptions;
}

//Devuelve si el usuario está autorizado a publicar
public function canPublish(){
return $this->canPublish;
}

//El usuario se vuelve publicador, puede publicar
public function makePublisher(){
$this->canPublish = true;
}
}

2.1.- Optimizar la búsqueda de subscriptores de un tópico
2.2.- Mantenimiento del array de subscripciones.

De forma análoga a los usuarios autenticados (AuthUser) vamos a tener una lista de subscriptores (lista de PubSubUsers). Para ello creamos la clase «SubscribersList» que almacenará usuarios indexados por tópico e idUsuario (como se ha expuesto antes en la teoría). Esta clase manipula los PubSubUsers con los métodos necesarios para adecuar los datos de esos objetos a la situación en la que se encuentran dentro de la lista (por ejemplo, al añadir un subscriptor a la lista, también va a ejecutar la función «PubSubUser.subscribe(topic)«). Ya que la clase no tiene mayor complejidad solo os muestro la interfaz de funciones:


class SubscribersList
{
//Array de PubSubUsers
protected $subscribers = array();

public function topicExists($topic){
...
}

//Devuelve los subscritos a un topico
public function getTopicSubscribers($topic){
...
}

public function addSubscriber($topic, /*PubSubUser*/ $user){
...
}

public function removeSubscriber($topic, /*PubSubUser*/ $user){
...
}
}

3.1.- Usar un sistema de log de errores

Para ello me voy a servir de las clases ErrorHandler y Logger que incluyo en el archivo comprimido que pondré para descargar en el ejemplo. No tienen mayor relevancia, solo decir que ErrorHandler evita que el script PHP muera sin decirme qué ha ido mal (en su lugar lanza un mensaje de error y escribe un log del error) y el Logger escribe un log de la excepción recogida en un archivo de texto que se le indica en su archivo de configuración (config/LoggerConfigurator.php); tras escribir el log devuelve el código autogenerado con el que se ha guardado el archivo de texto.

El ErrorHandler solo se usa en el script «publish.php» y el Logger se usa en dicho script y en el servicio PHP.

Como he dicho, no es necesario comprender cómo funcionan estas clases pero si lo deseas puedes seguir leyendo esta subsección (sino puedes saltarte a la siguiente sección).

Para crear un log de los errores que se producen en el servicio PHP y que no se detenga la ejecución del mismo, envuelvo todo el interior del método heredado «WebSocketServer.process(user, message)» (Es la función que se ejecuta cuando se recibe un mensaje de un usuario) con un bloque try-catch. El bloque catch realiza un log de la excepción que se haya podido producir y desconecta al usuario que ha provocado el error (forzamos que tenga que reiniciar su conexión para ver si así se arregla el error y salir del paso hasta que analicemos el log).

protected function process ($user, $message) {
try{
...
}catch(Exception $ex){
//Se crea el log del error y este devuelve el código identificativo
$logCode = $this->logException($ex);
$this->stdout('FATAL ERROR '.$logCode);

//Le devuelve al usuario un estado 500
$this->respond($user, HTTP::INTERNAL_ERROR, array('error_code'=>$logCode));
$this->disconnect($user->socket);
}

4.1.- Utilizar un protocolo de comunicación con estados en los sockets

Uso el enumerado HTTP (en el archivo HTTPStatus.php) para asociar los estados HTTP a palabras. En el servicio PHP se crea el método «respond(usuario, estadoHTTP, mensaje)«, que formatea la respuesta adecuadamente, serializándola en JSON. Así pues se usaría de la siguiente forma:

require_once(__DIR__.'/../config/HTTPStatus.php');
...
private function respond($user, $httpCode, $extraMsg = ''){
$response = array('code'=>$httpCode, 'msg'=>$extraMsg);
$this->send($user, json_encode($response));
return true;
}
...
$this->respond($user, HTTP::UNAUTHORIZED, 'You cannot do anything if you are not authenticated');
...

Ponerlo todo en conjunto (crear PublishSubscribeServer)

Ahora que tenemos las piezas por separado, vamos a juntarlas todas en el servicio PHP. Lo primero es crear una clase que herede de WebSocketServer llamada «PublishSubscribeServer«.

Debido a que cada desarrollador usará su propio sistema de Log y su propio método para chequear las credenciales de los usuarios que intentan autenticarse en el servicio PHP, esta clase ha de ser abstracta y dejar los métodos «logException(exception)» y «checkUserCredentials(userName, password)» como abstractos. Más adelante se procederá a implementar la herencia, pero primero os explico qué hace esta clase:

Antes de nada, decir que usa las clases antes explicadas «AuthUsersList» y «SubscribersList» y que se debe redefinir el atributo «userClass» de la clase padre (WebSocketServer) para decirle que el tipo de usuario usado será «PubSubUser«:

abstract class PublishSubscribeServer extends WebSocketServer {
//Indicamos que el tipo de usuario es PubSubUser
protected $userClass = 'PubSubUser';
protected $authUsersList;
protected $subscribers;

function __construct($ip, $puerto, $bufferLength = 2048){
parent::__construct($ip, $puerto, $bufferLength);
$this->authUsersList = new AuthUsersList();
$this->subscribers = new SubscribersList();
}
...
}

En el método «process(usuario, mensaje)» (que es el método que se ejecuta al recibir datos de un usuario) se espera recibir un mensaje serializado en JSON (por ejemplo:

{
'action':'subscribe',
'topic':'PBX_events'
}

). Se des-serializa (json_decode) y se comprueba el valor del atributo «action«. Según lo que traiga este valor se ejecuta un método u otro. Los posibles valores son:

  • authentication
  • publish
  • subscribe
  • unsubscribe

En caso de recibir otra acción no esperada se devuelve un estado HTTP 400 (Bad request). Si un usuario no autenticado intenta hacer cualquier cosa que no sea autenticarse se devuelve un estado HTTP 401 (Unauthorized).

protected function process ($user, $message) {
try{
...
if($jsonMessage['action'] != 'authentication'
&& !$user->isAuthenticated())
{
//Devolver unauthorized
$this->respond($user, HTTP::UNAUTHORIZED ...);
return;
}
switch ($jsonMessage['action']) {
case 'authentication':
$this->authenticate($user, $jsonMessage);
break;

case 'publish': ...
case 'subscribe': ...
default:
$this->respond($user, HTTP::BAD_REQUEST, 'Action unknown');
break;
}
}catch(Exception $ex){
//Se loguea la excepción y se devuelve HTTP 500
...
}
}

Lo primero que tiene que hacer el usuario es autenticarse, así que para ello se ejecutará el método «authenticate(user, jsonMessage)«. Aquí comprueba las credenciales del usuario (el método abstracto «checkUserCredentials» que cada desarrollador debe implementar por su cuenta) el cual debe devolver un array con los datos del usuario encontrado (o false si no encontró al usuario con las credenciales proporcionadas). Si el usuario puede publicar, en el array devuelto debe existir un atributo llamado «can_publish» cuyo valor sea 1:

private function authenticate($user, $jsonMessage){
...
//checkUserCredentials debe devolver un array con al menos
//el valor 'can_publish' (o false si no se encontró al usuario)
$recoveredUser = $this->checkUserCredentials($jsonMessage['user'], $jsonMessage['password']);
if($recoveredUser !== false)
{
$this->authUsersList->authenticate($user);
//Si el usuario puede publicar, 'can_publish' debe valer 1
if($recoveredUser['can_publish'] == 1) $user->makePublisher();
$this->respond($user, HTTP::OK, 'Authenticated');
return true;
}
else{
$this->respond($user, HTTP::NOT_FOUND, 'Authentication failed, user not found');
$this->disconnect($user->socket);
return false;
}
}

Después, lo normal, sería que el usuario se subscriba a algún tópico, para ello se ejecuta el método «subscribe(user, jsonMessage)«. La operación es sencilla, se usa la lista de subscriptores (SubscribersList) para añadir el usuario al tópico y, a su vez, añadir el tópico a la lista de subscripciones del usuario, usando el método «SubscribersList.addSubscriber()«:

private function subscribe($user, $jsonMessage){
$topic = $jsonMessage['topic'];
$this->subscribers->addSubscriber($topic, $user);
}

Obviamente el usuario también se puede desubscribir, para ello se ejecutará el método «unsubscribe(user, jsonMessage)«.

Y finalmente, el usuario también puede publicar (método «publish(user, jsonMessage)«). Cuando va a publicar primero se comprueba que el usuario tenga este privilegio (solo pueden hacerlo los publishers) y se mandan los datos de publicación solamente a aquellos usuarios subscritos al tópico indicado y que además estén autenticados en el sistema (comprobación que se hace automáticamente en el método privado «pushData(user, messaje)«):

private function publish($user, $jsonMessage){
//Si no puede publicar FIN
if($user->canPublish() === false){
$this->respond($user, HTTP::UNAUTHORIZED, 'You cannot publish');
return false;
}
...
//Los datos se mandan en este formato
$dataToPush = array(
'action'=>'publish',
'topic'=>$jsonMessage['topic'],
'data'=>$jsonMessage['data']
);
$subscribersToTopic = $this->subscribers->getTopicSubscribers($jsonMessage['topic']);
foreach ($subscribersToTopic as $currentSubscriber) {
//Solo le mando los datos a los que estén autenticados en el sistema
//(En teoría no haría falta comprobarlo ya que solo puede estar subscrito a un
//tópico si se ha autenticado antes, pero en caso de este valor pudiese cambiar
//dinámicamente en el futuro dejo la comprobación)
$this->pushData($currentSubscriber, json_encode($dataToPush));
}
//Le digo al publisher que todo fue OK
$this->respond($user, HTTP::CREATED, 'Data published');
return true;
}

//Manda los datos solo si el usuario está autenticado
private function pushData($user, $message){
if(!$user->isAuthenticated()) return false;
$this->send($user, $message);
return true;
}

Como se puede observar, los datos enviados tendrán el siguiente formato:


{
'action': 'publish',
'topic': 'PBX_events',
'data': {'lineId': 1, 'status': 5, ...}
}

Para realizar las labores de mantenimiento sobre los usuarios conectados pero no autenticados (zombies), se redefine la función «tick()», la cual pide los usuarios zombies y los va desconectando:

protected function tick(){
$zombies = $this->authUsersList->getZombies();
foreach ($zombies as $currentZombie) {
$this->stdout($currentZombie->socket.' zombie catched, kicking by timeout');
$this->respond($currentZombie, HTTP::REQUEST_TIMEOUT, 'Kicked by timeout of authentication');
$this->disconnect($currentZombie->socket);
}
}

Crear nuestra clase concreta de PublishSubscribeServer (MyPubSubServer)

Finalmente nos queda heredar de la clase PublisSubscribeServer (en mi caso, la clase hija se va a llamar MyPubSubServer), y en ella debemos definir los métodos abstractos «logException(exception)» y «checkUserCredentials(userName, password)«.

En mi caso «logException()» va a escribir un stackTrace de la excepción recogida en un fichero de texto dentro de la carpeta «<root>/errors_log«, usando la clase Logger. Recuerda que este método debe devolver un identificador de log.

class MyPubSubServer extends PublishSubscribeServer{
...
protected function logException($ex){
$logCode = Logger::log_exception($ex);
return $logCode;
}
...
}

Y la función «checkUserCredentials()«, normalmente, atacará a una base de datos, recuperará un usuario y devolverá un array en el que exista el atributo «can_publish». Si el usuario puede publicar este valor debe valer 1 en otro caso debe valer 0. Si no encontró al usuario deberá devolver «false».

Para simplificar el ejemplo, sencillamente compruebo que si el usuario es «user_1» no puede publicar pero si el usuario es «system» sí puede publicar:


protected function checkUserCredentials($userName, $password){
//Lo normal sería atacar a una BD
$usuario = array();
if($userName == 'user_1' && $password == 'blabla') $usuario['can_publish'] = 0;
else if($userName == 'system' && $password == 'wakawaka') $usuario['can_publish'] = 1;
else return false;
return $usuario;
}

¡Bien! Ya podemos empezar a hacer correr el servicio de publish subscribe. Para hacer la prueba voy a usar XAMPP.

Los archivos del servicio PHP deben estar en una carpeta privada a la que no puedan acceder los clientes desde internet. En mi caso dicha carpeta va a ser «C:\xampp\damons\pub_sub»; ahí almaceno el archivo «PublishSubscribeDaemon.php» que es el que inicializa y hace correr la clase «MyPubSubServer«. Desde la consola de comandos (CMD) ejecutamos el siguiente ídem:

«C:\xampp\php\php.exe» -q C:\xampp\daemons\pub_sub\PublishSubscribeDaemon.php

¡Y ya tenemos el servicio corriendo!

SECCIÓN PÚBLICA (htdocs)

Ahora nos falta la parte cliente…

client.html

En mi caso voy a almacenar los archivos públicos (accesibles desde internet) en la carpeta C:\xampp\htdocs\pub_sub. Para ejecutar la prueba solo hay que abrir una pestaña en algún navegador apuntando a la URL «localhost/pub_sub/client.html» y poniendo las credenciales «user_1» y «blabla» en los prompts que deberían aparecer tras realizar la conexión con el socket maestro. Tras conseguir realizar la autenticación se verá una respuesta de que la autenticación fue OK.

Ahora se manda la petición de subscripción al tópico «PBX_events» y, si todo fue bien, debería devolverse un mensaje con un código de estado 201.

WebSocket garantiza FIFO: Obsérvese que en el div donde se escriben los mensajes, aparecerá «authenticating» y «subscribing» antes de recibir las respuestas del servicio PHP. No hay que controlar que un mensaje vaya después de recibir respuesta del anterior usando funciones de callback (como pasa con AJAX) ya que todos los mensajes de un socket se mandan a través del protocolo TCP, lo cual garantiza FIFO; es por ello que, aunque veamos primero «authenticating» y luego «subscribing» seguido de sus respuestas, nunca recibiremos una respuesta 401 (Unauthorized) para la subscripción ya que se garantiza que primero se va a mandar y procesar la petición de autenticación.

La página client.html incluye un botón en el que pone «Simular publicación» que, como bien dice, simulará el proceso que haría el servicio Java (encargado de mandar los nuevos estados de las líneas de la centralita) cuando tenga que mandar (publicar) dichos datos. Para realizar la simulación se manda un AJAX a «publisher.php» con los datos de autenticación (que están escritos en javascript para hacer la prueba pero que nunca deben escribirse ahí ya que cualquiera podría verlos) y los datos ficticios de una línea de la centralita.

function doPublish(){
/*
¡¡No debes guardar datos de autenticación aquí (lo vería cualquiera)!! Esta es
una prueba en la cual simulamos que este método es lo que haría el servicio
Java que lee las líneas de la centralita; pero es el servicio Java el que
tiene que tener guardadas las credenciales para que nadie las vea.
*/
var inputs = {
'auth': {'user': 'system', 'password': 'wakawaka'},
'data': {'lineId': 1, 'status': 4, 'otherData': 'blabla'}
};
//Se manda un AJAX en JSON con método HTTP POST al script "publisher.php"
var ajaxJson = new AjaxJson(
'POST', 'ajax/publisher.php', inputs, false, null,
201, true, doPublish_callback);
ajaxJson.send();
}

//La respuesta del AJAX se recibe aquí
function doPublish_callback(respuesta, extraParams){
console.log(respuesta);
console.log(extraParams);
}

publisher.php

Finalmente queda explicar el script «publisher.php«…

Para no alargarme más, diré que este script tiene que usar un cliente websocket PHP para conectarse al socket maestro del servicio PHP p/s. Este cliente lo he recogido de la siguiente biblioteca de GitHub:

https://gist.github.com/ikwattro/908f64f8a3121dab6b0c

Tiene un par de fallos que están solventados en la versión que trae el ejemplo (a parte de que renombro la clase ws a WebSocketClient). En lugar de este cliente, uso PubSubWSCli, que es una especialización de WebSocketClient (hereda de esta) y sencillamente realiza la tarea de conexión y autenticación automáticamente tras su inicialización. Al escribir algo en el socket siempre espera un resultado con un estado HTTP concreto y en caso de recibir otra cosa elevará una excepción que será recogida en «publisher.php» y tratada por la clase ErrorHandler.

Para tratar los datos recibidos en «publisher.php» en JSON uso una clase llamada PHPAjax. Tampoco explicaré qué hace ya que entiendo que cada desarrollador usará su propia metodología para tratar los datos. En este caso solo es necesario saber que los datos recibidos se almacenan en la variable «$receivedJSON«. Así pues en publisher.php tendríamos lo siguiente:

...
ErrorHandler::setAJAXErrorHandlers();
$ws;
try{
...
//Los datos recibidos
$receivedJSON = $jsonAjax->getReceivedData();

//Se conecta y autentica con los datos recibidos al socket maestro
$ws = new PubSubWSCli($receivedJSON['auth']['user'], $receivedJSON['auth']['password']);
$action = array(
'action'=>'publish',
'topic'=>'PBX_events',
'data'=>$receivedJSON['data']
);
//Manda la acción de publicar
//Este método es bloqueante así que se ejecutará el script de forma secuencial
$result = $ws->send(json_encode($action), HTTP::CREATED);
//Cierra el socket cliente
$ws->close();
//Devuelve OK al cliente que invocó este script
$jsonAjax->returnJSON(HTTP::CREATED, array());
}catch(Exception $ex){
//Si se captura alguna excepción se procesará aquí
ErrorHandler::processExceptionAJAX($ex);
}

¡C’est fini!

Para hacer la prueba, abre dos pestañas apuntando con la URL «localhost/pub_sub/client.html»; en una nos autenticamos y en la otra no. Pulsamos en el botón «Simular publicación» en alguno de los client.html y veremos cómo llegan los datos de la publicación solamente a la pestaña de client.html que se había autenticado.

DESCARGA EL EJEMPLO

Puedes obtener el ejemplo preparado para correr en XAMPP aquí.


Créditos de fuentes externas:

Iconos:

  • PC by art shop from the Noun Project
  • Server by Chanut is Industries from the Noun Project
  • Cloud by AlePio from the Noun Project
  • Gears by Gregor Cresnar from the Noun Project
  • Phone by mbok sumirna from the Noun Project
  • Time by arjuazka from the Noun Project

Bibliotecas:

Deja un comentario