【教程】三步配置Pipeline,解锁Open-WebUI图片识别(二)

AI摘要

Open-WebUI通过pipelines管道支持图片理解和视频识别,但本身不支持视频上传。配置pipelines后,可集成Ollama本地图片模型(如gemma3:27b)和智普免费图片模型(如glm-4.1v-thinking-flash)。代码优先使用智普模型(若配置),仅Ollama的qwen3-vl:8b支持视频处理,智普模型仅支持图片。

open-webui 支持图片理解

通过pipelines管道让open-webui 支持对话流程中多次图片和视频识别内容;但是open-webui 不支持视频上传,所以视频理解功能受限;open-webui 上传视频会报错,报错信息:File type video/mp4 is not supported for processing

只要先部署好pipelines和open-webui 配置pipelines,上传下面的python 文件,就可以实现open-webui 识别图片功能;

具体实现代码是pipelines官方文件(dynamic_ollama_vision_filter_pipeline.py)改的,支持ollama 本地图片模型(gemma3:27b)和智普免费图片模型(glm-4.1v-thinking-flash),

代码工作流程

  • 如果只配置ollama ,没添加智普URL和API KEY,则优先使用ollama;
  • 同时配置了ollama 和 智普时,优先智普模型(因为免费)

pipelines完整的代码

"""
title: Dynamic Vision Pipeline (Ollama + ZhipuAI)
author: Andrew Tait Gehrhardt (modified for GLM-4.1V-Thinking-Flash support)
date: 2024-06-18
version: 2.1
license: MIT
description: A pipeline for dynamically processing images and videos when current model is a text only model.
             Supports both Ollama and ZhipuAI models with intelligent routing.
             Note: Only Ollama qwen3-vl:8b supports video processing; ZhipuAI models support images only.
requirements: pydantic, aiohttp
"""

from typing import List, Optional
from pydantic import BaseModel
import json
import aiohttp
import base64
import os
from utils.pipelines.main import get_last_user_message

