Project 2: Enhancing a LangChain Agent with Reinforcement Learning

Project 2: Enhancing a LangChain Agent with Reinforcement Learning

This project delves into a more advanced scenario: taking an existing agent built with a popular framework (LangChain) and enhancing its performance using Reinforcement Learning (RL) via Agentic Lightening. Instead of just tuning prompts, we’ll focus on optimizing the agent’s decision-making and tool-use strategy in a simulated interactive environment.

Clear Objective: To integrate a LangChain agent into Agentic Lightening and conceptually train it with RL to improve its ability to solve multi-step problems requiring tool usage.

Problem Statement: Our LangChain agent, while capable of using tools, might make suboptimal choices in a multi-step problem-solving scenario, leading to inefficient or incorrect outcomes. We want to use RL to guide the agent towards better strategies for tool selection and task completion.

Project Structure

We’ll break this project into these steps:

  1. Define the Base LangChain Agent: Create a LangChain agent that uses simple arithmetic tools.
  2. Define a LitAgent Wrapper: Adapt the LangChain agent to fit the LitAgent interface.
  3. Define RL-Oriented Tasks: Create tasks that require sequential decisions and tool use.
  4. Implement an RL-Friendly Reward Function: Design a reward that provides feedback on correctness and efficiency of tool usage.
  5. Simulate the RL Training Loop with VERL (Conceptual): Demonstrate how Agentic Lightening would use VERL to train this agent.

Step 1: Define the Base LangChain Agent

Our LangChain agent will be equipped with simple tools: an add tool and a subtract tool. It will need to use these tools to solve arithmetic puzzles.

Create a new directory for this project, e.g., agentic_rl_project. Inside, create a file named langchain_rl_agent.py:

# langchain_rl_agent.py
import os
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_core.messages import AIMessage

# Set your OpenAI API key
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# For this example, we'll use a mock LLM if no API key is set.

class MockChatOpenAI:
    """A mock ChatOpenAI for local testing without an actual API key.
    It tries to simulate tool calling based on prompt.
    """
    async def ainvoke(self, messages, tool_choice=None):
        last_message_content = messages[-1].content if messages and messages[-1].content else ""
        system_message_content = next((m.content for m in messages if m.role == 'system'), "")
        
        # Simple rule-based tool calling simulation
        if "add" in last_message_content.lower() and "two numbers" in last_message_content.lower():
            # Extract numbers
            numbers = re.findall(r'\d+', last_message_content)
            if len(numbers) >= 2:
                num1, num2 = int(numbers[0]), int(numbers[1])
                return AIMessage(
                    content="",
                    tool_calls=[
                        {
                            "id": "call_add_tool_mock_id",
                            "name": "add_two_numbers",
                            "args": {"num1": num1, "num2": num2},
                        }
                    ]
                )
        if "subtract" in last_message_content.lower() and "from" in last_message_content.lower():
            numbers = re.findall(r'\d+', last_message_content)
            if len(numbers) >= 2:
                num1, num2 = int(numbers[0]), int(numbers[1])
                # Assume "subtract X from Y" means Y - X
                return AIMessage(
                    content="",
                    tool_calls=[
                        {
                            "id": "call_subtract_tool_mock_id",
                            "name": "subtract_two_numbers",
                            "args": {"num1": num2, "num2": num1}, # num1 from num2
                        }
                    ]
                )
        
        # If no tool call, just respond
        if "what is 5 plus 3" in last_message_content.lower():
            return AIMessage(content="The answer is 8.")
        return AIMessage(content="I need a tool to answer that or I cannot process it.")

# Define the tools
@tool
def add_two_numbers(num1: int, num2: int) -> int:
    """Adds two integers and returns their sum. Useful for addition tasks."""
    print(f"  [Tool: add_two_numbers] Called with {num1}, {num2}")
    return num1 + num2

@tool
def subtract_two_numbers(num1: int, num2: int) -> int:
    """Subtracts the second integer from the first. Useful for subtraction tasks."""
    print(f"  [Tool: subtract_two_numbers] Called with {num1}, {num2}")
    return num1 - num2

