Running AI Workloads In-House: A Guide to OPEA Microservices

Running AI Workloads In-House: A Guide to OPEA Microservices

Introduction

In this guide, we'll explore how to run AI workloads in-house using OPEA (Open Protocol for Enterprise AI) components. We'll specifically focus on setting up and orchestrating two microservices: text-to-image and image-to-video generation. This setup demonstrates how enterprises can maintain control over their AI infrastructure while leveraging powerful open-source models.

Why Run AI Workloads In-House?

There are several compelling reasons to run AI workloads on your own infrastructure:

  1. Data Privacy: Keep sensitive data within your organization's boundaries

  2. Cost Control: Predictable costs without per-request API charges

  3. Customization: Ability to modify and fine-tune models for specific needs

  4. Latency: Reduced network latency for faster inference

  5. Compliance: Better control over regulatory requirements

Project Overview

Our goal is to set up two OPEA microservices:

  • Text-to-Image Service: Generates images from text descriptions using Stable Diffusion

  • Image-to-Video Service: Creates videos from static images using Stable Video Diffusion

These services will run in Docker containers and communicate with each other, demonstrating a practical microservices architecture.

Prerequisites

  • Docker and Docker Compose installed

  • Basic understanding of Python and FastAPI

  • A Hugging Face account and API token

  • Sufficient disk space for model weights (~10GB)

  • Git for version control

Step-by-Step Implementation

1. Project Structure Setup

First, create the project directory structure:

mkdir opea-services
cd opea-services
mkdir text2image
mkdir image2video
mkdir docker-compose

2. Setting Up the Text-to-Image Service

Create the text2image service files:

# text2image/opea_text2image_microservice.py
import logging
from fastapi import FastAPI, HTTPException
from diffusers import StableDiffusionPipeline
import torch
import os
import base64
from io import BytesIO
from PIL import Image

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

app = FastAPI()
pipe = None

