← Back to all products

Async Task Queue Toolkit

$39

Celery + Redis task queue setup with retry logic, monitoring, scheduling, and dead letter handling patterns.

📁 18 files💻 Python🏷 v1.0.0
PythonMarkdownJSONYAMLDockerRedis

📁 File Structure 18 files

async-task-queue-toolkit/ ├── LICENSE ├── README.md ├── docker/ │ └── docker-compose.yml ├── examples/ │ ├── email_tasks.py │ └── image_processing.py ├── guides/ │ └── task-queue-patterns.md ├── src/ │ └── queue/ │ ├── broker.py │ ├── dead_letter.py │ ├── middleware.py │ ├── monitoring.py │ ├── scheduler.py │ ├── serializers.py │ ├── task.py │ └── worker.py └── tests/ ├── conftest.py ├── test_broker.py └── test_worker.py

📖 Documentation Preview README excerpt

Async Task Queue Toolkit

A production-ready, Redis-backed asynchronous task queue for Python applications. Built on asyncio with support for retries, scheduling, middleware chains, dead-letter queues, and real-time monitoring.

Features

  • Redis-backed broker with reliable enqueue/dequeue/ack semantics
  • Worker processes that consume tasks, execute handlers, and manage failures
  • @task() decorator with configurable retries, timeout, and priority
  • Cron-like scheduler for recurring tasks (interval and cron expressions)
  • Middleware chain for logging, timing, retry logic, and rate limiting
  • Dead-letter queue for failed task inspection and replay
  • Monitoring with queue depth, throughput, latency, and error metrics
  • Pluggable serializers (JSON and msgpack)

Quick Start

Installation


pip install redis msgpack aioredis

Define a Task


from queue.task import task

@task(retries=3, timeout=30, priority=5)
async def send_welcome_email(user_id: int, email: str) -> dict:
    """Send a welcome email to a new user."""
    # your email logic here
    return {"status": "sent", "user_id": user_id}

Enqueue and Process


import asyncio
from queue.broker import RedisBroker
from queue.worker import Worker

async def main():
    broker = RedisBroker(redis_url="redis://localhost:6379/0")
    await broker.connect()

    # Enqueue a task
    task_id = await broker.enqueue(
        "send_welcome_email",
        kwargs={"user_id": 42, "email": "user@example.com"},
        priority=5,
    )
    print(f"Enqueued task: {task_id}")

    # Start a worker
    worker = Worker(broker=broker, concurrency=10)
    worker.register_handler("send_welcome_email", send_welcome_email)
    await worker.start()

asyncio.run(main())

*... continues with setup instructions, usage examples, and more.*

📄 Code Sample .py preview

src/queue/broker.py """ Redis-backed task broker for async task queue. Provides reliable enqueue, dequeue, acknowledge, and retry operations using Redis lists and sorted sets for priority-based task management. """ from __future__ import annotations import asyncio import json import time import uuid from dataclasses import dataclass, field from enum import Enum from typing import Any, Optional import redis.asyncio as aioredis from .serializers import JsonSerializer, Serializer class TaskStatus(str, Enum): """Possible states of a task in the queue.""" PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" RETRYING = "retrying" @dataclass class TaskMessage: """Represents a serialized task message in the queue.""" task_id: str task_name: str args: tuple[Any, ...] = () kwargs: dict[str, Any] = field(default_factory=dict) priority: int = 0 retries_left: int = 3 timeout: float = 300.0 created_at: float = field(default_factory=time.time) attempt: int = 0 def to_dict(self) -> dict[str, Any]: """Serialize the task message to a dictionary.""" return { "task_id": self.task_id, "task_name": self.task_name, "args": list(self.args), # ... 177 more lines ...