KEMBAR78
Enable concurrent execution of agents in GraphFlow by ekzhu · Pull Request #6545 · microsoft/autogen · GitHub
Skip to content

Conversation

@ekzhu
Copy link
Collaborator

@ekzhu ekzhu commented May 16, 2025

Support concurrent execution in GraphFlow:

  • Updated BaseGroupChatManager.select_speaker to return a union of a single string or a list of speaker name strings and added logics to check for currently activated speakers and only proceed to select next speakers when all activated speakers have finished.
  • Updated existing teams (e.g., SelectorGroupChat) with the new signature, while still returning a single speaker in their implementations.
  • Updated GraphFlow to support multiple speakers selected.
  • Refactored GraphFlow for less dictionary gymnastic by using a queue and update using update_message_thread.

Example: a fan out graph:

import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    # Initialize agents with OpenAI model clients.
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
    agent_a = AssistantAgent("A", model_client=model_client, system_message="You are a helpful assistant.")
    agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to Chinese.")
    agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to Japanese.")

    # Create a directed graph with fan-out flow A -> (B, C).
    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    builder.add_edge(agent_a, agent_b).add_edge(agent_a, agent_c)
    graph = builder.build()

    # Create a GraphFlow team with the directed graph.
    team = GraphFlow(
        participants=[agent_a, agent_b, agent_c],
        graph=graph,
    )

    # Run the team and print the events.
    async for event in team.run_stream(task="Write a short story about a cat."):
        print(event)


asyncio.run(main())

Resolves:
#6541
#6533

@ekzhu
Copy link
Collaborator Author

ekzhu commented May 16, 2025

@abhinav-aegis pls take a look thx

@ekzhu ekzhu linked an issue May 16, 2025 that may be closed by this pull request
1 task
@codecov
Copy link

codecov bot commented May 16, 2025

Codecov Report

Attention: Patch coverage is 81.55340% with 19 lines in your changes missing coverage. Please review.

Project coverage is 79.52%. Comparing base (2eadef4) to head (cede00f).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...p_chat/_magentic_one/_magentic_one_orchestrator.py 42.85% 12 Missing ⚠️
...chat/teams/_group_chat/_base_group_chat_manager.py 88.46% 3 Missing ⚠️
...at/teams/_group_chat/_graph/_digraph_group_chat.py 93.33% 3 Missing ⚠️
...n_agentchat/teams/_group_chat/_swarm_group_chat.py 75.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #6545      +/-   ##
==========================================
+ Coverage   75.40%   79.52%   +4.12%     
==========================================
  Files         331      225     -106     
  Lines       19779    16623    -3156     
  Branches      406        0     -406     
==========================================
- Hits        14914    13220    -1694     
+ Misses       4592     3403    -1189     
+ Partials      273        0     -273     
Flag Coverage Δ
unittests 79.52% <81.55%> (+4.12%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@abhinav-aegis
Copy link
Contributor

@ekzhu Nice. I will provide more comments next week if you still need.

But one item that is not clear to me is where does locking happen - I obviously don't know core functionality well enough to see this.

If multiple speakers are speaking in parallel, we wont know when they will finish executing. Are you able to guarantee that handle_agent_response will not be called by more than one concurrently running agent before the first one's handle_agent_response has finished executing? This was obviously never an issue previously for Teams with just a single speaker.

I think it might be very worthwhile adding a couple of tests to make sure this is working as expected - I saw a new test for saving and state and resuming but did not see one for this particular scenario.

@jackgerrits Do agree that this is a major change and probably needs a version bump :)

@ekzhu
Copy link
Collaborator Author

ekzhu commented May 17, 2025

The base class makes sure there is only one handling of the agent response -- the lock is in the base class SequentialRoutedAgent

@abhinav-aegis
Copy link
Contributor

The base class makes sure there is only one handling of the agent response -- the lock is in the base class SequentialRoutedAgent

Thx. Will check it out and give any additional comments if required.

@ekzhu
Copy link
Collaborator Author

ekzhu commented May 19, 2025

UPDATE: avoid breaking existing code by modifying select_speaker to return a union type.

@ekzhu ekzhu enabled auto-merge (squash) May 19, 2025 21:41
@ekzhu ekzhu merged commit f0b7344 into main May 19, 2025
63 checks passed
@ekzhu ekzhu deleted the ekzhu-parallel-graph-flow branch May 19, 2025 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

GraphFlow should support concurrent execution of multiple agents Graphflow support assistant and system messages on history task

3 participants