Mastering the Microsoft Agent Framework: A Comprehensive Learning Guide

Mastering the Microsoft Agent Framework: A Comprehensive Learning Guide

Welcome to the exciting world of AI agents! This document is designed to be your comprehensive guide to the Microsoft Agent Framework, a powerful, open-source SDK and runtime that simplifies the creation, deployment, and management of intelligent AI agents and complex multi-agent systems. Whether you’re a seasoned developer looking to dive into agentic AI or a complete beginner, this guide will walk you through everything you need to know, from the foundational concepts to building sophisticated, production-ready applications.

1. Introduction to Microsoft Agent Framework

What is the Microsoft Agent Framework?

The Microsoft Agent Framework is an open-source SDK and runtime for both .NET and Python, designed to be the unified foundation for building sophisticated AI agents. It effectively merges the strengths of two previously distinct Microsoft AI projects:

  1. Semantic Kernel: Known for its stable SDK, robust enterprise connectors, structured workflows, and advanced observability features.
  2. AutoGen: A research-driven framework that pioneered innovative multi-agent orchestration patterns and collaborative AI techniques.

The Microsoft Agent Framework (MAF) aims to address the challenges developers face in moving AI agent prototypes to production. It provides a consistent developer experience, offering tools to:

  • Build individual agents with minimal boilerplate.
  • Orchestrate complex multi-agent workflows with ease.
  • Host and Deploy agents using familiar development patterns.
  • Monitor and Manage agent behavior in production environments.

In essence, MAF allows you to create autonomous software components that can reason about goals, call tools and APIs, collaborate with other agents, and dynamically adapt to achieve objectives.

Why Learn the Microsoft Agent Framework?

Learning MAF offers significant benefits for developers and organizations venturing into agentic AI:

  • Unified Platform: No longer choose between experimental innovation (AutoGen) and enterprise stability (Semantic Kernel). MAF offers the best of both worlds in a single framework.
  • Open Standards & Interoperability: Built with open standards like Model Context Protocol (MCP), Agent-to-Agent (A2A), and OpenAPI, ensuring your agents are portable and can communicate across different frameworks and cloud environments. This is crucial for integrating with existing systems and avoiding vendor lock-in.
  • Research-to-Production Pipeline: Get access to cutting-edge orchestration patterns directly from Microsoft Research, hardened for enterprise use with features like durability, governance, and performance.
  • Enterprise Readiness: Out-of-the-box support for critical enterprise requirements:
    • Observability: Built-in telemetry, logging, and OpenTelemetry contributions for standardized tracing.
    • Security & Compliance: Microsoft Entra ID integration for agent identities, robust security features, compliance hooks (task adherence checks, prompt shields, PII detection), and approval flows for sensitive actions.
    • Durability: Agents can pause, resume, and recover from interruptions, supporting long-running workflows.
    • Memory Management: Pluggable memory modules (Redis, Pinecone, Postgres, etc.) for persistent context and personalized experiences.
  • Developer Experience: Offers a consistent, simplified experience across .NET and Python, with rich samples, VS Code integration, and the flexibility to swap AI models/providers without code changes.
  • Wide Use Cases: Enables the creation of powerful solutions for:
    • Automated customer service (intent triage, specialist agents)
    • Intelligent content generation (writing, editing, SEO)
    • Complex enterprise automation (sales proposals, compliance, IT support)
    • Research and data analysis
    • Personalized assistants (travel planning, budget management)

The AI agent market is projected for significant growth, and MAF positions you to build robust solutions that drive real business impact.

Setting Up Your Development Environment

To follow along with this guide, you’ll need to set up your environment. We will focus on Python, as it’s highly popular for AI/ML development.

Prerequisites:

  1. Python 3.9+: Ensure you have Python installed. You can download it from python.org.
    • Verify installation: python --version or python3 --version
  2. pip: Python’s package installer. It usually comes with Python.
    • Verify installation: pip --version or pip3 --version
  3. Virtual Environment (Recommended): Always use a virtual environment to manage project dependencies.
  4. Code Editor: Visual Studio Code is highly recommended due to excellent Python and AI tooling support.
    • Install the Python extension for VS Code.
    • Consider the VS Code AI Toolkit for streamlined agent development (currently more C#-focused but expanding).
  5. Azure Account (Optional but Recommended for Full Features): Many enterprise features like Azure OpenAI, Azure Cognitive Search, and Microsoft Entra ID require an Azure subscription.
  6. GitHub Personal Access Token (PAT): If you plan to use GitHub-hosted models (for early exploration or specific scenarios), you’ll need a PAT with models scope. Create one in your GitHub settings.

Step-by-Step Environment Setup:

  1. Create a Project Directory:

    mkdir my-maf-project
    cd my-maf-project
    
  2. Create and Activate a Virtual Environment:

    python -m venv .venv
    # On Windows:
    # .venv\Scripts\activate
    # On macOS/Linux:
    source .venv/bin/activate
    

    (You’ll see (.venv) prefix in your terminal after activation).

  3. Install Microsoft Agent Framework SDKs: The Python SDK for Microsoft Agent Framework is typically distributed in modular packages. Based on current information, you’ll install core components. The microsoft-agents-hosting-aiohttp package provides the necessary server components for an agent.

    pip install microsoft-agents-hosting-aiohttp python-dotenv openai azure-identity azure-core redis langchain
    
    • microsoft-agents-hosting-aiohttp: Core SDK for hosting agents using aiohttp.
    • python-dotenv: For managing environment variables.
    • openai: For interacting with OpenAI/Azure OpenAI models.
    • azure-identity: For Azure authentication.
    • azure-core: Core Azure library.
    • redis: For Redis connections (memory/history).
    • langchain: We’ll include this for our migration project, as MAF and LangChain can interoperate.
  4. Set Environment Variables (e.g., for Azure OpenAI): Create a .env file in your project root and add your credentials.

    # .env
    # Azure OpenAI Service details
    AZURE_OPENAI_API_KEY="YOUR_AZURE_OPENAI_API_KEY"
    AZURE_OPENAI_ENDPOINT="https://YOUR_AOAI_RESOURCE_NAME.openai.azure.com/"
    AZURE_OPENAI_DEPLOYMENT_NAME="YOUR_GPT_MODEL_DEPLOYMENT_NAME" # e.g., gpt-4o-mini-deployment
    
    # Redis connection (optional, for memory)
    REDIS_HOST="localhost"
    REDIS_PORT="6379"
    REDIS_PASSWORD="YOUR_REDIS_PASSWORD" # If you have one
    
    # Azure Cognitive Search (optional, for tool/RAG)
    AZURE_SEARCH_ENDPOINT="https://YOUR_AZURE_SEARCH_SERVICE.search.windows.net"
    AZURE_SEARCH_API_KEY="YOUR_AZURE_SEARCH_API_KEY"
    AZURE_SEARCH_INDEX_NAME="YOUR_SEARCH_INDEX"
    
    # Microsoft Entra ID (for Auth) - if using for service principals
    # AZURE_TENANT_ID="YOUR_TENANT_ID"
    # AZURE_CLIENT_ID="YOUR_CLIENT_ID"
    # AZURE_CLIENT_SECRET="YOUR_CLIENT_SECRET"
    

    Important: Replace placeholder values with your actual credentials. Never commit your .env file to version control.

2. Core Concepts and Fundamentals

The Microsoft Agent Framework (MAF) builds upon several key concepts from Semantic Kernel and AutoGen. Understanding these building blocks is crucial for effective agent development.

2.1 Agents

At its core, an Agent in MAF is an autonomous entity capable of accomplishing objectives. Agents are empowered by:

  • Reasoning and Decision-Making: Powered by Large Language Models (LLMs), planning algorithms, or custom logic.
  • Tool Usage: The ability to call external functions, APIs, or services (referred to as “Tools” or “Skills”).
  • Context Awareness: Maintaining state and memory through chat history, threads, vector stores, and enterprise data.

In MAF, the AIAgent is a central abstraction. A common implementation is ChatClientAgent, which wraps an IChatClient (your LLM connector) and provides instructions.

Detailed Explanation: An agent acts on prompts, leveraging its LLM to decide on actions, use tools, and generate responses. The ChatClientAgent is a direct interface to an LLM, allowing you to imbue it with a persona and specific instructions.

Code Example: Your First MAF Agent (Echo Agent with Azure OpenAI)

Let’s create a simple “Echo Agent” that uses Azure OpenAI to process a message and respond.

# echo_agent.py
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Microsoft Agent Framework Imports (Conceptual/Based on .NET structure)
# Note: The Python SDK is still evolving. This uses currently available modules
# or conceptually aligns with the .NET MAF structure and general Python SDK patterns.
from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process, jwt_authorization_middleware
from aiohttp.web import Request, Response, Application, run_app

# OpenAI / Azure OpenAI imports
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential

# --- Configuration for Azure OpenAI ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")

if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
    raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")

# Authenticate with Azure OpenAI
# For production, consider DefaultAzureCredential for managed identities
try:
    aoai_client = AzureOpenAI(
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2024-02-15-preview", # Check latest API version
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
    )
    print("Successfully connected to Azure OpenAI.")
except Exception as e:
    print(f"Error connecting to Azure OpenAI: {e}")
    # Fallback or exit if essential AI service is unavailable
    exit(1)

# A simple wrapper for an LLM client that conforms to MAF agent expectations
class AzureOpenAIChatClient:
    def __init__(self, client, deployment_name):
        self.client = client
        self.deployment_name = deployment_name

    async def complete(self, messages: list[dict], **kwargs) -> str:
        try:
            response = await self.client.chat.completions.create(
                model=self.deployment_name,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error calling Azure OpenAI: {e}")
            return "An error occurred while processing your request."

# Initialize our custom chat client
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)

# --- Define our Agent using MAF Python SDK (Echo Agent) ---
# The AgentApplication is the core entity that handles incoming activities
# and routes them to appropriate handlers (message, conversation_update, etc.)

# In MAF Python SDK, agent logic is often implemented as handlers for TurnContext activities.
# The TurnState can store conversational state.
class EchoAgentState(TurnState):
    # You can add state variables here
    pass

AGENT_APP = AgentApplication[EchoAgentState](
    storage=MemoryStorage(), # Simple in-memory storage for demonstration
    adapter=CloudAdapter()
)

# Handler for new conversations or members added
async def _welcome_and_help(context: TurnContext, _: EchoAgentState):
    await context.send_activity(
        "Hello! I am an Echo Agent powered by Azure OpenAI. "
        "Type /help for help or send me any message to hear it echoed back."
    )

AGENT_APP.conversation_update("membersAdded")(_welcome_and_help)
AGENT_APP.message("/help")(_welcome_and_help)

# Main message handler for the Echo Agent
@AGENT_APP.activity("message")
async def on_message(context: TurnContext, _):
    user_message = context.activity.text
    # Use our Azure OpenAI client to generate a more "intelligent" echo
    # In a real agent, this would involve more complex prompt engineering
    # or tool calls.
    llm_response = await my_chat_client.complete(
        messages=[
            {"role": "system", "content": "You are a helpful assistant. Echo the user's message, but add a friendly remark."},
            {"role": "user", "content": user_message}
        ]
    )
    await context.send_activity(f"You said: '{user_message}'\nAI says: {llm_response}")


# --- FastAPI-like server setup for local testing ---
# This part hosts the agent as an HTTP endpoint, typically for local testing
# or integration with a chatbot frontend.
def start_server(agent_application: AgentApplication):
    """
    Starts an aiohttp web server to host the AgentApplication.
    """
    async def entry_point(req: Request) -> Response:
        agent: AgentApplication = req.app["agent_app"]
        adapter: CloudAdapter = req.app["adapter"]
        return await start_agent_process(req, agent, adapter)

    APP = Application() # middlewares=[jwt_authorization_middleware] for auth
    APP.router.add_post("/api/messages", entry_point)
    APP.router.add_get("/api/messages", lambda _: Response(status=200)) # Health check
    APP["agent_app"] = agent_application
    APP["adapter"] = agent_application.adapter
    try:
        port = int(os.environ.get("PORT", 3978))
        print(f"======== Running on http://localhost:{port} ========")
        print("(Press CTRL+C to quit)")
        run_app(APP, host="localhost", port=port)
    except Exception as error:
        print(f"Error starting server: {error}")
        raise error

if __name__ == "__main__":
    start_server(AGENT_APP)

To Run This Example:

  1. Make sure your .env file is configured with Azure OpenAI details.

  2. Run from your terminal: python echo_agent.py

  3. The agent will start on http://localhost:3978.

  4. To interact: You’ll typically need a client. Microsoft provides the teamsapptester CLI tool for local interaction, or you can use curl for basic POST requests to /api/messages.

    • Using teamsapptester (Node.js required):

      npm install -g @microsoft/teams-app-test-tool
      teamsapptester
      

      This will open a browser window connected to your agent.

    • Using curl (basic POST, won’t show AI response in curl directly as it’s typically an async chat bot flow):

      curl -X POST http://localhost:3978/api/messages \
           -H "Content-Type: application/json" \
           -d '{
                 "type": "message",
                 "text": "Hello, Agent!",
                 "from": {"id": "user1", "name": "User"},
                 "conversation": {"id": "conv1"},
                 "recipient": {"id": "agent1", "name": "EchoAgent"}
               }'
      

      (Note: curl is for demonstrating the endpoint is active. For a full conversational experience, use teamsapptester or a custom client).

Exercises/Mini-Challenges:

  1. Modify the on_message handler to make the AI’s “friendly remark” vary based on the length of the user’s message.
  2. Add a new /status message handler that responds with a fixed message like “Agent is online and ready!”.

2.2 Tools (Skills)

Detailed Explanation: Agents become powerful when they can interact with the outside world. Tools (often called Skills in Semantic Kernel) are functions or APIs that an agent can call to perform specific tasks. This could be searching the web, querying a database, sending an email, or performing a complex calculation. MAF heavily emphasizes “Typed Tool Calls” for safety and ease of use, leveraging OpenAPI for discoverability.

Code Example: Adding a Web Search Tool

