GraphRAG+Ollama本地部署实现

前文已经介绍过GraphRAG在调用llm api的方式的部署方式,如果有需要的可以去看我写的上一篇文章
微软开源GraphRAG的使用教程(最全,非常详细)
但上述有一个关键的问题——由于GraphRAG过于消耗token,所以money可能接受不了 (ps:听说如果用GPT-4o可能需要46刀)
为了找到一种省钱的方式并且能够使用GraphRAG便可以调用Ollama本地部署的开源大模型,但需要修改部分源码,我已经实现过了,给大家避坑,快速部署。如果对你有用的,欢迎点赞、收藏!

前提

在使用该教程之前确保自己电脑已经安装好Ollama,anaconda等基础配置

1. 创建一个新的虚拟环境(前提已经安装好anaconda),此处推荐python版本3.10

conda create -n graphrag-ollama-local python=3.10
conda activate graphrag-ollama-local

2.安装Ollama

pip install ollama

3.使用Ollama下载使用的开源模型

(1)此处需要选择一个llm和一个embedding模型,这里我选择mistral(4.1GB) 和nomic-embed-text(278MB)

此处也可以自己选择模型,可以自行去Ollama官网查看

ollama pull mistral  #llm
ollama pull nomic-embed-text  #embedding

(2)检查是否下载成功

ollama list

(3)启动服务

ollama serve

如图已经成功,记得保证http://localhost:11434的ollama默认端口,不要被别的占用

4. 下载源代码

git clone https://github.com/TheAiSingularity/graphrag-local-ollama.git
cd graphrag-local-ollama/

此处已经要用git下载,如果download源代码可能后续会报错

5. 安装依赖包(非常重要!!!)

pip install -e .

此处如果报错,一定要使用git clone源代码,再次运行即可

6. 创建GraphRAG目录,并且创建input/目录用于存放原始文档

mkdir -p ./ragtest/input

7. 将原始文档放入到./ragtest/input目录下(仅支持txt文件,可多个)