# Define the LLM (use mock if no API key, otherwise ChatOpenAI)
llm = ChatOpenAI(temperature=0, model="gpt-4o") if os.getenv("OPENAI_API_KEY") else MockChatOpenAI()

# Define the tools the agent can use
tools = [add_two_numbers, subtract_two_numbers]

# Define the prompt template
base_prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are an intelligent arithmetic assistant. Use the available tools to solve math problems. Respond with the final answer only."),
        MessagesPlaceholder("chat_history", optional=True),
        ("human", "{input}"),
        MessagesPlaceholder("agent_scratchpad"),
    ]
)

# Create the agent
lc_agent_runnable = create_openai_functions_agent(llm, tools, base_prompt_template)
lc_agent_executor = AgentExecutor(agent=lc_agent_runnable, tools=tools, verbose=False, handle_parsing_errors=True) # verbose=True for debugging

async def run_lc_math_agent(question: str, history=None) -> str:
    """Helper function to run the LangChain agent."""
    try:
        if history is None:
            history = []
        response = await lc_agent_executor.ainvoke({"input": question, "chat_history": history})
        return response["output"]
    except Exception as e:
        return f"Agent Error: {e}"

# Simple local test (run this file directly)
async def main_lc_math_test():
    print("--- LangChain Math Agent Local Test ---")
    
    print("\nTest 1: Simple Addition")
    result = await run_lc_math_agent("What is 5 plus 3?")
    print(f"Agent response: {result}") # Mock LLM should answer directly here
    
    print("\nTest 2: Tool-based Addition")
    result = await run_lc_math_agent("Add 10 and 5.")
    print(f"Agent response: {result}")
    
    print("\nTest 3: Tool-based Subtraction")
    result = await run_lc_math_agent("Subtract 7 from 20.")
    print(f"Agent response: {result}")

    print("\nTest 4: Multi-step (conceptual - mock LLM won't do this automatically)")
    result = await run_lc_math_agent("Start with 10. Add 5, then subtract 3.")
    print(f"Agent response: {result}") # Will likely struggle without a true multi-step LLM

if __name__ == "__main__":
    import asyncio
    import re
    asyncio.run(main_lc_math_test())

Note: The MockChatOpenAI tries to simulate tool calling. For a real LangChain agent with tool use, you would need an actual OpenAI API key and potentially gpt-3.5-turbo-0613 or gpt-4 models for reliable function calling. The verbose=True in AgentExecutor can be very helpful for debugging agent steps.

Step 2: Define a LitAgent Wrapper

Now, we’ll wrap our LangChain agent in a LitAgent so Agentic Lightening can interact with it. This LitAgent will manage the LangChain agent’s execution within a rollout.

Create a file named rl_litagent_wrapper.py in the same directory:

# rl_litagent_wrapper.py
import asyncio
import re
from agentlightning.litagent import LitAgent
from agentlightning.types import AgentLightningTask, AgentResource
from langchain_rl_agent import run_lc_math_agent, base_prompt_template, llm, tools, create_openai_functions_agent, AgentExecutor # Import LangChain components