Let’s integrate a simple web search tool. We’ll simulate this with a dummy function for now, but show how it would integrate if it were a real web call.

First, let’s create a tools.py file:

# tools.py
import asyncio
from typing import Dict, Any, List

class SearchTool:
    @staticmethod
    async def web_search(query: str) -> List[str]:
        """
        Performs a simulated web search and returns relevant snippets.
        In a real application, this would integrate with a search API (e.g., Bing, Google).
        """
        print(f"Executing web search for: {query}")
        # Simulate an API call
        await asyncio.sleep(1)
        if "weather" in query.lower():
            return ["Current weather in London: 15°C, cloudy. Expected rain tomorrow."]
        elif "capital of france" in query.lower():
            return ["The capital of France is Paris."]
        elif "python agent framework" in query.lower():
            return [
                "Microsoft Agent Framework (MAF) is an open-source SDK for building AI agents.",
                "LangChain and AutoGen are other popular Python agent frameworks.",
                "MAF combines Semantic Kernel and AutoGen capabilities."
            ]
        else:
            return [f"Search results for '{query}': No specific results found, but here's some general info."]

    @staticmethod
    async def get_current_time() -> str:
        """
        Retrieves the current date and time.
        """
        import datetime
        print("Executing get_current_time tool.")
        return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

Now, let’s update echo_agent.py to make use of this tool. The Python SDK uses a Planner concept to enable tool calling, similar to AutoGen’s function calling.

# echo_agent_with_tools.py (Modified from echo_agent.py)
import os
import asyncio
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential

# Import our custom tools
from tools import SearchTool

# --- Configuration (same as before) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")

if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
    raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")

try:
    aoai_client = AzureOpenAI(
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2024-02-15-preview", # Check latest API version
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
    )
    print("Successfully connected to Azure OpenAI.")
except Exception as e:
    print(f"Error connecting to Azure OpenAI: {e}")
    exit(1)

class AzureOpenAIChatClient:
    def __init__(self, client, deployment_name):
        self.client = client
        self.deployment_name = deployment_name

    async def complete(self, messages: list[dict], **kwargs) -> str:
        try:
            response = await self.client.chat.completions.create(
                model=self.deployment_name,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error calling Azure OpenAI: {e}")
            return "An error occurred while processing your request."

my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)

# --- Introduce Planner for Tool Calling ---
# In the MAF Python SDK, explicit tool registration and an LLM-driven planner
# are used to enable tool calling. This is where AutoGen's influence is clear.
# For simplicity, we'll create a basic planner that just decides if a search is needed.
# In a full MAF implementation, this would be handled more abstractly by the framework.

# We'll need a mechanism to describe our tools to the LLM so it knows when to use them.
# This often involves creating "function schemas" for the LLM.

def get_tool_schemas():
    """Returns a list of tool descriptions in OpenAI function call format."""
    return [
        {
            "type": "function",
            "function": {
                "name": "web_search",
                "description": "Performs a web search to find relevant information.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The search query."
                        }
                    },
                    "required": ["query"]
                }
            }
        },
        {
            "type": "function",
            "function": {
                "name": "get_current_time",
                "description": "Retrieves the current date and time.",
                "parameters": {
                    "type": "object",
                    "properties": {},
                    "required": []
                }
            }
        }
    ]

# --- Define our Agent with Tool-Calling Capability ---
class ToolUsingAgentState(TurnState):
    # We can store conversational history
    history: List[Dict[str, Any]] = []

AGENT_APP = AgentApplication[ToolUsingAgentState](
    storage=MemoryStorage(),
    adapter=CloudAdapter()
)

async def _welcome_and_help(context: TurnContext, state: ToolUsingAgentState):
    await context.send_activity(
        "Hello! I am an Agent with web search capabilities. "
        "Ask me questions like 'What is the weather in London?' or 'What is the current time?'"
    )
    state.history.append({"role": "assistant", "content": "Hello! I am an Agent with web search capabilities. Ask me questions like 'What is the weather in London?' or 'What is the current time?'"})

AGENT_APP.conversation_update("membersAdded")(_welcome_and_help)
AGENT_APP.message("/help")(_welcome_and_help)

@AGENT_APP.activity("message")
async def on_message_with_tools(context: TurnContext, state: ToolUsingAgentState):
    user_message = context.activity.text
    state.history.append({"role": "user", "content": user_message})

    messages_for_llm = [
        {"role": "system", "content": "You are a helpful assistant. Use tools if necessary to answer questions."},
        *state.history
    ]

    try:
        response = await aoai_client.chat.completions.create(
            model=AZURE_OPENAI_DEPLOYMENT_NAME,
            messages=messages_for_llm,
            tools=get_tool_schemas(),
            tool_choice="auto" # Let the LLM decide if it wants to call a tool
        )

        assistant_message = response.choices[0].message
        state.history.append(assistant_message)

        if assistant_message.tool_calls:
            for tool_call in assistant_message.tool_calls:
                function_name = tool_call.function.name
                arguments = tool_call.function.arguments

                print(f"Agent wants to call tool: {function_name} with args: {arguments}")

                # Execute the tool
                tool_output = None
                if function_name == "web_search":
                    query = eval(arguments).get("query") # Be careful with eval in production!
                    tool_output = await SearchTool.web_search(query)
                elif function_name == "get_current_time":
                    tool_output = await SearchTool.get_current_time()
                else:
                    tool_output = f"Unknown tool: {function_name}"

                state.history.append({
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": str(tool_output)
                })

                # Call LLM again with tool output
                final_response = await aoai_client.chat.completions.create(
                    model=AZURE_OPENAI_DEPLOYMENT_NAME,
                    messages=state.history
                )
                final_assistant_message = final_response.choices[0].message.content
                state.history.append({"role": "assistant", "content": final_assistant_message})
                await context.send_activity(final_assistant_message)

        else:
            # If no tool call, just respond with the LLM's message
            await context.send_activity(assistant_message.content)

    except Exception as e:
        print(f"Error in tool-using agent: {e}")
        error_msg = "An error occurred while processing your request with tools."
        await context.send_activity(error_msg)
        state.history.append({"role": "assistant", "content": error_msg})


# --- FastAPI-like server setup (same as before) ---
def start_server(agent_application: AgentApplication):
    async def entry_point(req: Request) -> Response:
        agent: AgentApplication = req.app["agent_app"]
        adapter: CloudAdapter = req.app["adapter"]
        return await start_agent_process(req, agent, adapter)

    APP = Application()
    APP.router.add_post("/api/messages", entry_point)
    APP.router.add_get("/api/messages", lambda _: Response(status=200))
    APP["agent_app"] = agent_application
    APP["adapter"] = agent_application.adapter
    try:
        port = int(os.environ.get("PORT", 3978))
        print(f"======== Running on http://localhost:{port} ========")
        print("(Press CTRL+C to quit)")
        run_app(APP, host="localhost", port=port)
    except Exception as error:
        print(f"Error starting server: {error}")
        raise error

if __name__ == "__main__":
    start_server(AGENT_APP)

To Run This Example:

  1. Create tools.py as described.
  2. Save the agent code as tool_agent.py.
  3. Ensure your .env is set up for Azure OpenAI.
  4. Run: python tool_agent.py
  5. Use teamsapptester or a custom client. Try prompts like:
    • “What is the weather in London?”
    • “What is the current time?”
    • “Tell me about Python agent frameworks.”

Exercises/Mini-Challenges:

  1. Add a new tool MathTool with methods for add(a, b) and subtract(a, b). Update the get_tool_schemas and on_message_with_tools to allow the agent to use these math functions.
  2. Modify the web_search tool to return a more diverse set of results based on a predefined list or a more sophisticated keyword matching.

2.3 Memory and Context Management

Detailed Explanation: Agents need memory to maintain conversational context, remember past interactions, and retrieve relevant information. MAF provides a pluggable memory system, allowing integration with various storage solutions (e.g., Redis, vector databases). TurnState is used to manage transient state during a “turn” of interaction, while MemoryStorage or other persistent stores handle long-term memory.

Code Example: Persistent Chat History with Redis

Let’s modify our agent to use Redis for persistent chat history.

First, ensure Redis is installed and running, and your .env has Redis connection details.

# redis_agent.py (Building on tool_agent.py)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis # Use async Redis client

load_dotenv()

from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential

from tools import SearchTool # Our custom tools

# --- Configuration ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")

if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
    raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")

try:
    aoai_client = AzureOpenAI(
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2024-02-15-preview",
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
    )
    print("Successfully connected to Azure OpenAI.")
except Exception as e:
    print(f"Error connecting to Azure OpenAI: {e}")
    exit(1)

# Initialize Redis client
# Note: For production, use connection pooling and proper error handling.
try:
    redis_client = redis.Redis(
        host=REDIS_HOST,
        port=REDIS_PORT,
        password=REDIS_PASSWORD,
        decode_responses=True # Decode responses to string
    )
    # Test connection
    async def _test_redis_connection():
        await redis_client.ping()
        print("Successfully connected to Redis.")
    asyncio.run(_test_redis_connection())
except Exception as e:
    print(f"Error connecting to Redis: {e}")
    # Proceed without Redis or exit based on your application's tolerance
    redis_client = None

class AzureOpenAIChatClient:
    def __init__(self, client, deployment_name):
        self.client = client
        self.deployment_name = deployment_name

    async def complete(self, messages: list[dict], **kwargs) -> str:
        try:
            response = await self.client.chat.completions.create(
                model=self.deployment_name,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error calling Azure OpenAI: {e}")
            return "An error occurred while processing your request."

my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)

def get_tool_schemas():
    """Returns a list of tool descriptions in OpenAI function call format."""
    return [
        {
            "type": "function",
            "function": {
                "name": "web_search",
                "description": "Performs a web search to find relevant information.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The search query."
                        }
                    },
                    "required": ["query"]
                }
            }
        },
        {
            "type": "function",
            "function": {
                "name": "get_current_time",
                "description": "Retrieves the current date and time.",
                "parameters": {
                    "type": "object",
                    "properties": {},
                    "required": []
                }
            }
        }
    ]

# --- Custom Redis-backed Memory Storage (Conceptual, as MAF might have its own) ---
# For demonstration, we'll implement a simple Redis-backed storage for chat history.
# MAF's official Redis memory module would abstract this further.
class RedisChatHistoryStorage(MemoryStorage):
    def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
        self.redis_client = redis_client
        self.prefix = prefix

    async def _get_key(self, conversation_id: str) -> str:
        return f"{self.prefix}{conversation_id}"

    async def read(self, conversation_id: str) -> dict:
        key = await self._get_key(conversation_id)
        raw_history = await self.redis_client.get(key)
        if raw_history:
            return json.loads(raw_history)
        return {"history": []} # Default if no history

    async def write(self, conversation_id: str, data: dict):
        key = await self._get_key(conversation_id)
        await self.redis_client.set(key, json.dumps(data))
        # Optional: Set an expiry for the history
        # await self.redis_client.expire(key, 3600) # 1 hour

# Our agent state now includes a way to load/save history
class PersistentToolUsingAgentState(TurnState):
    history: List[Dict[str, Any]] = []
    conversation_id: str = "default_conversation" # Placeholder, actual ID comes from TurnContext

# Use our Redis storage if available, otherwise fallback to in-memory
storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()

AGENT_APP = AgentApplication[PersistentToolUsingAgentState](
    storage=storage_instance,
    adapter=CloudAdapter()
)

async def _load_history(context: TurnContext, state: PersistentToolUsingAgentState):
    if context.activity and context.activity.conversation and context.activity.conversation.id:
        state.conversation_id = context.activity.conversation.id
    
    if redis_client:
        stored_data = await storage_instance.read(state.conversation_id)
        state.history = stored_data.get("history", [])
        print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")

async def _save_history(context: TurnContext, state: PersistentToolUsingAgentState):
    if redis_client and state.conversation_id:
        await storage_instance.write(state.conversation_id, {"history": state.history})
        print(f"Saved history for {state.conversation_id}: {len(state.history)} items")


# Register these as pre/post handlers for activities
AGENT_APP.on_before_activity(_load_history)
AGENT_APP.on_after_activity(_save_history)


async def _welcome_and_help(context: TurnContext, state: PersistentToolUsingAgentState):
    await context.send_activity(
        "Hello! I am a persistent Agent with web search capabilities. "
        "Ask me questions like 'What is the weather in London?' or 'What is the current time?'"
    )
    state.history.append({"role": "assistant", "content": "Hello! I am a persistent Agent with web search capabilities. Ask me questions like 'What is the weather in London?' or 'What is the current time?'"})


AGENT_APP.conversation_update("membersAdded")(_welcome_and_help)
AGENT_APP.message("/help")(_welcome_and_help)


@AGENT_APP.activity("message")
async def on_message_with_tools_persistent(context: TurnContext, state: PersistentToolUsingAgentState):
    user_message = context.activity.text
    state.history.append({"role": "user", "content": user_message})

    messages_for_llm = [
        {"role": "system", "content": "You are a helpful assistant. Use tools if necessary to answer questions."},
        *state.history
    ]

    try:
        response = await aoai_client.chat.completions.create(
            model=AZURE_OPENAI_DEPLOYMENT_NAME,
            messages=messages_for_llm,
            tools=get_tool_schemas(),
            tool_choice="auto"
        )

        assistant_message = response.choices[0].message
        if assistant_message.content:
            state.history.append({"role": "assistant", "content": assistant_message.content})
        
        if assistant_message.tool_calls:
            for tool_call in assistant_message.tool_calls:
                function_name = tool_call.function.name
                arguments = tool_call.function.arguments

                print(f"Agent wants to call tool: {function_name} with args: {arguments}")

                tool_output = None
                if function_name == "web_search":
                    query = json.loads(arguments).get("query")
                    tool_output = await SearchTool.web_search(query)
                elif function_name == "get_current_time":
                    tool_output = await SearchTool.get_current_time()
                else:
                    tool_output = f"Unknown tool: {function_name}"

                state.history.append({
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": str(tool_output)
                })

                final_response = await aoai_client.chat.completions.create(
                    model=AZURE_OPENAI_DEPLOYMENT_NAME,
                    messages=state.history
                )
                final_assistant_message_content = final_response.choices[0].message.content
                state.history.append({"role": "assistant", "content": final_assistant_message_content})
                await context.send_activity(final_assistant_message_content)

        else:
            await context.send_activity(assistant_message.content)

    except Exception as e:
        print(f"Error in persistent tool-using agent: {e}")
        error_msg = "An error occurred while processing your request with persistent memory."
        await context.send_activity(error_msg)
        state.history.append({"role": "assistant", "content": error_msg})


