⚡ 使用 GitHub 模型的并发代理工作流程 (.NET)

📋 高性能并行处理教程

本笔记本演示了使用 Microsoft Agent Framework for .NET 和 GitHub 模型的并发工作流模式。您将学习如何构建高性能、并行处理工作流程,通过同时执行多个 AI 代理来最大限度地提高吞吐量,同时保持协调和数据一致性。

🎯 学习目标

🚀 并发处理基础知识

  • 并行代理执行:同时运行多个 AI 代理以获得最佳性能
  • 异步/等待模式:利用.NET的异步编程模型实现高效并发
  • GitHub 模型集成:协调对 GitHub 的 AI 模型推理服务的多个并发调用
  • 资源管理:跨并发操作高效管理 AI 模型资源

🏗️ 高级并发架构

  • 基于任务的并行:使用 .NET 任务并行库实现最佳并发执行
  • 同步模式:协调并发代理,同时避免竞争条件
  • 负载平衡:在可用的并发处理能力之间有效地分配工作
  • 容错:处理单个代理故障而不停止整个工作流程

🏢 企业并发应用

  • 大容量文档处理:同时处理多个文档
  • 实时内容分析:传入数据流的并发分析
  • 批处理优化:最大化大规模数据处理操作的吞吐量
  • 多模态分析:不同内容类型和格式的并行处理

⚙️ 先决条件和设置

📦 所需的 NuGet 包

高性能并发工作流程的基本包:

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- Core AI Framework with Async Support -->
<PackageReference Include="Microsoft.Extensions.AI" Version="9.9.0" />

<!-- Client Model Abstractions for API Communication -->
<PackageReference Include="System.ClientModel" Version="1.6.1.0" />

<!-- Azure Identity and Async LINQ for Advanced Operations -->
<PackageReference Include="Azure.Identity" Version="1.15.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.3" />

<!-- Local Agent Framework References -->
<!-- Microsoft.Agents.AI.dll - Core agent abstractions with async support -->
<!-- Microsoft.Agents.AI.OpenAI.dll - GitHub Models integration with concurrency -->

🔑 GitHub 模型配置

环境设置(.env 文件):

1
2
3
GITHUB_TOKEN=your_github_personal_access_token
GITHUB_ENDPOINT=https://models.inference.ai.azure.com
GITHUB_MODEL_ID=gpt-4o-mini

并发处理注意事项:

1
2
3
4
5
6
7
// Configure for concurrent operations
var clientOptions = new OpenAIClientOptions()
{
Endpoint = new Uri(githubEndpoint),
// Configure connection pooling for concurrent requests
NetworkTimeout = TimeSpan.FromMinutes(5)
};

🏗️ 并发工作流架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
graph TD
A[Workflow Input] --> B[Task Distribution]
B --> C[Concurrent Agent Pool]
C --> D[Agent Task 1]
C --> E[Agent Task 2]
C --> F[Agent Task 3]
C --> G[Agent Task N]

D --> H[Result Aggregation]
E --> H
F --> H
G --> H

H --> I[Synchronized Output]

J[GitHub Models API] --> D
J --> E
J --> F
J --> G

K[.NET Task Scheduler] --> C

关键组件:

  • 任务并行库:.NET 对并发操作的内置支持
  • 代理池:用于并行处理的多个代理实例
  • 结果聚合:并发代理结果的协调和合并
  • 同步点:确保并发操作之间的数据一致性

🎨 并发工作流设计模式

🔍 并行研究与分析

1
Research Topic → Concurrent Research Agents → Result Synthesis → Final Report

📊 多源数据处理

1
Data Sources → Parallel Processing Agents → Data Integration → Unified Output

🎭 内容生成管道

1
Content Requirements → Concurrent Content Generators → Quality Review → Final Content

🔄 扇出/扇入处理

1
Single Input → Multiple Concurrent Processors → Result Aggregation → Single Output

🏢 企业绩效效益

吞吐量和可扩展性

  • 线性性能扩展:添加更多并发代理以提高吞吐量
  • 资源利用率:可用人工智能模型容量的最大效率
  • 减少处理时间:通过并行执行显着减少时间
  • 弹性扩展:根据工作负载动态调整并发代理数量

🛡️ 可靠性和弹性

  • 故障隔离:单个代理故障不会影响其他并发操作
  • 优雅降级:系统在代理容量减少的情况下继续运行
  • 错误恢复:失败并发操作的自动重试机制
  • 负载分配:在可用代理之间均匀分配工作

📊 性能监控

  • 并发执行指标:跟踪所有并行操作的性能
  • 资源使用分析:监控 CPU、内存和网络利用率
  • 吞吐量分析:衡量并发处理的效率增益
  • 瓶颈检测:识别并解决性能限制

