Integrating with Existing Agent Frameworks

Integrating with Existing Agent Frameworks

One of the most compelling features of Agentic Lightening is its ability to train and optimize any AI agent, regardless of the framework it was built with. This means you don’t have to throw away your existing LangChain, AutoGen, OpenAI Agent SDK, or custom agents. Instead, you can “light them up” by wrapping them with a LitAgent and integrating them into the Agentic Lightening training pipeline.

This chapter will guide you through the strategies for integrating agents from popular frameworks. The core idea is to encapsulate your existing agent’s run or invoke method within Agentic Lightening’s training_rollout method and then define how to extract a reward.

The General Integration Strategy

The process of integrating an external agent framework typically involves these steps:

  1. Identify the Core Execution Logic: Pinpoint the method in your existing agent that takes an input (e.g., a query, a task) and produces an output (e.g., a response, a result). This is usually a run, invoke, or agent.chat method.
  2. Create a Custom LitAgent: Subclass agentlightning.LitAgent.
  3. Implement training_rollout: Inside your custom LitAgent’s training_rollout method:
    • Instantiate your external agent (if not already done).
    • Call the external agent’s core execution method with the task.context.
    • Capture its output and any intermediate steps (if you want to use tracers later).
    • Based on the output and the task (which can contain ground truth or evaluation criteria), calculate and return a reward.
    • Optionally, propagate resources from Agentic Lightening to your agent (e.g., updated prompts).
  4. Define Tasks: Prepare a set of AgentLightningTask objects that your agent will be trained on. These tasks should contain the input for your agent and any information needed to calculate a reward.
  5. Run with the Trainer: Use the AgentLightningServer and Trainer to orchestrate training, just like with the SimpleMathAgent.

Let’s look at specific examples.

1. Integrating with LangChain Agents

LangChain is a widely adopted framework for building LLM-powered applications, including agents. Its agents typically involve an LLM, a set of tools, and a prompt.

LangChain Agent Overview

A simple LangChain agent usually involves:

  • An LLM (e.g., ChatOpenAI)
  • Tools (e.g., Tool objects wrapping functions)
  • An AgentExecutor to orchestrate the LLM and tools.
# langchain_agent_example.py
import os
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool

# Set your OpenAI API key
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# For this example, we'll use a mock LLM if no API key is set for local testing.

class MockChatOpenAI:
    """A mock ChatOpenAI for local testing without an actual API key."""
    async def ainvoke(self, messages):
        last_message = messages[-1].content
        if "add" in last_message.lower():
            try:
                parts = last_message.lower().replace("add", "").replace("and", "").split()
                numbers = [int(s) for s in parts if s.isdigit()]
                if len(numbers) == 2:
                    return {"content": f"The sum is {sum(numbers)}"}
            except ValueError:
                pass
        if "tell me a joke" in last_message.lower():
            return {"content": "Why don't scientists trust atoms? Because they make up everything!"}
        return {"content": "I couldn't process that request."}

@tool
def add_two_numbers(num1: int, num2: int) -> int:
    """Adds two integers and returns their sum."""
    return num1 + num2

# Define the LLM (use mock if no API key, otherwise ChatOpenAI)
llm = ChatOpenAI(temperature=0) if os.getenv("OPENAI_API_KEY") else MockChatOpenAI()

# Define the tools the agent can use
tools = [add_two_numbers]

# Define the prompt template
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful AI assistant. You have access to tools."),
        MessagesPlaceholder("chat_history", optional=True),
        ("human", "{input}"),
        MessagesPlaceholder("agent_scratchpad"),
    ]
)

# Create the agent
lc_agent_runnable = create_openai_functions_agent(llm, tools, prompt)
lc_agent_executor = AgentExecutor(agent=lc_agent_runnable, tools=tools, verbose=True)

async def run_langchain_agent(input_str: str) -> str:
    """Helper function to run the LangChain agent."""
    response = await lc_agent_executor.ainvoke({"input": input_str})
    return response["output"]

