Title here
Summary here
We’ve made building tool-using experts (agents) much simpler and more reliable:
MCPRuntime
owns connect/refresh/close, so agents don’t.make_resilient_tools_node(...)
auto-recovers from timeouts/401s and still emits ToolMessages so your graph stays valid (no “dangling tool_calls” 400s).McpToolkit
wraps raw MCP tools and injects runtime context (e.g., library/project filters) automatically.Use the Generalist template for chat-only agents, and the Sentinel template when you need MCP tools (OpenSearch/KPIs/etc.).
This expert just chats—no tools. It shows the smallest, “good citizen” AgentFlow pattern.
# app/agents/generalist/generalist.py
import logging
from datetime import datetime
from langgraph.constants import START
from langgraph.graph import MessagesState, StateGraph
from app.core.agents.flow import AgentFlow
from app.core.model.model_factory import get_model
from app.common.structures import AgentSettings
logger = logging.getLogger(__name__)
class GeneralistExpert(AgentFlow):
name: str
role: str
nickname: str
description: str
icon: str = "assistant"
categories: list[str] = []
tag: str = "general"
def __init__(self, agent_settings: AgentSettings):
self.agent_settings = agent_settings
self.name = agent_settings.name
self.nickname = agent_settings.nickname or agent_settings.name
self.role = agent_settings.role
self.description = agent_settings.description
self.categories = agent_settings.categories or ["general"]
self.tag = agent_settings.tag or "general"
self.current_date = datetime.now().strftime("%Y-%m-%d")
self.base_prompt = (
"You are a thoughtful, concise assistant. "
f"Current date: {self.current_date}."
)
self.model = None
self._graph = None
async def async_init(self):
# 1) choose model
self.model = get_model(self.agent_settings.model)
# 2) build a tiny graph: START -> reasoner
builder = StateGraph(MessagesState)
builder.add_node("reasoner", self.reasoner)
builder.add_edge(START, "reasoner")
self._graph = builder.compile()
# 3) register with AgentFlow base
super().__init__(
name=self.name,
role=self.role,
nickname=self.nickname,
description=self.description,
icon=self.icon,
graph=self._graph,
base_prompt=self.base_prompt,
categories=self.categories,
tag=self.tag,
toolkit=None, # no tools for this expert
)
async def reasoner(self, state: MessagesState):
# prepend base prompt and let the model answer
resp = self.model.invoke([self.base_prompt] + state["messages"])
return {"messages": [resp]}
A production-ready pattern for experts that call MCP tools (OpenSearch, KPI, etc.). The key parts are:
# app/agents/sentinel/sentinel.py
import json
import logging
from datetime import datetime
from typing import Any, Dict
from langchain_core.messages import ToolMessage
from langgraph.constants import START
from langgraph.graph import MessagesState, StateGraph
from langgraph.prebuilt import tools_condition
from app.core.agents.flow import AgentFlow
from app.core.model.model_factory import get_model
from app.common.structures import AgentSettings
from app.common.mcp_runtime import MCPRuntime
from app.common.resilient_tool_node import make_resilient_tools_node
logger = logging.getLogger(__name__)
class SentinelExpert(AgentFlow):
name: str
role: str
nickname: str
description: str
icon: str = "ops_agent"
categories: list[str] = []
tag: str = "ops"
def __init__(self, agent_settings: AgentSettings):
self.agent_settings = agent_settings
self.name = agent_settings.name
self.nickname = agent_settings.nickname or agent_settings.name
self.role = agent_settings.role
self.description = agent_settings.description
self.categories = agent_settings.categories or ["ops", "monitoring"]
self.tag = agent_settings.tag or "ops"
self.current_date = datetime.now().strftime("%Y-%m-%d")
self.base_prompt = (
"You are Sentinel, an operations/monitoring expert for Fred.\n"
"- Use os.* tools for OpenSearch health, shards, indices, mappings, diagnostics.\n"
"- Use kpi.* tools for usage, latency, error rates.\n"
"Return concise, actionable summaries with next steps. "
f"Current date: {self.current_date}."
)
self.model = None
self._graph = None
# NEW: shared runtime for MCP
self.mcp = MCPRuntime(
agent_settings=self.agent_settings,
context_provider=lambda: self.get_runtime_context(), # optional
)
async def async_init(self):
# 1) model
self.model = get_model(self.agent_settings.model)
# 2) connect MCP once
await self.mcp.init()
# 3) bind tools on the model
self.model = self.model.bind_tools(self.mcp.get_tools())
# 4) graph: START -> reasoner -> (if tools) tools -> reasoner
builder = StateGraph(MessagesState)
builder.add_node("reasoner", self.reasoner)
# resilient ToolNode: retries/timeouts/401 and yields ToolMessages fallback
tools_node = make_resilient_tools_node(
get_tools=lambda: self.mcp.get_tools(),
refresh_cb=self.mcp.refresh, # atomically refresh client+toolkit
)
builder.add_node("tools", tools_node)
builder.add_edge(START, "reasoner")
builder.add_conditional_edges("reasoner", tools_condition)
builder.add_edge("tools", "reasoner")
self._graph = builder.compile()
# 5) register with AgentFlow
super().__init__(
name=self.name,
role=self.role,
nickname=self.nickname,
description=self.description,
icon=self.icon,
graph=self._graph,
base_prompt=self.base_prompt,
categories=self.categories,
tag=self.tag,
toolkit=None, # toolkit is owned inside MCPRuntime
)
async def reasoner(self, state: MessagesState):
"""
One LLM step; may call tools. After tools run, collect ToolMessages so
the UI can display the raw tool payloads if useful.
"""
resp = self.model.invoke([self.base_prompt] + state["messages"])
# Collect tool outputs by tool name (last win)
tool_payloads: Dict[str, Any] = {}
for msg in state["messages"]:
if isinstance(msg, ToolMessage) and getattr(msg, "name", ""):
raw = msg.content
try:
tool_payloads[msg.name] = json.loads(raw) if isinstance(raw, str) else raw
except Exception:
tool_payloads[msg.name] = raw
meta = resp.response_metadata.get("tools", {})
meta.update(tool_payloads)
resp.response_metadata["tools"] = meta
return {"messages": [resp]}
You don’t need to call these directly inside your node—make_resilient_tools_node will trigger refresh_cb when needed.
That’s it—your expert is robust to transient MCP issues and easy to maintain.
Build fast. Stay resilient. Happy shipping 🚀