Introduction
In this guide, we'll explore how to run AI workloads in-house using OPEA (Open Protocol for Enterprise AI) components. We'll specifically focus on setting up and orchestrating two microservices: text-to-image and image-to-video generation. This setup demonstrates how enterprises can maintain control over their AI infrastructure while leveraging powerful open-source models.
Why Run AI Workloads In-House?
There are several compelling reasons to run AI workloads on your own infrastructure:
Data Privacy: Keep sensitive data within your organization's boundaries
Cost Control: Predictable costs without per-request API charges
Customization: Ability to modify and fine-tune models for specific needs
Latency: Reduced network latency for faster inference
Compliance: Better control over regulatory requirements
Project Overview
Our goal is to set up two OPEA microservices:
Text-to-Image Service: Generates images from text descriptions using Stable Diffusion
Image-to-Video Service: Creates videos from static images using Stable Video Diffusion
These services will run in Docker containers and communicate with each other, demonstrating a practical microservices architecture.
Prerequisites
Docker and Docker Compose installed
Basic understanding of Python and FastAPI
A Hugging Face account and API token
Sufficient disk space for model weights (~10GB)
Git for version control
Step-by-Step Implementation
1. Project Structure Setup
First, create the project directory structure:
mkdir opea-services
cd opea-services
mkdir text2image
mkdir image2video
mkdir docker-compose
2. Setting Up the Text-to-Image Service
Create the text2image service files:
# text2image/opea_text2image_microservice.py
import logging
from fastapi import FastAPI, HTTPException
from diffusers import StableDiffusionPipeline
import torch
import os
import base64
from io import BytesIO
from PIL import Image
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = FastAPI()
pipe = None
@app.on_event("startup")
async def startup_event():
global pipe
logger.info("Starting text2image service...")
try:
model_id = os.getenv("MODEL", "stabilityai/stable-diffusion-2-1")
hf_token = os.getenv("HF_TOKEN")
logger.info(f"Initializing model {model_id}")
pipe = StableDiffusionPipeline.from_pretrained(
model_id,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
use_auth_token=hf_token
)
device = "cuda" if torch.cuda.is_available() else "cpu"
pipe = pipe.to(device)
logger.info(f"Model initialized on {device}")
except Exception as e:
logger.error(f"Error during startup: {str(e)}")
raise e
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.post("/v1/text2image")
async def generate_image(request: dict):
global pipe
logger.info(f"Received request: {request}")
try:
if pipe is None:
raise HTTPException(status_code=500, detail="Model not initialized")
prompt = request.get("prompt", "")
num_images = request.get("num_images_per_prompt", 1)
logger.info(f"Generating image for prompt: {prompt}")
images = pipe(prompt, num_images_per_prompt=num_images).images
# Convert images to base64
image_list = []
for img in images:
buffered = BytesIO()
img.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
image_list.append(img_str)
return {
"status": "success",
"images": image_list,
"message": f"Generated {len(images)} images successfully"
}
except Exception as e:
logger.error(f"Error generating image: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
Create the Dockerfile:
# text2image/Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir accelerate
# Copy service files
COPY . .
# Expose the port
EXPOSE 9379
# Add startup script
COPY start.sh .
RUN chmod +x start.sh
CMD ["./start.sh"]
Create the requirements.txt:
# text2image/requirements.txt
torch
diffusers
transformers
fastapi
uvicorn
safetensors
accelerate
Create the start script:
# text2image/start.sh
#!/bin/bash
uvicorn opea_text2image_microservice:app --host 0.0.0.0 --port 9379 --log-level debug
3. Setting Up the Image-to-Video Service
Create the image2video service files:
# image2video/opea_image2video_microservice.py
import logging
from fastapi import FastAPI, HTTPException
from diffusers import StableVideoDiffusionPipeline
import torch
import os
import base64
from io import BytesIO
import requests
from PIL import Image
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = FastAPI()
pipe = None
@app.on_event("startup")
async def startup_event():
global pipe
logger.info("Starting image2video service...")
try:
hf_token = os.getenv("HF_TOKEN")
logger.info("Initializing SVD model")
pipe = StableVideoDiffusionPipeline.from_pretrained(
"stabilityai/stable-video-diffusion-img2vid",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
use_auth_token=hf_token
)
device = "cuda" if torch.cuda.is_available() else "cpu"
pipe = pipe.to(device)
logger.info(f"Model initialized on {device}")
except Exception as e:
logger.error(f"Error during startup: {str(e)}")
raise e
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.post("/v1/image2video")
async def generate_video(request: dict):
global pipe
logger.info(f"Received request: {request}")
try:
if pipe is None:
raise HTTPException(status_code=500, detail="Model not initialized")
image_paths = request.get("images_path", [])
if not image_paths:
raise HTTPException(status_code=400, detail="No image paths provided")
# Load the first image
image_path = image_paths[0]["image_path"]
logger.info(f"Loading image from: {image_path}")
# Handle both URLs and base64 images
if image_path.startswith('http'):
response = requests.get(image_path)
image = Image.open(BytesIO(response.content))
else:
# Assume it's base64
try:
image_data = base64.b64decode(image_path)
image = Image.open(BytesIO(image_data))
except:
raise HTTPException(status_code=400, detail="Invalid image data")
logger.info("Generating video")
video_frames = pipe(image, num_frames=16).frames
# Convert frames to base64
frame_list = []
for frame in video_frames:
buffered = BytesIO()
frame.save(buffered, format="PNG")
frame_str = base64.b64encode(buffered.getvalue()).decode()
frame_list.append(frame_str)
return {
"status": "success",
"frames": frame_list,
"message": f"Generated video with {len(frame_list)} frames"
}
except Exception as e:
logger.error(f"Error generating video: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
4. Docker Compose Configuration
Create the docker-compose.yml:
# docker-compose/docker-compose.yml
version: '3.8'
services:
text2image:
build:
context: ../text2image
dockerfile: Dockerfile
ports:
- "9379:9379"
environment:
- MODEL=stabilityai/stable-diffusion-2-1
- HF_TOKEN=${HF_TOKEN}
networks:
- opea-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9379/health"]
interval: 30s
timeout: 10s
retries: 3
volumes:
- text2image_cache:/root/.cache
image2video:
build:
context: ../image2video
dockerfile: Dockerfile
ports:
- "9369:9369"
environment:
- HF_TOKEN=${HF_TOKEN}
networks:
- opea-network
depends_on:
- text2image
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9369/health"]
interval: 30s
timeout: 10s
retries: 3
volumes:
- image2video_cache:/root/.cache
networks:
opea-network:
driver: bridge
volumes:
text2image_cache:
image2video_cache:
Running the Services
- Set up your Hugging Face token:
export HF_TOKEN=your_hugging_face_token
- Build and start the services:
cd docker-compose
docker-compose up --build -d
- Monitor the logs:
docker-compose logs -f
Results:
Testing the Services
- Test the text-to-image service:
curl http://localhost:9379/v1/text2image -X POST \
-H "Content-Type: application/json" \
-d '{"prompt":"An astronaut riding a green horse", "num_images_per_prompt":1}'
- Test the image-to-video service:
curl http://localhost:9369/v1/image2video -X POST \
-H "Content-Type: application/json" \
-d '{"images_path":[{"image_path":"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/diffusers/svd/rocket.png"}]}'
Troubleshooting Guide
During our implementation, we encountered and resolved several issues:
Model Loading Issues
Problem: Models not loading properly
Solution: Added proper error handling and logging
Added volume mounts for model caching
Empty Responses
Problem: Services returning empty responses
Solution: Improved error handling and response formatting
Added proper base64 encoding for images
Memory Issues
Problem: Services crashing due to memory
Solution: Added proper torch dtype configuration
Implemented device-specific optimizations
Container Communication
Problem: Services not communicating
Solution: Set up proper Docker networking
Added health checks
Best Practices and Lessons Learned
Logging
Implement comprehensive logging
Use debug level during development
Log all important operations and errors
Error Handling
Always handle exceptions properly
Return meaningful error messages
Implement proper status codes
Configuration
Use environment variables for configuration
Implement reasonable defaults
Make services configurable
Resource Management
Properly manage model loading
Implement proper cleanup
Use appropriate cache strategies
Conclusion
Running AI workloads in-house using OPEA components is a viable solution for organizations looking to maintain control over their AI infrastructure. While there are challenges to overcome, the benefits of having full control over your AI services often outweigh the implementation complexity.
This implementation demonstrates how to set up a basic pipeline of AI services, but there's room for expansion:
Adding more services to the pipeline
Implementing better error handling
Adding monitoring and metrics
Implementing authentication and authorization
Adding rate limiting and queue management