Using OpenTelemetry with Celery Task Queues

Celery is a powerful background task processor for Python applications. It is commonly used to offload time-consuming tasks like sending emails, processing images, or making API calls, allowing your web application to respond faster and stay responsive.

However, out-of-sight background tasks can quickly become front-of-mind concerns. These tasks can fail silently, introduce latency, or cause downstream issues that are hard to detect without visibility. That’s where observability comes in.

OpenTelemetry (OTel) is the open standard for collecting telemetry data such as traces, metrics, and logs. With wide support across the Python ecosystem, it’s the go-to solution for instrumenting your services and gaining full visibility into your application’s behavior, including Celery task queues.

In this post, we’ll walk through how to instrument a Celery app with OTel, add custom spans for deeper insights, and export your telemetry data to SolarWinds® Observability SaaS to visualize traces and troubleshoot issues more easily.

Set up a Sample Celery Task Project

To demonstrate the OpenTelemetry integration, we’ve built a small Python app that uses Celery as the message broker and Redis as the message broker. We’ll also include Flask to simulate a web request that triggers background tasks. Our app will let users submit a task via an HTTP request. Celery will pick it up and simulate a delay before returning a result. You can find the code at this GitHub repository, follow it, and try it out yourself.

Project Structure and Code Highlights

The initial folder structure for our project looks like this:

python-celery-demo/
├── app.py
├── celery_app.py

├── demo.py
├── tasks.py
└── requirements.txt

In tasks.py, we defined a simple task that simulates work with a delay:

import time
import random
from celery_app import celery_app

from dotenv import load_dotenv


@celery_app.task(name='process_task')
def process_task(task_id: str, min_delay: int = 3, max_delay: int = 10) -> dict:
    """
    Simulate a long-running task with a random delay.
   
    Args:
        task_id: Unique identifier for the task
        min_delay: Minimum delay in seconds (default: 3)
        max_delay: Maximum delay in seconds (default: 10)
       
    Returns:
        dict: Task result with status and completion time
    """
    # Generate random delay between min_delay and max_delay
    delay_seconds = random.randint(min_delay, max_delay)
   
    # Simulate processing time
    time.sleep(delay_seconds)
   
    return {
        'task_id': task_id,
        'status': 'completed',
        'processed_at': time.strftime('%Y-%m-%d %H:%M:%S'),
        'message': f'Task {task_id} processed successfully',
        'processing_time': delay_seconds
    }

The Celery background app, defined in celery_app.py, connects to Redis and processes any tasks found in the queue there.

from celery import Celery
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Create Celery instance
celery_app = Celery(
    'tasks',
    broker=os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
    backend=os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
)

# Optional configuration
celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

Finally, the Flask application in app.py sets up a server to accept API requests for submitting a task and checking task statuses.

from flask import Flask, jsonify, request
import uuid
from tasks import process_task

app = Flask(__name__)

@app.route('/submit-task', methods=['POST'])
def submit_task():
    """
    Endpoint to submit a new task for processing.
    Accepts a JSON payload with optional min_delay and max_delay parameters.
    """
    try:
        data = request.get_json() or {}
        min_delay = int(data.get('min_delay', 3))
        max_delay = int(data.get('max_delay', 10))
       
        # Validate delay parameters
        if min_delay > max_delay:
            min_delay, max_delay = max_delay, min_delay
       
        # Generate a unique task ID
        task_id = str(uuid.uuid4())
       
        # Submit the task to Celery
        task = process_task.delay(task_id, min_delay, max_delay)
       
        return jsonify({
            'task_id': task_id,
            'celery_task_id': task.id,
            'status': 'submitted',
            'message': 'Task submitted successfully',
            'delay_range': f'{min_delay}-{max_delay} seconds'
        }), 202
       
    except Exception as e:
        return jsonify({
            'error': str(e),
            'status': 'error'
        }), 400

@app.route('/task-status/<task_id>', methods=['GET'])
def get_task_status(task_id):
    """
    Endpoint to check the status of a submitted task.
    """
    try:
        task = process_task.AsyncResult(task_id)
        if task.ready():
            return jsonify({
                'task_id': task_id,
                'status': 'completed',
                'result': task.get()
            })
        return jsonify({
            'task_id': task_id,
            'status': 'processing'
        })
    except Exception as e:
        return jsonify({
            'error': str(e),
            'status': 'error'
        }), 400

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

