Mastering the Microsoft Agent Framework: A Comprehensive Learning Guide
Welcome to the exciting world of AI agents! This document is designed to be your comprehensive guide to the Microsoft Agent Framework, a powerful, open-source SDK and runtime that simplifies the creation, deployment, and management of intelligent AI agents and complex multi-agent systems. Whether you’re a seasoned developer looking to dive into agentic AI or a complete beginner, this guide will walk you through everything you need to know, from the foundational concepts to building sophisticated, production-ready applications.
1. Introduction to Microsoft Agent Framework
What is the Microsoft Agent Framework?
The Microsoft Agent Framework is an open-source SDK and runtime for both .NET and Python, designed to be the unified foundation for building sophisticated AI agents. It effectively merges the strengths of two previously distinct Microsoft AI projects:
- Semantic Kernel: Known for its stable SDK, robust enterprise connectors, structured workflows, and advanced observability features.
- AutoGen: A research-driven framework that pioneered innovative multi-agent orchestration patterns and collaborative AI techniques.
The Microsoft Agent Framework (MAF) aims to address the challenges developers face in moving AI agent prototypes to production. It provides a consistent developer experience, offering tools to:
- Build individual agents with minimal boilerplate.
- Orchestrate complex multi-agent workflows with ease.
- Host and Deploy agents using familiar development patterns.
- Monitor and Manage agent behavior in production environments.
In essence, MAF allows you to create autonomous software components that can reason about goals, call tools and APIs, collaborate with other agents, and dynamically adapt to achieve objectives.
Why Learn the Microsoft Agent Framework?
Learning MAF offers significant benefits for developers and organizations venturing into agentic AI:
- Unified Platform: No longer choose between experimental innovation (AutoGen) and enterprise stability (Semantic Kernel). MAF offers the best of both worlds in a single framework.
- Open Standards & Interoperability: Built with open standards like Model Context Protocol (MCP), Agent-to-Agent (A2A), and OpenAPI, ensuring your agents are portable and can communicate across different frameworks and cloud environments. This is crucial for integrating with existing systems and avoiding vendor lock-in.
- Research-to-Production Pipeline: Get access to cutting-edge orchestration patterns directly from Microsoft Research, hardened for enterprise use with features like durability, governance, and performance.
- Enterprise Readiness: Out-of-the-box support for critical enterprise requirements:
- Observability: Built-in telemetry, logging, and OpenTelemetry contributions for standardized tracing.
- Security & Compliance: Microsoft Entra ID integration for agent identities, robust security features, compliance hooks (task adherence checks, prompt shields, PII detection), and approval flows for sensitive actions.
- Durability: Agents can pause, resume, and recover from interruptions, supporting long-running workflows.
- Memory Management: Pluggable memory modules (Redis, Pinecone, Postgres, etc.) for persistent context and personalized experiences.
- Developer Experience: Offers a consistent, simplified experience across .NET and Python, with rich samples, VS Code integration, and the flexibility to swap AI models/providers without code changes.
- Wide Use Cases: Enables the creation of powerful solutions for:
- Automated customer service (intent triage, specialist agents)
- Intelligent content generation (writing, editing, SEO)
- Complex enterprise automation (sales proposals, compliance, IT support)
- Research and data analysis
- Personalized assistants (travel planning, budget management)
The AI agent market is projected for significant growth, and MAF positions you to build robust solutions that drive real business impact.
Setting Up Your Development Environment
To follow along with this guide, you’ll need to set up your environment. We will focus on Python, as it’s highly popular for AI/ML development.
Prerequisites:
- Python 3.9+: Ensure you have Python installed. You can download it from python.org.
- Verify installation:
python --versionorpython3 --version
- Verify installation:
- pip: Python’s package installer. It usually comes with Python.
- Verify installation:
pip --versionorpip3 --version
- Verify installation:
- Virtual Environment (Recommended): Always use a virtual environment to manage project dependencies.
- Code Editor: Visual Studio Code is highly recommended due to excellent Python and AI tooling support.
- Install the Python extension for VS Code.
- Consider the VS Code AI Toolkit for streamlined agent development (currently more C#-focused but expanding).
- Azure Account (Optional but Recommended for Full Features): Many enterprise features like Azure OpenAI, Azure Cognitive Search, and Microsoft Entra ID require an Azure subscription.
- GitHub Personal Access Token (PAT): If you plan to use GitHub-hosted models (for early exploration or specific scenarios), you’ll need a PAT with
modelsscope. Create one in your GitHub settings.
Step-by-Step Environment Setup:
Create a Project Directory:
mkdir my-maf-project cd my-maf-projectCreate and Activate a Virtual Environment:
python -m venv .venv # On Windows: # .venv\Scripts\activate # On macOS/Linux: source .venv/bin/activate(You’ll see
(.venv)prefix in your terminal after activation).Install Microsoft Agent Framework SDKs: The Python SDK for Microsoft Agent Framework is typically distributed in modular packages. Based on current information, you’ll install core components. The
microsoft-agents-hosting-aiohttppackage provides the necessary server components for an agent.pip install microsoft-agents-hosting-aiohttp python-dotenv openai azure-identity azure-core redis langchainmicrosoft-agents-hosting-aiohttp: Core SDK for hosting agents using aiohttp.python-dotenv: For managing environment variables.openai: For interacting with OpenAI/Azure OpenAI models.azure-identity: For Azure authentication.azure-core: Core Azure library.redis: For Redis connections (memory/history).langchain: We’ll include this for our migration project, as MAF and LangChain can interoperate.
Set Environment Variables (e.g., for Azure OpenAI): Create a
.envfile in your project root and add your credentials.# .env # Azure OpenAI Service details AZURE_OPENAI_API_KEY="YOUR_AZURE_OPENAI_API_KEY" AZURE_OPENAI_ENDPOINT="https://YOUR_AOAI_RESOURCE_NAME.openai.azure.com/" AZURE_OPENAI_DEPLOYMENT_NAME="YOUR_GPT_MODEL_DEPLOYMENT_NAME" # e.g., gpt-4o-mini-deployment # Redis connection (optional, for memory) REDIS_HOST="localhost" REDIS_PORT="6379" REDIS_PASSWORD="YOUR_REDIS_PASSWORD" # If you have one # Azure Cognitive Search (optional, for tool/RAG) AZURE_SEARCH_ENDPOINT="https://YOUR_AZURE_SEARCH_SERVICE.search.windows.net" AZURE_SEARCH_API_KEY="YOUR_AZURE_SEARCH_API_KEY" AZURE_SEARCH_INDEX_NAME="YOUR_SEARCH_INDEX" # Microsoft Entra ID (for Auth) - if using for service principals # AZURE_TENANT_ID="YOUR_TENANT_ID" # AZURE_CLIENT_ID="YOUR_CLIENT_ID" # AZURE_CLIENT_SECRET="YOUR_CLIENT_SECRET"Important: Replace placeholder values with your actual credentials. Never commit your
.envfile to version control.
2. Core Concepts and Fundamentals
The Microsoft Agent Framework (MAF) builds upon several key concepts from Semantic Kernel and AutoGen. Understanding these building blocks is crucial for effective agent development.
2.1 Agents
At its core, an Agent in MAF is an autonomous entity capable of accomplishing objectives. Agents are empowered by:
- Reasoning and Decision-Making: Powered by Large Language Models (LLMs), planning algorithms, or custom logic.
- Tool Usage: The ability to call external functions, APIs, or services (referred to as “Tools” or “Skills”).
- Context Awareness: Maintaining state and memory through chat history, threads, vector stores, and enterprise data.
In MAF, the AIAgent is a central abstraction. A common implementation is ChatClientAgent, which wraps an IChatClient (your LLM connector) and provides instructions.
Detailed Explanation:
An agent acts on prompts, leveraging its LLM to decide on actions, use tools, and generate responses. The ChatClientAgent is a direct interface to an LLM, allowing you to imbue it with a persona and specific instructions.
Code Example: Your First MAF Agent (Echo Agent with Azure OpenAI)
Let’s create a simple “Echo Agent” that uses Azure OpenAI to process a message and respond.
# echo_agent.py
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Microsoft Agent Framework Imports (Conceptual/Based on .NET structure)
# Note: The Python SDK is still evolving. This uses currently available modules
# or conceptually aligns with the .NET MAF structure and general Python SDK patterns.
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process, jwt_authorization_middleware
from aiohttp.web import Request, Response, Application, run_app
# OpenAI / Azure OpenAI imports
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential
# --- Configuration for Azure OpenAI ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")
# Authenticate with Azure OpenAI
# For production, consider DefaultAzureCredential for managed identities
try:
aoai_client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version="2024-02-15-preview", # Check latest API version
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
print("Successfully connected to Azure OpenAI.")
except Exception as e:
print(f"Error connecting to Azure OpenAI: {e}")
# Fallback or exit if essential AI service is unavailable
exit(1)
# A simple wrapper for an LLM client that conforms to MAF agent expectations
class AzureOpenAIChatClient:
def __init__(self, client, deployment_name):
self.client = client
self.deployment_name = deployment_name
async def complete(self, messages: list[dict], **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=self.deployment_name,
messages=messages,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
print(f"Error calling Azure OpenAI: {e}")
return "An error occurred while processing your request."
# Initialize our custom chat client
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)
# --- Define our Agent using MAF Python SDK (Echo Agent) ---
# The AgentApplication is the core entity that handles incoming activities
# and routes them to appropriate handlers (message, conversation_update, etc.)
# In MAF Python SDK, agent logic is often implemented as handlers for TurnContext activities.
# The TurnState can store conversational state.
class EchoAgentState(TurnState):
# You can add state variables here
pass
AGENT_APP = AgentApplication[EchoAgentState](
storage=MemoryStorage(), # Simple in-memory storage for demonstration
adapter=CloudAdapter()
)
# Handler for new conversations or members added
async def _welcome_and_help(context: TurnContext, _: EchoAgentState):
await context.send_activity(
"Hello! I am an Echo Agent powered by Azure OpenAI. "
"Type /help for help or send me any message to hear it echoed back."
)
AGENT_APP.conversation_update("membersAdded")(_welcome_and_help)
AGENT_APP.message("/help")(_welcome_and_help)
# Main message handler for the Echo Agent
@AGENT_APP.activity("message")
async def on_message(context: TurnContext, _):
user_message = context.activity.text
# Use our Azure OpenAI client to generate a more "intelligent" echo
# In a real agent, this would involve more complex prompt engineering
# or tool calls.
llm_response = await my_chat_client.complete(
messages=[
{"role": "system", "content": "You are a helpful assistant. Echo the user's message, but add a friendly remark."},
{"role": "user", "content": user_message}
]
)
await context.send_activity(f"You said: '{user_message}'\nAI says: {llm_response}")
# --- FastAPI-like server setup for local testing ---
# This part hosts the agent as an HTTP endpoint, typically for local testing
# or integration with a chatbot frontend.
def start_server(agent_application: AgentApplication):
"""
Starts an aiohttp web server to host the AgentApplication.
"""
async def entry_point(req: Request) -> Response:
agent: AgentApplication = req.app["agent_app"]
adapter: CloudAdapter = req.app["adapter"]
return await start_agent_process(req, agent, adapter)
APP = Application() # middlewares=[jwt_authorization_middleware] for auth
APP.router.add_post("/api/messages", entry_point)
APP.router.add_get("/api/messages", lambda _: Response(status=200)) # Health check
APP["agent_app"] = agent_application
APP["adapter"] = agent_application.adapter
try:
port = int(os.environ.get("PORT", 3978))
print(f"======== Running on http://localhost:{port} ========")
print("(Press CTRL+C to quit)")
run_app(APP, host="localhost", port=port)
except Exception as error:
print(f"Error starting server: {error}")
raise error
if __name__ == "__main__":
start_server(AGENT_APP)
To Run This Example:
Make sure your
.envfile is configured with Azure OpenAI details.Run from your terminal:
python echo_agent.pyThe agent will start on
http://localhost:3978.To interact: You’ll typically need a client. Microsoft provides the
teamsapptesterCLI tool for local interaction, or you can usecurlfor basic POST requests to/api/messages.Using
teamsapptester(Node.js required):npm install -g @microsoft/teams-app-test-tool teamsapptesterThis will open a browser window connected to your agent.
Using
curl(basic POST, won’t show AI response incurldirectly as it’s typically an async chat bot flow):curl -X POST http://localhost:3978/api/messages \ -H "Content-Type: application/json" \ -d '{ "type": "message", "text": "Hello, Agent!", "from": {"id": "user1", "name": "User"}, "conversation": {"id": "conv1"}, "recipient": {"id": "agent1", "name": "EchoAgent"} }'(Note:
curlis for demonstrating the endpoint is active. For a full conversational experience, useteamsapptesteror a custom client).
Exercises/Mini-Challenges:
- Modify the
on_messagehandler to make the AI’s “friendly remark” vary based on the length of the user’s message. - Add a new
/statusmessage handler that responds with a fixed message like “Agent is online and ready!”.
2.2 Tools (Skills)
Detailed Explanation: Agents become powerful when they can interact with the outside world. Tools (often called Skills in Semantic Kernel) are functions or APIs that an agent can call to perform specific tasks. This could be searching the web, querying a database, sending an email, or performing a complex calculation. MAF heavily emphasizes “Typed Tool Calls” for safety and ease of use, leveraging OpenAPI for discoverability.
Code Example: Adding a Web Search Tool
Let’s integrate a simple web search tool. We’ll simulate this with a dummy function for now, but show how it would integrate if it were a real web call.
First, let’s create a tools.py file:
# tools.py
import asyncio
from typing import Dict, Any, List
class SearchTool:
@staticmethod
async def web_search(query: str) -> List[str]:
"""
Performs a simulated web search and returns relevant snippets.
In a real application, this would integrate with a search API (e.g., Bing, Google).
"""
print(f"Executing web search for: {query}")
# Simulate an API call
await asyncio.sleep(1)
if "weather" in query.lower():
return ["Current weather in London: 15°C, cloudy. Expected rain tomorrow."]
elif "capital of france" in query.lower():
return ["The capital of France is Paris."]
elif "python agent framework" in query.lower():
return [
"Microsoft Agent Framework (MAF) is an open-source SDK for building AI agents.",
"LangChain and AutoGen are other popular Python agent frameworks.",
"MAF combines Semantic Kernel and AutoGen capabilities."
]
else:
return [f"Search results for '{query}': No specific results found, but here's some general info."]
@staticmethod
async def get_current_time() -> str:
"""
Retrieves the current date and time.
"""
import datetime
print("Executing get_current_time tool.")
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Now, let’s update echo_agent.py to make use of this tool. The Python SDK uses a Planner concept to enable tool calling, similar to AutoGen’s function calling.
# echo_agent_with_tools.py (Modified from echo_agent.py)
import os
import asyncio
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential
# Import our custom tools
from tools import SearchTool
# --- Configuration (same as before) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")
try:
aoai_client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version="2024-02-15-preview", # Check latest API version
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
print("Successfully connected to Azure OpenAI.")
except Exception as e:
print(f"Error connecting to Azure OpenAI: {e}")
exit(1)
class AzureOpenAIChatClient:
def __init__(self, client, deployment_name):
self.client = client
self.deployment_name = deployment_name
async def complete(self, messages: list[dict], **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=self.deployment_name,
messages=messages,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
print(f"Error calling Azure OpenAI: {e}")
return "An error occurred while processing your request."
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)
# --- Introduce Planner for Tool Calling ---
# In the MAF Python SDK, explicit tool registration and an LLM-driven planner
# are used to enable tool calling. This is where AutoGen's influence is clear.
# For simplicity, we'll create a basic planner that just decides if a search is needed.
# In a full MAF implementation, this would be handled more abstractly by the framework.
# We'll need a mechanism to describe our tools to the LLM so it knows when to use them.
# This often involves creating "function schemas" for the LLM.
def get_tool_schemas():
"""Returns a list of tool descriptions in OpenAI function call format."""
return [
{
"type": "function",
"function": {
"name": "web_search",
"description": "Performs a web search to find relevant information.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "get_current_time",
"description": "Retrieves the current date and time.",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
}
]
# --- Define our Agent with Tool-Calling Capability ---
class ToolUsingAgentState(TurnState):
# We can store conversational history
history: List[Dict[str, Any]] = []
AGENT_APP = AgentApplication[ToolUsingAgentState](
storage=MemoryStorage(),
adapter=CloudAdapter()
)
async def _welcome_and_help(context: TurnContext, state: ToolUsingAgentState):
await context.send_activity(
"Hello! I am an Agent with web search capabilities. "
"Ask me questions like 'What is the weather in London?' or 'What is the current time?'"
)
state.history.append({"role": "assistant", "content": "Hello! I am an Agent with web search capabilities. Ask me questions like 'What is the weather in London?' or 'What is the current time?'"})
AGENT_APP.conversation_update("membersAdded")(_welcome_and_help)
AGENT_APP.message("/help")(_welcome_and_help)
@AGENT_APP.activity("message")
async def on_message_with_tools(context: TurnContext, state: ToolUsingAgentState):
user_message = context.activity.text
state.history.append({"role": "user", "content": user_message})
messages_for_llm = [
{"role": "system", "content": "You are a helpful assistant. Use tools if necessary to answer questions."},
*state.history
]
try:
response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=messages_for_llm,
tools=get_tool_schemas(),
tool_choice="auto" # Let the LLM decide if it wants to call a tool
)
assistant_message = response.choices[0].message
state.history.append(assistant_message)
if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls:
function_name = tool_call.function.name
arguments = tool_call.function.arguments
print(f"Agent wants to call tool: {function_name} with args: {arguments}")
# Execute the tool
tool_output = None
if function_name == "web_search":
query = eval(arguments).get("query") # Be careful with eval in production!
tool_output = await SearchTool.web_search(query)
elif function_name == "get_current_time":
tool_output = await SearchTool.get_current_time()
else:
tool_output = f"Unknown tool: {function_name}"
state.history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": str(tool_output)
})
# Call LLM again with tool output
final_response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=state.history
)
final_assistant_message = final_response.choices[0].message.content
state.history.append({"role": "assistant", "content": final_assistant_message})
await context.send_activity(final_assistant_message)
else:
# If no tool call, just respond with the LLM's message
await context.send_activity(assistant_message.content)
except Exception as e:
print(f"Error in tool-using agent: {e}")
error_msg = "An error occurred while processing your request with tools."
await context.send_activity(error_msg)
state.history.append({"role": "assistant", "content": error_msg})
# --- FastAPI-like server setup (same as before) ---
def start_server(agent_application: AgentApplication):
async def entry_point(req: Request) -> Response:
agent: AgentApplication = req.app["agent_app"]
adapter: CloudAdapter = req.app["adapter"]
return await start_agent_process(req, agent, adapter)
APP = Application()
APP.router.add_post("/api/messages", entry_point)
APP.router.add_get("/api/messages", lambda _: Response(status=200))
APP["agent_app"] = agent_application
APP["adapter"] = agent_application.adapter
try:
port = int(os.environ.get("PORT", 3978))
print(f"======== Running on http://localhost:{port} ========")
print("(Press CTRL+C to quit)")
run_app(APP, host="localhost", port=port)
except Exception as error:
print(f"Error starting server: {error}")
raise error
if __name__ == "__main__":
start_server(AGENT_APP)
To Run This Example:
- Create
tools.pyas described. - Save the agent code as
tool_agent.py. - Ensure your
.envis set up for Azure OpenAI. - Run:
python tool_agent.py - Use
teamsapptesteror a custom client. Try prompts like:- “What is the weather in London?”
- “What is the current time?”
- “Tell me about Python agent frameworks.”
Exercises/Mini-Challenges:
- Add a new tool
MathToolwith methods foradd(a, b)andsubtract(a, b). Update theget_tool_schemasandon_message_with_toolsto allow the agent to use these math functions. - Modify the
web_searchtool to return a more diverse set of results based on a predefined list or a more sophisticated keyword matching.
2.3 Memory and Context Management
Detailed Explanation:
Agents need memory to maintain conversational context, remember past interactions, and retrieve relevant information. MAF provides a pluggable memory system, allowing integration with various storage solutions (e.g., Redis, vector databases). TurnState is used to manage transient state during a “turn” of interaction, while MemoryStorage or other persistent stores handle long-term memory.
Code Example: Persistent Chat History with Redis
Let’s modify our agent to use Redis for persistent chat history.
First, ensure Redis is installed and running, and your .env has Redis connection details.
# redis_agent.py (Building on tool_agent.py)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis # Use async Redis client
load_dotenv()
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential
from tools import SearchTool # Our custom tools
# --- Configuration ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")
try:
aoai_client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version="2024-02-15-preview",
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
print("Successfully connected to Azure OpenAI.")
except Exception as e:
print(f"Error connecting to Azure OpenAI: {e}")
exit(1)
# Initialize Redis client
# Note: For production, use connection pooling and proper error handling.
try:
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
decode_responses=True # Decode responses to string
)
# Test connection
async def _test_redis_connection():
await redis_client.ping()
print("Successfully connected to Redis.")
asyncio.run(_test_redis_connection())
except Exception as e:
print(f"Error connecting to Redis: {e}")
# Proceed without Redis or exit based on your application's tolerance
redis_client = None
class AzureOpenAIChatClient:
def __init__(self, client, deployment_name):
self.client = client
self.deployment_name = deployment_name
async def complete(self, messages: list[dict], **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=self.deployment_name,
messages=messages,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
print(f"Error calling Azure OpenAI: {e}")
return "An error occurred while processing your request."
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)
def get_tool_schemas():
"""Returns a list of tool descriptions in OpenAI function call format."""
return [
{
"type": "function",
"function": {
"name": "web_search",
"description": "Performs a web search to find relevant information.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "get_current_time",
"description": "Retrieves the current date and time.",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
}
]
# --- Custom Redis-backed Memory Storage (Conceptual, as MAF might have its own) ---
# For demonstration, we'll implement a simple Redis-backed storage for chat history.
# MAF's official Redis memory module would abstract this further.
class RedisChatHistoryStorage(MemoryStorage):
def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
self.redis_client = redis_client
self.prefix = prefix
async def _get_key(self, conversation_id: str) -> str:
return f"{self.prefix}{conversation_id}"
async def read(self, conversation_id: str) -> dict:
key = await self._get_key(conversation_id)
raw_history = await self.redis_client.get(key)
if raw_history:
return json.loads(raw_history)
return {"history": []} # Default if no history
async def write(self, conversation_id: str, data: dict):
key = await self._get_key(conversation_id)
await self.redis_client.set(key, json.dumps(data))
# Optional: Set an expiry for the history
# await self.redis_client.expire(key, 3600) # 1 hour
# Our agent state now includes a way to load/save history
class PersistentToolUsingAgentState(TurnState):
history: List[Dict[str, Any]] = []
conversation_id: str = "default_conversation" # Placeholder, actual ID comes from TurnContext
# Use our Redis storage if available, otherwise fallback to in-memory
storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()
AGENT_APP = AgentApplication[PersistentToolUsingAgentState](
storage=storage_instance,
adapter=CloudAdapter()
)
async def _load_history(context: TurnContext, state: PersistentToolUsingAgentState):
if context.activity and context.activity.conversation and context.activity.conversation.id:
state.conversation_id = context.activity.conversation.id
if redis_client:
stored_data = await storage_instance.read(state.conversation_id)
state.history = stored_data.get("history", [])
print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")
async def _save_history(context: TurnContext, state: PersistentToolUsingAgentState):
if redis_client and state.conversation_id:
await storage_instance.write(state.conversation_id, {"history": state.history})
print(f"Saved history for {state.conversation_id}: {len(state.history)} items")
# Register these as pre/post handlers for activities
AGENT_APP.on_before_activity(_load_history)
AGENT_APP.on_after_activity(_save_history)
async def _welcome_and_help(context: TurnContext, state: PersistentToolUsingAgentState):
await context.send_activity(
"Hello! I am a persistent Agent with web search capabilities. "
"Ask me questions like 'What is the weather in London?' or 'What is the current time?'"
)
state.history.append({"role": "assistant", "content": "Hello! I am a persistent Agent with web search capabilities. Ask me questions like 'What is the weather in London?' or 'What is the current time?'"})
AGENT_APP.conversation_update("membersAdded")(_welcome_and_help)
AGENT_APP.message("/help")(_welcome_and_help)
@AGENT_APP.activity("message")
async def on_message_with_tools_persistent(context: TurnContext, state: PersistentToolUsingAgentState):
user_message = context.activity.text
state.history.append({"role": "user", "content": user_message})
messages_for_llm = [
{"role": "system", "content": "You are a helpful assistant. Use tools if necessary to answer questions."},
*state.history
]
try:
response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=messages_for_llm,
tools=get_tool_schemas(),
tool_choice="auto"
)
assistant_message = response.choices[0].message
if assistant_message.content:
state.history.append({"role": "assistant", "content": assistant_message.content})
if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls:
function_name = tool_call.function.name
arguments = tool_call.function.arguments
print(f"Agent wants to call tool: {function_name} with args: {arguments}")
tool_output = None
if function_name == "web_search":
query = json.loads(arguments).get("query")
tool_output = await SearchTool.web_search(query)
elif function_name == "get_current_time":
tool_output = await SearchTool.get_current_time()
else:
tool_output = f"Unknown tool: {function_name}"
state.history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": str(tool_output)
})
final_response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=state.history
)
final_assistant_message_content = final_response.choices[0].message.content
state.history.append({"role": "assistant", "content": final_assistant_message_content})
await context.send_activity(final_assistant_message_content)
else:
await context.send_activity(assistant_message.content)
except Exception as e:
print(f"Error in persistent tool-using agent: {e}")
error_msg = "An error occurred while processing your request with persistent memory."
await context.send_activity(error_msg)
state.history.append({"role": "assistant", "content": error_msg})
# --- Server Setup (same as before) ---
def start_server(agent_application: AgentApplication):
async def entry_point(req: Request) -> Response:
agent: AgentApplication = req.app["agent_app"]
adapter: CloudAdapter = req.app["adapter"]
return await start_agent_process(req, agent, adapter)
APP = Application()
APP.router.add_post("/api/messages", entry_point)
APP.router.add_get("/api/messages", lambda _: Response(status=200))
APP["agent_app"] = agent_application
APP["adapter"] = agent_application.adapter
try:
port = int(os.environ.get("PORT", 3978))
print(f"======== Running on http://localhost:{port} ========")
print("(Press CTRL+C to quit)")
run_app(APP, host="localhost", port=port)
except Exception as error:
print(f"Error starting server: {error}")
raise error
if __name__ == "__main__":
start_server(AGENT_APP)
To Run This Example:
- Ensure Redis is running (e.g., via Docker:
docker run -p 6379:6379 --name my-redis -d redis). - Configure
REDIS_HOST,REDIS_PORT,REDIS_PASSWORDin your.env. - Save the agent code as
redis_agent.py. - Run:
python redis_agent.py - Use
teamsapptester. Send messages, then restart theredis_agent.pyscript and send new messages in the same conversation. You should see that the history is loaded.
Exercises/Mini-Challenges:
- Implement a command
/clear_historythat clears the Redis history for the current conversation. - Modify the
RedisChatHistoryStorageto include a timestamp with each history entry and display it when loading.
3. Intermediate Topics
3.1 Workflows and Orchestration
Detailed Explanation: While a single agent can be powerful, real-world problems often require multiple agents collaborating. MAF excels at orchestration, allowing you to define how agents interact. This can be sequential (Agent A hands off to Agent B), concurrent (multiple agents work in parallel), group chat (agents debate), or more complex patterns. The framework simplifies the routing and state management between these agents.
MAF uses AgentWorkflowBuilder (in .NET) or similar concepts in Python to define these flows. The key idea is that a workflow itself can be treated as an agent.
Code Example: Sequential Multi-Agent Workflow
Let’s create a “Content Creation” workflow with a “Writer Agent” and an “Editor Agent.”
First, let’s refine our tools.py slightly for our content agents:
# tools.py (Updated with content-specific tools)
import asyncio
from typing import Dict, Any, List
class SearchTool:
@staticmethod
async def web_search(query: str) -> List[str]:
"""
Performs a simulated web search and returns relevant snippets.
Used by the writer for research.
"""
print(f"Executing web search for: {query}")
await asyncio.sleep(0.5)
if "AI agent framework" in query.lower():
return ["Microsoft Agent Framework unifies Semantic Kernel and AutoGen.", "LangChain is popular for LLM orchestration."]
return [f"Search results for '{query}': Example content related to the query."]
class ContentReviewTool:
@staticmethod
async def check_grammar(text: str) -> str:
"""
Checks the grammar of the provided text and suggests corrections.
Used by the editor.
"""
print(f"Executing grammar check for text: {text[:50]}...")
await asyncio.sleep(0.3)
if " an example" in text:
return text.replace(" an example", " a clear example") + "\n(Suggested: 'a clear example' instead of 'an example')"
return text + "\n(Grammar check: No significant issues found.)"
@staticmethod
async def analyze_sentiment(text: str) -> str:
"""
Analyzes the sentiment of the provided text.
Used by the editor.
"""
print(f"Executing sentiment analysis for text: {text[:50]}...")
await asyncio.sleep(0.3)
if "negative" in text.lower() or "bad" in text.lower():
return "Sentiment: Negative"
return "Sentiment: Neutral/Positive"
Now, the multi-agent workflow:
# multi_agent_workflow.py
import os
import asyncio
import json
from dotenv import load_dotenv
load_dotenv()
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI
from tools import SearchTool, ContentReviewTool # Our updated tools
# --- Configuration (same as before) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")
try:
aoai_client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version="2024-02-15-preview",
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
print("Successfully connected to Azure OpenAI.")
except Exception as e:
print(f"Error connecting to Azure OpenAI: {e}")
exit(1)
class AzureOpenAIChatClient:
def __init__(self, client, deployment_name):
self.client = client
self.deployment_name = deployment_name
async def complete(self, messages: list[dict], **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=self.deployment_name,
messages=messages,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
print(f"Error calling Azure OpenAI: {e}")
return "An error occurred while processing your request."
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)
# --- Tool Definitions for LLM ---
def get_writer_tool_schemas():
return [
{
"type": "function",
"function": {
"name": "web_search",
"description": "Performs a web search to gather information for writing.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
}
}
]
def get_editor_tool_schemas():
return [
{
"type": "function",
"function": {
"name": "check_grammar",
"description": "Checks the grammar of the provided text and suggests corrections.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The text to check."
}
},
"required": ["text"]
}
}
},
{
"type": "function",
"function": {
"name": "analyze_sentiment",
"description": "Analyzes the sentiment of the provided text.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The text to analyze."
}
},
"required": ["text"]
}
}
}
]
# --- Base Agent Class for MAF-like structure ---
class BaseMAFAgent:
def __init__(self, name: str, instructions: str, tool_schemas: List[Dict[str, Any]] = None):
self.name = name
self.instructions = instructions
self.tool_schemas = tool_schemas if tool_schemas is not None else []
self.aoai_client = aoai_client # Using global client for simplicity
async def run_step(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Runs a single step of the agent, potentially calling tools."""
current_messages = [{"role": "system", "content": self.instructions}] + messages
try:
response = await self.aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=current_messages,
tools=self.tool_schemas,
tool_choice="auto"
)
assistant_message = response.choices[0].message
output = {"content": assistant_message.content, "tool_calls": []}
if assistant_message.tool_calls:
output["tool_calls"] = assistant_message.tool_calls
return output
except Exception as e:
print(f"Error in {self.name} agent: {e}")
return {"content": f"An error occurred in {self.name}.", "tool_calls": []}
async def execute_tool_call(self, tool_call) -> str:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f" {self.name} executing tool: {function_name} with args: {arguments}")
tool_output = f"Unknown tool: {function_name}"
if function_name == "web_search":
query = arguments.get("query")
tool_output = await SearchTool.web_search(query)
elif function_name == "check_grammar":
text = arguments.get("text")
tool_output = await ContentReviewTool.check_grammar(text)
elif function_name == "analyze_sentiment":
text = arguments.get("text")
tool_output = await ContentReviewTool.analyze_sentiment(text)
return str(tool_output)
# --- Define specific Agents ---
writer_agent = BaseMAFAgent(
name="Writer",
instructions="You are a creative writer. Your goal is to write engaging content based on user requests. Use the web_search tool for research.",
tool_schemas=get_writer_tool_schemas()
)
editor_agent = BaseMAFAgent(
name="Editor",
instructions="You are an expert editor. Your goal is to review and improve written content, focusing on grammar, engagement, and tone. Use check_grammar and analyze_sentiment tools.",
tool_schemas=get_editor_tool_schemas()
)
# --- Workflow Logic (Simulated by manual chaining for Python MAF SDK) ---
# In MAF Python SDK, this would typically involve an AgentWorkflowBuilder or similar
# for declarative workflow definition. For now, we simulate the sequential flow.
class ContentWorkflowState(TurnState):
history: List[Dict[str, Any]] = []
current_content: str = ""
user_request: str = ""
# A single AgentApplication to manage the workflow
WORKFLOW_AGENT_APP = AgentApplication[ContentWorkflowState](
storage=MemoryStorage(), # Using in-memory for this example
adapter=CloudAdapter()
)
async def _workflow_welcome(context: TurnContext, state: ContentWorkflowState):
await context.send_activity(
"Welcome to the Content Creation Workflow! I have a Writer and an Editor agent. "
"Tell me what content you want to create (e.g., 'Write an article about AI agent frameworks')."
)
state.history.append({"role": "assistant", "content": "Welcome to the Content Creation Workflow!"})
WORKFLOW_AGENT_APP.conversation_update("membersAdded")(_workflow_welcome)
WORKFLOW_AGENT_APP.message("/help")(_workflow_welcome)
@WORKFLOW_AGENT_APP.activity("message")
async def content_creation_workflow_handler(context: TurnContext, state: ContentWorkflowState):
user_message = context.activity.text
state.user_request = user_message
state.history.append({"role": "user", "content": user_message})
await context.send_activity(f"Starting content creation for: '{user_message}'...")
# --- Step 1: Writer Agent writes the content ---
await context.send_activity("Writer Agent is working...")
writer_messages = [{"role": "user", "content": f"Write content based on the request: {user_message}"}]
writer_output = await writer_agent.run_step(writer_messages)
content_draft = writer_output["content"]
# Handle writer's tool calls
if writer_output["tool_calls"]:
for tool_call in writer_output["tool_calls"]:
tool_result = await writer_agent.execute_tool_call(tool_call)
# Send tool result back to writer for continued reasoning
writer_messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": tool_result})
re_writer_output = await writer_agent.run_step(writer_messages)
content_draft = re_writer_output["content"] # Use the refined content
await context.send_activity(f"Writer Draft:\n```\n{content_draft}\n```")
state.current_content = content_draft
state.history.append({"role": "assistant", "content": f"Writer produced draft: {content_draft}"})
# --- Step 2: Editor Agent reviews and refines ---
await context.send_activity("Editor Agent is reviewing...")
editor_messages = [{"role": "user", "content": f"Review and improve this content: {state.current_content}"}]
editor_output = await editor_agent.run_step(editor_messages)
final_content = editor_output["content"]
# Handle editor's tool calls
if editor_output["tool_calls"]:
for tool_call in editor_output["tool_calls"]:
tool_result = await editor_agent.execute_tool_call(tool_call)
editor_messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": tool_result})
re_editor_output = await editor_agent.run_step(editor_messages)
final_content = re_editor_output["content"] # Use the refined content
await context.send_activity(f"Editor Final Version:\n```\n{final_content}\n```")
state.history.append({"role": "assistant", "content": f"Editor finalized content: {final_content}"})
await context.send_activity("Content creation workflow completed!")
# --- Server Setup (same as before) ---
def start_server(agent_application: AgentApplication):
async def entry_point(req: Request) -> Response:
agent: AgentApplication = req.app["agent_app"]
adapter: CloudAdapter = req.app["adapter"]
return await start_agent_process(req, agent, adapter)
APP = Application()
APP.router.add_post("/api/messages", entry_point)
APP.router.add_get("/api/messages", lambda _: Response(status=200))
APP["agent_app"] = agent_application
APP["adapter"] = agent_application.adapter
try:
port = int(os.environ.get("PORT", 3978))
print(f"======== Running on http://localhost:{port} ========")
print("(Press CTRL+C to quit)")
run_app(APP, host="localhost", port=port)
except Exception as error:
print(f"Error starting server: {error}")
raise error
if __name__ == "__main__":
start_server(WORKFLOW_AGENT_APP)
To Run This Example:
- Update
tools.pywith theContentReviewTool. - Save the workflow code as
multi_agent_workflow.py. - Ensure your
.envis set up for Azure OpenAI. - Run:
python multi_agent_workflow.py - Use
teamsapptester. Try prompts like:- “Write an article about AI agent frameworks.”
- “Generate a short story about a detective in space.”
Exercises/Mini-Challenges:
- Introduce a “Fact-Checker Agent” that uses
SearchTool.web_searchto verify facts in the content produced by the Writer before the Editor reviews it. - Change the workflow from sequential to a “group chat” where all agents (Writer, Editor, Fact-Checker) collaborate to refine content. (This requires more advanced orchestration patterns usually built into MAF’s workflow capabilities or AutoGen’s
GroupChatManagerif integrated).
3.2 Authentication and Security (Microsoft Entra ID)
Detailed Explanation: For enterprise applications, robust authentication and authorization are non-negotiable. MAF integrates with Microsoft Entra ID (formerly Azure Active Directory) to assign unique identities to agents, enabling secure access to resources and services. This helps in managing “agent sprawl” and ensuring compliance. JWT (JSON Web Token) authorization middleware is a common mechanism.
Conceptual Code Example: Protecting an Agent Endpoint with JWT (Entra ID)
While full Entra ID integration involves setting up app registrations in Azure and handling token acquisition, we can demonstrate how jwt_authorization_middleware would be used on the server side to protect our agent.
# secured_agent.py (Illustrates JWT Middleware usage, building on redis_agent.py)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis
load_dotenv()
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process, jwt_authorization_middleware
from aiohttp.web import Request, Response, Application, run_app, HTTPUnauthorized, HTTPException
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, ClientSecretCredential
from tools import SearchTool
# --- Configuration (same as before) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
# --- Microsoft Entra ID Configuration (for validation) ---
# For actual JWT validation, you'd configure the middleware with issuer, audience, etc.
# This is typically done via environment variables or a configuration object.
# These placeholders are for illustrating the *presence* of auth.
# REAL-WORLD: You would register an application in Azure Portal (App Registrations)
# and get these values.
# AZURE_TENANT_ID = os.getenv("AZURE_TENANT_ID")
# AZURE_CLIENT_ID = os.getenv("AZURE_CLIENT_ID")
# AZURE_CLIENT_SECRET = os.getenv("AZURE_CLIENT_SECRET") # If using client secret flow
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")
# AOAI Client and Redis Client initialization (same as redis_agent.py)
try:
aoai_client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version="2024-02-15-preview",
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
print("Successfully connected to Azure OpenAI.")
except Exception as e:
print(f"Error connecting to Azure OpenAI: {e}")
exit(1)
try:
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
decode_responses=True
)
async def _test_redis_connection():
await redis_client.ping()
print("Successfully connected to Redis.")
asyncio.run(_test_redis_connection())
except Exception as e:
print(f"Error connecting to Redis: {e}")
redis_client = None
class AzureOpenAIChatClient:
def __init__(self, client, deployment_name):
self.client = client
self.deployment_name = deployment_name
async def complete(self, messages: list[dict], **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=self.deployment_name,
messages=messages,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
print(f"Error calling Azure OpenAI: {e}")
return "An error occurred while processing your request."
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)
def get_tool_schemas():
return [
{
"type": "function",
"function": {
"name": "web_search",
"description": "Performs a web search to find relevant information.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
}
}
]
class RedisChatHistoryStorage(MemoryStorage):
def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
self.redis_client = redis_client
self.prefix = prefix
async def _get_key(self, conversation_id: str) -> str:
return f"{self.prefix}{conversation_id}"
async def read(self, conversation_id: str) -> dict:
key = await self._get_key(conversation_id)
raw_history = await self.redis_client.get(key)
if raw_history:
return json.loads(raw_history)
return {"history": []}
async def write(self, conversation_id: str, data: dict):
key = await self._get_key(conversation_id)
await self.redis_client.set(key, json.dumps(data))
class PersistentToolUsingAgentState(TurnState):
history: List[Dict[str, Any]] = []
conversation_id: str = "default_conversation"
storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()
# --- Agent Application (same as redis_agent.py) ---
SECURE_AGENT_APP = AgentApplication[PersistentToolUsingAgentState](
storage=storage_instance,
adapter=CloudAdapter()
)
async def _load_history(context: TurnContext, state: PersistentToolUsingAgentState):
if context.activity and context.activity.conversation and context.activity.conversation.id:
state.conversation_id = context.activity.conversation.id
if redis_client:
stored_data = await storage_instance.read(state.conversation_id)
state.history = stored_data.get("history", [])
print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")
async def _save_history(context: TurnContext, state: PersistentToolUsingAgentState):
if redis_client and state.conversation_id:
await storage_instance.write(state.conversation_id, {"history": state.history})
print(f"Saved history for {state.conversation_id}: {len(state.history)} items")
SECURE_AGENT_APP.on_before_activity(_load_history)
SECURE_AGENT_APP.on_after_activity(_save_history)
async def _secure_welcome_and_help(context: TurnContext, state: PersistentToolUsingAgentState):
await context.send_activity(
"Hello! I am a **secured** persistent Agent with web search capabilities. "
"Your access is authenticated via JWT."
)
state.history.append({"role": "assistant", "content": "Hello! I am a secured persistent Agent."})
SECURE_AGENT_APP.conversation_update("membersAdded")(_secure_welcome_and_help)
SECURE_AGENT_APP.message("/help")(_secure_welcome_and_help)
@SECURE_AGENT_APP.activity("message")
async def on_secure_message(context: TurnContext, state: PersistentToolUsingAgentState):
# This handler is the same as in redis_agent.py, but now it's protected
user_message = context.activity.text
state.history.append({"role": "user", "content": user_message})
messages_for_llm = [
{"role": "system", "content": "You are a helpful assistant. Use tools if necessary to answer questions."},
*state.history
]
try:
response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=messages_for_llm,
tools=get_tool_schemas(),
tool_choice="auto"
)
assistant_message = response.choices[0].message
if assistant_message.content:
state.history.append({"role": "assistant", "content": assistant_message.content})
if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Agent wants to call tool: {function_name} with args: {arguments}")
tool_output = None
if function_name == "web_search":
query = arguments.get("query")
tool_output = await SearchTool.web_search(query)
else:
tool_output = f"Unknown tool: {function_name}"
state.history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": str(tool_output)
})
final_response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=state.history
)
final_assistant_message_content = final_response.choices[0].message.content
state.history.append({"role": "assistant", "content": final_assistant_message_content})
await context.send_activity(final_assistant_message_content)
else:
await context.send_activity(assistant_message.content)
except Exception as e:
print(f"Error in secured agent: {e}")
error_msg = "An error occurred in the secured agent."
await context.send_activity(error_msg)
state.history.append({"role": "assistant", "content": error_msg})
# --- Server Setup with JWT Middleware ---
# The jwt_authorization_middleware needs configuration.
# For simplicity, we'll use a dummy config. In a real scenario, this would involve
# proper Azure AD app registration details for token validation.
from microsoft.agents.hosting.core import AgentAuthConfiguration
class DummyAgentAuthConfiguration(AgentAuthConfiguration):
def __init__(self):
# In a real app, these would come from env vars or app settings
self.tenant_id = "your-azure-ad-tenant-id"
self.client_id = "your-azure-ad-client-id"
self.authority = f"https://login.microsoftonline.com/{self.tenant_id}"
self.audience = [f"api://{self.client_id}"] # The audience of your agent's API
self.is_anonymous_auth_enabled = False # Require authentication
DUMMY_AUTH_CONFIG = DummyAgentAuthConfiguration()
def start_server_with_auth(agent_application: AgentApplication, auth_config: AgentAuthConfiguration):
async def entry_point(req: Request) -> Response:
# After middleware, req['user_id'] or similar would be populated
# This is where you might add custom authorization checks if needed
agent: AgentApplication = req.app["agent_app"]
adapter: CloudAdapter = req.app["adapter"]
return await start_agent_process(req, agent, adapter)
# Apply the JWT middleware
# In a real app, this middleware would check for a valid JWT token in the Authorization header.
APP = Application(middlewares=[jwt_authorization_middleware(auth_config=auth_config)])
APP.router.add_post("/api/messages", entry_point)
APP.router.add_get("/api/messages", lambda _: Response(status=200)) # Health check
APP["agent_app"] = agent_application
APP["adapter"] = agent_application.adapter
APP["agent_configuration"] = auth_config # Store auth config for middleware
try:
port = int(os.environ.get("PORT", 3978))
print(f"======== Running on http://localhost:{port} (Secured with JWT) ========")
print("(Press CTRL+C to quit)")
run_app(APP, host="localhost", port=port)
except Exception as error:
print(f"Error starting secured server: {error}")
raise error
if __name__ == "__main__":
# Note: To test this locally with actual JWT, you'd need to acquire a token
# (e.g., using MSAL Python library or Azure CLI) and include it in your client's
# Authorization header. teamsapptester usually handles anonymous or specific auth.
# For a quick test to see the auth barrier, remove `is_anonymous_auth_enabled` from
# DummyAgentAuthConfiguration and send a request without a token - you should get 401.
start_server_with_auth(SECURE_AGENT_APP, DUMMY_AUTH_CONFIG)
To Run This Example (and observe auth):
- Save the agent code as
secured_agent.py. - Configure your
.envfor OpenAI and Redis. - For a quick test: Run
python secured_agent.py.- If
is_anonymous_auth_enabledisTrue(or if you setmiddlewares=[]forAPP), it will run unauthenticated. - If
is_anonymous_auth_enabledisFalseinDUMMY_AUTH_CONFIG(as shown), then:- Try
curl -X POST http://localhost:3978/api/messages -H "Content-Type: application/json" -d '{"type": "message", "text": "test"}'. You should get anHTTPUnauthorized(401) error. - To truly test with a valid JWT, you’d need to obtain an access token from Azure AD for your configured
client_idandtenant_id, and then send it in theAuthorization: Bearer <YOUR_JWT>header. This is beyond a simplecurlexample and typically done through a proper client SDK or CLI.
- Try
- If
Exercises/Mini-Challenges:
- Research how to obtain an Azure AD access token for a service principal using the
azure-identitylibrary in Python. - Modify a client (e.g., a simple Python script or a Postman request) to include the obtained JWT and successfully send a message to the
secured_agent.py.
4. Advanced Topics and Best Practices
4.1 Retrieval Augmented Generation (RAG) with Azure Cognitive Search
Detailed Explanation: To keep LLMs current and grounded in specific, private data, Retrieval Augmented Generation (RAG) is essential. This involves retrieving relevant information from a knowledge base (like documents, databases, or specialized indexes) and injecting it into the LLM’s prompt. MAF fully supports RAG patterns. Azure Cognitive Search is Microsoft’s powerful search-as-a-service solution, perfect for building enterprise-grade RAG.
Code Example: Integrating Azure Cognitive Search for RAG
Let’s enhance our agent with a RAG tool that queries Azure Cognitive Search.
First, you’ll need an Azure Cognitive Search instance and an index populated with some data. For this example, we’ll assume an index named product-docs with documents containing title and content fields.
# rag_tools.py
import os
import asyncio
from typing import List, Dict, Any
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.aio import SearchClient # Use async client
class AzureSearchRAGTool:
def __init__(self, endpoint: str, api_key: str, index_name: str):
self.search_client = SearchClient(
endpoint=endpoint,
index_name=index_name,
credential=AzureKeyCredential(api_key)
)
print(f"Initialized Azure Cognitive Search client for index: {index_name}")
async def search_documents(self, query: str, top_n: int = 3) -> List[Dict[str, Any]]:
"""
Searches Azure Cognitive Search for relevant documents and returns snippets.
"""
print(f"Performing Azure Cognitive Search for query: '{query}'")
results = []
try:
async with self.search_client:
search_results = await self.search_client.search(
search_text=query,
select=["title", "content"], # Fields to retrieve
top=top_n
)
async for result in search_results:
results.append({
"title": result.get("title", "No Title"),
"content": result.get("content", "No Content")
})
except Exception as e:
print(f"Error during Azure Cognitive Search: {e}")
results.append({"error": f"Failed to perform search: {e}"})
return results
Now, integrate this into our agent.
# rag_agent.py (Building on redis_agent.py and using rag_tools.py)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis
load_dotenv()
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI
from rag_tools import AzureSearchRAGTool # Our new RAG tool
# --- Configuration ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")
AZURE_SEARCH_API_KEY = os.getenv("AZURE_SEARCH_API_KEY")
AZURE_SEARCH_INDEX_NAME = os.getenv("AZURE_SEARCH_INDEX_NAME")
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME,
AZURE_SEARCH_ENDPOINT, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME]):
raise ValueError("Missing required environment variables. Please check your .env file for OpenAI, Redis, and Azure Search.")
try:
aoai_client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version="2024-02-15-preview",
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
print("Successfully connected to Azure OpenAI.")
except Exception as e:
print(f"Error connecting to Azure OpenAI: {e}")
exit(1)
try:
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
decode_responses=True
)
async def _test_redis_connection():
await redis_client.ping()
print("Successfully connected to Redis.")
asyncio.run(_test_redis_connection())
except Exception as e:
print(f"Error connecting to Redis: {e}")
redis_client = None
# Initialize Azure Cognitive Search RAG tool
try:
azure_search_rag_tool = AzureSearchRAGTool(
endpoint=AZURE_SEARCH_ENDPOINT,
api_key=AZURE_SEARCH_API_KEY,
index_name=AZURE_SEARCH_INDEX_NAME
)
except Exception as e:
print(f"Error initializing AzureSearchRAGTool: {e}")
azure_search_rag_tool = None # Handle gracefully if search isn't critical
# --- Existing ChatClient (omitted for brevity, assume it's there) ---
class AzureOpenAIChatClient:
def __init__(self, client, deployment_name):
self.client = client
self.deployment_name = deployment_name
async def complete(self, messages: list[dict], **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=self.deployment_name,
messages=messages,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
print(f"Error calling Azure OpenAI: {e}")
return "An error occurred while processing your request."
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)
# --- Tool Definitions with RAG ---
def get_tool_schemas_with_rag():
return [
{
"type": "function",
"function": {
"name": "search_documents",
"description": "Searches internal knowledge base (Azure Cognitive Search) for relevant documents to answer questions.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query for the knowledge base."
}
},
"required": ["query"]
}
}
}
]
# --- Agent State & Storage (same as redis_agent.py) ---
class RedisChatHistoryStorage(MemoryStorage):
def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
self.redis_client = redis_client
self.prefix = prefix
async def _get_key(self, conversation_id: str) -> str:
return f"{self.prefix}{conversation_id}"
async def read(self, conversation_id: str) -> dict:
key = await self._get_key(conversation_id)
raw_history = await self.redis_client.get(key)
if raw_history:
return json.loads(raw_history)
return {"history": []}
async def write(self, conversation_id: str, data: dict):
key = await self._get_key(conversation_id)
await self.redis_client.set(key, json.dumps(data))
class PersistentRAGAgentState(TurnState):
history: List[Dict[str, Any]] = []
conversation_id: str = "default_conversation"
storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()
RAG_AGENT_APP = AgentApplication[PersistentRAGAgentState](
storage=storage_instance,
adapter=CloudAdapter()
)
async def _load_history(context: TurnContext, state: PersistentRAGAgentState):
if context.activity and context.activity.conversation and context.activity.conversation.id:
state.conversation_id = context.activity.conversation.id
if redis_client:
stored_data = await storage_instance.read(state.conversation_id)
state.history = stored_data.get("history", [])
print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")
async def _save_history(context: TurnContext, state: PersistentRAGAgentState):
if redis_client and state.conversation_id:
await storage_instance.write(state.conversation_id, {"history": state.history})
print(f"Saved history for {state.conversation_id}: {len(state.history)} items")
RAG_AGENT_APP.on_before_activity(_load_history)
RAG_AGENT_APP.on_after_activity(_save_history)
async def _rag_welcome(context: TurnContext, state: PersistentRAGAgentState):
await context.send_activity(
"Hello! I am an Agent with RAG capabilities using Azure Cognitive Search. "
"Ask me about topics covered in our internal documents (e.g., 'What are the features of Product X?')."
)
state.history.append({"role": "assistant", "content": "Hello! I am a RAG Agent."})
RAG_AGENT_APP.conversation_update("membersAdded")(_rag_welcome)
RAG_AGENT_APP.message("/help")(_rag_welcome)
@RAG_AGENT_APP.activity("message")
async def on_rag_message(context: TurnContext, state: PersistentRAGAgentState):
user_message = context.activity.text
state.history.append({"role": "user", "content": user_message})
# Prepare system prompt to encourage RAG tool use
system_prompt = (
"You are a helpful assistant. If the user asks a question that likely requires "
"knowledge from internal documents, use the 'search_documents' tool. "
"Summarize the information found in the documents to answer the user's question. "
"If you don't find relevant information, state that clearly."
)
messages_for_llm = [
{"role": "system", "content": system_prompt},
*state.history
]
try:
response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=messages_for_llm,
tools=get_tool_schemas_with_rag(),
tool_choice="auto"
)
assistant_message = response.choices[0].message
if assistant_message.content:
state.history.append({"role": "assistant", "content": assistant_message.content})
if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Agent wants to call tool: {function_name} with args: {arguments}")
tool_output = None
if function_name == "search_documents":
query = arguments.get("query")
if azure_search_rag_tool:
docs = await azure_search_rag_tool.search_documents(query)
tool_output = "\n".join([f"Title: {d['title']}\nContent: {d['content']}" for d in docs])
if not docs:
tool_output = "No relevant documents found."
else:
tool_output = "Azure Search tool is not available."
else:
tool_output = f"Unknown tool: {function_name}"
state.history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": str(tool_output)
})
# Call LLM again with tool output for generation
final_response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=state.history
)
final_assistant_message_content = final_response.choices[0].message.content
state.history.append({"role": "assistant", "content": final_assistant_message_content})
await context.send_activity(final_assistant_message_content)
else:
await context.send_activity(assistant_message.content)
except Exception as e:
print(f"Error in RAG agent: {e}")
error_msg = "An error occurred in the RAG agent."
await context.send_activity(error_msg)
state.history.append({"role": "assistant", "content": error_msg})
# --- Server Setup (same as previous examples) ---
def start_server(agent_application: AgentApplication):
async def entry_point(req: Request) -> Response:
agent: AgentApplication = req.app["agent_app"]
adapter: CloudAdapter = req.app["adapter"]
return await start_agent_process(req, agent, adapter)
APP = Application()
APP.router.add_post("/api/messages", entry_point)
APP.router.add_get("/api/messages", lambda _: Response(status=200))
APP["agent_app"] = agent_application
APP["adapter"] = agent_application.adapter
try:
port = int(os.environ.get("PORT", 3978))
print(f"======== Running on http://localhost:{port} ========")
print("(Press CTRL+C to quit)")
run_app(APP, host="localhost", port=port)
except Exception as error:
print(f"Error starting server: {error}")
raise error
if __name__ == "__main__":
start_server(RAG_AGENT_APP)
To Run This Example:
- Set up Azure Cognitive Search: Create a service, and an index (e.g.,
product-docs) with fieldstitleandcontent. Populate it with some dummy data relevant to questions you might ask. - Configure
AZURE_SEARCH_ENDPOINT,AZURE_SEARCH_API_KEY,AZURE_SEARCH_INDEX_NAMEin your.env. - Save
rag_tools.pyandrag_agent.py. - Run:
python rag_agent.py - Use
teamsapptester. Ask questions that your Azure Cognitive Search index can answer (e.g., “What are the benefits of Product X?”, “How to troubleshoot error code 123?”). The agent should usesearch_documentsand then synthesize an answer.
4.2 Database Querying Tool
Detailed Explanation: Agents can also interact with structured data in databases. This typically involves a tool that can translate natural language queries into SQL (or other database query language), execute the query, and return the results. This requires careful consideration of security and preventing SQL injection.
Code Example: PostgreSQL Database Querying Tool
Let’s add a tool to query a PostgreSQL database.
# db_tools.py
import asyncio
import psycopg2 # or asyncpg for async
from typing import List, Dict, Any
class DatabaseTool:
def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config
# For simplicity, we're using psycopg2 blocking, for async use asyncpg
# In a real async agent, you'd use an async DB driver
print("Initialized DatabaseTool. (Note: Using blocking psycopg2 for simplicity, consider asyncpg for production async apps)")
def _execute_query_sync(self, query: str) -> List[Dict[str, Any]]:
"""Synchronously executes a read-only SQL query."""
conn = None
result = []
try:
conn = psycopg2.connect(**self.db_config)
cur = conn.cursor()
cur.execute(query)
# Fetch column names
col_names = [desc[0] for desc in cur.description]
# Fetch rows
for row in cur.fetchall():
result.append(dict(zip(col_names, row)))
cur.close()
except Exception as e:
print(f"Database query error: {e}")
raise e
finally:
if conn:
conn.close()
return result
async def query_database(self, sql_query: str) -> List[Dict[str, Any]]:
"""
Executes a read-only SQL query against the database.
SECURITY WARNING: In a real application, NEVER allow arbitrary SQL queries directly.
Instead, use a SQL agent that generates parameterized queries or validates input carefully.
This is for demonstration purposes only.
"""
print(f"Executing database query: '{sql_query}'")
# Simulate async by running blocking call in thread pool
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self._execute_query_sync, sql_query)
async def get_table_schema(self, table_name: str) -> str:
"""
Retrieves the schema for a specified table.
"""
print(f"Getting schema for table: {table_name}")
schema_query = f"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = '{table_name}'
ORDER BY ordinal_position;
"""
try:
schema_data = await self.query_database(schema_query)
formatted_schema = [f"{col['column_name']} ({col['data_type']}, nullable: {col['is_nullable']})" for col in schema_data]
return f"Schema for '{table_name}':\n" + "\n".join(formatted_schema)
except Exception as e:
return f"Could not retrieve schema for '{table_name}': {e}"
Now, integrate this into our agent, including instructions to the LLM about the database.
# db_rag_agent.py (combines RAG and DB querying)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis
import psycopg2 # for db_tools init
load_dotenv()
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI
from rag_tools import AzureSearchRAGTool
from db_tools import DatabaseTool # Our new DB tool
# --- Configuration ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")
AZURE_SEARCH_API_KEY = os.getenv("AZURE_SEARCH_API_KEY")
AZURE_SEARCH_INDEX_NAME = os.getenv("AZURE_SEARCH_INDEX_NAME")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", 5432)
# Verify all necessary env vars are set
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME,
AZURE_SEARCH_ENDPOINT, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME,
POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST]):
raise ValueError("Missing required environment variables. Please check your .env file for OpenAI, Redis, Azure Search, and PostgreSQL.")
try:
aoai_client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version="2024-02-15-preview",
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
print("Successfully connected to Azure OpenAI.")
except Exception as e:
print(f"Error connecting to Azure OpenAI: {e}")
exit(1)
try:
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
decode_responses=True
)
async def _test_redis_connection():
await redis_client.ping()
print("Successfully connected to Redis.")
asyncio.run(_test_redis_connection())
except Exception as e:
print(f"Error connecting to Redis: {e}")
redis_client = None
try:
azure_search_rag_tool = AzureSearchRAGTool(
endpoint=AZURE_SEARCH_ENDPOINT,
api_key=AZURE_SEARCH_API_KEY,
index_name=AZURE_SEARCH_INDEX_NAME
)
except Exception as e:
print(f"Error initializing AzureSearchRAGTool: {e}")
azure_search_rag_tool = None
# Initialize Database Tool
try:
db_config = {
"database": POSTGRES_DB,
"user": POSTGRES_USER,
"password": POSTGRES_PASSWORD,
"host": POSTGRES_HOST,
"port": POSTGRES_PORT
}
# Test DB connection
with psycopg2.connect(**db_config) as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1")
print("Successfully connected to PostgreSQL.")
db_tool = DatabaseTool(db_config)
except Exception as e:
print(f"Error initializing DatabaseTool or connecting to PostgreSQL: {e}")
db_tool = None
class AzureOpenAIChatClient: # ... (omitted for brevity)
def __init__(self, client, deployment_name):
self.client = client
self.deployment_name = deployment_name
async def complete(self, messages: list[dict], **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=self.deployment_name,
messages=messages,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
print(f"Error calling Azure OpenAI: {e}")
return "An error occurred while processing your request."
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)
# --- Tool Definitions with RAG and DB Querying ---
def get_tool_schemas_with_db_rag():
schemas = [
{
"type": "function",
"function": {
"name": "search_documents",
"description": "Searches internal knowledge base (Azure Cognitive Search) for relevant documents to answer questions.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query for the knowledge base."
}
},
"required": ["query"]
}
}
}
]
if db_tool: # Only add DB tools if connected
schemas.extend([
{
"type": "function",
"function": {
"name": "query_database",
"description": "Executes a read-only SQL query against the PostgreSQL database. Use this ONLY for SELECT statements. Be very careful with SQL injection. Available tables: 'users', 'products', 'orders'.",
"parameters": {
"type": "object",
"properties": {
"sql_query": {
"type": "string",
"description": "The SQL SELECT query to execute."
}
},
"required": ["sql_query"]
}
}
},
{
"type": "function",
"function": {
"name": "get_table_schema",
"description": "Retrieves the schema (column names and types) for a specified table in the database.",
"parameters": {
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "The name of the table (e.g., 'users', 'products', 'orders')."
}
},
"required": ["table_name"]
}
}
}
])
return schemas
# --- Agent State & Storage (same as redis_agent.py) ---
class RedisChatHistoryStorage(MemoryStorage): # ... (omitted for brevity)
def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
self.redis_client = redis_client
self.prefix = prefix
async def _get_key(self, conversation_id: str) -> str:
return f"{self.prefix}{conversation_id}"
async def read(self, conversation_id: str) -> dict:
key = await self._get_key(conversation_id)
raw_history = await self.redis_client.get(key)
if raw_history:
return json.loads(raw_history)
return {"history": []}
async def write(self, conversation_id: str, data: dict):
key = await self._get_key(conversation_id)
await self.redis_client.set(key, json.dumps(data))
class PersistentDBRAGAgentState(TurnState):
history: List[Dict[str, Any]] = []
conversation_id: str = "default_conversation"
storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()
DB_RAG_AGENT_APP = AgentApplication[PersistentDBRAGAgentState](
storage=storage_instance,
adapter=CloudAdapter()
)
async def _load_history(context: TurnContext, state: PersistentDBRAGAgentState):
if context.activity and context.activity.conversation and context.activity.conversation.id:
state.conversation_id = context.activity.conversation.id
if redis_client:
stored_data = await storage_instance.read(state.conversation_id)
state.history = stored_data.get("history", [])
print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")
async def _save_history(context: TurnContext, state: PersistentDBRAGAgentState):
if redis_client and state.conversation_id:
await storage_instance.write(state.conversation_id, {"history": state.history})
print(f"Saved history for {state.conversation_id}: {len(state.history)} items")
DB_RAG_AGENT_APP.on_before_activity(_load_history)
DB_RAG_AGENT_APP.on_after_activity(_save_history)
async def _db_rag_welcome(context: TurnContext, state: PersistentDBRAGAgentState):
await context.send_activity(
"Hello! I am an Agent with RAG and Database querying capabilities. "
"Ask me about internal documents or query the database (tables: users, products, orders)."
)
state.history.append({"role": "assistant", "content": "Hello! I am a RAG and DB Agent."})
DB_RAG_AGENT_APP.conversation_update("membersAdded")(_db_rag_welcome)
DB_RAG_AGENT_APP.message("/help")(_db_rag_welcome)
@DB_RAG_AGENT_APP.activity("message")
async def on_db_rag_message(context: TurnContext, state: PersistentDBRAGAgentState):
user_message = context.activity.text
state.history.append({"role": "user", "content": user_message})
# Prepare system prompt to encourage RAG and DB tool use
system_prompt = (
"You are a helpful assistant. "
"Use 'search_documents' for general knowledge from internal documents. "
"Use 'get_table_schema' to understand database table structure (tables: users, products, orders) "
"and 'query_database' to retrieve information directly from the database (only SELECT queries!). "
"Always prioritize database queries if the information is clearly structured and available there. "
"If you don't find relevant information, state that clearly."
)
messages_for_llm = [
{"role": "system", "content": system_prompt},
*state.history
]
try:
response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=messages_for_llm,
tools=get_tool_schemas_with_db_rag(),
tool_choice="auto"
)
assistant_message = response.choices[0].message
if assistant_message.content:
state.history.append({"role": "assistant", "content": assistant_message.content})
if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Agent wants to call tool: {function_name} with args: {arguments}")
tool_output = None
if function_name == "search_documents":
query = arguments.get("query")
if azure_search_rag_tool:
docs = await azure_search_rag_tool.search_documents(query)
tool_output = "\n".join([f"Title: {d['title']}\nContent: {d['content']}" for d in docs])
if not docs:
tool_output = "No relevant documents found."
else:
tool_output = "Azure Search tool is not available."
elif function_name == "query_database":
sql_query = arguments.get("sql_query")
if db_tool:
try:
db_results = await db_tool.query_database(sql_query)
tool_output = json.dumps(db_results, indent=2)
except Exception as ex:
tool_output = f"Database error: {ex}"
else:
tool_output = "Database tool is not available."
elif function_name == "get_table_schema":
table_name = arguments.get("table_name")
if db_tool:
tool_output = await db_tool.get_table_schema(table_name)
else:
tool_output = "Database tool is not available."
else:
tool_output = f"Unknown tool: {function_name}"
state.history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": str(tool_output)
})
# Call LLM again with tool output for generation
final_response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=state.history
)
final_assistant_message_content = final_response.choices[0].message.content
state.history.append({"role": "assistant", "content": final_assistant_message_content})
await context.send_activity(final_assistant_message_content)
else:
await context.send_activity(assistant_message.content)
except Exception as e:
print(f"Error in DB RAG agent: {e}")
error_msg = "An error occurred in the RAG/DB agent."
await context.send_activity(error_msg)
state.history.append({"role": "assistant", "content": error_msg})
# --- Server Setup (same as previous examples) ---
def start_server(agent_application: AgentApplication):
async def entry_point(req: Request) -> Response:
agent: AgentApplication = req.app["agent_app"]
adapter: CloudAdapter = req.app["adapter"]
return await start_agent_process(req, agent, adapter)
APP = Application()
APP.router.add_post("/api/messages", entry_point)
APP.router.add_get("/api/messages", lambda _: Response(status=200))
APP["agent_app"] = agent_application
APP["adapter"] = agent_application.adapter
try:
port = int(os.environ.get("PORT", 3978))
print(f"======== Running on http://localhost:{port} ========")
print("(Press CTRL+C to quit)")
run_app(APP, host="localhost", port=port)
except Exception as error:
print(f"Error starting server: {error}")
raise error
if __name__ == "__main__":
start_server(DB_RAG_AGENT_APP)
To Run This Example:
Set up PostgreSQL: Install PostgreSQL locally or use Docker (
docker run -p 5432:5432 --name my-postgres -e POSTGRES_PASSWORD=your_password -d postgres).Create Sample Data: Connect to your PostgreSQL database and create some tables and data, e.g.:
CREATE TABLE users ( id SERIAL PRIMARY KEY, name VARCHAR(100), email VARCHAR(100) UNIQUE ); INSERT INTO users (name, email) VALUES ('Alice Smith', 'alice@example.com'), ('Bob Johnson', 'bob@example.com'); CREATE TABLE products ( product_id SERIAL PRIMARY KEY, product_name VARCHAR(255), price DECIMAL(10, 2), stock_quantity INT ); INSERT INTO products (product_name, price, stock_quantity) VALUES ('Laptop', 1200.00, 50), ('Mouse', 25.00, 200); CREATE TABLE orders ( order_id SERIAL PRIMARY KEY, user_id INT REFERENCES users(id), product_id INT REFERENCES products(product_id), quantity INT, order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); INSERT INTO orders (user_id, product_id, quantity) VALUES (1, 1, 1), (2, 2, 2);Configure
POSTGRES_DB,POSTGRES_USER,POSTGRES_PASSWORD,POSTGRES_HOST,POSTGRES_PORTin your.env.Ensure
rag_tools.pyanddb_tools.pyare in your project.Save the agent code as
db_rag_agent.py.Run:
pip install psycopg2-binary(if not already installed), thenpython db_rag_agent.py.Use
teamsapptester. Try prompts like:- “What is the schema for the ‘users’ table?”
- “How many products do we have in stock?” (LLM should translate to SQL:
SELECT SUM(stock_quantity) FROM products;) - “Show me all users and their emails.” (LLM should translate to SQL:
SELECT name, email FROM users;) - “What are the features of Product X?” (This should trigger the RAG tool).
5. Guided Projects
Here, we’ll implement two projects: one fresh MAF project and one migration from LangGraph.
Project 1: Fresh Project - “Intelligent Customer Support Agent”
Objective: Build a single, intelligent customer support agent using MAF that can answer common FAQs (via RAG), troubleshoot simple issues (using specific tools), and escalate to a human if necessary.
Problem Statement: A company wants to provide 24/7 AI-powered customer support to handle routine inquiries, reducing the workload on human agents.
Technologies Used:
- Microsoft Agent Framework (Python SDK)
- Azure OpenAI (for LLM)
- Redis (for chat history)
- Azure Cognitive Search (for FAQ/knowledge base RAG)
- A “mock” Human Escalation tool.
Step-by-Step Implementation:
Step 0: Setup (Already covered)
Ensure your .env is configured for Azure OpenAI, Redis, and Azure Cognitive Search.
Make sure you have rag_tools.py and db_tools.py (though db_tools won’t be explicitly used in this project, it’s good to have for general capabilities).
Step 1: Define Tools for Customer Support
Let’s create support_tools.py. This will include our RAG tool and a simulated escalation tool.
# support_tools.py
import asyncio
from typing import List, Dict, Any
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.aio import SearchClient # Use async client
class CustomerSupportTools:
def __init__(self, search_endpoint: str, search_api_key: str, search_index_name: str):
self.search_client = SearchClient(
endpoint=search_endpoint,
index_name=search_index_name,
credential=AzureKeyCredential(search_api_key)
)
print(f"Initialized Azure Cognitive Search client for customer support FAQs: {search_index_name}")
async def faq_lookup(self, query: str, top_n: int = 3) -> List[Dict[str, Any]]:
"""
Searches the customer support knowledge base (Azure Cognitive Search) for answers to frequently asked questions.
Useful for general inquiries about products, services, or policies.
"""
print(f"Executing FAQ lookup for: '{query}'")
results = []
try:
async with self.search_client:
search_results = await self.search_client.search(
search_text=query,
select=["question", "answer"], # Assuming your FAQ index has these fields
top=top_n
)
async for result in search_results:
results.append({
"question": result.get("question", "No Question"),
"answer": result.get("answer", "No Answer")
})
except Exception as e:
print(f"Error during FAQ lookup: {e}")
results.append({"error": f"Failed to perform FAQ search: {e}"})
return results
@staticmethod
async def reset_password(username: str) -> str:
"""
Simulates the process of initiating a password reset for a given username.
This would typically involve an internal system API call.
"""
print(f"Executing password reset for: {username}")
await asyncio.sleep(2)
return f"A password reset link has been sent to the registered email for user '{username}'. Please check your inbox."
@staticmethod
async def check_order_status(order_id: str) -> str:
"""
Checks the status of a specific order using its order ID.
This would typically integrate with an order management system.
"""
print(f"Executing order status check for order ID: {order_id}")
await asyncio.sleep(1.5)
# Simulate lookup
if order_id == "ORD123":
return "Order ORD123: Shipped, expected delivery tomorrow."
elif order_id == "ORD456":
return "Order ORD456: Processing, estimated ship date in 3-5 business days."
return f"Order ID {order_id} not found or invalid."
@staticmethod
async def escalate_to_human(issue_description: str, user_contact: str) -> str:
"""
Escalates the current customer inquiry to a human support agent.
Provides a summary of the issue and user contact information.
"""
print(f"Escalating to human: {issue_description} for {user_contact}")
await asyncio.sleep(1)
return f"Your issue '{issue_description}' has been escalated. A human agent will contact you shortly at {user_contact}."
Step 2: Create the Customer Support Agent
This agent will combine the CustomerSupportTools, Redis for memory, and Azure OpenAI for reasoning.
# customer_support_agent.py
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis
load_dotenv()
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI
from support_tools import CustomerSupportTools # Our new support tools
# --- Configuration (from .env) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")
AZURE_SEARCH_API_KEY = os.getenv("AZURE_SEARCH_API_KEY")
AZURE_SEARCH_FAQ_INDEX_NAME = os.getenv("AZURE_SEARCH_FAQ_INDEX_NAME", "customer-faqs") # New index for FAQs
# --- Initialize external services ---
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
raise ValueError("Missing Azure OpenAI environment variables.")
try:
aoai_client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version="2024-02-15-preview",
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
print("Successfully connected to Azure OpenAI.")
except Exception as e:
print(f"Error connecting to Azure OpenAI: {e}")
exit(1)
try:
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
decode_responses=True
)
async def _test_redis_connection():
await redis_client.ping()
print("Successfully connected to Redis.")
asyncio.run(_test_redis_connection())
except Exception as e:
print(f"Error connecting to Redis: {e}")
redis_client = None
try:
customer_support_tools = CustomerSupportTools(
search_endpoint=AZURE_SEARCH_ENDPOINT,
search_api_key=AZURE_SEARCH_API_KEY,
search_index_name=AZURE_SEARCH_FAQ_INDEX_NAME
)
except Exception as e:
print(f"Error initializing CustomerSupportTools: {e}")
customer_support_tools = None
# --- Tool Definitions for LLM ---
def get_support_tool_schemas():
schemas = []
# FAQ Lookup is always available if search is initialized
if customer_support_tools:
schemas.append({
"type": "function",
"function": {
"name": "faq_lookup",
"description": "Searches the customer support knowledge base for answers to frequently asked questions about products, services, or policies.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The user's question or keywords to search in the FAQs."
}
},
"required": ["query"]
}
}
})
# Specific troubleshooting tools
schemas.extend([
{
"type": "function",
"function": {
"name": "reset_password",
"description": "Initiates a password reset process for a user. Requires the user's exact username.",
"parameters": {
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "The username for which to reset the password."
}
},
"required": ["username"]
}
}
},
{
"type": "function",
"function": {
"name": "check_order_status",
"description": "Checks the current status of a customer's order. Requires the exact order ID.",
"parameters": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The unique identifier of the order."
}
},
"required": ["order_id"]
}
}
},
{
"type": "function",
"function": {
"name": "escalate_to_human",
"description": "Escalates the current customer issue to a human support agent. Provide a summary of the problem and the user's contact information.",
"parameters": {
"type": "object",
"properties": {
"issue_description": {
"type": "string",
"description": "A concise description of the issue the customer is facing."
},
"user_contact": {
"type": "string",
"description": "The customer's email or phone number for follow-up."
}
},
"required": ["issue_description", "user_contact"]
}
}
}
])
return schemas
# --- Agent State & Storage ---
class RedisChatHistoryStorage(MemoryStorage): # ... (omitted, same as before)
def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
self.redis_client = redis_client
self.prefix = prefix
async def _get_key(self, conversation_id: str) -> str:
return f"{self.prefix}{conversation_id}"
async def read(self, conversation_id: str) -> dict:
key = await self._get_key(conversation_id)
raw_history = await self.redis_client.get(key)
if raw_history:
return json.loads(raw_history)
return {"history": []}
async def write(self, conversation_id: str, data: dict):
key = await self._get_key(conversation_id)
await self.redis_client.set(key, json.dumps(data))
class CustomerSupportAgentState(TurnState):
history: List[Dict[str, Any]] = []
conversation_id: str = "default_conversation"
storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()
SUPPORT_AGENT_APP = AgentApplication[CustomerSupportAgentState](
storage=storage_instance,
adapter=CloudAdapter()
)
async def _load_history(context: TurnContext, state: CustomerSupportAgentState):
if context.activity and context.activity.conversation and context.activity.conversation.id:
state.conversation_id = context.activity.conversation.id
if redis_client:
stored_data = await storage_instance.read(state.conversation_id)
state.history = stored_data.get("history", [])
print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")
async def _save_history(context: TurnContext, state: CustomerSupportAgentState):
if redis_client and state.conversation_id:
await storage_instance.write(state.conversation_id, {"history": state.history})
print(f"Saved history for {state.conversation_id}: {len(state.history)} items")
SUPPORT_AGENT_APP.on_before_activity(_load_history)
SUPPORT_AGENT_APP.on_after_activity(_save_history)
async def _support_welcome(context: TurnContext, state: CustomerSupportAgentState):
welcome_message = (
"Hello! I am your AI Customer Support Assistant. "
"I can help with FAQs, order status, password resets, or escalate to a human. "
"How can I assist you today?"
)
await context.send_activity(welcome_message)
state.history.append({"role": "assistant", "content": welcome_message})
SUPPORT_AGENT_APP.conversation_update("membersAdded")(_support_welcome)
SUPPORT_AGENT_APP.message("/help")(_support_welcome)
@SUPPORT_AGENT_APP.activity("message")
async def on_support_message(context: TurnContext, state: CustomerSupportAgentState):
user_message = context.activity.text
state.history.append({"role": "user", "content": user_message})
system_prompt = (
"You are an AI Customer Support Assistant for a tech company. "
"Your goal is to assist users efficiently and politely. "
"Use the provided tools to answer questions, perform actions, or escalate. "
"When using `escalate_to_human`, always ask for user contact information (email or phone) if not provided, "
"and get a clear summary of the issue before escalating."
"Remember to be helpful and empathetic."
)
messages_for_llm = [
{"role": "system", "content": system_prompt},
*state.history
]
try:
response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=messages_for_llm,
tools=get_support_tool_schemas(),
tool_choice="auto"
)
assistant_message = response.choices[0].message
if assistant_message.content:
state.history.append({"role": "assistant", "content": assistant_message.content})
# Send initial non-tool response
await context.send_activity(assistant_message.content)
if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Agent wants to call tool: {function_name} with args: {arguments}")
tool_output = None
if function_name == "faq_lookup" and customer_support_tools:
query = arguments.get("query")
docs = await customer_support_tools.faq_lookup(query)
tool_output = "\n".join([f"Q: {d['question']}\nA: {d['answer']}" for d in docs])
if not docs:
tool_output = "No relevant FAQ found."
elif function_name == "reset_password" and customer_support_tools:
username = arguments.get("username")
tool_output = await customer_support_tools.reset_password(username)
elif function_name == "check_order_status" and customer_support_tools:
order_id = arguments.get("order_id")
tool_output = await customer_support_tools.check_order_status(order_id)
elif function_name == "escalate_to_human" and customer_support_tools:
issue = arguments.get("issue_description")
contact = arguments.get("user_contact")
tool_output = await customer_support_tools.escalate_to_human(issue, contact)
else:
tool_output = f"Tool '{function_name}' is not available or unrecognized."
state.history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": str(tool_output)
})
# Call LLM again with tool output for final response
final_response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=state.history
)
final_assistant_message_content = final_response.choices[0].message.content
state.history.append({"role": "assistant", "content": final_assistant_message_content})
await context.send_activity(final_assistant_message_content)
# If LLM had no tool calls, but still provided content, that was already sent above
# If no content and no tool calls, it's an issue with LLM response
elif not assistant_message.content:
error_msg = "I'm having trouble understanding. Could you please rephrase?"
await context.send_activity(error_msg)
state.history.append({"role": "assistant", "content": error_msg})
except Exception as e:
print(f"Error in Customer Support Agent: {e}")
error_msg = "An unexpected error occurred in the customer support agent. Please try again later."
await context.send_activity(error_msg)
state.history.append({"role": "assistant", "content": error_msg})
# --- Server Setup (standard MAF Aiohttp server) ---
def start_server(agent_application: AgentApplication):
async def entry_point(req: Request) -> Response:
agent: AgentApplication = req.app["agent_app"]
adapter: CloudAdapter = req.app["adapter"]
return await start_agent_process(req, agent, adapter)
APP = Application()
APP.router.add_post("/api/messages", entry_point)
APP.router.add_get("/api/messages", lambda _: Response(status=200))
APP["agent_app"] = agent_application
APP["adapter"] = agent_application.adapter
try:
port = int(os.environ.get("PORT", 3978))
print(f"======== Running on http://localhost:{port} (Customer Support Agent) ========")
print("(Press CTRL+C to quit)")
run_app(APP, host="localhost", port=port)
except Exception as error:
print(f"Error starting server: {error}")
raise error
if __name__ == "__main__":
start_server(SUPPORT_AGENT_APP)
To Run Project 1:
- Azure Cognitive Search for FAQs: Create an Azure Cognitive Search instance and an index (e.g.,
customer-faqs). Populate it with some Q&A documents.- Example data for
customer-faqsindex (assumingquestionandanswerfields):[ {"question": "How do I update my billing information?", "answer": "You can update your billing information by logging into your account settings on our website under the 'Billing' section."}, {"question": "What are your return policies?", "answer": "We offer a 30-day money-back guarantee on all products. Items must be returned in their original condition. See our full policy page for details."}, {"question": "My product isn't working.", "answer": "Please describe the issue in more detail, and I can try to help you troubleshoot, or you can request a human agent for technical support."}, {"question": "What is the warranty for Product X?", "answer": "Product X comes with a 1-year limited warranty covering manufacturing defects. Extended warranty options are available."} ]
- Example data for
- Configure your
.envfor Azure OpenAI, Redis, and Azure Cognitive Search (AZURE_SEARCH_FAQ_INDEX_NAME). - Save
support_tools.pyandcustomer_support_agent.py. - Run:
python customer_support_agent.py - Interact using
teamsapptester(or a custom client):- “How do I update my billing info?” (Should use
faq_lookup) - “My product is broken. Can you help me?” (Should ask for more details or suggest escalation)
- “I need to reset my password for user ‘john.doe’.” (Should use
reset_password) - “What is the status of my order ORD123?” (Should use
check_order_status) - “I need to talk to someone, my issue is complicated. My email is user@example.com.” (Should use
escalate_to_human)
- “How do I update my billing info?” (Should use
Project 2: Migrating a LangGraph Agent to Microsoft Agent Framework
Objective: Take an existing LangGraph-based “Simple Travel Planner Agent” and integrate its core logic into the Microsoft Agent Framework, leveraging MAF’s hosting, memory, and tool integration.
Problem Statement: An existing LangGraph travel planner needs to be deployed within an enterprise context, benefiting from MAF’s enterprise-grade features like standardized hosting, observability, and authentication, without a complete rewrite of the travel planning logic.
Technologies Used:
- LangGraph (for the original agent logic)
- Microsoft Agent Framework (Python SDK)
- Azure OpenAI (for LLM)
- Redis (for MAF chat history)
- FastAPI (to expose the LangGraph core as a microservice/tool if needed, or directly as a Python callable).
Approach: We will encapsulate the LangGraph agent’s core planning and execution into a callable Python function. This function will then be exposed as a “tool” to a MAF agent. The MAF agent will use its LLM to decide when to call this travel planning tool.
Step 0: LangGraph Agent (Existing)
First, let’s define our existing LangGraph agent. For simplicity, we’ll create a very basic one.
# langgraph_travel_planner.py
import operator
from typing import TypedDict, Annotated, List, Union
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, END
from langchain_openai import AzureChatOpenAI # Assuming AOAI for LLM
# --- LangGraph State ---
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
destination: str
dates: str
# Other potential fields like booking_confirmed, flight_options, hotel_options
# --- LangGraph Tools (for the original LangGraph agent) ---
# In LangGraph, you'd define tools for specific tasks.
# For our *existing* LangGraph agent, let's say it had a dummy flight search.
class LangGraphTravelTools:
@staticmethod
def get_dummy_flights(destination: str, dates: str) -> str:
"""
Retrieves dummy flight options for a given destination and dates.
"""
print(f"LangGraph tool: Searching dummy flights for {destination} on {dates}")
return f"Dummy flights to {destination} on {dates}: Flight LH456 (08:00 AM) - $500, Flight BA789 (11:00 AM) - $650."
# --- LangGraph Nodes ---
# LLM Node
def call_llm(state: AgentState):
llm = AzureChatOpenAI(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
api_version="2024-02-15-preview"
)
response = llm.invoke(state["messages"])
return {"messages": [response]}
# Tool Node (Executes a dummy tool call for LangGraph)
def call_tool(state: AgentState):
last_message = state["messages"][-1]
tool_name = last_message.tool_calls[0].function.name
tool_args = last_message.tool_calls[0].function.arguments
if tool_name == "get_dummy_flights":
args = json.loads(tool_args)
result = LangGraphTravelTools.get_dummy_flights(args["destination"], args["dates"])
return {"messages": [HumanMessage(content=result, name=tool_name)]}
return {"messages": [HumanMessage(content=f"Unknown tool: {tool_name}", name="tool_error")]}
# --- LangGraph Logic (Planner/Router) ---
def should_continue(state: AgentState):
last_message = state["messages"][-1]
if not last_message.tool_calls:
return "end" # No tool calls, we're done with agent
return "continue" # Tool calls, continue
# --- Build LangGraph ---
def build_langgraph_travel_planner():
# Define the agent's function-calling tool
tools = [
LangGraphTravelTools.get_dummy_flights
]
# Bind tools to the LLM (specific to LangChain/LangGraph)
llm = AzureChatOpenAI(
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
api_version="2024-02-15-preview"
).bind_tools(tools) # Here we bind the tools
workflow = StateGraph(AgentState)
workflow.add_node("llm", call_llm)
workflow.add_node("action", call_tool)
workflow.set_entry_point("llm")
workflow.add_conditional_edges(
"llm",
should_continue,
{
"continue": "action",
"end": END
}
)
workflow.add_edge("action", "llm") # After tool call, go back to LLM to summarize/plan next
app = workflow.compile()
return app, llm # Return app and the LLM with bound tools for MAF to use as reference
# --- Wrapper function to invoke LangGraph ---
async def invoke_langgraph_planner(destination: str, dates: str, initial_messages: List[BaseMessage] = None) -> str:
"""
Invokes the LangGraph travel planner to generate an itinerary.
"""
print(f"Invoking LangGraph Planner for {destination} on {dates}")
graph_app, llm_with_tools = build_langgraph_travel_planner() # Re-build graph for each invocation for simplicity
# We need to construct the prompt that will make LangGraph use its tools.
# This involves defining tools for the LLM specifically, not MAF's tools.
prompt_with_tools = [
{"role": "system", "content": f"""You are a travel planner. Plan a trip to {destination} on {dates}. Use the 'get_dummy_flights' tool to find flight options. Respond with a summary of the plan."""},
{"role": "user", "content": f"Plan a trip to {destination} on {dates}"}
]
# LangGraph's state expects BaseMessage objects.
# Convert our simple messages to HumanMessage for LangGraph entry.
initial_messages = [HumanMessage(content=msg["content"]) if msg["role"] == "user" else HumanMessage(content=msg["content"]) for msg in prompt_with_tools]
inputs = {"messages": initial_messages, "destination": destination, "dates": dates}
try:
# Run the LangGraph application
final_state = await graph_app.ainvoke(inputs)
# Extract the final message from the LangGraph run
final_message = final_state["messages"][-1].content
return final_message
except Exception as e:
print(f"Error running LangGraph: {e}")
return f"LangGraph travel planner encountered an error: {e}"
Step 1: Expose LangGraph Logic as a MAF-Compatible Tool
We’ll define a new tool class that internally calls our invoke_langgraph_planner function.
# maf_langgraph_integration_tools.py
import asyncio
from typing import Dict, Any, List
# Import our LangGraph invocation function
from langgraph_travel_planner import invoke_langgraph_planner
class LangGraphIntegrationTools:
@staticmethod
async def plan_travel_with_langgraph(destination: str, dates: str) -> str:
"""
Uses an external LangGraph agent to plan a travel itinerary,
including finding flight options.
"""
print(f"MAF Tool: Calling LangGraph agent to plan travel for {destination} on {dates}")
# Call the async LangGraph wrapper function
result = await invoke_langgraph_planner(destination, dates)
return result
@staticmethod
async def get_booking_status(booking_id: str) -> str:
"""
Retrieves the status of a specific travel booking. (Dummy tool)
"""
print(f"MAF Tool: Checking booking status for ID: {booking_id}")
await asyncio.sleep(0.5)
if booking_id == "TRV789":
return "Booking TRV789: Confirmed, Flights booked, Hotel pending confirmation."
return f"Booking ID {booking_id} not found."
Step 2: Create a MAF Agent to Orchestrate LangGraph
This MAF agent will use Azure OpenAI to decide when to call the plan_travel_with_langgraph tool.
# maf_travel_agent.py (Integrating LangGraph)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis
load_dotenv()
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI
from maf_langgraph_integration_tools import LangGraphIntegrationTools # Our MAF-compatible tools
# --- Configuration (from .env) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
# --- Initialize external services ---
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
raise ValueError("Missing Azure OpenAI environment variables.")
try:
aoai_client = AzureOpenAI(
api_key=AZURE_OPENAI_API_KEY,
api_version="2024-02-15-preview",
azure_endpoint=AZURE_OPENAI_ENDPOINT,
)
print("Successfully connected to Azure OpenAI.")
except Exception as e:
print(f"Error connecting to Azure OpenAI: {e}")
exit(1)
try:
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
decode_responses=True
)
async def _test_redis_connection():
await redis_client.ping()
print("Successfully connected to Redis.")
asyncio.run(_test_redis_connection())
except Exception as e:
print(f"Error connecting to Redis: {e}")
redis_client = None
# Initialize the MAF-compatible tools instance
langgraph_integration_tools_instance = LangGraphIntegrationTools()
# --- Tool Definitions for MAF LLM ---
def get_maf_travel_tool_schemas():
return [
{
"type": "function",
"function": {
"name": "plan_travel_with_langgraph",
"description": "Plans a detailed travel itinerary for a destination and dates, leveraging an existing LangGraph travel planning agent.",
"parameters": {
"type": "object",
"properties": {
"destination": {
"type": "string",
"description": "The desired travel destination."
},
"dates": {
"type": "string",
"description": "The travel dates (e.g., 'next week', 'July 2025')."
}
},
"required": ["destination", "dates"]
}
}
},
{
"type": "function",
"function": {
"name": "get_booking_status",
"description": "Retrieves the status of a specific travel booking.",
"parameters": {
"type": "object",
"properties": {
"booking_id": {
"type": "string",
"description": "The unique identifier of the travel booking."
}
},
"required": ["booking_id"]
}
}
}
]
# --- Agent State & Storage (same as previous Redis examples) ---
class RedisChatHistoryStorage(MemoryStorage): # ... (omitted)
def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
self.redis_client = redis_client
self.prefix = prefix
async def _get_key(self, conversation_id: str) -> str:
return f"{self.prefix}{conversation_id}"
async def read(self, conversation_id: str) -> dict:
key = await self._get_key(conversation_id)
raw_history = await self.redis_client.get(key)
if raw_history:
return json.loads(raw_history)
return {"history": []}
async def write(self, conversation_id: str, data: dict):
key = await self._get_key(conversation_id)
await self.redis_client.set(key, json.dumps(data))
class MAFTravelAgentState(TurnState):
history: List[Dict[str, Any]] = []
conversation_id: str = "default_conversation"
storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()
MAF_TRAVEL_AGENT_APP = AgentApplication[MAFTravelAgentState](
storage=storage_instance,
adapter=CloudAdapter()
)
async def _load_history(context: TurnContext, state: MAFTravelAgentState):
if context.activity and context.activity.conversation and context.activity.conversation.id:
state.conversation_id = context.activity.conversation.id
if redis_client:
stored_data = await storage_instance.read(state.conversation_id)
state.history = stored_data.get("history", [])
print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")
async def _save_history(context: TurnContext, state: MAFTravelAgentState):
if redis_client and state.conversation_id:
await storage_instance.write(state.conversation_id, {"history": state.history})
print(f"Saved history for {state.conversation_id}: {len(state.history)} items")
MAF_TRAVEL_AGENT_APP.on_before_activity(_load_history)
MAF_TRAVEL_AGENT_APP.on_after_activity(_save_history)
async def _maf_travel_welcome(context: TurnContext, state: MAFTravelAgentState):
welcome_message = (
"Hello! I am your AI Travel Agent, enhanced by a LangGraph planner. "
"I can plan trips and check booking statuses. "
"Try asking me to 'Plan a trip to Paris for next summer' or 'Check my booking TRV789'."
)
await context.send_activity(welcome_message)
state.history.append({"role": "assistant", "content": welcome_message})
MAF_TRAVEL_AGENT_APP.conversation_update("membersAdded")(_maf_travel_welcome)
MAF_TRAVEL_AGENT_APP.message("/help")(_maf_travel_welcome)
@MAF_TRAVEL_AGENT_APP.activity("message")
async def on_maf_travel_message(context: TurnContext, state: MAFTravelAgentState):
user_message = context.activity.text
state.history.append({"role": "user", "content": user_message})
system_prompt = (
"You are an AI Travel Planner. Your primary goal is to help users plan trips. "
"Use the 'plan_travel_with_langgraph' tool for detailed trip planning, "
"and 'get_booking_status' to check existing bookings. "
"Extract 'destination' and 'dates' for trip planning. "
"Be helpful and confirm details before initiating planning or checking status."
)
messages_for_llm = [
{"role": "system", "content": system_prompt},
*state.history
]
try:
response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=messages_for_llm,
tools=get_maf_travel_tool_schemas(),
tool_choice="auto"
)
assistant_message = response.choices[0].message
if assistant_message.content:
state.history.append({"role": "assistant", "content": assistant_message.content})
await context.send_activity(assistant_message.content)
if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"MAF Agent wants to call tool: {function_name} with args: {arguments}")
tool_output = None
if function_name == "plan_travel_with_langgraph":
destination = arguments.get("destination")
dates = arguments.get("dates")
# This calls our wrapper, which in turn invokes LangGraph
tool_output = await langgraph_integration_tools_instance.plan_travel_with_langgraph(destination, dates)
elif function_name == "get_booking_status":
booking_id = arguments.get("booking_id")
tool_output = await langgraph_integration_tools_instance.get_booking_status(booking_id)
else:
tool_output = f"Unknown tool: {function_name}"
state.history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": str(tool_output)
})
# Call LLM again with tool output for final response
final_response = await aoai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_NAME,
messages=state.history
)
final_assistant_message_content = final_response.choices[0].message.content
state.history.append({"role": "assistant", "content": final_assistant_message_content})
await context.send_activity(final_assistant_message_content)
elif not assistant_message.content:
error_msg = "I'm having trouble understanding. Could you please rephrase?"
await context.send_activity(error_msg)
state.history.append({"role": "assistant", "content": error_msg})
except Exception as e:
print(f"Error in MAF Travel Agent: {e}")
error_msg = "An unexpected error occurred in the MAF travel agent. Please try again later."
await context.send_activity(error_msg)
state.history.append({"role": "assistant", "content": error_msg})
# --- Server Setup (standard MAF Aiohttp server) ---
def start_server(agent_application: AgentApplication):
async def entry_point(req: Request) -> Response:
agent: AgentApplication = req.app["agent_app"]
adapter: CloudAdapter = req.app["adapter"]
return await start_agent_process(req, agent, adapter)
APP = Application()
APP.router.add_post("/api/messages", entry_point)
APP.router.add_get("/api/messages", lambda _: Response(status=200))
APP["agent_app"] = agent_application
APP["adapter"] = agent_application.adapter
try:
port = int(os.environ.get("PORT", 3978))
print(f"======== Running on http://localhost:{port} (MAF Travel Agent) ========")
print("(Press CTRL+C to quit)")
run_app(APP, host="localhost", port=port)
except Exception as error:
print(f"Error starting server: {error}")
raise error
if __name__ == "__main__":
start_server(MAF_TRAVEL_AGENT_APP)
To Run Project 2:
- Configure your
.envfor Azure OpenAI and Redis. - Ensure you have LangChain installed:
pip install langchain_core langchain_openai langgraph - Save
langgraph_travel_planner.py,maf_langgraph_integration_tools.py, andmaf_travel_agent.py. - Run:
python maf_travel_agent.py - Interact using
teamsapptester(or a custom client):- “Plan a trip to Rome for Christmas.” (Should trigger
plan_travel_with_langgraph, which calls LangGraph) - “Check my travel booking TRV789.” (Should trigger
get_booking_status)
- “Plan a trip to Rome for Christmas.” (Should trigger
How this migration benefits:
- Centralized Hosting & Management: The entire agent (including the LangGraph part) is now hosted and managed under a unified MAF application.
- Unified Memory: MAF’s Redis-backed memory handles the overall conversation history, encompassing interactions with the LangGraph sub-agent.
- Consistent Tooling: All tools, including the LangGraph “tool,” are exposed and managed in a consistent manner within MAF.
- Enterprise Integration: The MAF framework can then easily integrate other enterprise features like authentication (Entra ID), detailed observability, and integration with Azure AI Foundry services without modifying the core LangGraph logic. The LangGraph agent is effectively an isolated “black box” capability consumed by the MAF orchestrator.
6. Bonus Section: Further Learning and Resources
You’ve made significant progress! The Microsoft Agent Framework is a rapidly evolving area. Here are resources to continue your learning journey:
Recommended Online Courses/Tutorials
- Microsoft Learn Modules: Keep an eye on the official Microsoft Learn platform. New modules and tutorials specifically for the Microsoft Agent Framework (both .NET and Python) will be continuously released. Search for “Microsoft Agent Framework tutorial” on Microsoft Learn.
- Azure AI Fundamentals Certification: While not specific to MAF, understanding core Azure AI services (Azure OpenAI, Azure Cognitive Search) is highly beneficial.
- LangChain/LangGraph Tutorials: If you’re interested in building complex agentic graphs, LangChain’s official documentation and tutorials are excellent. The MAF often takes inspiration from or integrates with these concepts.
Official Documentation
- Microsoft Agent Framework GitHub Repository: This is the primary source for the open-source code, samples, and the most up-to-date information.
- https://github.com/microsoft/agent-framework (Main repo)
- https://github.com/microsoft/Agents-for-python (Specific to Python SDK)
- Azure AI Foundry Blog: Provides announcements, best practices, and design patterns for agentic AI on Azure.
- Microsoft Azure AI Documentation: General documentation for all Azure AI services, which integrate heavily with MAF.
Blogs and Articles
- Microsoft AI Blog: Stay updated with the latest AI news and announcements from Microsoft.
- Microsoft Tech Community Blogs: Often contains deep dives and practical guides from Microsoft engineers and MVPs.
- Leading AI/ML Publications: Follow popular blogs like Towards Data Science, The Batch (DeepLearning.AI), and InfoQ for broader industry trends in AI agents.
YouTube Channels
- Microsoft Developer: Official channel with tutorials, announcements, and talks on Microsoft technologies, including AI.
- Azure: Official Azure channel for cloud-specific AI content.
- FreeCodeCamp.org / Traversy Media / Tech with Tim: Excellent channels for Python, FastAPI, and general programming tutorials that complement AI development.
Community Forums/Groups
- GitHub Issues/Discussions: The GitHub repositories for Microsoft Agent Framework, Semantic Kernel, and AutoGen are active places for questions, bug reports, and discussions.
- Stack Overflow: Use relevant tags like
microsoft-agent-framework,azure-openai,pythonfor technical questions. - Discord Servers: Look for official Microsoft AI communities or independent AI development servers.
Next Steps/Advanced Topics
- Explore Different Orchestration Patterns: Dive deeper into concurrent, group chat, and “magnetic” orchestration. The Python SDK will likely expand its declarative workflow builders.
- Advanced Memory Management: Implement vector stores (e.g., Azure AI Search vector store, Pinecone, Qdrant) for more sophisticated RAG scenarios and long-term knowledge retention.
- Observability & Monitoring: Integrate with OpenTelemetry for comprehensive tracing of agent interactions, tool calls, and LLM usage. Learn how to connect this to Azure Monitor or other APM tools.
- Deployment Strategies: Learn how to deploy MAF agents to Azure Container Apps, Azure Kubernetes Service, or Azure Functions for scalable and resilient production environments.
- Human-in-the-Loop (HITL): Implement mechanisms for human review and intervention in critical agent workflows, such as approval steps for sensitive actions.
- Custom LLM Integration: Learn how to integrate other LLM providers (e.g., custom hosted models, local models like Ollama) using the
IChatClientabstraction. - Multi-Modal Agents: Explore integrating vision, speech, and other modalities into your agents.
- Security Best Practices: Deepen your understanding of AI security, prompt injection prevention, data governance, and compliance within agentic systems.
By continuously experimenting and building, you’ll master the Microsoft Agent Framework and be at the forefront of the AI agent revolution!