# --- Server Setup (same as before) ---
def start_server(agent_application: AgentApplication):
    async def entry_point(req: Request) -> Response:
        agent: AgentApplication = req.app["agent_app"]
        adapter: CloudAdapter = req.app["adapter"]
        return await start_agent_process(req, agent, adapter)

    APP = Application()
    APP.router.add_post("/api/messages", entry_point)
    APP.router.add_get("/api/messages", lambda _: Response(status=200))
    APP["agent_app"] = agent_application
    APP["adapter"] = agent_application.adapter
    try:
        port = int(os.environ.get("PORT", 3978))
        print(f"======== Running on http://localhost:{port} ========")
        print("(Press CTRL+C to quit)")
        run_app(APP, host="localhost", port=port)
    except Exception as error:
        print(f"Error starting server: {error}")
        raise error

if __name__ == "__main__":
    start_server(AGENT_APP)

To Run This Example:

  1. Ensure Redis is running (e.g., via Docker: docker run -p 6379:6379 --name my-redis -d redis).
  2. Configure REDIS_HOST, REDIS_PORT, REDIS_PASSWORD in your .env.
  3. Save the agent code as redis_agent.py.
  4. Run: python redis_agent.py
  5. Use teamsapptester. Send messages, then restart the redis_agent.py script and send new messages in the same conversation. You should see that the history is loaded.

Exercises/Mini-Challenges:

  1. Implement a command /clear_history that clears the Redis history for the current conversation.
  2. Modify the RedisChatHistoryStorage to include a timestamp with each history entry and display it when loading.

3. Intermediate Topics

3.1 Workflows and Orchestration

Detailed Explanation: While a single agent can be powerful, real-world problems often require multiple agents collaborating. MAF excels at orchestration, allowing you to define how agents interact. This can be sequential (Agent A hands off to Agent B), concurrent (multiple agents work in parallel), group chat (agents debate), or more complex patterns. The framework simplifies the routing and state management between these agents.

MAF uses AgentWorkflowBuilder (in .NET) or similar concepts in Python to define these flows. The key idea is that a workflow itself can be treated as an agent.

Code Example: Sequential Multi-Agent Workflow

Let’s create a “Content Creation” workflow with a “Writer Agent” and an “Editor Agent.”

First, let’s refine our tools.py slightly for our content agents:

# tools.py (Updated with content-specific tools)
import asyncio
from typing import Dict, Any, List

class SearchTool:
    @staticmethod
    async def web_search(query: str) -> List[str]:
        """
        Performs a simulated web search and returns relevant snippets.
        Used by the writer for research.
        """
        print(f"Executing web search for: {query}")
        await asyncio.sleep(0.5)
        if "AI agent framework" in query.lower():
            return ["Microsoft Agent Framework unifies Semantic Kernel and AutoGen.", "LangChain is popular for LLM orchestration."]
        return [f"Search results for '{query}': Example content related to the query."]

class ContentReviewTool:
    @staticmethod
    async def check_grammar(text: str) -> str:
        """
        Checks the grammar of the provided text and suggests corrections.
        Used by the editor.
        """
        print(f"Executing grammar check for text: {text[:50]}...")
        await asyncio.sleep(0.3)
        if " an example" in text:
            return text.replace(" an example", " a clear example") + "\n(Suggested: 'a clear example' instead of 'an example')"
        return text + "\n(Grammar check: No significant issues found.)"

    @staticmethod
    async def analyze_sentiment(text: str) -> str:
        """
        Analyzes the sentiment of the provided text.
        Used by the editor.
        """
        print(f"Executing sentiment analysis for text: {text[:50]}...")
        await asyncio.sleep(0.3)
        if "negative" in text.lower() or "bad" in text.lower():
            return "Sentiment: Negative"
        return "Sentiment: Neutral/Positive"

Now, the multi-agent workflow:

# multi_agent_workflow.py
import os
import asyncio
import json
from dotenv import load_dotenv

load_dotenv()

from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI

from tools import SearchTool, ContentReviewTool # Our updated tools

# --- Configuration (same as before) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")

if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
    raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")

try:
    aoai_client = AzureOpenAI(
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2024-02-15-preview",
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
    )
    print("Successfully connected to Azure OpenAI.")
except Exception as e:
    print(f"Error connecting to Azure OpenAI: {e}")
    exit(1)

class AzureOpenAIChatClient:
    def __init__(self, client, deployment_name):
        self.client = client
        self.deployment_name = deployment_name

    async def complete(self, messages: list[dict], **kwargs) -> str:
        try:
            response = await self.client.chat.completions.create(
                model=self.deployment_name,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error calling Azure OpenAI: {e}")
            return "An error occurred while processing your request."

my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)


# --- Tool Definitions for LLM ---
def get_writer_tool_schemas():
    return [
        {
            "type": "function",
            "function": {
                "name": "web_search",
                "description": "Performs a web search to gather information for writing.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The search query."
                        }
                    },
                    "required": ["query"]
                }
            }
        }
    ]

def get_editor_tool_schemas():
    return [
        {
            "type": "function",
            "function": {
                "name": "check_grammar",
                "description": "Checks the grammar of the provided text and suggests corrections.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "text": {
                            "type": "string",
                            "description": "The text to check."
                        }
                    },
                    "required": ["text"]
                }
            }
        },
        {
            "type": "function",
            "function": {
                "name": "analyze_sentiment",
                "description": "Analyzes the sentiment of the provided text.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "text": {
                            "type": "string",
                            "description": "The text to analyze."
                        }
                    },
                    "required": ["text"]
                }
            }
        }
    ]