Demo Run of the Application

In one terminal window, go ahead and start up your Flask application.

$ python app.py
* Serving Flask app 'app'
* Running on http://127.0.0.1:5000

In another terminal window, start up Celery.

$ celery -A tasks worker --loglevel=info

-------------- celery@demo-coder-machine v5.3.6 (emerald-rush)

[12:46:12,850: INFO/MainProcess] Connected to redis://localhost:6379/0
[12:46:12,851: INFO/MainProcess] mingle: searching for neighbors
[12:46:13,857: INFO/MainProcess] mingle: all alone
[12:46:13,871: INFO/MainProcess] celery@demo-coder-machine ready.

This walkthrough also includes a demo script that simulates adding tasks and checking their statuses.

$ python demo.py

===========================================================================
Starting Flask-Celery Demo with 10 tasks
===========================================================================
===========================================================================
Submitting 10 tasks...
===========================================================================
Task 1/10 submitted…
Task 2/10 submitted…
Task 3/10 submitted…

Task 4/10 submitted…
Task 5/10 submitted…
Task 6/10 submitted…
Task 7/10 submitted…
Task 8/10 submitted…
Task 9/10 submitted…
Task 10/10 submitted…
===========================================================================
Polling task statuses...
===========================================================================

Attempt 1/20 (Elapsed: 0.0s)
Progress: 0/10 tasks completed

Attempt 2/20 (Elapsed: 1.1s)
Progress: 0/10 tasks completed

Attempt 3/20 (Elapsed: 2.2s)
Progress: 0/10 tasks completed

Attempt 4/20 (Elapsed: 3.3s)
Progress: 0/10 tasks completed

Attempt 5/20 (Elapsed: 4.3s)
Progress: 0/10 tasks completed

Task 39c3d7fb-e160-4f7f-a2d7-440ac86be039 completed in 5 seconds!
Result: {
  "message": "Task 39c3d7fb-e160-4f7f-a2d7-440ac86be039 processed successfully",
  "processed_at": "2025-06-06 12:49:01",
  "processing_time": 5,
  "status": "completed",
  "task_id": "39c3d7fb-e160-4f7f-a2d7-440ac86be039"
}

Task 0f482a2c-4b47-4493-b65f-cbdbc9f876bf completed in 5 seconds!
Result: {
  "message": "Task 0f482a2c-4b47-4493-b65f-cbdbc9f876bf processed successfully",
  "processed_at": "2025-06-06 12:49:01",
  "processing_time": 5,
  "status": "completed",
  "task_id": "0f482a2c-4b47-4493-b65f-cbdbc9f876bf"
}


Attempt 15/20 (Elapsed: 14.8s)
Progress: 10/10 tasks completed

Processing Time Summary:
Task 39c3d7fb-e160-4f7f-a2d7-440ac86be039: 5 seconds
Task 0f482a2c-4b47-4493-b65f-cbdbc9f876bf: 5 seconds
Task 31005379-1e93-4bbc-a589-fd62c58e20ce: 6 seconds
Task afa02ae1-8d3c-457e-b56b-66fcdea4e779: 6 seconds
Task 95181dc3-ee32-45c5-a05a-64937e8c0583: 7 seconds
Task 3bdc5223-8146-4343-b7d7-fe4b393b7e02: 9 seconds
Task 7b5ff995-0f80-4f30-a698-15d1dc9604eb: 9 seconds
Task 4d6d61cc-2192-4dac-a401-4fa54d63efcd: 10 seconds
Task c9a4bb82-8a7c-4907-8865-2d368f3b4540: 5 seconds
Task 9d8e1b1f-59a0-4c2d-b8dd-56b348da3607: 9 seconds

Average processing time: 7.1 seconds

All tasks completed!

===========================================================================
Demo completed successfully!
===========================================================================

Add OpenTelemetry Dependencies to the Project

Instrumenting the Flask application and the Celery task queue provides a rich source of data that can be used in debugging and optimization. Let's walk through how to use OpenTelemetry and the OpenTelemetry Protocol (OTLP) to capture telemetry—traces, metrics, and logs—for export to SolarWinds Observability SaaS.