class LangChainRLOptimizedAgent(LitAgent):
    """
    A LitAgent that wraps our LangChain math agent for RL optimization.
    """
    def __init__(self):
        super().__init__()
        # We need to re-initialize the LangChain agent here if we expect its
        # components (like prompt) to be dynamically updated by resources.
        # For this example, we'll assume the base LangChain agent setup is fixed initially.
        self.lc_agent_executor = AgentExecutor(
            agent=create_openai_functions_agent(llm, tools, base_prompt_template),
            tools=tools,
            verbose=False, # Set to True for detailed LangChain internal logging during rollout
            handle_parsing_errors=True
        )
        self.current_prompt_resource_value = None # Store the current prompt for tracing
        print("LangChainRLOptimizedAgent initialized.")

    async def training_rollout(
        self,
        task: AgentLightningTask,
        rollout_id: str,
        resources: dict[str, AgentResource],
    ) -> float:
        print(f"[{rollout_id}] LangChain RL Agent received task: {task.name} - '{task.context}'")
        self.trace("rollout_start", {"task_context": task.context})

        # --- Dynamic Prompt Update (if applicable for RL) ---
        # If the RL algorithm updates prompts, we'd reconstruct the LangChain agent
        # or update its prompt here. For this project, we're assuming the RL
        # will primarily learn to choose tools, but dynamic prompts are possible.
        if "lc_system_prompt" in resources:
            new_prompt_value = resources["lc_system_prompt"].value
            if new_prompt_value != self.current_prompt_resource_value:
                # This would involve rebuilding the agent with a new prompt
                # For simplicity in this project, we just log and acknowledge.
                self.current_prompt_resource_value = new_prompt_value
                print(f"[{rollout_id}] Agent fetched new system prompt: '{new_prompt_value}'")
                # In a real system, you'd update self.lc_agent_executor with a new prompt
                # e.g., self.lc_agent_executor.agent.runnable.llm.prompt = new_ChatPromptTemplate(...)
        
        # Parse question and expected numerical answer from task context
        # Format: "Problem: <math_problem> (Expected: <numerical_answer>)"
        problem_match = re.search(r"Problem: (.*?)\s*\(Expected:\s*(\-?\d+)\)", task.context, re.IGNORECASE)
        if not problem_match:
            print(f"[{rollout_id}] Error: Task context format invalid. Expected '(Expected: <number>)'")
            self.trace("rollout_end", {"error": "Invalid task format"})
            return 0.0

        problem_question = problem_match.group(1).strip()
        expected_answer = int(problem_match.group(2))
        self.trace("parsed_task", {"question": problem_question, "expected_answer": expected_answer})

        # Run the LangChain agent
        lc_output = ""
        try:
            # We explicitly use self.lc_agent_executor's ainvoke method
            # If verbose=True is set on AgentExecutor, its internal steps will print.
            response_dict = await self.lc_agent_executor.ainvoke({"input": problem_question})
            lc_output = response_dict.get("output", "").strip()
            self.trace("lc_agent_output", {"raw_output": lc_output})
        except Exception as e:
            lc_output = f"Error during LangChain execution: {e}"
            print(f"[{roll_id}] LangChain Error: {e}")
            self.trace("lc_agent_error", {"error_message": str(e)})

        print(f"[{roll_id}] LangChain Agent final output: '{lc_output}'")

        # --- Reward Calculation (Step 4) ---
        reward = self._calculate_rl_reward(lc_output, expected_answer)
        
        self.trace("rollout_end", {"final_reward": reward})
        print(f"[{roll_id}] Final Reward: {reward:.2f}")
        return reward

    def _calculate_rl_reward(self, agent_output: str, expected_answer: int) -> float:
        """
        Calculates a reward for RL training.
        - Full reward for correct numerical answer.
        - Penalty for incorrect numerical answer.
        - Negative reward for errors or inability to answer.
        - (Conceptual) Small penalty for excessive tool use if traces were available.
        """
        try:
            # Try to extract the numerical answer from the agent's output
            # Assuming the agent tries to output the final answer at the end
            actual_answer_match = re.search(r"(-?\d+)", agent_output)
            if actual_answer_match:
                actual_answer = int(actual_answer_match.group(1))
                if actual_answer == expected_answer:
                    print(f"  [Reward] Correct Answer! {actual_answer} == {expected_answer}")
                    return 1.0  # Full reward for correctness
                else:
                    print(f"  [Reward] Incorrect Answer. Expected {expected_answer}, Got {actual_answer}")
                    return -0.5 # Penalty for incorrect answer
            else:
                # If no numerical answer found in output
                print(f"  [Reward] No numerical answer found in agent output: '{agent_output}'")
                return -1.0 # Significant penalty for not answering numerically
        except ValueError:
            print(f"  [Reward] Error parsing agent output for number. Output: '{agent_output}'")
            return -1.0 # Penalty for unparsable output
        except Exception as e:
            print(f"  [Reward] Unexpected error in reward calculation: {e}")
            return -1.0