@app.on_event("startup")
async def startup_event():
    global pipe
    logger.info("Starting text2image service...")
    try:
        model_id = os.getenv("MODEL", "stabilityai/stable-diffusion-2-1")
        hf_token = os.getenv("HF_TOKEN")
        logger.info(f"Initializing model {model_id}")
        pipe = StableDiffusionPipeline.from_pretrained(
            model_id,
            torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
            use_auth_token=hf_token
        )
        device = "cuda" if torch.cuda.is_available() else "cpu"
        pipe = pipe.to(device)
        logger.info(f"Model initialized on {device}")
    except Exception as e:
        logger.error(f"Error during startup: {str(e)}")
        raise e

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.post("/v1/text2image")
async def generate_image(request: dict):
    global pipe
    logger.info(f"Received request: {request}")
    try:
        if pipe is None:
            raise HTTPException(status_code=500, detail="Model not initialized")

        prompt = request.get("prompt", "")
        num_images = request.get("num_images_per_prompt", 1)

        logger.info(f"Generating image for prompt: {prompt}")
        images = pipe(prompt, num_images_per_prompt=num_images).images

        # Convert images to base64
        image_list = []
        for img in images:
            buffered = BytesIO()
            img.save(buffered, format="PNG")
            img_str = base64.b64encode(buffered.getvalue()).decode()
            image_list.append(img_str)

        return {
            "status": "success",
            "images": image_list,
            "message": f"Generated {len(images)} images successfully"
        }
    except Exception as e:
        logger.error(f"Error generating image: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

Create the Dockerfile:

# text2image/Dockerfile
FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir accelerate

# Copy service files
COPY . .

# Expose the port
EXPOSE 9379

# Add startup script
COPY start.sh .
RUN chmod +x start.sh

CMD ["./start.sh"]

Create the requirements.txt:

# text2image/requirements.txt
torch
diffusers
transformers
fastapi
uvicorn
safetensors
accelerate

Create the start script:

# text2image/start.sh
#!/bin/bash
uvicorn opea_text2image_microservice:app --host 0.0.0.0 --port 9379 --log-level debug

3. Setting Up the Image-to-Video Service

Create the image2video service files:

# image2video/opea_image2video_microservice.py
import logging
from fastapi import FastAPI, HTTPException
from diffusers import StableVideoDiffusionPipeline
import torch
import os
import base64
from io import BytesIO
import requests
from PIL import Image

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

app = FastAPI()
pipe = None

@app.on_event("startup")
async def startup_event():
    global pipe
    logger.info("Starting image2video service...")
    try:
        hf_token = os.getenv("HF_TOKEN")
        logger.info("Initializing SVD model")
        pipe = StableVideoDiffusionPipeline.from_pretrained(
            "stabilityai/stable-video-diffusion-img2vid",
            torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
            use_auth_token=hf_token
        )
        device = "cuda" if torch.cuda.is_available() else "cpu"
        pipe = pipe.to(device)
        logger.info(f"Model initialized on {device}")
    except Exception as e:
        logger.error(f"Error during startup: {str(e)}")
        raise e

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.post("/v1/image2video")
async def generate_video(request: dict):
    global pipe
    logger.info(f"Received request: {request}")
    try:
        if pipe is None:
            raise HTTPException(status_code=500, detail="Model not initialized")

        image_paths = request.get("images_path", [])
        if not image_paths:
            raise HTTPException(status_code=400, detail="No image paths provided")

        # Load the first image
        image_path = image_paths[0]["image_path"]
        logger.info(f"Loading image from: {image_path}")

        # Handle both URLs and base64 images
        if image_path.startswith('http'):
            response = requests.get(image_path)
            image = Image.open(BytesIO(response.content))
        else:
            # Assume it's base64
            try:
                image_data = base64.b64decode(image_path)
                image = Image.open(BytesIO(image_data))
            except:
                raise HTTPException(status_code=400, detail="Invalid image data")

        logger.info("Generating video")
        video_frames = pipe(image, num_frames=16).frames

        # Convert frames to base64
        frame_list = []
        for frame in video_frames:
            buffered = BytesIO()
            frame.save(buffered, format="PNG")
            frame_str = base64.b64encode(buffered.getvalue()).decode()
            frame_list.append(frame_str)

        return {
            "status": "success",
            "frames": frame_list,
            "message": f"Generated video with {len(frame_list)} frames"
        }
    except Exception as e:
        logger.error(f"Error generating video: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

4. Docker Compose Configuration

Create the docker-compose.yml:

# docker-compose/docker-compose.yml
version: '3.8'

services:
  text2image:
    build:
      context: ../text2image
      dockerfile: Dockerfile
    ports:
      - "9379:9379"
    environment:
      - MODEL=stabilityai/stable-diffusion-2-1
      - HF_TOKEN=${HF_TOKEN}
    networks:
      - opea-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9379/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    volumes:
      - text2image_cache:/root/.cache

  image2video:
    build:
      context: ../image2video
      dockerfile: Dockerfile
    ports:
      - "9369:9369"
    environment:
      - HF_TOKEN=${HF_TOKEN}
    networks:
      - opea-network
    depends_on:
      - text2image
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9369/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    volumes:
      - image2video_cache:/root/.cache

networks:
  opea-network:
    driver: bridge

volumes:
  text2image_cache:
  image2video_cache:

Running the Services

  1. Set up your Hugging Face token:
export HF_TOKEN=your_hugging_face_token
  1. Build and start the services:
cd docker-compose
docker-compose up --build -d
  1. Monitor the logs:
docker-compose logs -f

Results:

Testing the Services

  1. Test the text-to-image service:
curl http://localhost:9379/v1/text2image -X POST \
  -H "Content-Type: application/json" \
  -d '{"prompt":"An astronaut riding a green horse", "num_images_per_prompt":1}'
  1. Test the image-to-video service:
curl http://localhost:9369/v1/image2video -X POST \
  -H "Content-Type: application/json" \
  -d '{"images_path":[{"image_path":"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/diffusers/svd/rocket.png"}]}'

Troubleshooting Guide

During our implementation, we encountered and resolved several issues:

  1. Model Loading Issues

    • Problem: Models not loading properly

    • Solution: Added proper error handling and logging

    • Added volume mounts for model caching

  2. Empty Responses

    • Problem: Services returning empty responses

    • Solution: Improved error handling and response formatting

    • Added proper base64 encoding for images

  3. Memory Issues

    • Problem: Services crashing due to memory

    • Solution: Added proper torch dtype configuration

    • Implemented device-specific optimizations

  4. Container Communication

    • Problem: Services not communicating

    • Solution: Set up proper Docker networking

    • Added health checks

Best Practices and Lessons Learned

  1. Logging

    • Implement comprehensive logging

    • Use debug level during development

    • Log all important operations and errors

  2. Error Handling

    • Always handle exceptions properly

    • Return meaningful error messages

    • Implement proper status codes

  3. Configuration

    • Use environment variables for configuration

    • Implement reasonable defaults

    • Make services configurable

  4. Resource Management

    • Properly manage model loading

    • Implement proper cleanup

    • Use appropriate cache strategies

Conclusion

Running AI workloads in-house using OPEA components is a viable solution for organizations looking to maintain control over their AI infrastructure. While there are challenges to overcome, the benefits of having full control over your AI services often outweigh the implementation complexity.

This implementation demonstrates how to set up a basic pipeline of AI services, but there's room for expansion:

  • Adding more services to the pipeline

  • Implementing better error handling

  • Adding monitoring and metrics

  • Implementing authentication and authorization

  • Adding rate limiting and queue management

Resources