← Back to all products
$39
Async Task Queue Toolkit
Celery + Redis task queue setup with retry logic, monitoring, scheduling, and dead letter handling patterns.
PythonMarkdownJSONYAMLDockerRedis
📁 File Structure 18 files
async-task-queue-toolkit/
├── LICENSE
├── README.md
├── docker/
│ └── docker-compose.yml
├── examples/
│ ├── email_tasks.py
│ └── image_processing.py
├── guides/
│ └── task-queue-patterns.md
├── src/
│ └── queue/
│ ├── broker.py
│ ├── dead_letter.py
│ ├── middleware.py
│ ├── monitoring.py
│ ├── scheduler.py
│ ├── serializers.py
│ ├── task.py
│ └── worker.py
└── tests/
├── conftest.py
├── test_broker.py
└── test_worker.py
📖 Documentation Preview README excerpt
Async Task Queue Toolkit
A production-ready, Redis-backed asynchronous task queue for Python applications. Built on asyncio with support for retries, scheduling, middleware chains, dead-letter queues, and real-time monitoring.
Features
- Redis-backed broker with reliable enqueue/dequeue/ack semantics
- Worker processes that consume tasks, execute handlers, and manage failures
@task()decorator with configurable retries, timeout, and priority- Cron-like scheduler for recurring tasks (interval and cron expressions)
- Middleware chain for logging, timing, retry logic, and rate limiting
- Dead-letter queue for failed task inspection and replay
- Monitoring with queue depth, throughput, latency, and error metrics
- Pluggable serializers (JSON and msgpack)
Quick Start
Installation
pip install redis msgpack aioredis
Define a Task
from queue.task import task
@task(retries=3, timeout=30, priority=5)
async def send_welcome_email(user_id: int, email: str) -> dict:
"""Send a welcome email to a new user."""
# your email logic here
return {"status": "sent", "user_id": user_id}
Enqueue and Process
import asyncio
from queue.broker import RedisBroker
from queue.worker import Worker
async def main():
broker = RedisBroker(redis_url="redis://localhost:6379/0")
await broker.connect()
# Enqueue a task
task_id = await broker.enqueue(
"send_welcome_email",
kwargs={"user_id": 42, "email": "user@example.com"},
priority=5,
)
print(f"Enqueued task: {task_id}")
# Start a worker
worker = Worker(broker=broker, concurrency=10)
worker.register_handler("send_welcome_email", send_welcome_email)
await worker.start()
asyncio.run(main())
*... continues with setup instructions, usage examples, and more.*
📄 Code Sample .py preview
src/queue/broker.py
"""
Redis-backed task broker for async task queue.
Provides reliable enqueue, dequeue, acknowledge, and retry operations
using Redis lists and sorted sets for priority-based task management.
"""
from __future__ import annotations
import asyncio
import json
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
import redis.asyncio as aioredis
from .serializers import JsonSerializer, Serializer
class TaskStatus(str, Enum):
"""Possible states of a task in the queue."""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class TaskMessage:
"""Represents a serialized task message in the queue."""
task_id: str
task_name: str
args: tuple[Any, ...] = ()
kwargs: dict[str, Any] = field(default_factory=dict)
priority: int = 0
retries_left: int = 3
timeout: float = 300.0
created_at: float = field(default_factory=time.time)
attempt: int = 0
def to_dict(self) -> dict[str, Any]:
"""Serialize the task message to a dictionary."""
return {
"task_id": self.task_id,
"task_name": self.task_name,
"args": list(self.args),
# ... 177 more lines ...