Start by setting up the necessary dependencies. The core OpenTelemetry packages are opentelemetry-api and opentelemetry-sdk, which provide the fundamental functionality for creating and managing telemetry data. For our specific application stack, we'll also need opentelemetry-instrumentation-flask and opentelemetry-instrumentation-celery to monitor our background tasks. To send the telemetry data to SolarWinds Observability SaaS, we'll use opentelemetry-exporter-otlp (OpenTelemetry Protocol). We'll also include packages for requests, logging, and helping to ensure we follow standard naming conventions for our telemetry data.

Add these dependencies to requirements.txt with specific versions to help ensure compatibility and stability.

flask==3.0.2
celery==5.3.6
redis==5.0.1
python-dotenv==1.0.1
requests==2.31.0

# OpenTelemetry dependencies with specific versions to avoid conflicts
importlib-metadata>=6.0,<7.0
protobuf>=3.19,<5.0
opentelemetry-api==1.23.0
opentelemetry-sdk==1.23.0
opentelemetry-exporter-otlp==1.23.0
opentelemetry-instrumentation-flask==0.44b0
opentelemetry-instrumentation-celery==0.44b0
opentelemetry-instrumentation-requests==0.44b0
opentelemetry-instrumentation-logging==0.44b0
opentelemetry-semantic-conventions==0.44b0

Once the dependencies are defined, create a Python virtual environment to isolate and install your project dependencies using pip. This setup provides you with all the necessary tools to start collecting and exporting telemetry data from the application to SolarWinds Observability SaaS.

Configure the Environment

To manage your OpenTelemetry configuration securely and flexibly, use environment variables with python-dotenv. This approach allows you to keep sensitive information like API keys out of the codebase while making it easy to configure different environments.

OTLP endpoint and ingestion token

To send telemetry data to SolarWinds Observability SaaS, you will need to find your OpenTelemetry endpoint and create an API token for data ingestion. Follow these steps.

 Log in to SolarWinds Observability SaaS. The URL, after you log in, may look similar to this:

https://my.na-01.cloud.solarwinds.com/

The xx-yy part of the URL (na-01 in this example) will depend on the data center used for your SolarWinds Observability SaaS account and organization. Take note of this.

 Navigate to Settings > API Tokens.

On the API Tokens page, click Create API Token.

Specify a name for your new API token. Select Ingestion as the token type. Click Create API Token.

Copy the resulting token value.

Store Information in an .env File

Create an .env file in the project root with the following essential variables:

# OpenTelemetry Configuration
OTEL_SERVICE_NAME=python-celery-demo
OTEL_ENVIRONMENT=development
OTEL_EXPORTER_OTLP_ENDPOINT=https://otel.collector.XX-YY.cloud.solarwinds.com:443
OTEL_EXPORTER_OTLP_API_KEY=replace-with-solarwinds-observability-token

# Celery Configuration (optional, defaults shown)
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0

The application already uses python-dotenv (as seen in app.py and tasks.py), which automatically loads these variables when the application starts. The .env file should be added to .gitignore to prevent accidental commits of sensitive information. For production deployments, these environment variables would be set through the deployment platform's configuration management system.

Configure OpenTelemetry

To centralize the OpenTelemetry configuration, create an otel_config.py file that sets up all three pillars of observability: traces, metrics, and logs. The resulting file looks like this:

import os
import json
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry._logs import set_logger_provider
from flask import request
import logging
import grpc