🔧 开发与运营

  • 异步编程模型:利用.NET成熟的异步/等待模式
  • 任务协调:内置任务管理和协调功能
  • 异常处理:并发操作的综合错误处理
  • 调试支持:用于并发工作流程的 Visual Studio 调试工具

让我们使用 .NET 构建高性能并发 AI 工作流程! 🚀

💻 运行代码

完整的实现可在 03.dotnet-agent-framework-workflow-ghmodel-concurrent.cs 中找到。此文件演示了旅行计划的扇出/扇入并发工作流程

🏗️ 工作流程架构

1
User Request → ConcurrentStartExecutor → [Researcher Agent || Planner Agent] → ConcurrentAggregationExecutor → Final Output

关键组件:

  1. ConcurrentStartExecutor:同时向所有代理广播用户请求
  2. 研究代理:同时分析目的地和景点
  3. 规划代理:同时创建详细的旅行计划
  4. ConcurrentAggregationExecutor:收集并合并两个代理的结果

🎯 扇出/扇入模式

此工作流程演示了经典的 扇出/扇入 模式:

  • 扇出:一条输入消息同时广播给多个代理
  • 并发处理:多个代理并行处理同一任务
  • 扇入:收集所有代理的结果并将其聚合为单个输出

🚀 运行示例

1
2
3
4
5
# Make the script executable (Unix/Linux/macOS)
chmod +x 03.dotnet-agent-framework-workflow-ghmodel-concurrent.cs

# Run the concurrent workflow
./03.dotnet-agent-framework-workflow-ghmodel-concurrent.cs

或者在 Windows 上:

1
dotnet run 03.dotnet-agent-framework-workflow-ghmodel-concurrent.cs

📝 预期输出

工作流程将:

  1. 广播请求:向两个代理发送“计划 12 月去西雅图旅行”
  2. 并发处理:两个代理同时工作:
    • 研究人员确定景点和细节
    • 规划师创建行程和物流
  3. 聚合:将两个响应合并为综合输出
  4. 显示结果:显示合并后的旅行计划及其所有信息

🔧 自定义选项

添加更多并发代理:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Create additional specialized agents
AIAgent budgetAgent = openAIClient.GetChatClient(github_model_id).CreateAIAgent(
name: "Budget-Agent", instructions: "Calculate travel costs...");

// Add to fan-out
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [researcherAgent, plannerAgent, budgetAgent])
.AddFanInEdge(aggregationExecutor, sources: [researcherAgent, plannerAgent, budgetAgent])
.WithOutputFrom(aggregationExecutor)
.Build();

// Update aggregation count
if (this._messages.Count == 3) { ... }

修改代理指令:

1
2
const string ResearcherAgentInstructions = "Your custom instructions for research...";
const string PlanAgentInstructions = "Your custom instructions for planning...";

Change the Task:

1
2
3
4
StreamingRun run = await InProcessExecution.StreamAsync(
workflow,
"Plan a European vacation for 2 weeks in summer"
);

🎯 实际应用

这种并发模式非常适合:

  • 内容创建:多个作者同时创建不同的部分
  • 代码审查:多个审查者从不同角度分析代码
  • 市场研究:不同细分市场的并行分析
  • 文档处理:并发提取、分析和验证
  • 多视角分析:对同一输入获取不同的观点

🔍 了解自定义执行器

并发启动执行器:

  • 实现 IMessageHandler<string> 来接受字符串输入
  • 向所有连接的代理广播消息
  • 发送TurnToken触发并发处理

并发聚合执行器:

  • 实现 IMessageHandler<ChatMessage> 来接收代理响应
  • 以线程安全的方式收集消息
  • 当所有预期响应到达时聚合
  • 使用 context.YieldOutputAsync() 产生最终输出

⚡ 性能优势

并发与顺序:

  • 顺序:Agent1(30 秒)→ Agent2(30 秒)= 总共 60 秒
  • 并发:Agent1(30 秒)|| Agent2(30 秒)= 总共 30 秒

吞吐量改进:N 个并发代理的速度提高了 N 倍(取决于工作负载和资源)

🛡️ 错误处理

该工作流程可以优雅地处理单个代理故障:

  • 如果一个代理失败,其他代理继续处理
  • 聚合器可以实现超时逻辑
  • 如果需要,可以返回部分结果

📊 高级功能

动态代理计数:
修改聚合逻辑以支持可变代理计数:

1
2
3
4
5
6
7
8
9
10
11
private int _expectedAgentCount;
private readonly List<ChatMessage> _messages = [];

public async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context)
{
this._messages.Add(message);
if (this._messages.Count == _expectedAgentCount)
{
// Process aggregation
}
}

这种并发工作流程模式对于构建高性能、可扩展的人工智能代理系统至关重要!