Step 3: Define RL-Oriented Tasks

For RL, we need tasks that often require multiple steps and where the “best” sequence of actions (tool calls) isn’t immediately obvious. Our arithmetic puzzles fit this, though a more complex RL task would involve dynamic environments.

Create a file named rl_tasks.py in the same directory:

# rl_tasks.py
from agentlightning.types import AgentLightningTask

rl_math_tasks = [
    AgentLightningTask(name="Simple Add", context="Problem: What is 10 plus 5? (Expected: 15)"),
    AgentLightningTask(name="Simple Subtract", context="Problem: Subtract 7 from 20. (Expected: 13)"),
    AgentLightningTask(name="Combined 1", context="Problem: Start with 12. Add 8, then subtract 5. (Expected: 15)"),
    AgentLightningTask(name="Combined 2", context="Problem: Calculate: 25 minus 10, then add 3. (Expected: 18)"),
    AgentLightningTask(name="Negative Result", context="Problem: What is 5 minus 10? (Expected: -5)"),
    AgentLightningTask(name="Complex Combination", context="Problem: From 50, subtract 15, then add 7, then subtract 12. (Expected: 30)"),
]

Step 4: Implement an RL-Friendly Reward Function

The _calculate_rl_reward method is already part of our LangChainRLOptimizedAgent in rl_litagent_wrapper.py.

  • Reward Design Principles for RL:
    • Correctness (High Reward): 1.0 for perfectly correct answers.
    • Incorrectness (Penalty): -0.5 for numerically incorrect answers.
    • Failure to Answer/Parse Error (Strong Penalty): -1.0 if the agent cannot produce a numerical answer or encounters an error. This strongly discourages “giving up” or non-sensical outputs.
    • (Future Improvement) Efficiency Penalty: For more advanced RL, you might want to integrate a penalty for excessive tool calls or long chains of thought (which would require parsing traces, covered conceptually in the next step).

Step 5: Simulate the RL Training Loop with VERL (Conceptual)

Implementing a full RL training loop with VERL is beyond a simple code example here, as it involves specialized RL algorithms, data buffering, and policy updates often requiring GPU resources. However, we can conceptually demonstrate how Agentic Lightening would orchestrate this using a mock RLOptimizer.

The key is that the Trainer would collect LitRollout objects, and the RLOptimizer would process these, and then propose updates (e.g., to the agent’s prompt or even a fine-tuned LLM model) through AgentResource objects.

Create a file named run_rl_optimization.py in the same directory:

# run_rl_optimization.py
import asyncio
import random
from agentlightning.trainer import Trainer
from agentlightning.types import AgentLightningTask, AgentResource, LitRollout
from rl_litagent_wrapper import LangChainRLOptimizedAgent
from rl_tasks import rl_math_tasks

