メインコンテンツへスキップ
AutoGen は、AI エージェントやアプリケーションを構築するための Microsoft 製のフレームワークです。会話型 AI(AgentChat)、コアなマルチエージェント機能(Core)、外部サービスとのインテグレーション(Extensions)のためのコンポーネントを提供し、複雑なマルチエージェントシステムの作成を簡素化します。また、AutoGen はノーコードでエージェントのプロトタイプを作成できる Studio も提供しています。詳細については、AutoGen 公式ドキュメントをご覧ください。
このガイドは、AutoGen に関する基本的な知識があることを前提としています。
Weave は AutoGen と連携し、マルチエージェントアプリケーションの実行をトレースして可視化するのに役立ちます。Weave を初期化するだけで、autogen_agentchatautogen_coreautogen_ext 内のインタラクションを自動的に追跡できます。このガイドでは、AutoGen で Weave を使用するさまざまな例を順を追って説明します。

事前準備

始める前に、AutoGen と Weave がインストールされていることを確認してください。また、使用する予定の LLM プロバイダー(OpenAI、Anthropic など)の SDK も必要です。
pip install autogen_agentchat "autogen_ext[openai,anthropic]" weave 
APIキーを環境変数として設定します。
import os

os.environ["OPENAI_API_KEY"] = "<your-openai-api-key>"
os.environ["ANTHROPIC_API_KEY"] = "<your-anthropic-api-key>"

基本設定

スクリプトの冒頭で Weave を初期化し、トレースのキャプチャを開始します。
import weave
weave.init("autogen-demo")

シンプルなモデルクライアントのトレース

Weave は、AutoGen 内のモデルクライアントに対して直接行われた呼び出しをトレースできます。

クライアントの create 呼び出しをトレースする

この例では、OpenAIChatCompletionClient への単一の呼び出しをトレースする方法を示します。
import asyncio
from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
# from autogen_ext.models.anthropic import AnthropicChatCompletionClient

async def simple_client_call(model_name = "gpt-4o"):
    model_client = OpenAIChatCompletionClient(
        model=model_name,
    )
    # 代わりに Anthropic や他のモデルクライアントを使用することもできます
    # model_client = AnthropicChatCompletionClient(
        # model="claude-3-haiku-20240307"
    # )
    response = await model_client.create(
        [UserMessage(content="Hello, how are you?", source="user")]
    )
    print(response)

asyncio.run(simple_client_call())

autogen-simple-client.png

ストリーミングを伴うクライアントの create 呼び出しをトレースする

Weave は、ストリーミングレスポンスのトレースもサポートしています。

async def simple_client_call_stream(model_name = "gpt-4o"):
    openai_model_client = OpenAIChatCompletionClient(model=model_name)
    async for item in openai_model_client.create_stream(
        [UserMessage(content="Hello, how are you?", source="user")]
    ):
      print(item, flush=True, end="")

asyncio.run(simple_client_call_stream())

autogen-streaming-client.png

Weave によるキャッシュされた呼び出しの記録

AutoGen の ChatCompletionCache を使用すると、Weave はこれらのインタラクションをトレースし、レスポンスがキャッシュから取得されたものか、新しい呼び出しによるものかを表示します。

from autogen_ext.models.cache import ChatCompletionCache

async def run_cache_client(model_name = "gpt-4o"):
      openai_model_client = OpenAIChatCompletionClient(model=model_name)
      cache_client = ChatCompletionCache(openai_model_client,)

      response = await cache_client.create(
          [UserMessage(content="Hello, how are you?", source="user")]
      )
      print(response)  # OpenAI からのレスポンスが出力されるはずです
      response = await cache_client.create(
          [UserMessage(content="Hello, how are you?", source="user")]
      )
      print(response)  # キャッシュされたレスポンスが出力されるはずです

asyncio.run(run_cache_client())

autogen-cached-client.png

ツール呼び出しを伴うエージェントのトレース

Weave はエージェントとそのツールの使用状況をトレースし、エージェントがどのようにツールを選択して実行するかを可視化します。
from autogen_agentchat.agents import AssistantAgent

async def get_weather(city: str) -> str:
    return f"{city} の天気は 73 度で晴れです。"

async def run_agent_with_tools(model_name = "gpt-4o"):
    model_client = OpenAIChatCompletionClient(model=model_name)

    agent = AssistantAgent(
        name="weather_agent",
        model_client=model_client,
        tools=[get_weather],
        system_message="あなたは親切なアシスタントです。",
        reflect_on_tool_use=True,
    )
    # コンソールへの出力をストリーミングする場合:
    # await Console(agent.run_stream(task="ニューヨークの天気はどうですか?"))
    res = await agent.run(task="ニューヨークの天気はどうですか?")
    print(res)
    await model_client.close()

