Robert Bermejo's Blog Robert Bermejo Blog

Azure Storage Queues

Azure storage queues es un servicio de azure que nos permite desacoplar aplicaciones o componentes de forma que puedan comunicarse. Los componentes pueden estar en el cloud u on-premise.

Azure storage queues tiene las siguientes características:

  • Soporta 2000 transacciones por segundo.
  • Los mensajes tienen una vida de 7 días.
  • El tamaño máximo de un mensaje es de 64kb.

Seguramente la pregunta que os estáis haciendo en este momento es: ¿Solo 64kb? El uso de queues no está diseñado para el envío de grandes volúmenes de información, sino de pequeños mensajes que envían los emisores y los receptores al recibirlos saben que acción deben realizar.

¿Cómo puedo trabar con queues?

Para poder trabajar con queues lo podemos realizar de dos formas:

1- Vía RestAPI –> Si el container al que pertenece la queue es público puedes atacar directamente a la API, en caso contrario primero se debería obtener la Shared Key Signature correspondiente (en mi anterior articulo escribí en cómo hacerlo para Blobs, sería homologo para queues http://robertbermejo.com/index.php/2016/03/01/shared-access-signatures-sas/

2- Vía .Net –> Utilizando la librería Microsoft.WindowsAzure.Storage, para ello hace falta conocer la connectionstring al storage.

Cuando utilizar queues

El uso de azure storage queues es útil cuando:

  • La tarea a realizar requiere de un tiempo alto de ejecución (alta latencia).
  • La tarea necesita de un sistema externo que no siempre está disponible.
  • La tarea necesita de un alto consumo de CPU.
  • Cuando queremos desacoplar partes de nuestro sistema.

Ciclo de vida de un mensaje

Una vez el mensaje ha sido añadido a la queue, entra en juego una de las partes más importantes de una queue, el receptor.

Cuando un receptor realiza el “pop” del mensaje este no se elimina de la queue, esto solo sucederá cuando se llame al método DeleteMessage. Entonces, ¿si no se elimina el mensaje nada más recibirlo puede ser consumido por otros receptores? La respuesta es no, el mensaje se marca como invisible por lo que los demás no pueden verlo. El tiempo por defecto que el mensaje queda marcado como invisible es de 30 segundos y es configurable por el receptor según el tiempo que pueda tardar su lógica.

Una vez el receptor ha terminado su ejecución dentro del tiempo de espera (tiempo que configurado en el mensaje como invisible) el mensaje se borra, en caso contrario el mensaje vuelve a ser visible y puede ser consumido de nuevo. Esta dinámica permite que los mensajes no se pierdan.

Anteriormente he mencionado que un mensaje no podía ser consumido por más de un receptor lo cual no es del todo cierto dado que se puede realizar una configuración especial del mensaje de forma que quede visible y pueda ser consumido por más de un receptor, pero este no es su comportamiento estándar.

A continuación, vemos como un pequeño ejemplo de cómo trabajar con queues.

Primero tendríamos un servicio para interactuar con nuestra queue.

public class QueueStorageServices
{
 private CloudStorageAccount storageAccount;
 private const string QueueName = "examplequeue";public QueueStorageServices()
 {
    this.storageAccount =    CloudStorageAccount.Parse(ConfigurationManager.ConnectionStrings["Azure"].Connec tionString); 
 }
 private CloudQueueClient QueueClient
 {
   get { return this.storageAccount.CreateCloudQueueClient(); } 
 } 

 /// Add message from queue
 public void AddMessage(QueueModel model)
 {
   var queue = this.GetContainer(QueueName);
   var messageJson = JsonConvert.SerializeObject(model);
   var message = new CloudQueueMessage(messageJson);
   //Message with 60 seconds visibulity
   queue.AddMessage(message, timeToLive: TimeSpan.FromSeconds(60.0));
 } 

 /// Read message from queue
 public QueueModel GetMessage()
 {
   var queue = this.GetContainer(QueueName);
   var queueuMessage = queue.GetMessage();
   return JsonConvert.DeserializeObject<QueueModel>(queueuMessage.AsString); 
 }

 /// Get reference to queue. If no exist create it.
 private CloudQueue GetContainer(string queue)
 {
   var queueContainer = this.QueueClient.GetQueueReference(queue);
   queueContainer.CreateIfNotExists();
   return queueContainer;
 }
}

Ahora creamos un sender que añadirá mensajes a la queue de la siguiente forma.

var model = new QueueModel() { Message = "Example queue", CreateDateMessage = DateTime.Now };
QueueStorageServices queueService = new QueueStorageServices();
queueService.AddMessage(model);
Console.WriteLine("Message send to queue");

Para finalizar tendríamos un reciver que recogería el objeto de la queue.

QueueStorageServices queueService = new QueueStorageServices();
var queueModel = queueService.GetMessage();
Console.WriteLine($"Message from queue: {queueModel.Message} - Creation message date: {queueModel.CreateDateMessage.ToString()}");

Si ejecutamos el sender:

Sender1

Vemos que se ha añadido el mensaje en la queue:

VSQueue

Finalmente lanzamos el reciver:

Reciver

Con esta solución, el proceso debería estar constantemente preguntando por si hay más mensajes para poder procesarlos, para no tener que realizar esto existe QueueTrigger que cuando hay un cambio en la queue automáticamente coge el mensaje y se procesa.

Para ello creamos un WebJob de Azure y haríamos que estuviera constantemente corriendo

class Program
{
  // Please set the following connection strings in app.config for this WebJob to run:
  // AzureWebJobsDashboard and AzureWebJobsStorage
  static void Main()
  {
     var host = new JobHost();
    // The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
  }
}

Se nos crea la clase Functions con el método ProcessQueueMessage, aquí implementaríamos los métodos con los queuetriggers a las queues que tuviéramos.

public class Functions
{
  // This function will get triggered/executed when a new message is written 
  // on an Azure Queue called queue.
  private const string QueueName = "examplequeue";  

  /// QueueTrigger: When add message to queue execute this method.
  public static void ProcessQueueMessage([QueueTrigger(QueueName)] QueueModel queueModel, TextWriter log)
  {
     log.WriteLine($"Message from queue: {queueModel.Message} - Creation message  date: {queueModel.CreateDateMessage.ToString()}");
  }

  /// When message fails in queue goes to poision queue
  public static void ProcessQueuePoisonNotificationMessage([QueueTrigger(QueueName)] QueueModel queueModel, TextWriter log)
  {
     log.WriteLine($"Poison: Message from queue: {queueModel.Message} - Creation message date: {queueModel.CreateDateMessage.ToString()}");
  }
}

Ejecutamos el webjob como si fuera un proceso normal y obtenemos el siguiente resultado:

WwebJob

Para más información sobre queues con WebJobs consultar: https://azure.microsoft.com/en-us/documentation/articles/websites-dotnet-webjobs-sdk-storage-queues-how-to/

Conclusión

Azure storage queues es una buena solución cuando queremos que nuestro sistema no tenga dependencias entre diferentes módulos de forma que este desacoplado y necesitamos una forma rápida de comunicación entre ellos. Si quisiéramos una solución más compleja deberíamos utilizar Azure Service Bus.

En GitHub podéis encontrar el código: https://github.com/bermejoblasco/AzureStorageQueues

Hasta la próxima.


Si te parece interesante, ¡compártelo!