def setup_otel():
  """Configure OpenTelemetry with OTLP exporter."""
  try:
    # Get configuration from environment variables
    endpoint = os.getenv('OTEL_EXPORTER_OTLP_ENDPOINT')
    api_key = os.getenv('OTEL_EXPORTER_OTLP_API_KEY')
    service_name = os.getenv('OTEL_SERVICE_NAME', 'python-celery-demo')
       
    if not endpoint or not api_key:
      raise ValueError("OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_API_KEY must be set")
       
      # Create gRPC metadata with API key
      metadata = [
        ('authorization', f'Bearer {api_key.strip()}')
      ]
       
      # Create a resource with service information
      resource = Resource.create({
        ResourceAttributes.SERVICE_NAME: service_name,
        ResourceAttributes.SERVICE_VERSION: "1.0.0",
        ResourceAttributes.DEPLOYMENT_ENVIRONMENT: os.getenv('OTEL_ENVIRONMENT', 'development')
      })
       
      # Create channel credentials
      channel_credentials = grpc.ssl_channel_credentials()
       
      # Configure trace provider
      trace_provider = TracerProvider(resource=resource)
      otlp_exporter = OTLPSpanExporter(
        endpoint=endpoint,
        credentials=channel_credentials,
        headers=metadata,
        timeout=30,
        compression=grpc.Compression.Gzip,
      )
      span_processor = BatchSpanProcessor(otlp_exporter)
      trace_provider.add_span_processor(span_processor)
      trace.set_tracer_provider(trace_provider)
       
      # Configure metrics provider
      metric_reader = PeriodicExportingMetricReader(
        OTLPMetricExporter(
          endpoint=endpoint,
          credentials=channel_credentials,
          headers=metadata,
          timeout=30,
          compression=grpc.Compression.Gzip,
        )
      )
      meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
       
      # Configure logging provider
      log_exporter = OTLPLogExporter(
        endpoint=endpoint,
        credentials=channel_credentials,
        headers=metadata,
        timeout=30,
        compression=grpc.Compression.Gzip,
      )
      logger_provider = LoggerProvider(resource=resource) logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
      set_logger_provider(logger_provider)
       
      # Get the root logger
      root_logger = logging.getLogger()
       
      # Remove any existing handlers to avoid duplicates
      for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)
       
      # Add OTLP handler to root logger
      otlp_handler = LoggingHandler(logger_provider=logger_provider)
      otlp_handler.setLevel(logging.INFO)
      root_logger.addHandler(otlp_handler)
       
      # Add a console handler for local debugging
      console_handler = logging.StreamHandler()
      console_handler.setLevel(logging.INFO)
      console_formatter = logging.Formatter('%(asctime)s %(levelname)s [%(name)s] - %(message)s')
      console_handler.setFormatter(console_formatter)
      root_logger.addHandler(console_handler)
       
      # Set the root logger level
      root_logger.setLevel(logging.INFO)
       
      # Log startup message
      root_logger.info("OpenTelemetry initialized successfully")
       
      return trace_provider, meter_provider
       
  except Exception as e:
    logging.error(f"Failed to initialize OpenTelemetry: {str(e)}", exc_info=True)
    raise

The configuration in the above code snippet involves several key steps:

  1. Resource configuration: Define a resource (Resource.create) that identifies the service with attributes like service name, version, and environment. This metadata helps identify and filter telemetry data in SolarWinds Observability SaaS.
  2. Provider setup: Configure the TracerProvider with an OTLP exporter that sends spans to SolarWinds Observability SaaS. The configuration includes endpoint information and authentication credentials. Similarly, configure the MeterProvider, LoggerProvider, and LoggingHandler.

The configuration is initialized early in the application lifecycle, helping ensure that all telemetry data is properly collected and exported. In addition, the centralized approach makes it easy to maintain and modify your observability setup, while ensuring consistent telemetry collection across both your Flash we application and Celery workers.

Instrument the Application

With your OpenTelemetry configuration in place, you can instrument the application as needed.

Example Trace Implementation

For example, implement a trace in tasks.py by modifying process_task to look like this:

@celery_app.task(name='process_task')
def process_task(task_id, min_delay=3, max_delay=10):

    …

    # Get current span context for correlation
    current_span = trace.get_current_span()
    span_context = current_span.get_span_context() if current_span else None
   
    …   
    with tracer.start_as_current_span("process_task") as span:
        # Set span attributes
        span.set_attribute("task_id", task_id)
        span.set_attribute("min_delay", min_delay)
        span.set_attribute("max_delay", max_delay)
       
        # Generate random delay
        delay = random.uniform(min_delay, max_delay)
        span.set_attribute("actual_delay", delay)
       
        # Simulate processing
        time.sleep(delay)
       
        # Prepare result without including the message
        result = {
            "task_id": task_id,
            "status": "completed",
            "processing_time": delay,
            "min_delay": min_delay,
            "max_delay": max_delay
        }
       
        return result

Example Metrics and Logs Implementation