asyncio.run(run_agent_with_tools())

autogen-agent-tools.png

GroupChat のトレース - RoundRobin

RoundRobinGroupChat などのグループチャット内のインタラクションは Weave によってトレースされ、エージェント間の会話の流れを追うことができます。

from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat

# グループチャット全体をトレースしたいため、ここに weave op を追加します。
# これは完全にオプションですが、使用することを強くお勧めします。

@weave.op
async def run_round_robin_group_chat(model_name="gpt-4o"):
    model_client = OpenAIChatCompletionClient(model=model_name)

    primary_agent = AssistantAgent(
        "primary",
        model_client=model_client,
        system_message="あなたは親切な AI アシスタントです。",
    )

    critic_agent = AssistantAgent(
        "critic",
        model_client=model_client,
        system_message="建設的なフィードバックを提供してください。フィードバックが反映されたら 'APPROVE' と答えてください。",
    )

    text_termination = TextMentionTermination("APPROVE")

    team = RoundRobinGroupChat(
        [primary_agent, critic_agent], termination_condition=text_termination
    )
    await team.reset()
    # コンソールへの出力をストリーミングする場合:
    # await Console(team.run_stream(task="秋の季節についての短い詩を書いてください。"))
    result = await team.run(task="秋の季節についての短い詩を書いてください。")
    print(result)
    await model_client.close()


asyncio.run(run_round_robin_group_chat())

round_robin_group_chat.png

メモリのトレース

AutoGen のメモリコンポーネントは Weave でトレースできます。@weave.op() を使用して、読みやすさを向上させるためにメモリ操作を単一のトレースにグループ化できます。

from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType

# メモリ追加の呼び出しとメモリ取得の呼び出しを単一のトレースの下で
# まとめてトレースしたいため、ここに weave op を追加します。
# これは完全にオプションですが、使用することを強くお勧めします。

@weave.op
async def run_memory_agent(model_name="gpt-4o"):
    user_memory = ListMemory()

    await user_memory.add(
        MemoryContent(
            content="天気はメートル法であるべきです",
            mime_type=MemoryMimeType.TEXT,
        )
    )

    await user_memory.add(
        MemoryContent(
            content="食事のレシピはビーガンでなければなりません", mime_type=MemoryMimeType.TEXT
        )
    )

    async def get_weather(city: str, units: str = "imperial") -> str:
        if units == "imperial":
            return f"{city} の天気は 73 °F で晴れです。"
        elif units == "metric":
            return f"{city} の天気は 23 °C で晴れです。"
        else:
            return f"申し訳ありません、{city} の天気はわかりません。"

    model_client = OpenAIChatCompletionClient(model=model_name)
    assistant_agent = AssistantAgent(
        name="assistant_agent",
        model_client=model_client,
        tools=[get_weather],
        memory=[user_memory],
    )

    # コンソールへの出力をストリーミングする場合:
    # stream = assistant_agent.run_stream(task="ニューヨークの天気はどうですか?")
    # await Console(stream)
    result = await assistant_agent.run(task="ニューヨークの天気はどうですか?")
    print(result)
    await model_client.close()


asyncio.run(run_memory_agent())

autogen-memory.png

RAG ワークフローのトレース

ChromaDBVectorMemory のようなメモリシステムを使用したドキュメントのインデックス作成や検索を含む、検索拡張生成(RAG)ワークフローはトレース可能です。RAG プロセスを @weave.op() でデコレートすることで、フロー全体の可視化に役立ちます。
この RAG の例には chromadb が必要です。pip install chromadb でインストールしてください。
# !pip install -q chromadb 
# 環境に chromadb がインストールされていることを確認してください: `pip install chromadb`

import re
from typing import List
import os
from pathlib import Path

import aiofiles
import aiohttp

from autogen_core.memory import Memory, MemoryContent, MemoryMimeType
from autogen_ext.memory.chromadb import (
    ChromaDBVectorMemory,
    PersistentChromaDBVectorMemoryConfig,
)