# Simple local test (run this file directly)
async def main_lc_test():
    print("--- LangChain Agent Local Test ---")
    print("\nAdding numbers:")
    result = await run_langchain_agent("Add 10 and 20")
    print(f"Agent response: {result}")

    print("\nAsking a joke:")
    result = await run_langchain_agent("Tell me a joke")
    print(f"Agent response: {result}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main_lc_test())

Note: If you have an OPENAI_API_KEY set in your environment, ChatOpenAI will be used. Otherwise, the MockChatOpenAI will be used for local testing.

Wrapping a LangChain Agent with LitAgent

Now, let’s create a LitAgent that can run this LangChain agent and assign a reward.

# langchain_litagent.py
import asyncio
import re
from agentlightning.litagent import LitAgent
from agentlightning.types import AgentLightningTask, AgentResource
from langchain_agent_example import run_langchain_agent # Import the agent runner

class LangChainOptimizedAgent(LitAgent):
    """
    A LitAgent that wraps a LangChain agent for optimization with Agentic Lightening.
    """
    async def training_rollout(
        self,
        task: AgentLightningTask,
        rollout_id: str,
        resources: dict[str, AgentResource],
    ) -> float:
        print(f"[{rollout_id}] LangChain Agent received task: {task.name} - '{task.context}'")

        # Extract the actual question for the LangChain agent
        question_match = re.search(r"Question: (.*?)(?: \(Expected: (.*?)\))?$", task.context)
        if not question_match:
            print(f"[{rollout_id}] Could not parse question from task context: {task.context}")
            return 0.0
        
        question = question_match.group(1).strip()
        expected_answer_str = question_match.group(2) if question_match.group(2) else None

        # --- IMPORTANT: How resources are passed to LangChain ---
        # In a real-world scenario, 'resources' might contain updated prompt templates
        # or tool configurations. You would then dynamically update your LangChain agent's
        # components based on these resources before running it.
        # For simplicity, we'll assume the base LangChain agent logic is fixed for now,
        # but this is where you'd inject optimizable parameters.
        # Example: if "updated_prompt" in resources:
        #   lc_agent_executor.agent.runnable.llm.prompt = resources["updated_prompt"].value

        # Run the actual LangChain agent
        try:
            agent_output = await run_langchain_agent(question)
            print(f"[{rollout_id}] LangChain Agent output: {agent_output}")

            # --- Reward Calculation Logic ---
            reward = 0.0
            if expected_answer_str:
                if "sum is" in agent_output.lower():
                    actual_num_match = re.search(r"the sum is (\d+)", agent_output.lower())
                    if actual_num_match:
                        actual_answer = int(actual_num_match.group(1))
                        try:
                            expected_answer = int(expected_answer_str)
                            if actual_answer == expected_answer:
                                reward = 1.0
                                print(f"[{rollout_id}] Correct! Expected: {expected_answer}, Actual: {actual_answer}")
                            else:
                                print(f"[{rollout_id}] Incorrect. Expected: {expected_answer}, Actual: {actual_answer}")
                        except ValueError:
                            print(f"[{rollout_id}] Could not parse expected answer '{expected_answer_str}' as integer.")
                elif "joke" in question.lower() and "joke" in agent_output.lower():
                    reward = 1.0
                    print(f"[{rollout_id}] Agent told a joke as expected.")
                else:
                    print(f"[{rollout_id}] Agent did not provide expected numeric sum or joke for task '{question}'.")
            else:
                print(f"[{rollout_id}] No explicit expected answer, defaulting to 0 reward for now.")
                # For tasks without specific expected answers, you might define
                # other reward criteria, e.g., if it successfully used a tool.

            return reward
        except Exception as e:
            print(f"[{rollout_id}] Error running LangChain agent: {e}")
            return 0.0

# Example of how you would run this LitAgent with the Trainer (conceptual)
async def main_lc_litagent_test():
    # Make sure you have the AgentLightningServer running in a separate terminal:
    # agentlightning server start --host 0.0.0.0 --port 8000

    backend_url = "http://localhost:8000"
    num_workers = 1

    trainer = Trainer(n_workers=num_workers) # Trainer imported from agentlightning.trainer

    langchain_wrapper_agent = LangChainOptimizedAgent()

    # Define some tasks for the trainer
    lc_tasks = [
        AgentLightningTask(name="LC Task 1", context="Question: Add 15 and 25 (Expected: 40)"),
        AgentLightningTask(name="LC Task 2", context="Question: Add 100 and 7 (Expected: 107)"),
        AgentLightningTask(name="LC Task 3", context="Question: Add 5 and 5 (Expected: 11)"), # Incorrect
        AgentLightningTask(name="LC Task 4", context="Question: Tell me a joke (Expected: None)"), # For joke task
    ]

    print("\n--- Running LangChain LitAgent via trainer.dev() for demonstration ---")
    for i, task in enumerate(lc_tasks):
        print(f"\nProcessing Task {i+1}: {task.name}")
        rollout_result = await trainer.dev(agent=langchain_wrapper_agent, task=task, resources={})
        print(f"Task '{task.name}' completed with reward: {rollout_result.final_reward}")

if __name__ == "__main__":
    import asyncio
    # Make sure to run langchain_agent_example.py first or ensure its functions are available
    # Then ensure the AgentLightningServer is running (as mentioned in the main_lc_litagent_test)
    # Finally, run this script.
    asyncio.run(main_lc_litagent_test())

To run this example:

  1. Save the LangChain agent code as langchain_agent_example.py.
  2. Save the LitAgent wrapper code as langchain_litagent.py.
  3. Ensure your Agentic Lightening virtual environment is active.
  4. Important: Start the AgentLightningServer in a separate terminal:
    agentlightning server start --host 0.0.0.0 --port 8000
    
  5. Then, run the langchain_litagent.py script:
    python langchain_litagent.py
    

Exercise 1: Dynamic Prompt Updates for LangChain

Enhance the LangChainOptimizedAgent:

  1. Assume resources might contain an AgentResource named "math_prompt" with an updated system message for the LangChain agent.
  2. Modify the training_rollout method to:
    • Dynamically create a new ChatPromptTemplate using this resource if it exists.
    • Update the lc_agent_executor.agent.runnable.llm.prompt (or similar prompt attribute) before calling run_langchain_agent.
  3. Create a sample AgentLightningTask and an AgentResource to demonstrate this, and use trainer.dev() to test.

2. Integrating with AutoGen Agents

AutoGen, another powerful framework from Microsoft, focuses on multi-agent conversations. Integrating AutoGen involves running a conversation between AutoGen agents and then evaluating the conversation’s outcome.

AutoGen Agent Overview

AutoGen agents communicate by sending messages to each other. A typical setup involves:

  • A UserProxyAgent (simulating user input).
  • An AssistantAgent (performing tasks, possibly using tools).
  • A groupchat or direct chat to orchestrate interactions.
# autogen_agent_example.py
import os
import autogen

# Set your OpenAI API key
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

# Ensure you have a config list for AutoGen
# For local testing, you might use a mock LLM.
config_list_openai = [
    {
        "model": "gpt-4o", # or "gpt-3.5-turbo"
        "api_key": os.getenv("OPENAI_API_KEY"),
    }
]

# Fallback for mock LLM if no API key is set
if not os.getenv("OPENAI_API_KEY"):
    print("WARNING: OPENAI_API_KEY not found. Using a mock LLM for AutoGen example.")
    class MockAutoGenLLM:
        def create(self, *args, **kwargs):
            messages = kwargs.get('messages', [])
            last_user_message = next((m['content'] for m in reversed(messages) if m['role'] == 'user'), '')
            if "add" in last_user_message.lower():
                try:
                    parts = last_user_message.lower().replace("add", "").replace("and", "").split()
                    numbers = [int(s) for s in parts if s.isdigit()]
                    if len(numbers) == 2:
                        return autogen.Completion(choices=[autogen.CompletionChoice(message={"content": f"The sum is {sum(numbers)}."})])
                except ValueError:
                    pass
            return autogen.Completion(choices=[autogen.CompletionChoice(message={"content": "I couldn't fulfill that request with my current tools."})])

    autogen.Completion.create = MockAutoGenLLM().create
    config_list_openai = [
        {
            "model": "mock-llm", # Dummy model name for mock
            "api_key": "dummy"
        }
    ]

# Create an assistant agent
assistant = autogen.AssistantAgent(
    name="Assistant",
    llm_config={"config_list": config_list_openai},
    system_message="You are a helpful AI assistant. You can perform arithmetic operations.",
)

# Create a user proxy agent
user_proxy = autogen.UserProxyAgent(
    name="User",
    human_input_mode="NEVER", # Never ask for human input for automated training
    max_consecutive_auto_reply=10,
    is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"),
    code_execution_config={"work_dir": "autogen_scratchpad", "use_docker": False}, # Set work_dir
)

async def run_autogen_task(task_description: str) -> str:
    """Helper function to run an AutoGen conversation."""
    user_proxy.initiate_chat(assistant, message=task_description)
    # The last message in the chat history from the assistant is usually the final answer
    last_message = assistant.last_message()["content"]
    return last_message

# Simple local test (run this file directly)
async def main_autogen_test():
    print("--- AutoGen Agent Local Test ---")
    print("\nAdding numbers:")
    response = await run_autogen_task("Add 10 and 15.")
    print(f"AutoGen response: {response}")

    print("\nAsking for something it cannot do:")
    response = await run_autogen_task("Write a poem about space.")
    print(f"AutoGen response: {response}")

if __name__ == "__main__":
    import asyncio
    # Create the autogen_scratchpad directory if it doesn't exist
    if not os.path.exists("autogen_scratchpad"):
        os.makedirs("autogen_scratchpad")
    asyncio.run(main_autogen_test())

Wrapping an AutoGen Agent with LitAgent

# autogen_litagent.py
import asyncio
import re
from agentlightning.litagent import LitAgent
from agentlightning.types import AgentLightningTask, AgentResource
from autogen_agent_example import run_autogen_task # Import the AutoGen runner

class AutoGenOptimizedAgent(LitAgent):
    """
    A LitAgent that wraps an AutoGen conversation for optimization.
    """
    def __init__(self, autogen_agent_factory_func):
        super().__init__()
        self.autogen_agent_factory_func = autogen_agent_factory_func

    async def training_rollout(
        self,
        task: AgentLightningTask,
        rollout_id: str,
        resources: dict[str, AgentResource],
    ) -> float:
        print(f"[{rollout_id}] AutoGen Agent received task: {task.name} - '{task.context}'")

        # Parse the question and expected answer from the task context
        question_match = re.search(r"Question: (.*?)(?: \(Expected: (.*?)\))?$", task.context)
        if not question_match:
            print(f"[{rollout_id}] Could not parse question from task context: {task.context}")
            return 0.0
        
        question = question_match.group(1).strip()
        expected_answer_str = question_match.group(2) if question_match.group(2) else None

        # --- IMPORTANT: How resources are passed to AutoGen ---
        # If 'resources' contain updated system messages, tool definitions,
        # or LLM configurations, you would regenerate or update your AutoGen agents here.
        # For instance, if you have a resource named "assistant_system_message",
        # you would update the assistant's system_message before initiate_chat.
        # For this example, we'll assume the factory function handles initial setup.

        # Run the AutoGen conversation
        try:
            autogen_output = await run_autogen_task(question)
            print(f"[{roll_id}] AutoGen Agent final message: {autogen_output}")

            # --- Reward Calculation Logic ---
            reward = 0.0
            if expected_answer_str:
                if "sum is" in autogen_output.lower():
                    actual_num_match = re.search(r"the sum is (\d+)", autogen_output.lower())
                    if actual_num_match:
                        actual_answer = int(actual_num_match.group(1))
                        try:
                            expected_answer = int(expected_answer_str)
                            if actual_answer == expected_answer:
                                reward = 1.0
                                print(f"[{rollout_id}] Correct! Expected: {expected_answer}, Actual: {actual_answer}")
                            else:
                                print(f"[{rollout_id}] Incorrect. Expected: {expected_answer}, Actual: {actual_answer}")
                        except ValueError:
                            print(f"[{rollout_id}] Could not parse expected answer '{expected_answer_str}' as integer.")
                else:
                    print(f"[{rollout_id}] AutoGen did not provide expected numeric sum for task '{question}'.")
            else:
                # For tasks without a specific numeric answer, you might reward based on
                # whether the agent correctly responded (e.g., acknowledged inability).
                if "couldn't fulfill" in autogen_output.lower() and "poem" in question.lower():
                    reward = 0.5 # Partial reward for acknowledging limitations
                    print(f"[{rollout_id}] AutoGen correctly stated it couldn't fulfill the non-math request.")
                else:
                    print(f"[{rollout_id}] No explicit expected answer or other reward criteria met.")


            return reward
        except Exception as e:
            print(f"[{rollout_id}] Error running AutoGen agent: {e}")
            return 0.0

# Define a factory function if needed to re-initialize agents with new resources
def create_autogen_agents():
    # You would typically re-initialize or configure AutoGen agents here
    # with updated resources if available. For this example, we use the pre-configured
    # agents from autogen_agent_example.
    from autogen_agent_example import user_proxy, assistant # Re-import or pass them
    return user_proxy, assistant


async def main_autogen_litagent_test():
    # Make sure you have the AgentLightningServer running in a separate terminal:
    # agentlightning server start --host 0.0.0.0 --port 8000

    backend_url = "http://localhost:8000"
    num_workers = 1

    trainer = Trainer(n_workers=num_workers) # Trainer imported from agentlightning.trainer

    autogen_wrapper_agent = AutoGenOptimizedAgent(create_autogen_agents)

    # Define some tasks for the trainer
    ag_tasks = [
        AgentLightningTask(name="AG Task 1", context="Question: Add 30 and 40 (Expected: 70)"),
        AgentLightningTask(name="AG Task 2", context="Question: Subtract 10 from 50 (Expected: 40)"), # Will fail with our mock LLM
        AgentLightningTask(name="AG Task 3", context="Question: Write a short story (Expected: None)"), # For the 'cannot fulfill' scenario
    ]

    print("\n--- Running AutoGen LitAgent via trainer.dev() for demonstration ---")
    for i, task in enumerate(ag_tasks):
        print(f"\nProcessing Task {i+1}: {task.name}")
        rollout_result = await trainer.dev(agent=autogen_wrapper_agent, task=task, resources={})
        print(f"Task '{task.name}' completed with reward: {rollout_result.final_reward}")

if __name__ == "__main__":
    import asyncio
    # Make sure to run autogen_agent_example.py once to create the scratchpad,
    # or ensure autogen_agent_example's functions are available.
    # Then ensure the AgentLightningServer is running.
    # Finally, run this script.
    asyncio.run(main_autogen_litagent_test())

To run this example:

  1. Save the AutoGen agent code as autogen_agent_example.py.
  2. Save the LitAgent wrapper code as autogen_litagent.py.
  3. Ensure your Agentic Lightening virtual environment is active.
  4. Important: Start the AgentLightningServer in a separate terminal:
    agentlightning server start --host 0.0.0.0 --port 8000
    
  5. Then, run the autogen_litagent.py script:
    python autogen_litagent.py
    

Exercise 2: Implementing Tool-Based Reward for AutoGen

Modify AutoGenOptimizedAgent:

  1. Assume autogen_agent_example could use a “calculator” tool. If the task requires calculation, and the AutoGen output indicates that the add_two_numbers tool was successfully called and produced a correct result, award 1.0.
  2. Adjust the reward logic to prioritize correct tool usage for mathematical tasks.
  3. (Optional, advanced) Explore how to get tool call information from AutoGen’s conversation history to refine the reward. You might need to parse assistant.chat_messages for tool calls.

3. Integrating with OpenAI Agent SDK

The OpenAI Agent SDK (often used with Assistants API) provides robust functionality for building agents with persistent threads, function calling, and state management.

OpenAI Agent SDK Overview

Key components:

  • An Assistant (configured with an LLM, instructions, and tools).
  • A Thread (for maintaining conversation history).
  • Messages (user and assistant interactions).
  • Runs (orchestrating the assistant’s actions).

Since the OpenAI Assistant’s API manages external state (threads, runs), our LitAgent would typically interact with this API. For local testing without an actual OpenAI Assistant setup, this is harder to mock robustly, but the principle remains the same.

Conceptual openai_litagent.py structure (requires actual OpenAI API key for real functionality):

# openai_litagent.py (Conceptual - requires OpenAI API key and Assistant setup)
import os
import asyncio
from openai import OpenAI
from agentlightning.litagent import LitAgent
from agentlightning.types import AgentLightningTask, AgentResource

# Ensure your OpenAI API key is set
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

# This is a placeholder. In a real scenario, you'd have
# a pre-created OpenAI Assistant ID and potentially Tool definitions.
# For simplicity, we'll try to simulate using a basic chat completion.

class OpenAIBasicAgent(LitAgent):
    """
    A LitAgent that wraps interaction with the OpenAI Chat Completions API
    to simulate an OpenAI Agent.
    """
    def __init__(self):
        super().__init__()
        self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    async def training_rollout(
        self,
        task: AgentLightningTask,
        rollout_id: str,
        resources: dict[str, AgentResource],
    ) -> float:
        print(f"[{rollout_id}] OpenAI Basic Agent received task: {task.name} - '{task.context}'")

        question_match = re.search(r"Question: (.*?)(?: \(Expected: (.*?)\))?$", task.context)
        if not question_match:
            print(f"[{rollout_id}] Could not parse question from task context: {task.context}")
            return 0.0
        
        question = question_match.group(1).strip()
        expected_answer_str = question_match.group(2) if question_match.group(2) else None

        # A very basic "system message" that could be optimized via resources
        system_message = "You are a helpful assistant that tries to answer questions."
        if "system_message" in resources:
            system_message = resources["system_message"].value # Dynamically update system message

        try:
            # Simulate a basic chat completion
            chat_completion = await self.client.chat.completions.create(
                messages=[
                    {"role": "system", "content": system_message},
                    {"role": "user", "content": question}
                ],
                model="gpt-3.5-turbo", # Or your preferred model
                temperature=0.0
            )
            agent_output = chat_completion.choices[0].message.content
            print(f"[{rollout_id}] OpenAI Basic Agent output: {agent_output}")

            reward = 0.0
            if expected_answer_str:
                # Simple keyword matching for demo. More robust evaluation is needed.
                if expected_answer_str.lower() in agent_output.lower():
                    reward = 1.0
                    print(f"[{rollout_id}] Expected answer '{expected_answer_str}' found in output.")
                else:
                    print(f"[{rollout_id}] Expected answer '{expected_answer_str}' NOT found in output.")
            return reward

        except Exception as e:
            print(f"[{rollout_id}] Error with OpenAI API: {e}")
            return 0.0

async def main_openai_litagent_test():
    # Set your OpenAI API key as an environment variable or directly in the code for testing.
    # This example requires a valid OPENAI_API_KEY to function.
    if not os.getenv("OPENAI_API_KEY"):
        print("Please set your OPENAI_API_KEY environment variable to run this example.")
        return

    # Make sure you have the AgentLightningServer running in a separate terminal:
    # agentlightning server start --host 0.0.0.0 --port 8000

    backend_url = "http://localhost:8000"
    num_workers = 1

    trainer = Trainer(n_workers=num_workers) # Trainer imported from agentlightning.trainer

    openai_wrapper_agent = OpenAIBasicAgent()

    # Define some tasks
    openai_tasks = [
        AgentLightningTask(name="Math Q", context="Question: What is 2 + 2? (Expected: 4)"),
        AgentLightningTask(name="Fact Q", context="Question: Who discovered penicillin? (Expected: Alexander Fleming)"),
        AgentLightningTask(name="Greeting", context="Question: Say hello. (Expected: Hello)"), # Simple greeting
    ]

    print("\n--- Running OpenAI Basic LitAgent via trainer.dev() for demonstration ---")
    for i, task in enumerate(openai_tasks):
        print(f"\nProcessing Task {i+1}: {task.name}")
        rollout_result = await trainer.dev(agent=openai_wrapper_agent, task=task, resources={})
        print(f"Task '{task.name}' completed with reward: {rollout_result.final_reward}")

if __name__ == "__main__":
    asyncio.run(main_openai_litagent_test())

To run this conceptual example:

  1. Save the code as openai_litagent.py.
  2. Set your OPENAI_API_KEY environment variable.
  3. Ensure your Agentic Lightening virtual environment is active.
  4. Important: Start the AgentLightningServer in a separate terminal:
    agentlightning server start --host 0.0.0.0 --port 8000
    
  5. Then, run the openai_litagent.py script:
    python openai_litagent.py
    

Exercise 3: Advanced OpenAI Agent Integration (Conceptual)

If you have experience with the actual OpenAI Assistants API:

  1. Modify OpenAIBasicAgent to use the Assistants API (create/retrieve assistant, create thread, add messages, run assistant, poll for status, retrieve run steps/messages).
  2. Assume you have an Assistant with a pre-defined tool (e.g., a “calculator” tool). Design tasks that require tool usage.
  3. The reward function should then evaluate not just the final answer but also whether the correct tool was called. You might need to parse rollout_result.traces if AgentLightningServer is configured to capture Assistant API calls. (This is an advanced exercise requiring familiarity with both APIs).

4. Integrating with CrewAI Agents

CrewAI focuses on orchestrating multiple, role-playing AI agents to collaboratively solve tasks. The integration pattern is similar: run a CrewAI task within training_rollout and define the reward.

CrewAI Agent Overview

A CrewAI setup typically involves:

  • Agent objects (with roles, goals, backstories, tools).
  • Task objects (with descriptions, expected outputs).
  • A Crew object (orchestrating agents and tasks).

Conceptual crewai_agent_example.py (requires crewai installation and an LLM configured):

# crewai_agent_example.py (Conceptual - requires CrewAI and an LLM setup)
import os
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI # Using LangChain for LLM in CrewAI

# Set your OpenAI API key
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

# Fallback for mock LLM if no API key is set
if not os.getenv("OPENAI_API_KEY"):
    print("WARNING: OPENAI_API_KEY not found. Using a mock LLM for CrewAI example.")
    class MockChatOpenAI:
        def __init__(self, *args, **kwargs):
            pass # CrewAI might call other methods like _generate

        async def _agenerate(self, messages, *args, **kwargs):
            last_message = messages[-1].content
            if "research about" in last_message.lower():
                topic = last_message.lower().replace("research about", "").strip()
                return {"generations": [{"text": f"Mock research result for {topic}: Key points found."}]}
            return {"generations": [{"text": "Mock response: Couldn't perform complex research."}]}
    mock_llm = MockChatOpenAI()
else:
    mock_llm = None

# Configure LLM for CrewAI
crew_llm = ChatOpenAI(model="gpt-4o", temperature=0) if not mock_llm else mock_llm

# Define Agents
researcher = Agent(
    role='Senior Research Analyst',
    goal='Uncover critical information on specified topics',
    backstory='A skilled analyst with a knack for finding hidden data and insights.',
    verbose=True,
    allow_delegation=False,
    llm=crew_llm
)

writer = Agent(
    role='Content Strategist',
    goal='Craft compelling and informative articles based on research',
    backstory='A wordsmith who turns raw data into engaging narratives.',
    verbose=True,
    allow_delegation=False,
    llm=crew_llm
)

async def run_crewai_task(topic: str) -> str:
    """Helper function to run a CrewAI task."""

    # Define Tasks
    research_task = Task(
        description=f"Conduct thorough research on {topic}.",
        expected_output="A concise summary of key findings about the topic.",
        agent=researcher
    )

    write_task = Task(
        description=f"Write a short, engaging article (approx. 100 words) based on the research summary about {topic}.",
        expected_output=f"A short article about {topic}.",
        agent=writer
    )

    # Instantiate your crew with a sequential process
    project_crew = Crew(
        agents=[researcher, writer],
        tasks=[research_task, write_task],
        verbose=2, # You can set it to 1 or 2 for more detailed logging
        process=Process.sequential
    )

    # Kickoff the crew's work
    result = await project_crew.kickoff()
    return result

# Simple local test (run this file directly)
async def main_crewai_test():
    print("--- CrewAI Agent Local Test ---")
    print("\nResearching and writing about AI ethics:")
    response = await run_crewai_task("AI ethics")
    print(f"CrewAI final output: {response}")

    print("\nResearching and writing about quantum computing:")
    response = await run_crewai_task("quantum computing")
    print(f"CrewAI final output: {response}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main_crewai_test())

Wrapping a CrewAI Agent with LitAgent

# crewai_litagent.py
import asyncio
from agentlightning.litagent import LitAgent
from agentlightning.types import AgentLightningTask, AgentResource
from crewai_agent_example import run_crewai_task # Import the CrewAI runner

class CrewAIOptimizedAgent(LitAgent):
    """
    A LitAgent that wraps a CrewAI agent for optimization.
    """
    async def training_rollout(
        self,
        task: AgentLightningTask,
        rollout_id: str,
        resources: dict[str, AgentResource],
    ) -> float:
        print(f"[{rollout_id}] CrewAI Agent received task: {task.name} - '{task.context}'")

        # Parse the topic and expected keywords from the task context
        topic_match = re.search(r"Topic: (.*?)(?: \(Expected Keywords: (.*?)\))?$", task.context)
        if not topic_match:
            print(f"[{rollout_id}] Could not parse topic from task context: {task.context}")
            return 0.0
        
        topic = topic_match.group(1).strip()
        expected_keywords_str = topic_match.group(2) if topic_match.group(2) else ""
        expected_keywords = [kw.strip().lower() for kw in expected_keywords_str.split(',') if kw.strip()]

        # --- IMPORTANT: How resources are passed to CrewAI ---
        # If 'resources' contain updated agent roles, goals, or prompt modifications,
        # you would dynamically re-initialize your CrewAI agents and tasks here.
        # This allows the trainer to optimize these parameters.
        # Example:
        # if "researcher_goal" in resources:
        #   updated_goal = resources["researcher_goal"].value
        #   researcher.goal = updated_goal
        # You'd need a mechanism to access/reconfigure the CrewAI components.

        try:
            crew_output = await run_crewai_task(topic)
            print(f"[{rollout_id}] CrewAI final output: {crew_output}")

            # --- Reward Calculation Logic ---
            reward = 0.0
            if expected_keywords:
                # Check if all expected keywords are present in the final output
                output_lower = crew_output.lower()
                all_keywords_present = all(kw in output_lower for kw in expected_keywords)

                if all_keywords_present:
                    reward = 1.0
                    print(f"[{rollout_id}] All expected keywords found: {expected_keywords}")
                else:
                    missing_keywords = [kw for kw in expected_keywords if kw not in output_lower]
                    reward = 1 - (len(missing_keywords) / len(expected_keywords)) # Partial reward
                    print(f"[{rollout_id}] Missing keywords: {missing_keywords}. Partial reward: {reward}")
            else:
                # For tasks without specific keywords, you might reward based on completion
                # or length of output, assuming some quality criteria.
                if len(crew_output) > 50: # Basic check for meaningful output
                    reward = 0.5
                    print(f"[{rollout_id}] Task completed with significant output, no specific keywords expected.")
            
            return reward

        except Exception as e:
            print(f"[{rollout_id}] Error running CrewAI: {e}")
            return 0.0

async def main_crewai_litagent_test():
    # Make sure you have the AgentLightningServer running in a separate terminal:
    # agentlightning server start --host 0.0.0.0 --port 8000

    backend_url = "http://localhost:8000"
    num_workers = 1

    trainer = Trainer(n_workers=num_workers) # Trainer imported from agentlightning.trainer

    crewai_wrapper_agent = CrewAIOptimizedAgent()

    # Define some tasks for the trainer
    crewai_tasks = [
        AgentLightningTask(name="AI Safety Report", context="Topic: AI Safety (Expected Keywords: ethics, bias, alignment)"),
        AgentLightningTask(name="Future Tech Article", context="Topic: Future of Quantum Computing (Expected Keywords: superposition, entanglement, qubits)"),
        AgentLightningTask(name="Climate Change Summary", context="Topic: Climate Change Impacts (Expected Keywords: sea levels, extreme weather, carbon emissions, deforestation)"),
    ]

    print("\n--- Running CrewAI LitAgent via trainer.dev() for demonstration ---")
    for i, task in enumerate(crewai_tasks):
        print(f"\nProcessing Task {i+1}: {task.name}")
        rollout_result = await trainer.dev(agent=crewai_wrapper_agent, task=task, resources={})
        print(f"Task '{task.name}' completed with reward: {rollout_result.final_reward}")

if __name__ == "__main__":
    import asyncio
    # Make sure to install crewai: pip install crewai
    # Then ensure the AgentLightningServer is running.
    # Finally, run this script.
    asyncio.run(main_crewai_litagent_test())

To run this conceptual example:

  1. Save the CrewAI agent code as crewai_agent_example.py.
  2. Save the LitAgent wrapper code as crewai_litagent.py.
  3. Ensure your Agentic Lightening virtual environment is active.
  4. Install crewai: pip install crewai langchain-openai
  5. Set your OPENAI_API_KEY environment variable (or ensure the mock LLM is robust enough for CrewAI’s internal calls).
  6. Important: Start the AgentLightningServer in a separate terminal:
    agentlightning server start --host 0.0.0.0 --port 8000
    
  7. Then, run the crewai_litagent.py script:
    python crewai_litagent.py
    

Exercise 4: Optimizing CrewAI Agent Roles

Extend CrewAIOptimizedAgent:

  1. Assume resources might contain AgentResource objects named "researcher_backstory" and "writer_goal".
  2. Modify the run_crewai_task (or directly within training_rollout if you prefer to re-instantiate the Crew there) to dynamically update the researcher agent’s backstory and the writer agent’s goal using these resources.
  3. Design tasks and a reward function that specifically benefits from optimized agent roles (e.g., a backstory for the researcher that emphasizes “novel findings” and a goal for the writer that emphasizes “persuasive language”).

Moving Forward

These examples illustrate the power and flexibility of Agentic Lightening. By wrapping your existing agents in a LitAgent and defining a reward function, you can seamlessly integrate them into the optimization pipeline. The resources mechanism is key to enabling the Trainer to dynamically modify your agents’ behavior (e.g., updating prompts, tool configurations, or even model parameters) to improve performance over time.

In the next chapter, we will delve deeper into the critical concept of rollouts and rewards, which are the lifeblood of agent optimization in Agentic Lightening. Understanding how to design effective tasks and reward signals is fundamental to training truly intelligent agents.