# --- Mock RL Optimizer (Simulates VERL) ---
class MockRLOptimizer:
    """
    A mock RL Optimizer that simulates learning by gradually improving a "policy_prompt".
    It updates the prompt to be more specific if rewards improve.
    """
    def __init__(self):
        self.current_policy_prompt = "You are an intelligent arithmetic assistant. Use the available tools."
        self.iteration = 0
        self.best_policy_prompt = self.current_policy_prompt
        self.highest_avg_reward = -100.0
        self.prompt_improvement_history = []

    async def optimize_step(self, rollout_results: list[LitRollout], resources_version: str) -> dict:
        self.iteration += 1
        print(f"\n--- Mock RL Optimizer: Iteration {self.iteration} ---")
        
        if not rollout_results:
            print("RL Optimizer: No rollouts received.")
            return {"version": resources_version, "resources": {}}

        avg_reward = sum(r.final_reward for r in rollout_results) / len(rollout_results)
        print(f"RL Optimizer: Current prompt '{self.current_policy_prompt}' resulted in Avg Reward: {avg_reward:.2f}")

        self.prompt_improvement_history.append((self.current_policy_prompt, avg_reward))

        # Simulate policy improvement based on rewards
        new_prompt_suggestion = self.current_policy_prompt
        if avg_reward > self.highest_avg_reward:
            self.highest_avg_reward = avg_reward
            self.best_policy_prompt = self.current_policy_prompt
            print(f"RL Optimizer: New best policy prompt: '{self.best_policy_prompt}' (Avg Reward: {self.highest_avg_reward:.2f})")
            
            # Simple simulation: if doing well, make prompt more explicit about tool use
            if "always consider tool use" not in self.current_policy_prompt.lower():
                new_prompt_suggestion = "You are a highly intelligent arithmetic assistant. ALWAYS consider using the available tools to solve math problems. Respond with the final answer only."
            elif "prioritize tool usage" not in self.current_policy_prompt.lower():
                new_prompt_suggestion = "You are an expert arithmetic solver. PRIORITIZE tool usage for calculations. Respond with the final answer only."
        elif avg_reward < self.highest_avg_reward * 0.8 and self.iteration > 1:
            # If performance degrades significantly, revert or try a different approach
            print("RL Optimizer: Performance decreased. Reverting to best prompt.")
            new_prompt_suggestion = self.best_policy_prompt
        
        # Random exploration: occasionally try a slightly different prompt to explore
        if random.random() < 0.2 and self.iteration < 5: # Small chance to explore
            print("RL Optimizer: Randomly exploring a new prompt variant.")
            new_prompt_suggestion = random.choice([
                "Solve math problems step-by-step using tools.",
                "Arithmetic expert. Provide solutions using calculation tools.",
                self.best_policy_prompt # Can also explore the best known prompt
            ])

        self.current_policy_prompt = new_prompt_suggestion
        
        print(f"RL Optimizer: Proposing new policy prompt for next iteration: '{self.current_policy_prompt}'")

        return {
            "version": f"v_policy_{self.iteration}",
            "resources": {
                "lc_system_prompt": AgentResource(name="lc_system_prompt", value=self.current_policy_prompt),
            }
        }


async def main_rl_optimization():
    # Make sure you have the AgentLightningServer running in a separate terminal:
    # agentlightning server start --host 0.0.0.0 --port 8000
    
    backend_url = "http://localhost:8000"
    num_workers = 2 # Simulate multiple parallel rollouts for faster data collection

    trainer = Trainer(n_workers=num_workers)
    langchain_rl_agent = LangChainRLOptimizedAgent()
    rl_optimizer = MockRLOptimizer()

    current_resources = {} # Resources dictionary, starts empty

    print("--- Starting LangChain Agent Optimization with RL ---")
    
    # Simulate RL training for a number of epochs
    num_epochs = 10 
    for epoch in range(num_epochs):
        print(f"\n========== RL Optimization Epoch {epoch + 1}/{num_epochs} ==========")
        
        # Pass the current policy prompt from the optimizer to the agent via resources
        current_resources["lc_system_prompt"] = AgentResource(
            name="lc_system_prompt", 
            value=rl_optimizer.current_policy_prompt # Use the prompt proposed by the optimizer
        )
        
        print(f"Agent will use system prompt: '{current_resources['lc_system_prompt'].value}'")

        collected_rollouts = []
        # For each epoch, run the agent against all defined tasks
        # In a true RL setup, tasks might be sampled, not all run every epoch.
        for i, task in enumerate(rl_math_tasks):
            print(f"\n  Running task {i+1}/{len(rl_math_tasks)}: {task.name}")
            rollout_result = await trainer.dev(
                agent=langchain_rl_agent,
                task=task,
                resources=current_resources
            )
            collected_rollouts.append(rollout_result)
        
        # The RL optimizer processes the rollouts and updates its policy/suggests new resources
        updated_trainer_state = await rl_optimizer.optimize_step(collected_rollouts, f"v_epoch_{epoch}")
        # The optimizer internally updates its 'current_policy_prompt'
        # We don't need to manually update `current_resources` here, as `rl_optimizer.current_policy_prompt`
        # is already being passed in the next epoch.

    print("\n--- LangChain RL Optimization Completed ---")
    print(f"Final Best Policy Prompt: '{rl_optimizer.best_policy_prompt}' with highest average reward: {rl_optimizer.highest_avg_reward:.2f}")
    
    print("\nPrompt Improvement History:")
    for prompt, reward in rl_optimizer.prompt_improvement_history:
        print(f"  Prompt: '{prompt}' | Avg Reward: {reward:.2f}")