class SimpleDocumentIndexer:
    def __init__(self, memory: Memory, chunk_size: int = 1500) -> None:
        self.memory = memory
        self.chunk_size = chunk_size

    async def _fetch_content(self, source: str) -> str:
        if source.startswith(("http://", "https://")):
            async with aiohttp.ClientSession() as session:
                async with session.get(source) as response:
                    return await response.text()
        else:
            async with aiofiles.open(source, "r", encoding="utf-8") as f:
                return await f.read()

    def _strip_html(self, text: str) -> str:
        text = re.sub(r"<[^>]*>", " ", text)
        text = re.sub(r"\\s+", " ", text)
        return text.strip()

    def _split_text(self, text: str) -> List[str]:
        chunks: list[str] = []
        for i in range(0, len(text), self.chunk_size):
            chunk = text[i : i + self.chunk_size]
            chunks.append(chunk.strip())
        return chunks

    async def index_documents(self, sources: List[str]) -> int:
        total_chunks = 0
        for source in sources:
            try:
                content = await self._fetch_content(source)
                if "<" in content and ">" in content:
                    content = self._strip_html(content)
                chunks = self._split_text(content)
                for i, chunk in enumerate(chunks):
                    await self.memory.add(
                        MemoryContent(
                            content=chunk,
                            mime_type=MemoryMimeType.TEXT,
                            metadata={"source": source, "chunk_index": i},
                        )
                    )
                total_chunks += len(chunks)
            except Exception as e:
                print(f"Error indexing {source}: {str(e)}")
        return total_chunks

@weave.op
async def run_rag_agent(model_name="gpt-4o"):
    rag_memory = ChromaDBVectorMemory(
        config=PersistentChromaDBVectorMemoryConfig(
            collection_name="autogen_docs",
            persistence_path=os.path.join(str(Path.home()), ".chromadb_autogen_weave"),
            k=3,
            score_threshold=0.4,
        )
    )
    # await rag_memory.clear() # 必要に応じて既存のメモリをクリアするにはコメント解除してください

    async def index_autogen_docs() -> None:
        indexer = SimpleDocumentIndexer(memory=rag_memory)
        sources = [
            "https://raw.githubusercontent.com/microsoft/autogen/main/README.md",
            "https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/agents.html",
        ]
        chunks: int = await indexer.index_documents(sources)
        print(f"{len(sources)} 個の AutoGen ドキュメントから {chunks} 個のチャンクをインデックスしました")
    
    # コレクションが空の場合、または再インデックスしたい場合にのみインデックスを作成します
    # デモ目的では、毎回インデックスを作成するか、既にインデックスされているかを確認します
    # この例では、各実行ごとにインデックスを試みます。チェックの追加を検討してください
    await index_autogen_docs()

    model_client = OpenAIChatCompletionClient(model=model_name)
    rag_assistant = AssistantAgent(
        name="rag_assistant",
        model_client=model_client,
        memory=[rag_memory],
    )
    
    # コンソールへの出力をストリーミングする場合:
    # stream = rag_assistant.run_stream(task="AgentChat とは何ですか?")
    # await Console(stream)
    result = await rag_assistant.run(task="AgentChat とは何ですか?")
    print(result)

    await rag_memory.close()
    await model_client.close()

asyncio.run(run_rag_agent())
autogen-rag.png

エージェントランタイムのトレース

Weave は、SingleThreadedAgentRuntime のような AutoGen のエージェントランタイム内の操作をトレースできます。ランタイム実行関数の周囲で @weave.op() を使用すると、関連するトレースをグループ化できます。
from dataclasses import dataclass
from typing import Callable

from autogen_core import (
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    default_subscription,
    message_handler,
    AgentId,
    SingleThreadedAgentRuntime
)

@dataclass
class Message:
    content: int

@default_subscription
class Modifier(RoutedAgent):
    def __init__(self, modify_val: Callable[[int], int]) -> None:
        super().__init__("A modifier agent.")
        self._modify_val = modify_val

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        val = self._modify_val(message.content)
        print(f"{'-'*80}\\nModifier:\\nModified {message.content} to {val}")
        await self.publish_message(Message(content=val), DefaultTopicId())

@default_subscription
class Checker(RoutedAgent):
    def __init__(self, run_until: Callable[[int], bool]) -> None:
        super().__init__("A checker agent.")
        self._run_until = run_until

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> None:
        if not self._run_until(message.content):
            print(f"{'-'*80}\\nChecker:\\n{message.content} passed the check, continue.")
            await self.publish_message(
                Message(content=message.content), DefaultTopicId()
            )
        else:
            print(f"{'-'*80}\\nChecker:\\n{message.content} failed the check, stopping.")

# エージェントランタイムの呼び出し全体を単一のトレースの下で
# まとめてトレースしたいため、ここに weave op を追加します。
# これは完全にオプションですが、使用することを強くお勧めします。