cp input/* ./ragtest/input #可根据自己的需求修改

此处给个示例,也可以直接放入./ragtest/input/目录下

8. 初始化项目

python -m graphrag.index --init --root ./ragtest

此处ragtest目录下有output,input,settings.yaml, prompts,.env(默认隐藏)五个目录及文件

9. 移动 settings.yaml 文件,这是用 ollama 本地模型配置的主要预定义配置文件:

mv settings.yaml ./ragtest

10.修改配置文件

此处需要注意,因为我们在第三步选择了mistral(4.1GB) 和nomic-embed-text(278MB),所以需要将setting.yaml中的llm model修改为mistral;embedding model修改为nomic-embed-text。
api_base为http://localhost:11434/v1和http://localhost:11434/api(此为Ollama的默认api调用地址,如果无特殊要求可以不修改)
修改结果如下:

encoding_model: cl100k_base
skip_workflows: []
llm:
  api_key: ${GRAPHRAG_API_KEY}
  type: openai_chat # or azure_openai_chat
  model: mistral
  model_supports_json: false # recommended if this is available for your model.
  # max_tokens: 4000
  # request_timeout: 180.0
  api_base: http://localhost:11434/v1
  # api_version: 2024-02-15-preview
  # organization: <organization_id>
  # deployment_name: <azure_model_deployment_name>
  # tokens_per_minute: 150_000 # set a leaky bucket throttle
  # requests_per_minute: 10_000 # set a leaky bucket throttle
  # max_retries: 10
  # max_retry_wait: 10.0
  # sleep_on_rate_limit_recommendation: true # whether to sleep when azure suggests wait-times
  # concurrent_requests: 25 # the number of parallel inflight requests that may be made

parallelization:
  stagger: 0.3
  # num_threads: 50 # the number of threads to use for parallel processing

async_mode: threaded # or asyncio

embeddings:
  ## parallelization: override the global parallelization settings for embeddings
  async_mode: threaded # or asyncio
  llm:
    api_key: ${GRAPHRAG_API_KEY}
    type: openai_embedding # or azure_openai_embedding
    model: nomic-embed-text
    api_base:  http://localhost:11434/api
    # api_version: 2024-02-15-preview
    # organization: <organization_id>
    # deployment_name: <azure_model_deployment_name>
    # tokens_per_minute: 150_000 # set a leaky bucket throttle
    # requests_per_minute: 10_000 # set a leaky bucket throttle
    # max_retries: 10
    # max_retry_wait: 10.0
    # sleep_on_rate_limit_recommendation: true # whether to sleep when azure suggests wait-times
    # concurrent_requests: 25 # the number of parallel inflight requests that may be made
    # batch_size: 16 # the number of documents to send in a single request
    # batch_max_tokens: 8191 # the maximum number of tokens to send in a single request
    # target: required # or optional
  


chunks:
  size: 200
  overlap: 100
  group_by_columns: [id] # by default, we don't allow chunks to cross documents
    
input:
  type: file # or blob
  file_type: text # or csv
  base_dir: "input"
  file_encoding: utf-8
  file_pattern: ".*\\.txt$"

cache:
  type: file # or blob
  base_dir: "cache"
  # connection_string: <azure_blob_storage_connection_string>
  # container_name: <azure_blob_storage_container_name>

storage:
  type: file # or blob
  base_dir: "output/${timestamp}/artifacts"
  # connection_string: <azure_blob_storage_connection_string>
  # container_name: <azure_blob_storage_container_name>

reporting:
  type: file # or console, blob
  base_dir: "output/${timestamp}/reports"
  # connection_string: <azure_blob_storage_connection_string>
  # container_name: <azure_blob_storage_container_name>

entity_extraction:
  ## llm: override the global llm settings for this task
  ## parallelization: override the global parallelization settings for this task
  ## async_mode: override the global async_mode settings for this task
  prompt: "prompts/entity_extraction.txt"
  entity_types: [organization,person,geo,event]
  max_gleanings: 0

summarize_descriptions:
  ## llm: override the global llm settings for this task
  ## parallelization: override the global parallelization settings for this task
  ## async_mode: override the global async_mode settings for this task
  prompt: "prompts/summarize_descriptions.txt"
  max_length: 500

claim_extraction:
  ## llm: override the global llm settings for this task
  ## parallelization: override the global parallelization settings for this task
  ## async_mode: override the global async_mode settings for this task
  # enabled: true
  prompt: "prompts/claim_extraction.txt"
  description: "Any claims or facts that could be relevant to information discovery."
  max_gleanings: 0

community_report:
  ## llm: override the global llm settings for this task
  ## parallelization: override the global parallelization settings for this task
  ## async_mode: override the global async_mode settings for this task
  prompt: "prompts/community_report.txt"
  max_length: 2000
  max_input_length: 8000

cluster_graph:
  max_cluster_size: 10

embed_graph:
  enabled: false # if true, will generate node2vec embeddings for nodes
  # num_walks: 10
  # walk_length: 40
  # window_size: 2
  # iterations: 3
  # random_seed: 597832

umap:
  enabled: false # if true, will generate UMAP embeddings for nodes

snapshots:
  graphml: yes
  raw_entities: yes
  top_level_nodes: yes

local_search:
  # text_unit_prop: 0.5
  # community_prop: 0.1
  # conversation_history_max_turns: 5
  # top_k_mapped_entities: 10
  # top_k_relationships: 10
  # max_tokens: 12000

global_search:
  # max_tokens: 12000
  # data_max_tokens: 12000
  # map_max_tokens: 1000
  # reduce_max_tokens: 2000
  # concurrency: 32

11. 进行索引,构建图

python -m graphrag.index --root ./ragtest

12.进行全局查询

python -m graphrag.query --root ./ragtest --method global "What is meaning?"

13. 进行局部查询时,修改源码(可选,如果需要使用局部查询)

此处需要1处源码修改:
(1)首先找到源码所在位置
graphrag-local-ollama/graphrag/query/llm/oai/embedding.py
(2)安装langchain_community

pip install langchain_community

(3)将该文件替换为下述文件

# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""OpenAI Embedding model implementation."""

import asyncio
from collections.abc import Callable
from typing import Any

import numpy as np
import tiktoken
from tenacity import (
    AsyncRetrying,
    RetryError,
    Retrying,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential_jitter,
)

from graphrag.query.llm.base import BaseTextEmbedding
from graphrag.query.llm.oai.base import OpenAILLMImpl
from graphrag.query.llm.oai.typing import (
    OPENAI_RETRY_ERROR_TYPES,
    OpenaiApiType,
)
from graphrag.query.llm.text_utils import chunk_text
from graphrag.query.progress import StatusReporter

from langchain_community.embeddings import OllamaEmbeddings

class OpenAIEmbedding(BaseTextEmbedding, OpenAILLMImpl):
    """Wrapper for OpenAI Embedding models."""

    def __init__(
        self,
        api_key: str | None = None,
        azure_ad_token_provider: Callable | None = None,
        model: str = "text-embedding-3-small",
        deployment_name: str | None = None,
        api_base: str | None = None,
        api_version: str | None = None,
        api_type: OpenaiApiType = OpenaiApiType.OpenAI,
        organization: str | None = None,
        encoding_name: str = "cl100k_base",
        max_tokens: int = 8191,
        max_retries: int = 10,
        request_timeout: float = 180.0,
        retry_error_types: tuple[type[BaseException]] = OPENAI_RETRY_ERROR_TYPES,  # type: ignore
        reporter: StatusReporter | None = None,
    ):
        OpenAILLMImpl.__init__(
            self=self,
            api_key=api_key,
            azure_ad_token_provider=azure_ad_token_provider,
            deployment_name=deployment_name,
            api_base=api_base,
            api_version=api_version,
            api_type=api_type,  # type: ignore
            organization=organization,
            max_retries=max_retries,
            request_timeout=request_timeout,
            reporter=reporter,
        )

        self.model = model
        self.encoding_name = encoding_name
        self.max_tokens = max_tokens
        self.token_encoder = tiktoken.get_encoding(self.encoding_name)
        self.retry_error_types = retry_error_types

    def embed(self, text: str, **kwargs: Any) -> list[float]:
        """
        Embed text using OpenAI Embedding's sync function.

        For text longer than max_tokens, chunk texts into max_tokens, embed each chunk, then combine using weighted average.
        Please refer to: https://github.com/openai/openai-cookbook/blob/main/examples/Embedding_long_inputs.ipynb
        """
        token_chunks = chunk_text(
            text=text, token_encoder=self.token_encoder, max_tokens=self.max_tokens
        )
        chunk_embeddings = []
        chunk_lens = []
        for chunk in token_chunks:
            try:
                embedding, chunk_len = self._embed_with_retry(chunk, **kwargs)
                chunk_embeddings.append(embedding)
                chunk_lens.append(chunk_len)
            except Exception as e:  # noqa BLE001
                self._reporter.error(
                    message="Error embedding chunk",
                    details={self.__class__.__name__: str(e)},
                )
                continue
        chunk_embeddings = np.average(chunk_embeddings, axis=0, weights=chunk_lens)
        chunk_embeddings = chunk_embeddings / np.linalg.norm(chunk_embeddings)
        return chunk_embeddings.tolist()

    async def aembed(self, text: str, **kwargs: Any) -> list[float]:
        """
        Embed text using OpenAI Embedding's async function.

        For text longer than max_tokens, chunk texts into max_tokens, embed each chunk, then combine using weighted average.
        """
        token_chunks = chunk_text(
            text=text, token_encoder=self.token_encoder, max_tokens=self.max_tokens
        )
        chunk_embeddings = []
        chunk_lens = []
        embedding_results = await asyncio.gather(*[
            self._aembed_with_retry(chunk, **kwargs) for chunk in token_chunks
        ])
        embedding_results = [result for result in embedding_results if result[0]]
        chunk_embeddings = [result[0] for result in embedding_results]
        chunk_lens = [result[1] for result in embedding_results]
        chunk_embeddings = np.average(chunk_embeddings, axis=0, weights=chunk_lens)  # type: ignore
        chunk_embeddings = chunk_embeddings / np.linalg.norm(chunk_embeddings)
        return chunk_embeddings.tolist()

    def _embed_with_retry(
        self, text: str | tuple, **kwargs: Any
    ) -> tuple[list[float], int]:
        try:
            retryer = Retrying(
                stop=stop_after_attempt(self.max_retries),
                wait=wait_exponential_jitter(max=10),
                reraise=True,
                retry=retry_if_exception_type(self.retry_error_types),
            )
            for attempt in retryer:
                with attempt:
                    embedding = (
                        OllamaEmbeddings(
                            model=self.model,
                        ).embed_query(text)
                        or []
                    )
                    return (embedding, len(text))
        except RetryError as e:
            self._reporter.error(
                message="Error at embed_with_retry()",
                details={self.__class__.__name__: str(e)},
            )
            return ([], 0)
        else:
            # TODO: why not just throw in this case?
            return ([], 0)

    async def _aembed_with_retry(
        self, text: str | tuple, **kwargs: Any
    ) -> tuple[list[float], int]:
        try:
            retryer = AsyncRetrying(
                stop=stop_after_attempt(self.max_retries),
                wait=wait_exponential_jitter(max=10),
                reraise=True,
                retry=retry_if_exception_type(self.retry_error_types),
            )
            async for attempt in retryer:
                with attempt:
                    embedding = (
                        await OllamaEmbeddings(
                            model=self.model,
                        ).embed_query(text) or [] )
                    return (embedding, len(text))
        except RetryError as e:
            self._reporter.error(
                message="Error at embed_with_retry()",
                details={self.__class__.__name__: str(e)},
            )
            return ([], 0)
        else:
            # TODO: why not just throw in this case?
            return ([], 0)

到这里完全把坑跳过去了,就可以正常使用了,大功告成。
(3)进行局部查询(local method)

python -m graphrag.query --root ./ragtest --method local "What is meaning?"

附加内容

如果用的embedding模型不是示例的nomic-embed-text,需要再修改一处源码,找到这个目录下的文件
graphrag-local-ollama/graphrag/llm/openai/create_openai_client.py,修改这个model为自己调用的模型
在这里插入图片描述

报错总结

根据大家在评论区的报错,给大家总结一下

1.在查询时报如下错误:

ZeroDivisionError: Weights sum to zero, can't be normalized

或者

json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

解决方法:
找到graphrag-local-ollama/graphrag/query/structured_search/global_search/search.py
把其中所有的search_messages(一共两处)修改为如下格式,注释掉的为原始内容:

#search_messages = [
#  {"role": "system", "content": search_prompt},
#  {"role": "user", "content": query},
#]
search_messages = [ {"role": "user", "content": search_prompt + "\n\n### USER QUESTION ### \n\n" + query} ]

2.prompt token长度超出模型的上下文窗口

报错信息:

[GIN] 2024/11/18 - 14:04:10 | 500 | 3m0s | 127.0.0.1 | POST "/v1/chat/completions"
time=2024-11-18T14:04:10.031+08:00 level=WARN source=runner.go:126 msg="truncating input prompt" limit=2048 prompt=2564 numKeep=5

在settting.yaml中把chunk_size调小一点。

总结

通过上述方式,我们已经可以通过ollama调用本地部署的开源大模型进行GraphRAG的测试了,但是我两种方式用下来,开源模型由于参数量比较小,整体的效果不是太好,所以此处如果GPU的显存足够的话,可以选用参数量更大的qwen2或其他效果更好的大模型进行测试。如果使用过程中还有任何问题,欢迎在讨论区讨论。

Logo

为开发者提供按需使用的算力基础设施。

更多推荐