Stream - Cola de tareas - Python

Aquí tienes un ejemplo más detallado de cómo utilizar Redis Streams para simular un sistema de cola de tareas, donde se añaden tareas a un Stream y luego son procesadas por diferentes consumidores:

Ejemplo: Cola de Tareas con Redis Streams

1. Configuración:

  • Instalación: Asegúrate de tener Redis instalado y funcionando.
  • Cliente Redis: Utiliza un cliente Redis como redis-py para Python o redis para Node.js.

2. Creación del Stream:

Python
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# Crear un Stream llamado 'task_queue'
r.xgroup_create('task_queue', 'group1', 0)

Esto crea un Stream llamado task_queue y un grupo de consumidores llamado group1.

3. Añadir Tareas:

Python
def add_task(task_data):
    r.xadd('task_queue', task_data)

# Ejemplo de añadir una tarea
add_task({'task': 'enviar_email', 'to': 'ejemplo@ejemplo.com', 'subject': 'Asunto'})

La función add_task añade una nueva tarea al Stream. La tarea es un diccionario con los datos necesarios para su procesamiento.

4. Procesar Tareas:

Python
def process_tasks(consumer_name):
    while True:
        messages = r.xreadgroup('group1', consumer_name, {'task_queue': '>'}, block=0)
        if messages:
            for _, messages in messages:
                for message in messages:
                    task_id = message[0].decode('utf-8')
                    task_data = message[1]
                    print(f"Consumiendo tarea {task_id}: {task_data}")
                    # Aquí se procesa la tarea
                    r.xack('task_queue', 'group1', task_id)  # Aceptar la tarea

La función process_tasks lee las tareas del Stream y las procesa.

  • xreadgroup: Lee los mensajes del Stream para un grupo de consumidores específico.
  • xack: Marca una tarea como procesada, eliminándola de la lista de pendientes.

5. Múltiples Consumidores: Puedes crear múltiples hilos o procesos, cada uno con un nombre de consumidor diferente, para procesar las tareas en paralelo.

Ejemplo completo con múltiples consumidores:

Python
import threading

def start_consumer(consumer_name):
    consumer_thread = threading.Thread(target=process_tasks, args=(consumer_name,))
    consumer_thread.start()

start_consumer('worker1')
start_consumer('worker2')

Explicación:

  • Stream: Actúa como una cola donde se añaden las tareas.
  • Grupo de Consumidores: Define un conjunto de consumidores que pueden leer del Stream.
  • xadd: Añade nuevas tareas al Stream.
  • xreadgroup: Lee las tareas asignadas a un consumidor específico.
  • xack: Confirma que una tarea ha sido procesada.

Consideraciones adicionales:

  • Bloqueo: El parámetro block en xreadgroup controla si el consumidor se bloquea esperando nuevas tareas.
  • Persistencia: Redis ofrece opciones de persistencia para garantizar que los datos no se pierdan en caso de fallo.
  • Escalabilidad: Redis Cluster permite escalar horizontalmente para manejar grandes volúmenes de datos.
  • Patrones de Diseño: Puedes combinar Streams con otros patrones de diseño, como el patrón de productor-consumidor, para crear sistemas más complejos.

Este ejemplo demuestra cómo Redis Streams puede ser utilizado para implementar una cola de tareas robusta y escalable. Puedes personalizarlo para adaptarse a tus necesidades específicas, agregando características como prioridad de tareas, tiempos de espera, y manejo de errores.


Comentarios

Entradas más populares de este blog

Comandos geoespaciales