@weave.op
async def run_agent_runtime() -> None:
    runtime = SingleThreadedAgentRuntime()

    await Modifier.register(
        runtime,
        "modifier",
        lambda: Modifier(modify_val=lambda x: x - 1),
    )

    await Checker.register(
        runtime,
        "checker",
        lambda: Checker(run_until=lambda x: x <= 1),
    )

    runtime.start()
    await runtime.send_message(Message(content=3), AgentId("checker", "default"))
    await runtime.stop_when_idle()

asyncio.run(run_agent_runtime())

autogen-runtime.png

ワークフローのトレース (シーケンシャル)

エージェントのインタラクションのシーケンスを定義する複雑なエージェントワークフローをトレースできます。@weave.op() を使用して、ワークフロー全体のハイレベルなトレースを提供できます。

from autogen_core import TopicId, type_subscription
from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessage

@dataclass
class WorkflowMessage:
    content: str

concept_extractor_topic_type = "ConceptExtractorAgent"
writer_topic_type = "WriterAgent"
format_proof_topic_type = "FormatProofAgent"
user_topic_type = "User"

@type_subscription(topic_type=concept_extractor_topic_type)
class ConceptExtractorAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A concept extractor agent.")
        self._system_message = SystemMessage(
            content=(
                "あなたはマーケティングアナリストです。製品の説明が与えられたら、以下を特定してください:\n"
                "- 主な機能\n"
                "- ターゲットオーディエンス\n"
                "- 独自のセールスポイント (USP)\n\n"
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_user_description(self, message: WorkflowMessage, ctx: MessageContext) -> None:
        prompt = f"製品説明: {message.content}"
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
        await self.publish_message(
            WorkflowMessage(response), topic_id=TopicId(writer_topic_type, source=self.id.key)
        )

@type_subscription(topic_type=writer_topic_type)
class WriterAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A writer agent.")
        self._system_message = SystemMessage(
            content=(
                "あなたはマーケティングコピーライターです。機能、対象読者、USP を説明するテキストのブロックが与えられたら、"
                "これらのポイントを強調する魅力的なマーケティングコピー(ニュースレターのセクションのようなもの)を作成してください。"
                "出力は短く(150語程度)し、コピーだけを単一のテキストブロックとして出力してください。"
            )
        )
        self._model_client = model_client
    
    @message_handler
    async def handle_intermediate_text(self, message: WorkflowMessage, ctx: MessageContext) -> None:
        prompt = f"以下は製品に関する情報です:\\n\\n{message.content}"
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
        await self.publish_message(
            WorkflowMessage(response), topic_id=TopicId(format_proof_topic_type, source=self.id.key)
        )

@type_subscription(topic_type=format_proof_topic_type)
class FormatProofAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A format & proof agent.")
        self._system_message = SystemMessage(
            content=(
                "あなたは編集者です。下書きコピーが与えられたら、文法を修正し、明快さを向上させ、一貫したトーンを確保し、"
                "フォーマットを整えて洗練されたものにしてください。最終的な改善されたコピーを単一のテキストブロックとして出力してください。"
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_intermediate_text(self, message: WorkflowMessage, ctx: MessageContext) -> None:
        prompt = f"下書きコピー:\\n{message.content}."
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
        await self.publish_message(
            WorkflowMessage(response), topic_id=TopicId(user_topic_type, source=self.id.key)
        )

@type_subscription(topic_type=user_topic_type)
class UserAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("最終的なコピーをユーザーに出力するユーザーエージェント。")

    @message_handler
    async def handle_final_copy(self, message: WorkflowMessage, ctx: MessageContext) -> None:
        print(f"\\n{'-'*80}\\n{self.id.type} が最終コピーを受信しました:\\n{message.content}")

# エージェントワークフロー全体を単一のトレースの下で
# まとめてトレースしたいため、ここに weave op を追加します。
# これは完全にオプションですが、使用することを強くお勧めします。

@weave.op(call_display_name="Sequential Agent Workflow")
async def run_agent_workflow(model_name="gpt-4o"):
    model_client = OpenAIChatCompletionClient(model=model_name)
    runtime = SingleThreadedAgentRuntime()

    await ConceptExtractorAgent.register(runtime, type=concept_extractor_topic_type, factory=lambda: ConceptExtractorAgent(model_client=model_client))
    await WriterAgent.register(runtime, type=writer_topic_type, factory=lambda: WriterAgent(model_client=model_client))
    await FormatProofAgent.register(runtime, type=format_proof_topic_type, factory=lambda: FormatProofAgent(model_client=model_client))
    await UserAgent.register(runtime, type=user_topic_type, factory=lambda: UserAgent())

    runtime.start()
    await runtime.publish_message(
        WorkflowMessage(
            content="飲料を24時間冷たく保つ、環境に優しいステンレス製ウォーターボトル"
        ),
        topic_id=TopicId(concept_extractor_topic_type, source="default"),
    )
    await runtime.stop_when_idle()
    await model_client.close()

asyncio.run(run_agent_workflow())

autogen-sequential-workflow.png

コードエクゼキューターのトレース

Docker が必要です この例では Docker を使用したコード実行が含まれており、すべての環境(Colab など)で直接動作しない場合があります。これを試す場合は、Docker がローカルで実行されていることを確認してください。
Weave は、AutoGen エージェントによるコードの生成と実行をトレースします。

import tempfile
from autogen_core import DefaultTopicId
from autogen_core.code_executor import CodeBlock, CodeExecutor
from autogen_core.models import (
    AssistantMessage,
    ChatCompletionClient,
    LLMMessage,
    SystemMessage,
    UserMessage,
)
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor


@dataclass
class CodeGenMessage:
    content: str

@default_subscription
class Assistant(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("An assistant agent.")
        self._model_client = model_client
        self._chat_history: List[LLMMessage] = [
           SystemMessage(
                content="""PythonスクリプトをMarkdownブロックで書いてください。それは実行されます。
図は常に現在のディレクトリのファイルに保存してください。plt.show() は使用しないでください。このタスクを完了するために必要なすべてのコードは、単一のレスポンスに含まれている必要があります。""",
            )
        ]

    @message_handler
    async def handle_message(self, message: CodeGenMessage, ctx: MessageContext) -> None:
        self._chat_history.append(UserMessage(content=message.content, source="user"))
        result = await self._model_client.create(self._chat_history)
        print(f"\\n{'-'*80}\\nAssistant:\\n{result.content}")
        self._chat_history.append(AssistantMessage(content=result.content, source="assistant"))
        await self.publish_message(CodeGenMessage(content=result.content), DefaultTopicId())

def extract_markdown_code_blocks(markdown_text: str) -> List[CodeBlock]:
    pattern = re.compile(r"```(?:\\s*([\\w\\+\\-]+))?\\n([\\s\\S]*?)```")
    matches = pattern.findall(markdown_text)
    code_blocks: List[CodeBlock] = []
    for match in matches:
        language = match[0].strip() if match[0] else ""
        code_content = match[1]
        code_blocks.append(CodeBlock(code=code_content, language=language))
    return code_blocks

@default_subscription
class Executor(RoutedAgent):
    def __init__(self, code_executor: CodeExecutor) -> None:
        super().__init__("An executor agent.")
        self._code_executor = code_executor

    @message_handler
    async def handle_message(self, message: CodeGenMessage, ctx: MessageContext) -> None:
        code_blocks = extract_markdown_code_blocks(message.content)
        if code_blocks:
            result = await self._code_executor.execute_code_blocks(
                code_blocks, cancellation_token=ctx.cancellation_token
            )
            print(f"\\n{'-'*80}\\nExecutor:\\n{result.output}")
            await self.publish_message(CodeGenMessage(content=result.output), DefaultTopicId())

# コード生成ワークフロー全体を単一のトレースの下で
# まとめてトレースしたいため、ここに weave op を追加します。
# これは完全にオプションですが、使用することを強くお勧めします。

@weave.op(call_display_name="CodeGen Agent Workflow")
async def run_codegen(model_name="gpt-4o"): # モデルを更新
    work_dir = tempfile.mkdtemp()
    runtime = SingleThreadedAgentRuntime()

    # この例では Docker が実行されていることを確認してください
    try:
        async with DockerCommandLineCodeExecutor(work_dir=work_dir) as executor:
            model_client = OpenAIChatCompletionClient(model=model_name)
            await Assistant.register(runtime, "assistant", lambda: Assistant(model_client=model_client))
            await Executor.register(runtime, "executor", lambda: Executor(executor))

            runtime.start()
            await runtime.publish_message(
                CodeGenMessage(content="2024-01-01からのNVDA対TSLAの株価収益率(YTD)のプロットを作成してください。"),
                DefaultTopicId(),
            )
            await runtime.stop_when_idle()
            await model_client.close()
    except Exception as e:
        print(f"Docker コードエクゼキューターの例を実行できませんでした: {e}")
        print("Docker がインストールされ、実行されていることを確認してください。")
    finally:
        import shutil
        shutil.rmtree(work_dir)


asyncio.run(run_codegen())
autogen-codegen.png

詳細情報

このガイドは、Weave と AutoGen の統合を開始するための出発点となります。Weave UI を探索して、エージェントのインタラクション、モデルの呼び出し、ツールの使用状況の詳細なトレースを確認してください。