class Pipeline:
    class Valves(BaseModel):
        pipelines: List[str] = []
        priority: int = 0

        # Ollama configuration
        ollama_vision_model: str = "gemma3:27b"
        ollama_base_url: str = "http://localhost:11434"

        # ZhipuAI configuration
        zhipuai_enabled: bool = False
        zhipuai_api_key: str = ""
        zhipuai_base_url: str = "https://open.bigmodel.cn"

        # ZhipuAI models
        zhipuai_image_model: str = "glm-4.1v-thinking-flash"  # Primary image model (images only)
        zhipuai_video_model: str = "glm-4.5v"  # Video + image model (supports both)

        # Ollama models
        ollama_image_model: str = "gemma3:27b"  # For images only
        ollama_video_model: str = "qwen3-vl:8b"  # For images + videos

        # Priority configuration: "zhipuai_first" or "ollama_first"
        priority_order: str = "zhipuai_first"

        # Legacy support
        model_to_override: str = "deepseek"

    def __init__(self):
        self.type = "filter"
        self.name = "Dynamic Vision Filter (Ollama + ZhipuAI)"
        self.valves = self.Valves(
            **{
                "pipelines": ["*"],  # Connect to all pipelines

                # Ollama configuration from environment
                "ollama_base_url": os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"),
                "ollama_image_model": os.getenv("OLLAMA_IMAGE_MODEL", "gemma3:27b"),
                "ollama_video_model": os.getenv("OLLAMA_VIDEO_MODEL", "qwen3-vl:8b"),

                # ZhipuAI configuration from environment
                "zhipuai_enabled": os.getenv("ZHIPUAI_ENABLED", "false").lower() == "true",
                "zhipuai_api_key": os.getenv("ZHIPUAI_API_KEY", ""),
                "zhipuai_base_url": os.getenv("ZHIPUAI_BASE_URL", "https://open.bigmodel.cn"),
                "zhipuai_image_model": os.getenv("ZHIPUAI_IMAGE_MODEL", "glm-4.1v-thinking-flash"),
                "zhipuai_video_model": os.getenv("ZHIPUAI_VIDEO_MODEL", "glm-4.5v"),

                # Priority configuration
                "priority_order": os.getenv("VISION_PRIORITY_ORDER", "zhipuai_first"),

                # Legacy support
                "model_to_override": os.getenv("MODEL_TO_OVERRIDE", "deepseek"),
            }
        )

    async def on_startup(self):
        print(f"on_startup:{__name__}")
        print("Dynamic Vision Filter starting...")
        print(f"ZhipuAI Enabled: {self.valves.zhipuai_enabled}")
        if self.valves.zhipuai_enabled:
            print(f"  - 图片模型: {self.valves.zhipuai_image_model}")
            print(f"  - 视频模型: {self.valves.zhipuai_video_model}")
            print(f"  - API Key configured: {'Yes' if self.valves.zhipuai_api_key else 'No'}")
            print(f"  - Base URL: {self.valves.zhipuai_base_url}")
        print(f"Ollama Configuration:")
        print(f"  - 图片模型: {self.valves.ollama_image_model}")
        print(f"  - 视频模型: {self.valves.ollama_video_model}")
        print(f"  - Base URL: {self.valves.ollama_base_url}")
        print(f"Priority Order: {self.valves.priority_order}")
        print(f"Override Models: {self.valves.model_to_override}")
        pass

    async def on_shutdown(self):
        print(f"on_shutdown:{__name__}")
        pass

    async def process_media_with_zhipuai(self, images: List[str], videos: List[str], content: str, api_key: str, base_url: str, model: str) -> str:
        """使用智谱AI GLM-4.1V-Thinking-Flash处理图片和视频"""
        url = f"{base_url}/api/paas/v4/chat/completions"

        # 构建消息内容
        message_content = []

        # 添加文本内容
        if content:
            message_content.append({
                "type": "text",
                "text": content
            })
        else:
            # 根据是否有视频调整提示词
            if videos:
                message_content.append({
                    "type": "text",
                    "text": "请描述这段视频的内容,包括其中发生的事件、人物、物体等关键信息"
                })
            else:
                message_content.append({
                    "type": "text",
                    "text": "请描述这张图片的内容"
                })

        # 添加图片内容
        for img_data in images:
            # 检测是否已经是完整的数据URL格式
            if img_data.startswith('data:image/'):
                image_url = img_data
            else:
                # 假设是纯base64数据,需要添加前缀
                image_url = f"data:image/jpeg;base64,{img_data}"

            message_content.append({
                "type": "image_url",
                "image_url": {
                    "url": image_url
                }
            })

        # 添加视频内容
        for video_data in videos:
            # 智谱AI GLM-4.5v 需要公开可访问的视频URL,不支持base64编码
            if video_data.startswith(('http://', 'https://')):
                # 如果是URL,直接使用
                video_url = video_data
                print(f"使用公开视频URL: {video_url}")
            elif video_data.startswith('data:video/'):
                # 如果是base64数据,发出警告但仍尝试使用
                print("⚠️ 警告:智谱AI GLM-4.5v 建议使用公开URL而非base64编码的视频数据")
                video_url = video_data
            else:
                # 假设是纯base64数据
                print("⚠️ 警告:智谱AI GLM-4.5v 建议使用公开URL而非base64编码的视频数据")
                video_url = f"data:video/mp4;base64,{video_data}"

            message_content.append({
                "type": "video_url",
                "video_url": {
                    "url": video_url
                }
            })

        payload = {
            "model": model,
            "messages": [
                {
                    "role": "user",
                    "content": message_content
                }
            ],
            "stream": False,
            "temperature": 0.3,
            "max_tokens": 1000
        }

        # 添加thinking参数(用于GLM-4.5v视频处理)
        if "4.5" in model and videos:
            payload["thinking"] = {
                "type": "enabled"
            }

        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

        print(f"Calling ZhipuAI API: {url}")
        print(f"Model: {model}")
        print(f"Content: {content}")
        print(f"Images count: {len(images)}")
        print(f"Videos count: {len(videos)}")

        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(url, json=payload, headers=headers) as response:
                    print(f"ZhipuAI response status: {response.status}")

                    if response.status == 200:
                        response_data = await response.json()
                        result = response_data.get("choices", [{}])[0].get("message", {}).get("content", "")
                        print(f"ZhipuAI response: {result[:100]}..." if len(result) > 100 else f"ZhipuAI response: {result}")
                        return result
                    else:
                        error_text = await response.text()
                        print(f"Failed to process images with ZhipuAI, status code: {response.status}")
                        print(f"Error response: {error_text}")
                        return f"Error: Failed to process image with ZhipuAI (status {response.status})"
        except Exception as e:
            print(f"Exception when calling ZhipuAI: {str(e)}")
            return f"Error: {str(e)}"

    async def process_images_with_zhipuai(self, images: List[str], content: str, api_key: str, base_url: str, model: str) -> str:
        """向后兼容的图片处理方法"""
        return await self.process_media_with_zhipuai(images, [], content, api_key, base_url, model)

    async def process_media_with_ollama(self, images: List[str], videos: List[str], content: str, vision_model: str, ollama_base_url: str) -> str:
        """使用Ollama处理图片(注:Ollama目前不支持视频处理)"""
        # 如果有视频,返回提示信息
        if videos:
            print("Warning: Ollama does not support video processing. Videos will be ignored.")

        url = f"{ollama_base_url}/api/chat"
        payload = {
            "model": vision_model,
            "messages": [
                {
                    "role": "user",
                    "content": content,
                    "images": images
                }
            ],
            "stream": False  # 不使用流式响应
        }

        print(f"Calling Ollama API: {url}")
        print(f"Model: {vision_model}")
        print(f"Content: {content}")
        print(f"Images count: {len(images)}")
        if videos:
            print(f"Videos count: {len(videos)} (ignored - Ollama doesn't support video)")

        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(url, json=payload) as response:
                    print(f"Ollama response status: {response.status}")
                    if response.status == 200:
                        response_text = await response.text()
                        print(f"Response preview: {response_text[:200]}...")

                        # 尝试直接解析JSON(非流式响应)
                        try:
                            response_data = json.loads(response_text)
                            result = response_data.get("message", {}).get("content", "")
                            print(f"Ollama response (JSON): {result[:100]}..." if len(result) > 100 else f"Ollama response (JSON): {result}")
                            return result
                        except json.JSONDecodeError:
                            # 如果直接解析失败,处理SSE格式
                            print("Detected SSE format, parsing...")
                            content_parts = []
                            lines = response_text.strip().split('\n')
                            for line in lines:
                                line = line.strip()
                                if line.startswith('data: '):
                                    json_str = line[6:]  # 移除 'data: '
                                    if json_str and json_str != '[DONE]':
                                        try:
                                            data = json.loads(json_str)
                                            content = data.get("message", {}).get("content", "")
                                            content_parts.append(content)
                                        except json.JSONDecodeError as e:
                                            print(f"JSON解析错误: {e}, 原始数据: {json_str}")
                                            continue
                            result = "".join(content_parts)
                            print(f"Ollama response (SSE): {result[:100]}..." if len(result) > 100 else f"Ollama response (SSE): {result}")
                            return result
                    else:
                        error_text = await response.text()
                        print(f"Failed to process images with Ollama, status code: {response.status}")
                        print(f"Error response: {error_text}")
                        return f"Error: Failed to process image with Ollama (status {response.status})"
        except Exception as e:
            print(f"Exception when calling Ollama: {str(e)}")
            return f"Error: {str(e)}"

    async def process_images_with_ollama(self, images: List[str], content: str, vision_model: str, ollama_base_url: str) -> str:
        """向后兼容的Ollama图片处理方法"""
        return await self.process_media_with_ollama(images, [], content, vision_model, ollama_base_url)

    async def process_images_with_priority_routing(self, images: List[str], content: str) -> str:
        """智能路由:优先使用智谱AI,如果配置错误或失败则回退到Ollama"""
        zhipuai_available = (
            self.valves.zhipuai_enabled and
            self.valves.zhipuai_api_key and
            self.valves.zhipuai_base_url
        )
        ollama_available = (
            self.valves.ollama_base_url and
            self.valves.ollama_vision_model
        )

        print(f"ZhipuAI available: {zhipuai_available}")
        print(f"Ollama available: {ollama_available}")
        print(f"Priority order: {self.valves.priority_order}")

        if self.valves.priority_order == "zhipuai_first":
            # 优先使用智谱AI
            if zhipuai_available:
                print("Trying ZhipuAI first...")
                result = await self.process_images_with_zhipuai(
                    images, content,
                    self.valves.zhipuai_api_key,
                    self.valves.zhipuai_base_url,
                    self.valves.glm_model
                )
                if not result.startswith("Error:"):
                    return result
                print("ZhipuAI failed, falling back to Ollama...")
            elif ollama_available:
                print("ZhipuAI not configured, using Ollama...")
                return await self.process_images_with_ollama(
                    images, content,
                    self.valves.ollama_vision_model,
                    self.valves.ollama_base_url
                )
            else:
                return "Error: No vision model configured. Please configure either ZhipuAI or Ollama."

        elif self.valves.priority_order == "ollama_first":
            # 优先使用Ollama
            if ollama_available:
                print("Trying Ollama first...")
                result = await self.process_images_with_ollama(
                    images, content,
                    self.valves.ollama_vision_model,
                    self.valves.ollama_base_url
                )
                if not result.startswith("Error:"):
                    return result
                print("Ollama failed, falling back to ZhipuAI...")
            elif zhipuai_available:
                print("Ollama not configured, using ZhipuAI...")
                return await self.process_images_with_zhipuai(
                    images, content,
                    self.valves.zhipuai_api_key,
                    self.valves.zhipuai_base_url,
                    self.valves.glm_model
                )
            else:
                return "Error: No vision model configured. Please configure either Ollama or ZhipuAI."

        else:
            return f"Error: Invalid priority_order '{self.valves.priority_order}'. Use 'zhipuai_first' or 'ollama_first'."

    async def process_media_with_priority_routing(self, images: List[str], videos: List[str], content: str) -> str:
        """使用优先级路由处理媒体"""
        has_videos = len(videos) > 0

        # 按优先级尝试不同的模型组合
        routing_strategies = []

        if has_videos:
            # 有视频时的优先级策略
            print("检测到视频内容,优先尝试支持视频的模型")

            if self.valves.zhipuai_enabled:
                routing_strategies.append({
                    "name": "智谱AI视频模型 (GLM-4.5v)",
                    "provider": "zhipuai",
                    "model": self.valves.zhipuai_video_model,  # glm-4.5v
                    "supports_video": True
                })

            routing_strategies.append({
                "name": "Ollama视频模型 (qwen3-vl:8b)",
                "provider": "ollama",
                "model": self.valves.ollama_video_model,  # qwen3-vl:8b
                "supports_video": True
            })
        else:
            # 只有图片时的优先级策略
            print("检测到图片内容,按优先级尝试图片处理模型")

            if self.valves.zhipuai_enabled:
                routing_strategies.append({
                    "name": "智谱AI主图片模型 (GLM-4.1V-Thinking-Flash)",
                    "provider": "zhipuai",
                    "model": self.valves.zhipuai_image_model,  # glm-4.1v-thinking-flash
                    "supports_video": False
                })

            routing_strategies.append({
                "name": "Ollama图片模型 (gemma3:27b)",
                "provider": "ollama",
                "model": self.valves.ollama_image_model,  # gemma3:27b
                "supports_video": False
            })

        # 尝试每个策略
        last_error = None
        for strategy in routing_strategies:
            try:
                print(f"尝试使用 {strategy['name']}: {strategy['model']}")

                if strategy["provider"] == "zhipuai":
                    result = await self.process_media_with_zhipuai(
                        images, videos, content,  # 智谱AI支持视频(GLM-4.5v)
                        self.valves.zhipuai_api_key,
                        self.valves.zhipuai_base_url,
                        strategy["model"]
                    )
                    print(f"✅ {strategy['name']} 处理成功")
                    return result
                else:  # ollama
                    result = await self.process_media_with_ollama(
                        images, videos, content,  # Ollama也支持视频(qwen3-vl:8b)
                        strategy["model"],
                        self.valves.ollama_base_url
                    )
                    print(f"✅ {strategy['name']} 处理成功")
                    return result

            except Exception as e:
                last_error = e
                print(f"❌ {strategy['name']} 处理失败: {e}")
                continue

        # 所有策略都失败
        if has_videos:
            error_msg = f"所有视频处理策略都失败了。最后错误: {last_error}"
        else:
            error_msg = f"所有图片处理策略都失败了。最后错误: {last_error}"

        print(error_msg)
        raise Exception(error_msg)

    async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        print(f"pipe:{__name__}")

        images = []
        videos = []

        # Ensure the body is a dictionary
        if isinstance(body, str):
            body = json.loads(body)

        model = body.get("model", "")
        print(f"Current model: {model}")

        # Get the content of the most recent message
        user_message = get_last_user_message(body["messages"])

        # Check if current model is in the override list (supports comma-separated values and fuzzy matching)
        override_models = [m.strip() for m in self.valves.model_to_override.split(",") if m.strip()]
        print(f"Override models: {override_models}")

        # 模糊匹配函数
        def is_model_match(current_model: str, override_model: str) -> bool:
            # 精确匹配
            if current_model == override_model:
                return True

            # 包含匹配
            if override_model in current_model or current_model in override_model:
                return True

            # 忽略大小写匹配
            if override_model.lower() in current_model.lower() or current_model.lower() in override_model.lower():
                return True

            # 去除特殊字符后匹配
            import re
            current_clean = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fff]', '', current_model).lower()
            override_clean = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fff]', '', override_model).lower()
            if override_clean in current_clean or current_clean in override_clean:
                return True

            return False

        # 检查是否有匹配的模型
        model_matched = any(is_model_match(model, override_model) for override_model in override_models)
        print(f"Model in override list (fuzzy match): {model_matched}")
        if model_matched:
            matched_models = [override_model for override_model in override_models if is_model_match(model, override_model)]
            print(f"Matched with: {matched_models}")

        if model_matched:
            messages = body.get("messages", [])

            for message in messages:
                # 检查传统格式的 images 字段
                if "images" in message:
                    print(f"Found images field in message: {len(message['images'])} images")
                    images.extend(message["images"])

                # 检查 content 数组中的图片和视频
                if "content" in message and isinstance(message["content"], list):
                    print(f"Checking content array for images and videos...")
                    content_images = []
                    content_videos = []
                    content_text = ""

                    for item in message["content"]:
                        if item.get("type") == "text":
                            content_text += item.get("text", "")
                        elif item.get("type") == "image_url":
                            image_url = item.get("image_url", {}).get("url", "")
                            if image_url.startswith("data:image/"):
                                # 提取 base64 图片数据
                                base64_data = image_url.split(",", 1)[1]  # 移除 data:image/png;base64, 前缀
                                content_images.append(base64_data)
                                print(f"Found base64 image in content, length: {len(base64_data)}")
                        elif item.get("type") == "video_url":
                            video_url = item.get("video_url", {}).get("url", "")
                            if video_url.startswith("data:video/"):
                                # 提取 base64 视频数据
                                base64_data = video_url.split(",", 1)[1]  # 移除 data:video/mp4;base64, 前缀
                                content_videos.append(base64_data)
                                print(f"Found base64 video in content, length: {len(base64_data)}")

                    if content_images or content_videos:
                        print(f"Processing {len(content_images)} images and {len(content_videos)} videos from content array...")
                        raw_vision_response = await self.process_media_with_priority_routing(content_images, content_videos, content_text or "Describe this media")
                        # 替换整个 content 为视觉模型的响应
                        message["content"] = f"REPEAT THIS BACK: {raw_vision_response}"
                        print(f"Replaced content with vision model response")
        else:
            messages = body.get("messages", [])
            print(f"Total messages: {len(messages)}")
            for i, msg in enumerate(messages):
                print(f"Message {i}: role={msg.get('role')}, has_images={'images' in msg}, has_videos={'videos' in msg}")
                # 也检查其他可能包含媒体文件的字段
                for key in ['image', 'file', 'files', 'attachment', 'attachments', 'video', 'videos']:
                    if key in msg:
                        print(f"  - Also has {key}: True")

        return body

Saiyintai

软件测试工程师

相关推荐

【教程】三步配置Pipeline,解锁Open-WebUI图片识别(一)

PIPELINE扩展了OPEN-WEBUI的功能,使其不仅能进行文字对话,还能集成图片识别、音频处理等动态工作流。它支持函数调用、定制RAG、消息监控、速率限制、实时翻译和有害信息过滤等功能。安装需在Ubuntu环境中克隆代码库、配置虚拟环境并启动服务器,随后在OPEN-WEBUI管理面板中设置连接即可使用。

Open WebUI v0.8.12:终端安全修复 + API 稳定性提升

Open WebUI v0.8.12 版本发布,重点进行了安全修复、依赖补全与多项Bug修复。主要更新包括:增强多语言翻译;通过后端代理提升终端连接安全性,防止密钥泄露;修复终端工具异常、API文件列表错误和依赖缺失问题;优化管理员模型可见性、工具调用嵌入的显示效果及许可证数据加载。

暂无评论