if __name__ == "__main__":
    import re # Needed for MockChatOpenAI
    # Optional: Start AgentLightningServer in a separate terminal.
    # It's highly recommended to have it running for real VERL integration.
    # agentlightning server start --host 0.0.0.0 --port 8000
    
    asyncio.run(main_rl_optimization())

To Run Project 2:

  1. Create Project Directory: Create a folder named agentic_rl_project.
  2. Save Files: Save langchain_rl_agent.py, rl_litagent_wrapper.py, rl_tasks.py, and run_rl_optimization.py into this directory.
  3. Activate Environment: Ensure your Agentic Lightening virtual environment is active.
  4. Install LangChain and OpenAI (if not already):
    pip install langchain-openai langchain
    
  5. Set OpenAI API Key (Optional but Recommended): If you have an OpenAI API key, set it as an environment variable (export OPENAI_API_KEY="sk-..."). This will use a real LLM for the LangChain agent. If not, the mock LLM will be used, which has limited capabilities.
  6. (Optional but Recommended) Start Server: In a separate terminal, start the AgentLightningServer:
    agentlightning server start --host 0.0.0.0 --port 8000
    
  7. Run Optimization: In your primary terminal (in agentic_rl_project directory with env active):
    python run_rl_optimization.py
    

Expected Output:

You will see output for each epoch, showing:

  • Which system prompt the LangChain agent is currently using.
  • The agent’s execution for each task, potentially including simulated tool calls.
  • The MockRLOptimizer evaluating the average reward and proposing a new system prompt for the next epoch.
  • You should observe that the average reward might fluctuate, but ideally, it should show a general trend of improvement as the optimizer refines the prompt to encourage better tool use and correct answers.
  • Finally, a summary of the best policy prompt found and its reward history.

Exercises/Mini-Challenges for Project 2:

  1. Real Tool Usage (Requires OpenAI API):

    • If you have an OpenAI API key, modify langchain_rl_agent.py to use ChatOpenAI(model="gpt-4o", temperature=0) (or gpt-3.5-turbo-0613 for older function calling) instead of MockChatOpenAI.
    • Observe how the actual agent performs tool calls and how this impacts rewards.
    • This will give you a much more realistic simulation of RL optimization for tool use.
  2. Add Efficiency Penalty to Reward:

    • Modify _calculate_rl_reward in rl_litagent_wrapper.py.
    • This requires leveraging traces: You would need to enable verbose=True on AgentExecutor and then (conceptually, as full trace parsing is advanced) check the number of tool_code or function_call entries in rollout_result.traces (if they were captured).
    • Introduce a small penalty (e.g., -0.05) for each tool call beyond a certain optimal number (e.g., more than 2 tool calls for a simple 2-step problem). This encourages the agent to find the most efficient solution.
  3. Advanced Multi-Step Task and Policy Update:

    • (Advanced) Design a more complex multi-step task (e.g., “Calculate the final balance after deposit and withdrawal” which might involve multiple additions and subtractions).
    • Modify MockRLOptimizer to, instead of just updating the prompt, simulate updating a simple internal “tool usage policy” (e.g., a dictionary mapping problem types to preferred tools). The LitAgent would then consult this policy for its initial tool selection. This mimics how an RL algorithm might update a learnable component beyond just the prompt.

This project demonstrates the immense potential of integrating powerful frameworks like LangChain with the optimization capabilities of Agentic Lightening. You’ve now seen how to build, wrap, and conceptually optimize an agent using RL, opening doors to highly adaptive and intelligent AI agents.

In our final section, we’ll provide a curated list of further learning resources to continue your journey with Agentic Lightening and the broader field of agentic AI.