Stream - Cola de tareas - Python
Aquí tienes un ejemplo más detallado de cómo utilizar Redis Streams para simular un sistema de cola de tareas, donde se añaden tareas a un Stream y luego son procesadas por diferentes consumidores:
Ejemplo: Cola de Tareas con Redis Streams
1. Configuración:
- Instalación: Asegúrate de tener Redis instalado y funcionando.
- Cliente Redis: Utiliza un cliente Redis como
redis-pypara Python oredispara Node.js.
2. Creación del Stream:
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Crear un Stream llamado 'task_queue'
r.xgroup_create('task_queue', 'group1', 0)
Esto crea un Stream llamado task_queue y un grupo de consumidores llamado group1.
3. Añadir Tareas:
def add_task(task_data):
r.xadd('task_queue', task_data)
# Ejemplo de añadir una tarea
add_task({'task': 'enviar_email', 'to': 'ejemplo@ejemplo.com', 'subject': 'Asunto'})
La función add_task añade una nueva tarea al Stream. La tarea es un diccionario con los datos necesarios para su procesamiento.
4. Procesar Tareas:
def process_tasks(consumer_name):
while True:
messages = r.xreadgroup('group1', consumer_name, {'task_queue': '>'}, block=0)
if messages:
for _, messages in messages:
for message in messages:
task_id = message[0].decode('utf-8')
task_data = message[1]
print(f"Consumiendo tarea {task_id}: {task_data}")
# Aquí se procesa la tarea
r.xack('task_queue', 'group1', task_id) # Aceptar la tarea
La función process_tasks lee las tareas del Stream y las procesa.
xreadgroup: Lee los mensajes del Stream para un grupo de consumidores específico.xack: Marca una tarea como procesada, eliminándola de la lista de pendientes.
5. Múltiples Consumidores: Puedes crear múltiples hilos o procesos, cada uno con un nombre de consumidor diferente, para procesar las tareas en paralelo.
Ejemplo completo con múltiples consumidores:
import threading
def start_consumer(consumer_name):
consumer_thread = threading.Thread(target=process_tasks, args=(consumer_name,))
consumer_thread.start()
start_consumer('worker1')
start_consumer('worker2')
Explicación:
- Stream: Actúa como una cola donde se añaden las tareas.
- Grupo de Consumidores: Define un conjunto de consumidores que pueden leer del Stream.
- xadd: Añade nuevas tareas al Stream.
- xreadgroup: Lee las tareas asignadas a un consumidor específico.
- xack: Confirma que una tarea ha sido procesada.
Consideraciones adicionales:
- Bloqueo: El parámetro
blockenxreadgroupcontrola si el consumidor se bloquea esperando nuevas tareas. - Persistencia: Redis ofrece opciones de persistencia para garantizar que los datos no se pierdan en caso de fallo.
- Escalabilidad: Redis Cluster permite escalar horizontalmente para manejar grandes volúmenes de datos.
- Patrones de Diseño: Puedes combinar Streams con otros patrones de diseño, como el patrón de productor-consumidor, para crear sistemas más complejos.
Este ejemplo demuestra cómo Redis Streams puede ser utilizado para implementar una cola de tareas robusta y escalable. Puedes personalizarlo para adaptarse a tus necesidades específicas, agregando características como prioridad de tareas, tiempos de espera, y manejo de errores.
Comentarios
Publicar un comentario