# --- Base Agent Class for MAF-like structure ---
class BaseMAFAgent:
    def __init__(self, name: str, instructions: str, tool_schemas: List[Dict[str, Any]] = None):
        self.name = name
        self.instructions = instructions
        self.tool_schemas = tool_schemas if tool_schemas is not None else []
        self.aoai_client = aoai_client # Using global client for simplicity

    async def run_step(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Runs a single step of the agent, potentially calling tools."""
        current_messages = [{"role": "system", "content": self.instructions}] + messages
        try:
            response = await self.aoai_client.chat.completions.create(
                model=AZURE_OPENAI_DEPLOYMENT_NAME,
                messages=current_messages,
                tools=self.tool_schemas,
                tool_choice="auto"
            )
            assistant_message = response.choices[0].message
            
            output = {"content": assistant_message.content, "tool_calls": []}
            if assistant_message.tool_calls:
                output["tool_calls"] = assistant_message.tool_calls
            return output
        except Exception as e:
            print(f"Error in {self.name} agent: {e}")
            return {"content": f"An error occurred in {self.name}.", "tool_calls": []}

    async def execute_tool_call(self, tool_call) -> str:
        function_name = tool_call.function.name
        arguments = json.loads(tool_call.function.arguments)

        print(f"  {self.name} executing tool: {function_name} with args: {arguments}")

        tool_output = f"Unknown tool: {function_name}"
        if function_name == "web_search":
            query = arguments.get("query")
            tool_output = await SearchTool.web_search(query)
        elif function_name == "check_grammar":
            text = arguments.get("text")
            tool_output = await ContentReviewTool.check_grammar(text)
        elif function_name == "analyze_sentiment":
            text = arguments.get("text")
            tool_output = await ContentReviewTool.analyze_sentiment(text)
        
        return str(tool_output)

# --- Define specific Agents ---
writer_agent = BaseMAFAgent(
    name="Writer",
    instructions="You are a creative writer. Your goal is to write engaging content based on user requests. Use the web_search tool for research.",
    tool_schemas=get_writer_tool_schemas()
)

editor_agent = BaseMAFAgent(
    name="Editor",
    instructions="You are an expert editor. Your goal is to review and improve written content, focusing on grammar, engagement, and tone. Use check_grammar and analyze_sentiment tools.",
    tool_schemas=get_editor_tool_schemas()
)

# --- Workflow Logic (Simulated by manual chaining for Python MAF SDK) ---
# In MAF Python SDK, this would typically involve an AgentWorkflowBuilder or similar
# for declarative workflow definition. For now, we simulate the sequential flow.

class ContentWorkflowState(TurnState):
    history: List[Dict[str, Any]] = []
    current_content: str = ""
    user_request: str = ""

# A single AgentApplication to manage the workflow
WORKFLOW_AGENT_APP = AgentApplication[ContentWorkflowState](
    storage=MemoryStorage(), # Using in-memory for this example
    adapter=CloudAdapter()
)

async def _workflow_welcome(context: TurnContext, state: ContentWorkflowState):
    await context.send_activity(
        "Welcome to the Content Creation Workflow! I have a Writer and an Editor agent. "
        "Tell me what content you want to create (e.g., 'Write an article about AI agent frameworks')."
    )
    state.history.append({"role": "assistant", "content": "Welcome to the Content Creation Workflow!"})

WORKFLOW_AGENT_APP.conversation_update("membersAdded")(_workflow_welcome)
WORKFLOW_AGENT_APP.message("/help")(_workflow_welcome)

@WORKFLOW_AGENT_APP.activity("message")
async def content_creation_workflow_handler(context: TurnContext, state: ContentWorkflowState):
    user_message = context.activity.text
    state.user_request = user_message
    state.history.append({"role": "user", "content": user_message})
    await context.send_activity(f"Starting content creation for: '{user_message}'...")

    # --- Step 1: Writer Agent writes the content ---
    await context.send_activity("Writer Agent is working...")
    writer_messages = [{"role": "user", "content": f"Write content based on the request: {user_message}"}]
    
    writer_output = await writer_agent.run_step(writer_messages)
    content_draft = writer_output["content"]
    
    # Handle writer's tool calls
    if writer_output["tool_calls"]:
        for tool_call in writer_output["tool_calls"]:
            tool_result = await writer_agent.execute_tool_call(tool_call)
            # Send tool result back to writer for continued reasoning
            writer_messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": tool_result})
            re_writer_output = await writer_agent.run_step(writer_messages)
            content_draft = re_writer_output["content"] # Use the refined content
    
    await context.send_activity(f"Writer Draft:\n```\n{content_draft}\n```")
    state.current_content = content_draft
    state.history.append({"role": "assistant", "content": f"Writer produced draft: {content_draft}"})

    # --- Step 2: Editor Agent reviews and refines ---
    await context.send_activity("Editor Agent is reviewing...")
    editor_messages = [{"role": "user", "content": f"Review and improve this content: {state.current_content}"}]
    
    editor_output = await editor_agent.run_step(editor_messages)
    final_content = editor_output["content"]

    # Handle editor's tool calls
    if editor_output["tool_calls"]:
        for tool_call in editor_output["tool_calls"]:
            tool_result = await editor_agent.execute_tool_call(tool_call)
            editor_messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": tool_result})
            re_editor_output = await editor_agent.run_step(editor_messages)
            final_content = re_editor_output["content"] # Use the refined content
    
    await context.send_activity(f"Editor Final Version:\n```\n{final_content}\n```")
    state.history.append({"role": "assistant", "content": f"Editor finalized content: {final_content}"})
    await context.send_activity("Content creation workflow completed!")


# --- Server Setup (same as before) ---
def start_server(agent_application: AgentApplication):
    async def entry_point(req: Request) -> Response:
        agent: AgentApplication = req.app["agent_app"]
        adapter: CloudAdapter = req.app["adapter"]
        return await start_agent_process(req, agent, adapter)

    APP = Application()
    APP.router.add_post("/api/messages", entry_point)
    APP.router.add_get("/api/messages", lambda _: Response(status=200))
    APP["agent_app"] = agent_application
    APP["adapter"] = agent_application.adapter
    try:
        port = int(os.environ.get("PORT", 3978))
        print(f"======== Running on http://localhost:{port} ========")
        print("(Press CTRL+C to quit)")
        run_app(APP, host="localhost", port=port)
    except Exception as error:
        print(f"Error starting server: {error}")
        raise error

if __name__ == "__main__":
    start_server(WORKFLOW_AGENT_APP)

To Run This Example:

  1. Update tools.py with the ContentReviewTool.
  2. Save the workflow code as multi_agent_workflow.py.
  3. Ensure your .env is set up for Azure OpenAI.
  4. Run: python multi_agent_workflow.py
  5. Use teamsapptester. Try prompts like:
    • “Write an article about AI agent frameworks.”
    • “Generate a short story about a detective in space.”

Exercises/Mini-Challenges:

  1. Introduce a “Fact-Checker Agent” that uses SearchTool.web_search to verify facts in the content produced by the Writer before the Editor reviews it.
  2. Change the workflow from sequential to a “group chat” where all agents (Writer, Editor, Fact-Checker) collaborate to refine content. (This requires more advanced orchestration patterns usually built into MAF’s workflow capabilities or AutoGen’s GroupChatManager if integrated).

3.2 Authentication and Security (Microsoft Entra ID)

Detailed Explanation: For enterprise applications, robust authentication and authorization are non-negotiable. MAF integrates with Microsoft Entra ID (formerly Azure Active Directory) to assign unique identities to agents, enabling secure access to resources and services. This helps in managing “agent sprawl” and ensuring compliance. JWT (JSON Web Token) authorization middleware is a common mechanism.

Conceptual Code Example: Protecting an Agent Endpoint with JWT (Entra ID)

While full Entra ID integration involves setting up app registrations in Azure and handling token acquisition, we can demonstrate how jwt_authorization_middleware would be used on the server side to protect our agent.

# secured_agent.py (Illustrates JWT Middleware usage, building on redis_agent.py)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis

load_dotenv()

from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process, jwt_authorization_middleware
from aiohttp.web import Request, Response, Application, run_app, HTTPUnauthorized, HTTPException
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, ClientSecretCredential

from tools import SearchTool

# --- Configuration (same as before) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")

# --- Microsoft Entra ID Configuration (for validation) ---
# For actual JWT validation, you'd configure the middleware with issuer, audience, etc.
# This is typically done via environment variables or a configuration object.
# These placeholders are for illustrating the *presence* of auth.
# REAL-WORLD: You would register an application in Azure Portal (App Registrations)
# and get these values.
# AZURE_TENANT_ID = os.getenv("AZURE_TENANT_ID")
# AZURE_CLIENT_ID = os.getenv("AZURE_CLIENT_ID")
# AZURE_CLIENT_SECRET = os.getenv("AZURE_CLIENT_SECRET") # If using client secret flow

if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
    raise ValueError("Missing Azure OpenAI environment variables. Please check your .env file.")

# AOAI Client and Redis Client initialization (same as redis_agent.py)
try:
    aoai_client = AzureOpenAI(
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2024-02-15-preview",
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
    )
    print("Successfully connected to Azure OpenAI.")
except Exception as e:
    print(f"Error connecting to Azure OpenAI: {e}")
    exit(1)

try:
    redis_client = redis.Redis(
        host=REDIS_HOST,
        port=REDIS_PORT,
        password=REDIS_PASSWORD,
        decode_responses=True
    )
    async def _test_redis_connection():
        await redis_client.ping()
        print("Successfully connected to Redis.")
    asyncio.run(_test_redis_connection())
except Exception as e:
    print(f"Error connecting to Redis: {e}")
    redis_client = None


class AzureOpenAIChatClient:
    def __init__(self, client, deployment_name):
        self.client = client
        self.deployment_name = deployment_name

    async def complete(self, messages: list[dict], **kwargs) -> str:
        try:
            response = await self.client.chat.completions.create(
                model=self.deployment_name,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error calling Azure OpenAI: {e}")
            return "An error occurred while processing your request."

my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)

def get_tool_schemas():
    return [
        {
            "type": "function",
            "function": {
                "name": "web_search",
                "description": "Performs a web search to find relevant information.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The search query."
                        }
                    },
                    "required": ["query"]
                }
            }
        }
    ]

class RedisChatHistoryStorage(MemoryStorage):
    def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
        self.redis_client = redis_client
        self.prefix = prefix

    async def _get_key(self, conversation_id: str) -> str:
        return f"{self.prefix}{conversation_id}"

    async def read(self, conversation_id: str) -> dict:
        key = await self._get_key(conversation_id)
        raw_history = await self.redis_client.get(key)
        if raw_history:
            return json.loads(raw_history)
        return {"history": []}

    async def write(self, conversation_id: str, data: dict):
        key = await self._get_key(conversation_id)
        await self.redis_client.set(key, json.dumps(data))

class PersistentToolUsingAgentState(TurnState):
    history: List[Dict[str, Any]] = []
    conversation_id: str = "default_conversation"

storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()

# --- Agent Application (same as redis_agent.py) ---
SECURE_AGENT_APP = AgentApplication[PersistentToolUsingAgentState](
    storage=storage_instance,
    adapter=CloudAdapter()
)

async def _load_history(context: TurnContext, state: PersistentToolUsingAgentState):
    if context.activity and context.activity.conversation and context.activity.conversation.id:
        state.conversation_id = context.activity.conversation.id
    if redis_client:
        stored_data = await storage_instance.read(state.conversation_id)
        state.history = stored_data.get("history", [])
        print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")

async def _save_history(context: TurnContext, state: PersistentToolUsingAgentState):
    if redis_client and state.conversation_id:
        await storage_instance.write(state.conversation_id, {"history": state.history})
        print(f"Saved history for {state.conversation_id}: {len(state.history)} items")

SECURE_AGENT_APP.on_before_activity(_load_history)
SECURE_AGENT_APP.on_after_activity(_save_history)

async def _secure_welcome_and_help(context: TurnContext, state: PersistentToolUsingAgentState):
    await context.send_activity(
        "Hello! I am a **secured** persistent Agent with web search capabilities. "
        "Your access is authenticated via JWT."
    )
    state.history.append({"role": "assistant", "content": "Hello! I am a secured persistent Agent."})

SECURE_AGENT_APP.conversation_update("membersAdded")(_secure_welcome_and_help)
SECURE_AGENT_APP.message("/help")(_secure_welcome_and_help)

@SECURE_AGENT_APP.activity("message")
async def on_secure_message(context: TurnContext, state: PersistentToolUsingAgentState):
    # This handler is the same as in redis_agent.py, but now it's protected
    user_message = context.activity.text
    state.history.append({"role": "user", "content": user_message})

    messages_for_llm = [
        {"role": "system", "content": "You are a helpful assistant. Use tools if necessary to answer questions."},
        *state.history
    ]

    try:
        response = await aoai_client.chat.completions.create(
            model=AZURE_OPENAI_DEPLOYMENT_NAME,
            messages=messages_for_llm,
            tools=get_tool_schemas(),
            tool_choice="auto"
        )

        assistant_message = response.choices[0].message
        if assistant_message.content:
            state.history.append({"role": "assistant", "content": assistant_message.content})
        
        if assistant_message.tool_calls:
            for tool_call in assistant_message.tool_calls:
                function_name = tool_call.function.name
                arguments = json.loads(tool_call.function.arguments)

                print(f"Agent wants to call tool: {function_name} with args: {arguments}")

                tool_output = None
                if function_name == "web_search":
                    query = arguments.get("query")
                    tool_output = await SearchTool.web_search(query)
                else:
                    tool_output = f"Unknown tool: {function_name}"

                state.history.append({
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": str(tool_output)
                })

                final_response = await aoai_client.chat.completions.create(
                    model=AZURE_OPENAI_DEPLOYMENT_NAME,
                    messages=state.history
                )
                final_assistant_message_content = final_response.choices[0].message.content
                state.history.append({"role": "assistant", "content": final_assistant_message_content})
                await context.send_activity(final_assistant_message_content)

        else:
            await context.send_activity(assistant_message.content)

    except Exception as e:
        print(f"Error in secured agent: {e}")
        error_msg = "An error occurred in the secured agent."
        await context.send_activity(error_msg)
        state.history.append({"role": "assistant", "content": error_msg})


# --- Server Setup with JWT Middleware ---
# The jwt_authorization_middleware needs configuration.
# For simplicity, we'll use a dummy config. In a real scenario, this would involve
# proper Azure AD app registration details for token validation.
from microsoft.agents.hosting.core import AgentAuthConfiguration

class DummyAgentAuthConfiguration(AgentAuthConfiguration):
    def __init__(self):
        # In a real app, these would come from env vars or app settings
        self.tenant_id = "your-azure-ad-tenant-id"
        self.client_id = "your-azure-ad-client-id"
        self.authority = f"https://login.microsoftonline.com/{self.tenant_id}"
        self.audience = [f"api://{self.client_id}"] # The audience of your agent's API
        self.is_anonymous_auth_enabled = False # Require authentication

DUMMY_AUTH_CONFIG = DummyAgentAuthConfiguration()

def start_server_with_auth(agent_application: AgentApplication, auth_config: AgentAuthConfiguration):
    async def entry_point(req: Request) -> Response:
        # After middleware, req['user_id'] or similar would be populated
        # This is where you might add custom authorization checks if needed
        agent: AgentApplication = req.app["agent_app"]
        adapter: CloudAdapter = req.app["adapter"]
        return await start_agent_process(req, agent, adapter)

    # Apply the JWT middleware
    # In a real app, this middleware would check for a valid JWT token in the Authorization header.
    APP = Application(middlewares=[jwt_authorization_middleware(auth_config=auth_config)])
    APP.router.add_post("/api/messages", entry_point)
    APP.router.add_get("/api/messages", lambda _: Response(status=200)) # Health check
    APP["agent_app"] = agent_application
    APP["adapter"] = agent_application.adapter
    APP["agent_configuration"] = auth_config # Store auth config for middleware
    try:
        port = int(os.environ.get("PORT", 3978))
        print(f"======== Running on http://localhost:{port} (Secured with JWT) ========")
        print("(Press CTRL+C to quit)")
        run_app(APP, host="localhost", port=port)
    except Exception as error:
        print(f"Error starting secured server: {error}")
        raise error

if __name__ == "__main__":
    # Note: To test this locally with actual JWT, you'd need to acquire a token
    # (e.g., using MSAL Python library or Azure CLI) and include it in your client's
    # Authorization header. teamsapptester usually handles anonymous or specific auth.
    # For a quick test to see the auth barrier, remove `is_anonymous_auth_enabled` from
    # DummyAgentAuthConfiguration and send a request without a token - you should get 401.
    start_server_with_auth(SECURE_AGENT_APP, DUMMY_AUTH_CONFIG)

To Run This Example (and observe auth):

  1. Save the agent code as secured_agent.py.
  2. Configure your .env for OpenAI and Redis.
  3. For a quick test: Run python secured_agent.py.
    • If is_anonymous_auth_enabled is True (or if you set middlewares=[] for APP), it will run unauthenticated.
    • If is_anonymous_auth_enabled is False in DUMMY_AUTH_CONFIG (as shown), then:
      • Try curl -X POST http://localhost:3978/api/messages -H "Content-Type: application/json" -d '{"type": "message", "text": "test"}'. You should get an HTTPUnauthorized (401) error.
      • To truly test with a valid JWT, you’d need to obtain an access token from Azure AD for your configured client_id and tenant_id, and then send it in the Authorization: Bearer <YOUR_JWT> header. This is beyond a simple curl example and typically done through a proper client SDK or CLI.

Exercises/Mini-Challenges:

  1. Research how to obtain an Azure AD access token for a service principal using the azure-identity library in Python.
  2. Modify a client (e.g., a simple Python script or a Postman request) to include the obtained JWT and successfully send a message to the secured_agent.py.

4. Advanced Topics and Best Practices

Detailed Explanation: To keep LLMs current and grounded in specific, private data, Retrieval Augmented Generation (RAG) is essential. This involves retrieving relevant information from a knowledge base (like documents, databases, or specialized indexes) and injecting it into the LLM’s prompt. MAF fully supports RAG patterns. Azure Cognitive Search is Microsoft’s powerful search-as-a-service solution, perfect for building enterprise-grade RAG.

Code Example: Integrating Azure Cognitive Search for RAG

Let’s enhance our agent with a RAG tool that queries Azure Cognitive Search.

First, you’ll need an Azure Cognitive Search instance and an index populated with some data. For this example, we’ll assume an index named product-docs with documents containing title and content fields.

# rag_tools.py
import os
import asyncio
from typing import List, Dict, Any

from azure.core.credentials import AzureKeyCredential
from azure.search.documents.aio import SearchClient # Use async client

class AzureSearchRAGTool:
    def __init__(self, endpoint: str, api_key: str, index_name: str):
        self.search_client = SearchClient(
            endpoint=endpoint,
            index_name=index_name,
            credential=AzureKeyCredential(api_key)
        )
        print(f"Initialized Azure Cognitive Search client for index: {index_name}")

    async def search_documents(self, query: str, top_n: int = 3) -> List[Dict[str, Any]]:
        """
        Searches Azure Cognitive Search for relevant documents and returns snippets.
        """
        print(f"Performing Azure Cognitive Search for query: '{query}'")
        results = []
        try:
            async with self.search_client:
                search_results = await self.search_client.search(
                    search_text=query,
                    select=["title", "content"], # Fields to retrieve
                    top=top_n
                )
                async for result in search_results:
                    results.append({
                        "title": result.get("title", "No Title"),
                        "content": result.get("content", "No Content")
                    })
        except Exception as e:
            print(f"Error during Azure Cognitive Search: {e}")
            results.append({"error": f"Failed to perform search: {e}"})
        
        return results

Now, integrate this into our agent.

# rag_agent.py (Building on redis_agent.py and using rag_tools.py)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis

load_dotenv()

from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI

from rag_tools import AzureSearchRAGTool # Our new RAG tool

# --- Configuration ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")

AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")
AZURE_SEARCH_API_KEY = os.getenv("AZURE_SEARCH_API_KEY")
AZURE_SEARCH_INDEX_NAME = os.getenv("AZURE_SEARCH_INDEX_NAME")

if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME,
            AZURE_SEARCH_ENDPOINT, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME]):
    raise ValueError("Missing required environment variables. Please check your .env file for OpenAI, Redis, and Azure Search.")

try:
    aoai_client = AzureOpenAI(
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2024-02-15-preview",
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
    )
    print("Successfully connected to Azure OpenAI.")
except Exception as e:
    print(f"Error connecting to Azure OpenAI: {e}")
    exit(1)

try:
    redis_client = redis.Redis(
        host=REDIS_HOST,
        port=REDIS_PORT,
        password=REDIS_PASSWORD,
        decode_responses=True
    )
    async def _test_redis_connection():
        await redis_client.ping()
        print("Successfully connected to Redis.")
    asyncio.run(_test_redis_connection())
except Exception as e:
    print(f"Error connecting to Redis: {e}")
    redis_client = None

# Initialize Azure Cognitive Search RAG tool
try:
    azure_search_rag_tool = AzureSearchRAGTool(
        endpoint=AZURE_SEARCH_ENDPOINT,
        api_key=AZURE_SEARCH_API_KEY,
        index_name=AZURE_SEARCH_INDEX_NAME
    )
except Exception as e:
    print(f"Error initializing AzureSearchRAGTool: {e}")
    azure_search_rag_tool = None # Handle gracefully if search isn't critical

# --- Existing ChatClient (omitted for brevity, assume it's there) ---
class AzureOpenAIChatClient:
    def __init__(self, client, deployment_name):
        self.client = client
        self.deployment_name = deployment_name
    async def complete(self, messages: list[dict], **kwargs) -> str:
        try:
            response = await self.client.chat.completions.create(
                model=self.deployment_name,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error calling Azure OpenAI: {e}")
            return "An error occurred while processing your request."
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)


# --- Tool Definitions with RAG ---
def get_tool_schemas_with_rag():
    return [
        {
            "type": "function",
            "function": {
                "name": "search_documents",
                "description": "Searches internal knowledge base (Azure Cognitive Search) for relevant documents to answer questions.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The search query for the knowledge base."
                        }
                    },
                    "required": ["query"]
                }
            }
        }
    ]

# --- Agent State & Storage (same as redis_agent.py) ---
class RedisChatHistoryStorage(MemoryStorage):
    def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
        self.redis_client = redis_client
        self.prefix = prefix
    async def _get_key(self, conversation_id: str) -> str:
        return f"{self.prefix}{conversation_id}"
    async def read(self, conversation_id: str) -> dict:
        key = await self._get_key(conversation_id)
        raw_history = await self.redis_client.get(key)
        if raw_history:
            return json.loads(raw_history)
        return {"history": []}
    async def write(self, conversation_id: str, data: dict):
        key = await self._get_key(conversation_id)
        await self.redis_client.set(key, json.dumps(data))

class PersistentRAGAgentState(TurnState):
    history: List[Dict[str, Any]] = []
    conversation_id: str = "default_conversation"

storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()

RAG_AGENT_APP = AgentApplication[PersistentRAGAgentState](
    storage=storage_instance,
    adapter=CloudAdapter()
)

async def _load_history(context: TurnContext, state: PersistentRAGAgentState):
    if context.activity and context.activity.conversation and context.activity.conversation.id:
        state.conversation_id = context.activity.conversation.id
    if redis_client:
        stored_data = await storage_instance.read(state.conversation_id)
        state.history = stored_data.get("history", [])
        print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")

async def _save_history(context: TurnContext, state: PersistentRAGAgentState):
    if redis_client and state.conversation_id:
        await storage_instance.write(state.conversation_id, {"history": state.history})
        print(f"Saved history for {state.conversation_id}: {len(state.history)} items")

RAG_AGENT_APP.on_before_activity(_load_history)
RAG_AGENT_APP.on_after_activity(_save_history)

async def _rag_welcome(context: TurnContext, state: PersistentRAGAgentState):
    await context.send_activity(
        "Hello! I am an Agent with RAG capabilities using Azure Cognitive Search. "
        "Ask me about topics covered in our internal documents (e.g., 'What are the features of Product X?')."
    )
    state.history.append({"role": "assistant", "content": "Hello! I am a RAG Agent."})

RAG_AGENT_APP.conversation_update("membersAdded")(_rag_welcome)
RAG_AGENT_APP.message("/help")(_rag_welcome)


@RAG_AGENT_APP.activity("message")
async def on_rag_message(context: TurnContext, state: PersistentRAGAgentState):
    user_message = context.activity.text
    state.history.append({"role": "user", "content": user_message})

    # Prepare system prompt to encourage RAG tool use
    system_prompt = (
        "You are a helpful assistant. If the user asks a question that likely requires "
        "knowledge from internal documents, use the 'search_documents' tool. "
        "Summarize the information found in the documents to answer the user's question. "
        "If you don't find relevant information, state that clearly."
    )
    messages_for_llm = [
        {"role": "system", "content": system_prompt},
        *state.history
    ]

    try:
        response = await aoai_client.chat.completions.create(
            model=AZURE_OPENAI_DEPLOYMENT_NAME,
            messages=messages_for_llm,
            tools=get_tool_schemas_with_rag(),
            tool_choice="auto"
        )

        assistant_message = response.choices[0].message
        if assistant_message.content:
            state.history.append({"role": "assistant", "content": assistant_message.content})
        
        if assistant_message.tool_calls:
            for tool_call in assistant_message.tool_calls:
                function_name = tool_call.function.name
                arguments = json.loads(tool_call.function.arguments)

                print(f"Agent wants to call tool: {function_name} with args: {arguments}")

                tool_output = None
                if function_name == "search_documents":
                    query = arguments.get("query")
                    if azure_search_rag_tool:
                        docs = await azure_search_rag_tool.search_documents(query)
                        tool_output = "\n".join([f"Title: {d['title']}\nContent: {d['content']}" for d in docs])
                        if not docs:
                            tool_output = "No relevant documents found."
                    else:
                        tool_output = "Azure Search tool is not available."
                else:
                    tool_output = f"Unknown tool: {function_name}"

                state.history.append({
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": str(tool_output)
                })

                # Call LLM again with tool output for generation
                final_response = await aoai_client.chat.completions.create(
                    model=AZURE_OPENAI_DEPLOYMENT_NAME,
                    messages=state.history
                )
                final_assistant_message_content = final_response.choices[0].message.content
                state.history.append({"role": "assistant", "content": final_assistant_message_content})
                await context.send_activity(final_assistant_message_content)

        else:
            await context.send_activity(assistant_message.content)

    except Exception as e:
        print(f"Error in RAG agent: {e}")
        error_msg = "An error occurred in the RAG agent."
        await context.send_activity(error_msg)
        state.history.append({"role": "assistant", "content": error_msg})

# --- Server Setup (same as previous examples) ---
def start_server(agent_application: AgentApplication):
    async def entry_point(req: Request) -> Response:
        agent: AgentApplication = req.app["agent_app"]
        adapter: CloudAdapter = req.app["adapter"]
        return await start_agent_process(req, agent, adapter)

    APP = Application()
    APP.router.add_post("/api/messages", entry_point)
    APP.router.add_get("/api/messages", lambda _: Response(status=200))
    APP["agent_app"] = agent_application
    APP["adapter"] = agent_application.adapter
    try:
        port = int(os.environ.get("PORT", 3978))
        print(f"======== Running on http://localhost:{port} ========")
        print("(Press CTRL+C to quit)")
        run_app(APP, host="localhost", port=port)
    except Exception as error:
        print(f"Error starting server: {error}")
        raise error

if __name__ == "__main__":
    start_server(RAG_AGENT_APP)

To Run This Example:

  1. Set up Azure Cognitive Search: Create a service, and an index (e.g., product-docs) with fields title and content. Populate it with some dummy data relevant to questions you might ask.
  2. Configure AZURE_SEARCH_ENDPOINT, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME in your .env.
  3. Save rag_tools.py and rag_agent.py.
  4. Run: python rag_agent.py
  5. Use teamsapptester. Ask questions that your Azure Cognitive Search index can answer (e.g., “What are the benefits of Product X?”, “How to troubleshoot error code 123?”). The agent should use search_documents and then synthesize an answer.

4.2 Database Querying Tool

Detailed Explanation: Agents can also interact with structured data in databases. This typically involves a tool that can translate natural language queries into SQL (or other database query language), execute the query, and return the results. This requires careful consideration of security and preventing SQL injection.

Code Example: PostgreSQL Database Querying Tool

Let’s add a tool to query a PostgreSQL database.

# db_tools.py
import asyncio
import psycopg2 # or asyncpg for async
from typing import List, Dict, Any

class DatabaseTool:
    def __init__(self, db_config: Dict[str, str]):
        self.db_config = db_config
        # For simplicity, we're using psycopg2 blocking, for async use asyncpg
        # In a real async agent, you'd use an async DB driver
        print("Initialized DatabaseTool. (Note: Using blocking psycopg2 for simplicity, consider asyncpg for production async apps)")

    def _execute_query_sync(self, query: str) -> List[Dict[str, Any]]:
        """Synchronously executes a read-only SQL query."""
        conn = None
        result = []
        try:
            conn = psycopg2.connect(**self.db_config)
            cur = conn.cursor()
            cur.execute(query)
            # Fetch column names
            col_names = [desc[0] for desc in cur.description]
            # Fetch rows
            for row in cur.fetchall():
                result.append(dict(zip(col_names, row)))
            cur.close()
        except Exception as e:
            print(f"Database query error: {e}")
            raise e
        finally:
            if conn:
                conn.close()
        return result

    async def query_database(self, sql_query: str) -> List[Dict[str, Any]]:
        """
        Executes a read-only SQL query against the database.
        SECURITY WARNING: In a real application, NEVER allow arbitrary SQL queries directly.
        Instead, use a SQL agent that generates parameterized queries or validates input carefully.
        This is for demonstration purposes only.
        """
        print(f"Executing database query: '{sql_query}'")
        # Simulate async by running blocking call in thread pool
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, self._execute_query_sync, sql_query)

    async def get_table_schema(self, table_name: str) -> str:
        """
        Retrieves the schema for a specified table.
        """
        print(f"Getting schema for table: {table_name}")
        schema_query = f"""
        SELECT column_name, data_type, is_nullable
        FROM information_schema.columns
        WHERE table_name = '{table_name}'
        ORDER BY ordinal_position;
        """
        try:
            schema_data = await self.query_database(schema_query)
            formatted_schema = [f"{col['column_name']} ({col['data_type']}, nullable: {col['is_nullable']})" for col in schema_data]
            return f"Schema for '{table_name}':\n" + "\n".join(formatted_schema)
        except Exception as e:
            return f"Could not retrieve schema for '{table_name}': {e}"

Now, integrate this into our agent, including instructions to the LLM about the database.

# db_rag_agent.py (combines RAG and DB querying)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis
import psycopg2 # for db_tools init

load_dotenv()

from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI

from rag_tools import AzureSearchRAGTool
from db_tools import DatabaseTool # Our new DB tool

# --- Configuration ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")

AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")
AZURE_SEARCH_API_KEY = os.getenv("AZURE_SEARCH_API_KEY")
AZURE_SEARCH_INDEX_NAME = os.getenv("AZURE_SEARCH_INDEX_NAME")

POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", 5432)

# Verify all necessary env vars are set
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME,
            AZURE_SEARCH_ENDPOINT, AZURE_SEARCH_API_KEY, AZURE_SEARCH_INDEX_NAME,
            POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST]):
    raise ValueError("Missing required environment variables. Please check your .env file for OpenAI, Redis, Azure Search, and PostgreSQL.")

try:
    aoai_client = AzureOpenAI(
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2024-02-15-preview",
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
    )
    print("Successfully connected to Azure OpenAI.")
except Exception as e:
    print(f"Error connecting to Azure OpenAI: {e}")
    exit(1)

try:
    redis_client = redis.Redis(
        host=REDIS_HOST,
        port=REDIS_PORT,
        password=REDIS_PASSWORD,
        decode_responses=True
    )
    async def _test_redis_connection():
        await redis_client.ping()
        print("Successfully connected to Redis.")
    asyncio.run(_test_redis_connection())
except Exception as e:
    print(f"Error connecting to Redis: {e}")
    redis_client = None

try:
    azure_search_rag_tool = AzureSearchRAGTool(
        endpoint=AZURE_SEARCH_ENDPOINT,
        api_key=AZURE_SEARCH_API_KEY,
        index_name=AZURE_SEARCH_INDEX_NAME
    )
except Exception as e:
    print(f"Error initializing AzureSearchRAGTool: {e}")
    azure_search_rag_tool = None

# Initialize Database Tool
try:
    db_config = {
        "database": POSTGRES_DB,
        "user": POSTGRES_USER,
        "password": POSTGRES_PASSWORD,
        "host": POSTGRES_HOST,
        "port": POSTGRES_PORT
    }
    # Test DB connection
    with psycopg2.connect(**db_config) as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT 1")
        print("Successfully connected to PostgreSQL.")
    db_tool = DatabaseTool(db_config)
except Exception as e:
    print(f"Error initializing DatabaseTool or connecting to PostgreSQL: {e}")
    db_tool = None


class AzureOpenAIChatClient: # ... (omitted for brevity)
    def __init__(self, client, deployment_name):
        self.client = client
        self.deployment_name = deployment_name
    async def complete(self, messages: list[dict], **kwargs) -> str:
        try:
            response = await self.client.chat.completions.create(
                model=self.deployment_name,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error calling Azure OpenAI: {e}")
            return "An error occurred while processing your request."
my_chat_client = AzureOpenAIChatClient(aoai_client, AZURE_OPENAI_DEPLOYMENT_NAME)


# --- Tool Definitions with RAG and DB Querying ---
def get_tool_schemas_with_db_rag():
    schemas = [
        {
            "type": "function",
            "function": {
                "name": "search_documents",
                "description": "Searches internal knowledge base (Azure Cognitive Search) for relevant documents to answer questions.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The search query for the knowledge base."
                        }
                    },
                    "required": ["query"]
                }
            }
        }
    ]
    if db_tool: # Only add DB tools if connected
        schemas.extend([
            {
                "type": "function",
                "function": {
                    "name": "query_database",
                    "description": "Executes a read-only SQL query against the PostgreSQL database. Use this ONLY for SELECT statements. Be very careful with SQL injection. Available tables: 'users', 'products', 'orders'.",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "sql_query": {
                                "type": "string",
                                "description": "The SQL SELECT query to execute."
                            }
                        },
                        "required": ["sql_query"]
                    }
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "get_table_schema",
                    "description": "Retrieves the schema (column names and types) for a specified table in the database.",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "table_name": {
                                "type": "string",
                                "description": "The name of the table (e.g., 'users', 'products', 'orders')."
                            }
                        },
                        "required": ["table_name"]
                    }
                }
            }
        ])
    return schemas

# --- Agent State & Storage (same as redis_agent.py) ---
class RedisChatHistoryStorage(MemoryStorage): # ... (omitted for brevity)
    def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
        self.redis_client = redis_client
        self.prefix = prefix
    async def _get_key(self, conversation_id: str) -> str:
        return f"{self.prefix}{conversation_id}"
    async def read(self, conversation_id: str) -> dict:
        key = await self._get_key(conversation_id)
        raw_history = await self.redis_client.get(key)
        if raw_history:
            return json.loads(raw_history)
        return {"history": []}
    async def write(self, conversation_id: str, data: dict):
        key = await self._get_key(conversation_id)
        await self.redis_client.set(key, json.dumps(data))

class PersistentDBRAGAgentState(TurnState):
    history: List[Dict[str, Any]] = []
    conversation_id: str = "default_conversation"

storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()

DB_RAG_AGENT_APP = AgentApplication[PersistentDBRAGAgentState](
    storage=storage_instance,
    adapter=CloudAdapter()
)

async def _load_history(context: TurnContext, state: PersistentDBRAGAgentState):
    if context.activity and context.activity.conversation and context.activity.conversation.id:
        state.conversation_id = context.activity.conversation.id
    if redis_client:
        stored_data = await storage_instance.read(state.conversation_id)
        state.history = stored_data.get("history", [])
        print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")

async def _save_history(context: TurnContext, state: PersistentDBRAGAgentState):
    if redis_client and state.conversation_id:
        await storage_instance.write(state.conversation_id, {"history": state.history})
        print(f"Saved history for {state.conversation_id}: {len(state.history)} items")

DB_RAG_AGENT_APP.on_before_activity(_load_history)
DB_RAG_AGENT_APP.on_after_activity(_save_history)

async def _db_rag_welcome(context: TurnContext, state: PersistentDBRAGAgentState):
    await context.send_activity(
        "Hello! I am an Agent with RAG and Database querying capabilities. "
        "Ask me about internal documents or query the database (tables: users, products, orders)."
    )
    state.history.append({"role": "assistant", "content": "Hello! I am a RAG and DB Agent."})

DB_RAG_AGENT_APP.conversation_update("membersAdded")(_db_rag_welcome)
DB_RAG_AGENT_APP.message("/help")(_db_rag_welcome)


@DB_RAG_AGENT_APP.activity("message")
async def on_db_rag_message(context: TurnContext, state: PersistentDBRAGAgentState):
    user_message = context.activity.text
    state.history.append({"role": "user", "content": user_message})

    # Prepare system prompt to encourage RAG and DB tool use
    system_prompt = (
        "You are a helpful assistant. "
        "Use 'search_documents' for general knowledge from internal documents. "
        "Use 'get_table_schema' to understand database table structure (tables: users, products, orders) "
        "and 'query_database' to retrieve information directly from the database (only SELECT queries!). "
        "Always prioritize database queries if the information is clearly structured and available there. "
        "If you don't find relevant information, state that clearly."
    )
    messages_for_llm = [
        {"role": "system", "content": system_prompt},
        *state.history
    ]

    try:
        response = await aoai_client.chat.completions.create(
            model=AZURE_OPENAI_DEPLOYMENT_NAME,
            messages=messages_for_llm,
            tools=get_tool_schemas_with_db_rag(),
            tool_choice="auto"
        )

        assistant_message = response.choices[0].message
        if assistant_message.content:
            state.history.append({"role": "assistant", "content": assistant_message.content})
        
        if assistant_message.tool_calls:
            for tool_call in assistant_message.tool_calls:
                function_name = tool_call.function.name
                arguments = json.loads(tool_call.function.arguments)

                print(f"Agent wants to call tool: {function_name} with args: {arguments}")

                tool_output = None
                if function_name == "search_documents":
                    query = arguments.get("query")
                    if azure_search_rag_tool:
                        docs = await azure_search_rag_tool.search_documents(query)
                        tool_output = "\n".join([f"Title: {d['title']}\nContent: {d['content']}" for d in docs])
                        if not docs:
                            tool_output = "No relevant documents found."
                    else:
                        tool_output = "Azure Search tool is not available."
                elif function_name == "query_database":
                    sql_query = arguments.get("sql_query")
                    if db_tool:
                        try:
                            db_results = await db_tool.query_database(sql_query)
                            tool_output = json.dumps(db_results, indent=2)
                        except Exception as ex:
                            tool_output = f"Database error: {ex}"
                    else:
                        tool_output = "Database tool is not available."
                elif function_name == "get_table_schema":
                    table_name = arguments.get("table_name")
                    if db_tool:
                        tool_output = await db_tool.get_table_schema(table_name)
                    else:
                        tool_output = "Database tool is not available."
                else:
                    tool_output = f"Unknown tool: {function_name}"

                state.history.append({
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": str(tool_output)
                })

                # Call LLM again with tool output for generation
                final_response = await aoai_client.chat.completions.create(
                    model=AZURE_OPENAI_DEPLOYMENT_NAME,
                    messages=state.history
                )
                final_assistant_message_content = final_response.choices[0].message.content
                state.history.append({"role": "assistant", "content": final_assistant_message_content})
                await context.send_activity(final_assistant_message_content)

        else:
            await context.send_activity(assistant_message.content)

    except Exception as e:
        print(f"Error in DB RAG agent: {e}")
        error_msg = "An error occurred in the RAG/DB agent."
        await context.send_activity(error_msg)
        state.history.append({"role": "assistant", "content": error_msg})

# --- Server Setup (same as previous examples) ---
def start_server(agent_application: AgentApplication):
    async def entry_point(req: Request) -> Response:
        agent: AgentApplication = req.app["agent_app"]
        adapter: CloudAdapter = req.app["adapter"]
        return await start_agent_process(req, agent, adapter)

    APP = Application()
    APP.router.add_post("/api/messages", entry_point)
    APP.router.add_get("/api/messages", lambda _: Response(status=200))
    APP["agent_app"] = agent_application
    APP["adapter"] = agent_application.adapter
    try:
        port = int(os.environ.get("PORT", 3978))
        print(f"======== Running on http://localhost:{port} ========")
        print("(Press CTRL+C to quit)")
        run_app(APP, host="localhost", port=port)
    except Exception as error:
        print(f"Error starting server: {error}")
        raise error

if __name__ == "__main__":
    start_server(DB_RAG_AGENT_APP)

To Run This Example:

  1. Set up PostgreSQL: Install PostgreSQL locally or use Docker (docker run -p 5432:5432 --name my-postgres -e POSTGRES_PASSWORD=your_password -d postgres).

  2. Create Sample Data: Connect to your PostgreSQL database and create some tables and data, e.g.:

    CREATE TABLE users (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100) UNIQUE
    );
    INSERT INTO users (name, email) VALUES ('Alice Smith', 'alice@example.com'), ('Bob Johnson', 'bob@example.com');
    
    CREATE TABLE products (
        product_id SERIAL PRIMARY KEY,
        product_name VARCHAR(255),
        price DECIMAL(10, 2),
        stock_quantity INT
    );
    INSERT INTO products (product_name, price, stock_quantity) VALUES ('Laptop', 1200.00, 50), ('Mouse', 25.00, 200);
    
    CREATE TABLE orders (
        order_id SERIAL PRIMARY KEY,
        user_id INT REFERENCES users(id),
        product_id INT REFERENCES products(product_id),
        quantity INT,
        order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    INSERT INTO orders (user_id, product_id, quantity) VALUES (1, 1, 1), (2, 2, 2);
    
  3. Configure POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT in your .env.

  4. Ensure rag_tools.py and db_tools.py are in your project.

  5. Save the agent code as db_rag_agent.py.

  6. Run: pip install psycopg2-binary (if not already installed), then python db_rag_agent.py.

  7. Use teamsapptester. Try prompts like:

    • “What is the schema for the ‘users’ table?”
    • “How many products do we have in stock?” (LLM should translate to SQL: SELECT SUM(stock_quantity) FROM products;)
    • “Show me all users and their emails.” (LLM should translate to SQL: SELECT name, email FROM users;)
    • “What are the features of Product X?” (This should trigger the RAG tool).

5. Guided Projects

Here, we’ll implement two projects: one fresh MAF project and one migration from LangGraph.

Project 1: Fresh Project - “Intelligent Customer Support Agent”

Objective: Build a single, intelligent customer support agent using MAF that can answer common FAQs (via RAG), troubleshoot simple issues (using specific tools), and escalate to a human if necessary.

Problem Statement: A company wants to provide 24/7 AI-powered customer support to handle routine inquiries, reducing the workload on human agents.

Technologies Used:

  • Microsoft Agent Framework (Python SDK)
  • Azure OpenAI (for LLM)
  • Redis (for chat history)
  • Azure Cognitive Search (for FAQ/knowledge base RAG)
  • A “mock” Human Escalation tool.

Step-by-Step Implementation:

Step 0: Setup (Already covered) Ensure your .env is configured for Azure OpenAI, Redis, and Azure Cognitive Search. Make sure you have rag_tools.py and db_tools.py (though db_tools won’t be explicitly used in this project, it’s good to have for general capabilities).

Step 1: Define Tools for Customer Support

Let’s create support_tools.py. This will include our RAG tool and a simulated escalation tool.

# support_tools.py
import asyncio
from typing import List, Dict, Any

from azure.core.credentials import AzureKeyCredential
from azure.search.documents.aio import SearchClient # Use async client

class CustomerSupportTools:
    def __init__(self, search_endpoint: str, search_api_key: str, search_index_name: str):
        self.search_client = SearchClient(
            endpoint=search_endpoint,
            index_name=search_index_name,
            credential=AzureKeyCredential(search_api_key)
        )
        print(f"Initialized Azure Cognitive Search client for customer support FAQs: {search_index_name}")

    async def faq_lookup(self, query: str, top_n: int = 3) -> List[Dict[str, Any]]:
        """
        Searches the customer support knowledge base (Azure Cognitive Search) for answers to frequently asked questions.
        Useful for general inquiries about products, services, or policies.
        """
        print(f"Executing FAQ lookup for: '{query}'")
        results = []
        try:
            async with self.search_client:
                search_results = await self.search_client.search(
                    search_text=query,
                    select=["question", "answer"], # Assuming your FAQ index has these fields
                    top=top_n
                )
                async for result in search_results:
                    results.append({
                        "question": result.get("question", "No Question"),
                        "answer": result.get("answer", "No Answer")
                    })
        except Exception as e:
            print(f"Error during FAQ lookup: {e}")
            results.append({"error": f"Failed to perform FAQ search: {e}"})
        return results

    @staticmethod
    async def reset_password(username: str) -> str:
        """
        Simulates the process of initiating a password reset for a given username.
        This would typically involve an internal system API call.
        """
        print(f"Executing password reset for: {username}")
        await asyncio.sleep(2)
        return f"A password reset link has been sent to the registered email for user '{username}'. Please check your inbox."

    @staticmethod
    async def check_order_status(order_id: str) -> str:
        """
        Checks the status of a specific order using its order ID.
        This would typically integrate with an order management system.
        """
        print(f"Executing order status check for order ID: {order_id}")
        await asyncio.sleep(1.5)
        # Simulate lookup
        if order_id == "ORD123":
            return "Order ORD123: Shipped, expected delivery tomorrow."
        elif order_id == "ORD456":
            return "Order ORD456: Processing, estimated ship date in 3-5 business days."
        return f"Order ID {order_id} not found or invalid."

    @staticmethod
    async def escalate_to_human(issue_description: str, user_contact: str) -> str:
        """
        Escalates the current customer inquiry to a human support agent.
        Provides a summary of the issue and user contact information.
        """
        print(f"Escalating to human: {issue_description} for {user_contact}")
        await asyncio.sleep(1)
        return f"Your issue '{issue_description}' has been escalated. A human agent will contact you shortly at {user_contact}."

Step 2: Create the Customer Support Agent

This agent will combine the CustomerSupportTools, Redis for memory, and Azure OpenAI for reasoning.

# customer_support_agent.py
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis

load_dotenv()

from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI

from support_tools import CustomerSupportTools # Our new support tools

# --- Configuration (from .env) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")

AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")
AZURE_SEARCH_API_KEY = os.getenv("AZURE_SEARCH_API_KEY")
AZURE_SEARCH_FAQ_INDEX_NAME = os.getenv("AZURE_SEARCH_FAQ_INDEX_NAME", "customer-faqs") # New index for FAQs

# --- Initialize external services ---
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
    raise ValueError("Missing Azure OpenAI environment variables.")

try:
    aoai_client = AzureOpenAI(
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2024-02-15-preview",
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
    )
    print("Successfully connected to Azure OpenAI.")
except Exception as e:
    print(f"Error connecting to Azure OpenAI: {e}")
    exit(1)

try:
    redis_client = redis.Redis(
        host=REDIS_HOST,
        port=REDIS_PORT,
        password=REDIS_PASSWORD,
        decode_responses=True
    )
    async def _test_redis_connection():
        await redis_client.ping()
        print("Successfully connected to Redis.")
    asyncio.run(_test_redis_connection())
except Exception as e:
    print(f"Error connecting to Redis: {e}")
    redis_client = None

try:
    customer_support_tools = CustomerSupportTools(
        search_endpoint=AZURE_SEARCH_ENDPOINT,
        search_api_key=AZURE_SEARCH_API_KEY,
        search_index_name=AZURE_SEARCH_FAQ_INDEX_NAME
    )
except Exception as e:
    print(f"Error initializing CustomerSupportTools: {e}")
    customer_support_tools = None


# --- Tool Definitions for LLM ---
def get_support_tool_schemas():
    schemas = []
    # FAQ Lookup is always available if search is initialized
    if customer_support_tools:
        schemas.append({
            "type": "function",
            "function": {
                "name": "faq_lookup",
                "description": "Searches the customer support knowledge base for answers to frequently asked questions about products, services, or policies.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The user's question or keywords to search in the FAQs."
                        }
                    },
                    "required": ["query"]
                }
            }
        })
    
    # Specific troubleshooting tools
    schemas.extend([
        {
            "type": "function",
            "function": {
                "name": "reset_password",
                "description": "Initiates a password reset process for a user. Requires the user's exact username.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "username": {
                            "type": "string",
                            "description": "The username for which to reset the password."
                        }
                    },
                    "required": ["username"]
                }
            }
        },
        {
            "type": "function",
            "function": {
                "name": "check_order_status",
                "description": "Checks the current status of a customer's order. Requires the exact order ID.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "order_id": {
                            "type": "string",
                            "description": "The unique identifier of the order."
                        }
                    },
                    "required": ["order_id"]
                }
            }
        },
        {
            "type": "function",
            "function": {
                "name": "escalate_to_human",
                "description": "Escalates the current customer issue to a human support agent. Provide a summary of the problem and the user's contact information.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "issue_description": {
                            "type": "string",
                            "description": "A concise description of the issue the customer is facing."
                        },
                        "user_contact": {
                            "type": "string",
                            "description": "The customer's email or phone number for follow-up."
                        }
                    },
                    "required": ["issue_description", "user_contact"]
                }
            }
        }
    ])
    return schemas


# --- Agent State & Storage ---
class RedisChatHistoryStorage(MemoryStorage): # ... (omitted, same as before)
    def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
        self.redis_client = redis_client
        self.prefix = prefix
    async def _get_key(self, conversation_id: str) -> str:
        return f"{self.prefix}{conversation_id}"
    async def read(self, conversation_id: str) -> dict:
        key = await self._get_key(conversation_id)
        raw_history = await self.redis_client.get(key)
        if raw_history:
            return json.loads(raw_history)
        return {"history": []}
    async def write(self, conversation_id: str, data: dict):
        key = await self._get_key(conversation_id)
        await self.redis_client.set(key, json.dumps(data))

class CustomerSupportAgentState(TurnState):
    history: List[Dict[str, Any]] = []
    conversation_id: str = "default_conversation"

storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()

SUPPORT_AGENT_APP = AgentApplication[CustomerSupportAgentState](
    storage=storage_instance,
    adapter=CloudAdapter()
)

async def _load_history(context: TurnContext, state: CustomerSupportAgentState):
    if context.activity and context.activity.conversation and context.activity.conversation.id:
        state.conversation_id = context.activity.conversation.id
    if redis_client:
        stored_data = await storage_instance.read(state.conversation_id)
        state.history = stored_data.get("history", [])
        print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")

async def _save_history(context: TurnContext, state: CustomerSupportAgentState):
    if redis_client and state.conversation_id:
        await storage_instance.write(state.conversation_id, {"history": state.history})
        print(f"Saved history for {state.conversation_id}: {len(state.history)} items")

SUPPORT_AGENT_APP.on_before_activity(_load_history)
SUPPORT_AGENT_APP.on_after_activity(_save_history)

async def _support_welcome(context: TurnContext, state: CustomerSupportAgentState):
    welcome_message = (
        "Hello! I am your AI Customer Support Assistant. "
        "I can help with FAQs, order status, password resets, or escalate to a human. "
        "How can I assist you today?"
    )
    await context.send_activity(welcome_message)
    state.history.append({"role": "assistant", "content": welcome_message})

SUPPORT_AGENT_APP.conversation_update("membersAdded")(_support_welcome)
SUPPORT_AGENT_APP.message("/help")(_support_welcome)


@SUPPORT_AGENT_APP.activity("message")
async def on_support_message(context: TurnContext, state: CustomerSupportAgentState):
    user_message = context.activity.text
    state.history.append({"role": "user", "content": user_message})

    system_prompt = (
        "You are an AI Customer Support Assistant for a tech company. "
        "Your goal is to assist users efficiently and politely. "
        "Use the provided tools to answer questions, perform actions, or escalate. "
        "When using `escalate_to_human`, always ask for user contact information (email or phone) if not provided, "
        "and get a clear summary of the issue before escalating."
        "Remember to be helpful and empathetic."
    )
    messages_for_llm = [
        {"role": "system", "content": system_prompt},
        *state.history
    ]

    try:
        response = await aoai_client.chat.completions.create(
            model=AZURE_OPENAI_DEPLOYMENT_NAME,
            messages=messages_for_llm,
            tools=get_support_tool_schemas(),
            tool_choice="auto"
        )

        assistant_message = response.choices[0].message
        if assistant_message.content:
            state.history.append({"role": "assistant", "content": assistant_message.content})
            # Send initial non-tool response
            await context.send_activity(assistant_message.content)
        
        if assistant_message.tool_calls:
            for tool_call in assistant_message.tool_calls:
                function_name = tool_call.function.name
                arguments = json.loads(tool_call.function.arguments)

                print(f"Agent wants to call tool: {function_name} with args: {arguments}")

                tool_output = None
                if function_name == "faq_lookup" and customer_support_tools:
                    query = arguments.get("query")
                    docs = await customer_support_tools.faq_lookup(query)
                    tool_output = "\n".join([f"Q: {d['question']}\nA: {d['answer']}" for d in docs])
                    if not docs:
                        tool_output = "No relevant FAQ found."
                elif function_name == "reset_password" and customer_support_tools:
                    username = arguments.get("username")
                    tool_output = await customer_support_tools.reset_password(username)
                elif function_name == "check_order_status" and customer_support_tools:
                    order_id = arguments.get("order_id")
                    tool_output = await customer_support_tools.check_order_status(order_id)
                elif function_name == "escalate_to_human" and customer_support_tools:
                    issue = arguments.get("issue_description")
                    contact = arguments.get("user_contact")
                    tool_output = await customer_support_tools.escalate_to_human(issue, contact)
                else:
                    tool_output = f"Tool '{function_name}' is not available or unrecognized."

                state.history.append({
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": str(tool_output)
                })

                # Call LLM again with tool output for final response
                final_response = await aoai_client.chat.completions.create(
                    model=AZURE_OPENAI_DEPLOYMENT_NAME,
                    messages=state.history
                )
                final_assistant_message_content = final_response.choices[0].message.content
                state.history.append({"role": "assistant", "content": final_assistant_message_content})
                await context.send_activity(final_assistant_message_content)

        # If LLM had no tool calls, but still provided content, that was already sent above
        # If no content and no tool calls, it's an issue with LLM response
        elif not assistant_message.content:
             error_msg = "I'm having trouble understanding. Could you please rephrase?"
             await context.send_activity(error_msg)
             state.history.append({"role": "assistant", "content": error_msg})


    except Exception as e:
        print(f"Error in Customer Support Agent: {e}")
        error_msg = "An unexpected error occurred in the customer support agent. Please try again later."
        await context.send_activity(error_msg)
        state.history.append({"role": "assistant", "content": error_msg})

# --- Server Setup (standard MAF Aiohttp server) ---
def start_server(agent_application: AgentApplication):
    async def entry_point(req: Request) -> Response:
        agent: AgentApplication = req.app["agent_app"]
        adapter: CloudAdapter = req.app["adapter"]
        return await start_agent_process(req, agent, adapter)

    APP = Application()
    APP.router.add_post("/api/messages", entry_point)
    APP.router.add_get("/api/messages", lambda _: Response(status=200))
    APP["agent_app"] = agent_application
    APP["adapter"] = agent_application.adapter
    try:
        port = int(os.environ.get("PORT", 3978))
        print(f"======== Running on http://localhost:{port} (Customer Support Agent) ========")
        print("(Press CTRL+C to quit)")
        run_app(APP, host="localhost", port=port)
    except Exception as error:
        print(f"Error starting server: {error}")
        raise error

if __name__ == "__main__":
    start_server(SUPPORT_AGENT_APP)

To Run Project 1:

  1. Azure Cognitive Search for FAQs: Create an Azure Cognitive Search instance and an index (e.g., customer-faqs). Populate it with some Q&A documents.
    • Example data for customer-faqs index (assuming question and answer fields):
      [
          {"question": "How do I update my billing information?", "answer": "You can update your billing information by logging into your account settings on our website under the 'Billing' section."},
          {"question": "What are your return policies?", "answer": "We offer a 30-day money-back guarantee on all products. Items must be returned in their original condition. See our full policy page for details."},
          {"question": "My product isn't working.", "answer": "Please describe the issue in more detail, and I can try to help you troubleshoot, or you can request a human agent for technical support."},
          {"question": "What is the warranty for Product X?", "answer": "Product X comes with a 1-year limited warranty covering manufacturing defects. Extended warranty options are available."}
      ]
      
  2. Configure your .env for Azure OpenAI, Redis, and Azure Cognitive Search (AZURE_SEARCH_FAQ_INDEX_NAME).
  3. Save support_tools.py and customer_support_agent.py.
  4. Run: python customer_support_agent.py
  5. Interact using teamsapptester (or a custom client):
    • “How do I update my billing info?” (Should use faq_lookup)
    • “My product is broken. Can you help me?” (Should ask for more details or suggest escalation)
    • “I need to reset my password for user ‘john.doe’.” (Should use reset_password)
    • “What is the status of my order ORD123?” (Should use check_order_status)
    • “I need to talk to someone, my issue is complicated. My email is user@example.com.” (Should use escalate_to_human)

Project 2: Migrating a LangGraph Agent to Microsoft Agent Framework

Objective: Take an existing LangGraph-based “Simple Travel Planner Agent” and integrate its core logic into the Microsoft Agent Framework, leveraging MAF’s hosting, memory, and tool integration.

Problem Statement: An existing LangGraph travel planner needs to be deployed within an enterprise context, benefiting from MAF’s enterprise-grade features like standardized hosting, observability, and authentication, without a complete rewrite of the travel planning logic.

Technologies Used:

  • LangGraph (for the original agent logic)
  • Microsoft Agent Framework (Python SDK)
  • Azure OpenAI (for LLM)
  • Redis (for MAF chat history)
  • FastAPI (to expose the LangGraph core as a microservice/tool if needed, or directly as a Python callable).

Approach: We will encapsulate the LangGraph agent’s core planning and execution into a callable Python function. This function will then be exposed as a “tool” to a MAF agent. The MAF agent will use its LLM to decide when to call this travel planning tool.

Step 0: LangGraph Agent (Existing)

First, let’s define our existing LangGraph agent. For simplicity, we’ll create a very basic one.

# langgraph_travel_planner.py
import operator
from typing import TypedDict, Annotated, List, Union
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, END
from langchain_openai import AzureChatOpenAI # Assuming AOAI for LLM

# --- LangGraph State ---
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    destination: str
    dates: str
    # Other potential fields like booking_confirmed, flight_options, hotel_options

# --- LangGraph Tools (for the original LangGraph agent) ---
# In LangGraph, you'd define tools for specific tasks.
# For our *existing* LangGraph agent, let's say it had a dummy flight search.
class LangGraphTravelTools:
    @staticmethod
    def get_dummy_flights(destination: str, dates: str) -> str:
        """
        Retrieves dummy flight options for a given destination and dates.
        """
        print(f"LangGraph tool: Searching dummy flights for {destination} on {dates}")
        return f"Dummy flights to {destination} on {dates}: Flight LH456 (08:00 AM) - $500, Flight BA789 (11:00 AM) - $650."

# --- LangGraph Nodes ---
# LLM Node
def call_llm(state: AgentState):
    llm = AzureChatOpenAI(
        azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
        api_version="2024-02-15-preview"
    )
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# Tool Node (Executes a dummy tool call for LangGraph)
def call_tool(state: AgentState):
    last_message = state["messages"][-1]
    tool_name = last_message.tool_calls[0].function.name
    tool_args = last_message.tool_calls[0].function.arguments

    if tool_name == "get_dummy_flights":
        args = json.loads(tool_args)
        result = LangGraphTravelTools.get_dummy_flights(args["destination"], args["dates"])
        return {"messages": [HumanMessage(content=result, name=tool_name)]}
    return {"messages": [HumanMessage(content=f"Unknown tool: {tool_name}", name="tool_error")]}

# --- LangGraph Logic (Planner/Router) ---
def should_continue(state: AgentState):
    last_message = state["messages"][-1]
    if not last_message.tool_calls:
        return "end" # No tool calls, we're done with agent
    return "continue" # Tool calls, continue

# --- Build LangGraph ---
def build_langgraph_travel_planner():
    # Define the agent's function-calling tool
    tools = [
        LangGraphTravelTools.get_dummy_flights
    ]
    
    # Bind tools to the LLM (specific to LangChain/LangGraph)
    llm = AzureChatOpenAI(
        azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
        api_version="2024-02-15-preview"
    ).bind_tools(tools) # Here we bind the tools

    workflow = StateGraph(AgentState)

    workflow.add_node("llm", call_llm)
    workflow.add_node("action", call_tool)

    workflow.set_entry_point("llm")
    workflow.add_conditional_edges(
        "llm",
        should_continue,
        {
            "continue": "action",
            "end": END
        }
    )
    workflow.add_edge("action", "llm") # After tool call, go back to LLM to summarize/plan next

    app = workflow.compile()
    return app, llm # Return app and the LLM with bound tools for MAF to use as reference

# --- Wrapper function to invoke LangGraph ---
async def invoke_langgraph_planner(destination: str, dates: str, initial_messages: List[BaseMessage] = None) -> str:
    """
    Invokes the LangGraph travel planner to generate an itinerary.
    """
    print(f"Invoking LangGraph Planner for {destination} on {dates}")
    graph_app, llm_with_tools = build_langgraph_travel_planner() # Re-build graph for each invocation for simplicity
    
    # We need to construct the prompt that will make LangGraph use its tools.
    # This involves defining tools for the LLM specifically, not MAF's tools.
    prompt_with_tools = [
        {"role": "system", "content": f"""You are a travel planner. Plan a trip to {destination} on {dates}. Use the 'get_dummy_flights' tool to find flight options. Respond with a summary of the plan."""},
        {"role": "user", "content": f"Plan a trip to {destination} on {dates}"}
    ]
    
    # LangGraph's state expects BaseMessage objects.
    # Convert our simple messages to HumanMessage for LangGraph entry.
    initial_messages = [HumanMessage(content=msg["content"]) if msg["role"] == "user" else HumanMessage(content=msg["content"]) for msg in prompt_with_tools]

    inputs = {"messages": initial_messages, "destination": destination, "dates": dates}
    try:
        # Run the LangGraph application
        final_state = await graph_app.ainvoke(inputs)
        
        # Extract the final message from the LangGraph run
        final_message = final_state["messages"][-1].content
        return final_message
    except Exception as e:
        print(f"Error running LangGraph: {e}")
        return f"LangGraph travel planner encountered an error: {e}"

Step 1: Expose LangGraph Logic as a MAF-Compatible Tool

We’ll define a new tool class that internally calls our invoke_langgraph_planner function.

# maf_langgraph_integration_tools.py
import asyncio
from typing import Dict, Any, List

# Import our LangGraph invocation function
from langgraph_travel_planner import invoke_langgraph_planner

class LangGraphIntegrationTools:
    @staticmethod
    async def plan_travel_with_langgraph(destination: str, dates: str) -> str:
        """
        Uses an external LangGraph agent to plan a travel itinerary,
        including finding flight options.
        """
        print(f"MAF Tool: Calling LangGraph agent to plan travel for {destination} on {dates}")
        # Call the async LangGraph wrapper function
        result = await invoke_langgraph_planner(destination, dates)
        return result

    @staticmethod
    async def get_booking_status(booking_id: str) -> str:
        """
        Retrieves the status of a specific travel booking. (Dummy tool)
        """
        print(f"MAF Tool: Checking booking status for ID: {booking_id}")
        await asyncio.sleep(0.5)
        if booking_id == "TRV789":
            return "Booking TRV789: Confirmed, Flights booked, Hotel pending confirmation."
        return f"Booking ID {booking_id} not found."

Step 2: Create a MAF Agent to Orchestrate LangGraph

This MAF agent will use Azure OpenAI to decide when to call the plan_travel_with_langgraph tool.

# maf_travel_agent.py (Integrating LangGraph)
import os
import asyncio
import json
from dotenv import load_dotenv
import redis.asyncio as redis

load_dotenv()

from microsoft.agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft.agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
from openai import AzureOpenAI

from maf_langgraph_integration_tools import LangGraphIntegrationTools # Our MAF-compatible tools

# --- Configuration (from .env) ---
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")

# --- Initialize external services ---
if not all([AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT_NAME]):
    raise ValueError("Missing Azure OpenAI environment variables.")

try:
    aoai_client = AzureOpenAI(
        api_key=AZURE_OPENAI_API_KEY,
        api_version="2024-02-15-preview",
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
    )
    print("Successfully connected to Azure OpenAI.")
except Exception as e:
    print(f"Error connecting to Azure OpenAI: {e}")
    exit(1)

try:
    redis_client = redis.Redis(
        host=REDIS_HOST,
        port=REDIS_PORT,
        password=REDIS_PASSWORD,
        decode_responses=True
    )
    async def _test_redis_connection():
        await redis_client.ping()
        print("Successfully connected to Redis.")
    asyncio.run(_test_redis_connection())
except Exception as e:
    print(f"Error connecting to Redis: {e}")
    redis_client = None

# Initialize the MAF-compatible tools instance
langgraph_integration_tools_instance = LangGraphIntegrationTools()

# --- Tool Definitions for MAF LLM ---
def get_maf_travel_tool_schemas():
    return [
        {
            "type": "function",
            "function": {
                "name": "plan_travel_with_langgraph",
                "description": "Plans a detailed travel itinerary for a destination and dates, leveraging an existing LangGraph travel planning agent.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "destination": {
                            "type": "string",
                            "description": "The desired travel destination."
                        },
                        "dates": {
                            "type": "string",
                            "description": "The travel dates (e.g., 'next week', 'July 2025')."
                        }
                    },
                    "required": ["destination", "dates"]
                }
            }
        },
        {
            "type": "function",
            "function": {
                "name": "get_booking_status",
                "description": "Retrieves the status of a specific travel booking.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "booking_id": {
                            "type": "string",
                            "description": "The unique identifier of the travel booking."
                        }
                    },
                    "required": ["booking_id"]
                }
            }
        }
    ]

# --- Agent State & Storage (same as previous Redis examples) ---
class RedisChatHistoryStorage(MemoryStorage): # ... (omitted)
    def __init__(self, redis_client: redis.Redis, prefix: str = "chat_history:"):
        self.redis_client = redis_client
        self.prefix = prefix
    async def _get_key(self, conversation_id: str) -> str:
        return f"{self.prefix}{conversation_id}"
    async def read(self, conversation_id: str) -> dict:
        key = await self._get_key(conversation_id)
        raw_history = await self.redis_client.get(key)
        if raw_history:
            return json.loads(raw_history)
        return {"history": []}
    async def write(self, conversation_id: str, data: dict):
        key = await self._get_key(conversation_id)
        await self.redis_client.set(key, json.dumps(data))

class MAFTravelAgentState(TurnState):
    history: List[Dict[str, Any]] = []
    conversation_id: str = "default_conversation"

storage_instance = RedisChatHistoryStorage(redis_client) if redis_client else MemoryStorage()

MAF_TRAVEL_AGENT_APP = AgentApplication[MAFTravelAgentState](
    storage=storage_instance,
    adapter=CloudAdapter()
)

async def _load_history(context: TurnContext, state: MAFTravelAgentState):
    if context.activity and context.activity.conversation and context.activity.conversation.id:
        state.conversation_id = context.activity.conversation.id
    if redis_client:
        stored_data = await storage_instance.read(state.conversation_id)
        state.history = stored_data.get("history", [])
        print(f"Loaded history for {state.conversation_id}: {len(state.history)} items")

async def _save_history(context: TurnContext, state: MAFTravelAgentState):
    if redis_client and state.conversation_id:
        await storage_instance.write(state.conversation_id, {"history": state.history})
        print(f"Saved history for {state.conversation_id}: {len(state.history)} items")

MAF_TRAVEL_AGENT_APP.on_before_activity(_load_history)
MAF_TRAVEL_AGENT_APP.on_after_activity(_save_history)

async def _maf_travel_welcome(context: TurnContext, state: MAFTravelAgentState):
    welcome_message = (
        "Hello! I am your AI Travel Agent, enhanced by a LangGraph planner. "
        "I can plan trips and check booking statuses. "
        "Try asking me to 'Plan a trip to Paris for next summer' or 'Check my booking TRV789'."
    )
    await context.send_activity(welcome_message)
    state.history.append({"role": "assistant", "content": welcome_message})

MAF_TRAVEL_AGENT_APP.conversation_update("membersAdded")(_maf_travel_welcome)
MAF_TRAVEL_AGENT_APP.message("/help")(_maf_travel_welcome)


@MAF_TRAVEL_AGENT_APP.activity("message")
async def on_maf_travel_message(context: TurnContext, state: MAFTravelAgentState):
    user_message = context.activity.text
    state.history.append({"role": "user", "content": user_message})

    system_prompt = (
        "You are an AI Travel Planner. Your primary goal is to help users plan trips. "
        "Use the 'plan_travel_with_langgraph' tool for detailed trip planning, "
        "and 'get_booking_status' to check existing bookings. "
        "Extract 'destination' and 'dates' for trip planning. "
        "Be helpful and confirm details before initiating planning or checking status."
    )
    messages_for_llm = [
        {"role": "system", "content": system_prompt},
        *state.history
    ]

    try:
        response = await aoai_client.chat.completions.create(
            model=AZURE_OPENAI_DEPLOYMENT_NAME,
            messages=messages_for_llm,
            tools=get_maf_travel_tool_schemas(),
            tool_choice="auto"
        )

        assistant_message = response.choices[0].message
        if assistant_message.content:
            state.history.append({"role": "assistant", "content": assistant_message.content})
            await context.send_activity(assistant_message.content)
        
        if assistant_message.tool_calls:
            for tool_call in assistant_message.tool_calls:
                function_name = tool_call.function.name
                arguments = json.loads(tool_call.function.arguments)

                print(f"MAF Agent wants to call tool: {function_name} with args: {arguments}")

                tool_output = None
                if function_name == "plan_travel_with_langgraph":
                    destination = arguments.get("destination")
                    dates = arguments.get("dates")
                    # This calls our wrapper, which in turn invokes LangGraph
                    tool_output = await langgraph_integration_tools_instance.plan_travel_with_langgraph(destination, dates)
                elif function_name == "get_booking_status":
                    booking_id = arguments.get("booking_id")
                    tool_output = await langgraph_integration_tools_instance.get_booking_status(booking_id)
                else:
                    tool_output = f"Unknown tool: {function_name}"

                state.history.append({
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": str(tool_output)
                })

                # Call LLM again with tool output for final response
                final_response = await aoai_client.chat.completions.create(
                    model=AZURE_OPENAI_DEPLOYMENT_NAME,
                    messages=state.history
                )
                final_assistant_message_content = final_response.choices[0].message.content
                state.history.append({"role": "assistant", "content": final_assistant_message_content})
                await context.send_activity(final_assistant_message_content)

        elif not assistant_message.content:
            error_msg = "I'm having trouble understanding. Could you please rephrase?"
            await context.send_activity(error_msg)
            state.history.append({"role": "assistant", "content": error_msg})


    except Exception as e:
        print(f"Error in MAF Travel Agent: {e}")
        error_msg = "An unexpected error occurred in the MAF travel agent. Please try again later."
        await context.send_activity(error_msg)
        state.history.append({"role": "assistant", "content": error_msg})

# --- Server Setup (standard MAF Aiohttp server) ---
def start_server(agent_application: AgentApplication):
    async def entry_point(req: Request) -> Response:
        agent: AgentApplication = req.app["agent_app"]
        adapter: CloudAdapter = req.app["adapter"]
        return await start_agent_process(req, agent, adapter)

    APP = Application()
    APP.router.add_post("/api/messages", entry_point)
    APP.router.add_get("/api/messages", lambda _: Response(status=200))
    APP["agent_app"] = agent_application
    APP["adapter"] = agent_application.adapter
    try:
        port = int(os.environ.get("PORT", 3978))
        print(f"======== Running on http://localhost:{port} (MAF Travel Agent) ========")
        print("(Press CTRL+C to quit)")
        run_app(APP, host="localhost", port=port)
    except Exception as error:
        print(f"Error starting server: {error}")
        raise error

if __name__ == "__main__":
    start_server(MAF_TRAVEL_AGENT_APP)

To Run Project 2:

  1. Configure your .env for Azure OpenAI and Redis.
  2. Ensure you have LangChain installed: pip install langchain_core langchain_openai langgraph
  3. Save langgraph_travel_planner.py, maf_langgraph_integration_tools.py, and maf_travel_agent.py.
  4. Run: python maf_travel_agent.py
  5. Interact using teamsapptester (or a custom client):
    • “Plan a trip to Rome for Christmas.” (Should trigger plan_travel_with_langgraph, which calls LangGraph)
    • “Check my travel booking TRV789.” (Should trigger get_booking_status)

How this migration benefits:

  • Centralized Hosting & Management: The entire agent (including the LangGraph part) is now hosted and managed under a unified MAF application.
  • Unified Memory: MAF’s Redis-backed memory handles the overall conversation history, encompassing interactions with the LangGraph sub-agent.
  • Consistent Tooling: All tools, including the LangGraph “tool,” are exposed and managed in a consistent manner within MAF.
  • Enterprise Integration: The MAF framework can then easily integrate other enterprise features like authentication (Entra ID), detailed observability, and integration with Azure AI Foundry services without modifying the core LangGraph logic. The LangGraph agent is effectively an isolated “black box” capability consumed by the MAF orchestrator.

6. Bonus Section: Further Learning and Resources

You’ve made significant progress! The Microsoft Agent Framework is a rapidly evolving area. Here are resources to continue your learning journey:

  • Microsoft Learn Modules: Keep an eye on the official Microsoft Learn platform. New modules and tutorials specifically for the Microsoft Agent Framework (both .NET and Python) will be continuously released. Search for “Microsoft Agent Framework tutorial” on Microsoft Learn.
  • Azure AI Fundamentals Certification: While not specific to MAF, understanding core Azure AI services (Azure OpenAI, Azure Cognitive Search) is highly beneficial.
  • LangChain/LangGraph Tutorials: If you’re interested in building complex agentic graphs, LangChain’s official documentation and tutorials are excellent. The MAF often takes inspiration from or integrates with these concepts.

Official Documentation

Blogs and Articles

  • Microsoft AI Blog: Stay updated with the latest AI news and announcements from Microsoft.
  • Microsoft Tech Community Blogs: Often contains deep dives and practical guides from Microsoft engineers and MVPs.
  • Leading AI/ML Publications: Follow popular blogs like Towards Data Science, The Batch (DeepLearning.AI), and InfoQ for broader industry trends in AI agents.

YouTube Channels

  • Microsoft Developer: Official channel with tutorials, announcements, and talks on Microsoft technologies, including AI.
  • Azure: Official Azure channel for cloud-specific AI content.
  • FreeCodeCamp.org / Traversy Media / Tech with Tim: Excellent channels for Python, FastAPI, and general programming tutorials that complement AI development.

Community Forums/Groups

  • GitHub Issues/Discussions: The GitHub repositories for Microsoft Agent Framework, Semantic Kernel, and AutoGen are active places for questions, bug reports, and discussions.
  • Stack Overflow: Use relevant tags like microsoft-agent-framework, azure-openai, python for technical questions.
  • Discord Servers: Look for official Microsoft AI communities or independent AI development servers.

Next Steps/Advanced Topics

  1. Explore Different Orchestration Patterns: Dive deeper into concurrent, group chat, and “magnetic” orchestration. The Python SDK will likely expand its declarative workflow builders.
  2. Advanced Memory Management: Implement vector stores (e.g., Azure AI Search vector store, Pinecone, Qdrant) for more sophisticated RAG scenarios and long-term knowledge retention.
  3. Observability & Monitoring: Integrate with OpenTelemetry for comprehensive tracing of agent interactions, tool calls, and LLM usage. Learn how to connect this to Azure Monitor or other APM tools.
  4. Deployment Strategies: Learn how to deploy MAF agents to Azure Container Apps, Azure Kubernetes Service, or Azure Functions for scalable and resilient production environments.
  5. Human-in-the-Loop (HITL): Implement mechanisms for human review and intervention in critical agent workflows, such as approval steps for sensitive actions.
  6. Custom LLM Integration: Learn how to integrate other LLM providers (e.g., custom hosted models, local models like Ollama) using the IChatClient abstraction.
  7. Multi-Modal Agents: Explore integrating vision, speech, and other modalities into your agents.
  8. Security Best Practices: Deepen your understanding of AI security, prompt injection prevention, data governance, and compliance within agentic systems.

By continuously experimenting and building, you’ll master the Microsoft Agent Framework and be at the forefront of the AI agent revolution!