To instrument the application to collect various metrics, modify otel_config.py to implement various auto-instrumentation tools. In the calls below, Flask and Celery are automatically instrumented, as are API requests. In addition, the configuration sets up a logging middleware for use within the Flask app.

def instrument_app(app):
  """Instrument the Flask application with OpenTelemetry."""
  FlaskInstrumentor().instrument_app(app)
  RequestsInstrumentor().instrument()
  CeleryInstrumentor().instrument()
   
  # Add a simple request logging middleware
  @app.before_request
  def before_request():
    current_span = trace.get_current_span()
    if current_span:
      # Log request details with trace context
      logging.info(
        "FLASK: Request received",
        extra={
          "otel.service.name": os.getenv('OTEL_SERVICE_NAME'),
          "otel.environment": os.getenv('OTEL_ENVIRONMENT'),
          "otel.span_id": format(current_span.get_span_context().span_id, "016x"),
          "otel.trace_id": format(current_span.get_span_context().trace_id, "032x"),
          "http.method": request.method,
          "http.url": request.url,
          "http.route": request.endpoint
        }
      )

Make sure, within app.py, to call instrument_app.

from flask import Flask, jsonify, request
import uuid
from tasks import process_task
import logging
from dotenv import load_dotenv
import os
from otel_config import setup_otel, instrument_app

# Load environment variables from .env file
load_dotenv()

# Log environment variables (without sensitive data)
logging.info(f"OTEL_SERVICE_NAME: {os.getenv('OTEL_SERVICE_NAME')}")
logging.info(f"OTEL_ENVIRONMENT: {os.getenv('OTEL_ENVIRONMENT')}")
logging.info(f"OTEL_EXPORTER_OTLP_ENDPOINT: {os.getenv('OTEL_EXPORTER_OTLP_ENDPOINT')}")
logging.info(f"OTEL_EXPORTER_OTLP_HEADERS present: {'Yes' if os.getenv('OTEL_EXPORTER_OTLP_HEADERS') else 'No'}")

# Initialize OpenTelemetry
try:
    trace_provider, meter_provider = setup_otel()
    logging.info("OpenTelemetry initialized successfully")
except Exception as e:
    logging.error(f"Failed to initialize OpenTelemetry: {str(e)}")
    raise

app = Flask(__name__)

# Instrument the Flask application
instrument_app(app)
...

Verify Telemetry Data Export

With OpenTelemetry configured to export telemetry data to SolarWinds Observability SaaS, and the application properly instrumented, start up the application and the Celery task queue process. Then, run the demo script. 

After that has run, log in to SolarWinds Observability SaaS to verify the proper ingestion of telemetry data. Navigate to APM. There, you'll see your application listed.

Clicking on the listed application will show the presence of captured traces, metrics, and logs, available in the sub-navigation of tabs at the top of the page.

Click on the Traces tab. API request traces are shown across the various runs of the demo script.

Clicking on the details icon for any individual trace shows a breakdown of durations spent with each custom span.

Return to the APM details page. Then, navigate to Metrics. Metrics captured alongside the auto-instrumented traces are displayed.

Individual metric visualizations are available, but you can also create custom dashboards that overlay metrics or play them side by side for analysis and correlation.

The Logs tab shows every log event emitted by the application and sent to SolarWinds Observability SaaS.

This is helpful for application debugging as well as monitoring resources and transactions.

Conclusion

By instrumenting your Python (Flask/Celery) application with OpenTelemetry, you can transform your background tasks from black boxes into fully observable components of your system. The combination of automatic instrumentation for Flask, Celery, and HTTP requests, along with custom spans and structured logging, helps you create a complete picture of application behavior. Now, you can track requests from the initial web call through the entire task lifecycle, including queue time, processing duration, and any external service calls.

In SolarWinds Observability, this telemetry data comes together to provide powerful insights. You can visualize the complete request flow across your distributed system, set up alerts for task failures or performance degradation, and use the correlated logs to diagnose issues quickly. Seeing real-time task queue lengths, processing times, and error rates helps you proactively manage your application's health and performance.

Whether debugging a specific task failure or optimizing your system's overall performance, the observability data you've set up provides the context to make informed decisions and maintain a reliable application.

To learn more about OpenTelemetry with SolarWinds Observability SaaS, try it for free for 30 days.

 

THWACK - Symbolize TM, R, and C