基于消息队列架构的代码助手系统,使用LangGraph和本地LLM服务实现。
┌─────────────────────────────────────────────────────────────────────────────┐
│ User Interface Layer │
│ (Web API / CLI / Direct Call) │
└─────────────────────┬───────────────────────────────┬───────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ LangGraph Application Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │
│ │ START │───▶│ CODER │───▶│ CHECKER │───▶│ DEBUG │ │
│ │ Node │ │ Agent │ │ Agent │ │ Agent │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └──────────────┘ │
└──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Message Queue Layer (Redis/RabbitMQ) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │
│ │ Producer │ │ Consumer │ │ Producer │ │ Consumer │ │
│ │ (Coder) │ │ (Checker) │ │ (Checker) │ │ (Debugger) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └──────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Message Queue Topics/Channels │ │
│ │ • coder.requests • checker.requests • debugger.requests │ │
│ │ • coder.responses • checker.responses • debugger.responses │ │
│ │ • system.status • error.logs • metrics │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────┬───────────────────────────────┬───────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Local LLM Service Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │
│ │Coder Service│ │Checker Serv.│ │Debugger Serv│ │ Health Mon │ │
│ │(Port 8001) │ │(Port 8002) │ │(Port 8003) │ │ (Port 8004) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
- Docker 和 Docker Compose
- Python 3.9+
- Redis 或 RabbitMQ
# 启动所有服务
docker-compose up -d
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f# Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine
# 或 RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management-alpinepython simple_llm_server.py# 启动Coder服务
python local_llm_service_restful.py --service coder --port 8001
# 启动Checker服务
python local_llm_service_restful.py --service checker --port 8002
# 启动Debugger服务
python local_llm_service_restful.py --service debugger --port 8003python main_langgraph.py --queue-type redis --redis-host localhost --redis-port 6379curl -X POST http://localhost:8004/generate \
-H "Content-Type: application/json" \
-d '{
"question": "How do I build a RAG chain in LCEL?",
"context": "LCEL documentation",
"max_iterations": 3
}'响应示例:
{
"success": true,
"session_id": "session_1234567890",
"code_solution": {
"prefix": "A RAG chain implementation using LCEL",
"imports": "from langchain_core.prompts import ChatPromptTemplate\nfrom langchain_openai import ChatOpenAI",
"code": "# Build RAG chain\nrag_chain = prompt | model | parser"
},
"iterations": 2,
"execution_time": 5.23,
"error": null
}curl http://localhost:8004/health# 系统状态
curl http://localhost:8004/status
# 监控指标
curl http://localhost:8004/monitoring/metrics
# 告警信息
curl http://localhost:8004/monitoring/alerts本项目包含完整的测试套件,用于验证系统功能和性能。
# 运行所有测试
python test/run_tests.py --all
# 运行特定类型的测试
python test/run_tests.py --unit # 单元测试
python test/run_tests.py --integration # 集成测试
python test/run_tests.py --performance # 性能测试| 测试类型 | 成功率 | 关键指标 |
|---|---|---|
| Redis基础功能 | 100% | 4/4测试通过 |
| 消息队列功能 | 100% | 9条消息处理 |
| 简化系统测试 | 100% | 3/3测试通过 |
| 平均响应时间 | - | 0.20秒 |
- ✅ Redis连接和消息队列操作
- ✅ 发布订阅通信机制
- ✅ 多代理架构通信
- ✅ 代码生成和验证流程
- ✅ Mock LLM服务集成
- ✅ 错误处理和重试机制
- ✅ 系统监控和健康检查
详细测试报告请参考: test/TEST_SUMMARY.md
SERVICE_NAME: 服务名称 (coder/checker/debugger)LLM_SERVER_URL: 本地LLM服务器URLQUEUE_TYPE: 消息队列类型 (redis/rabbitmq)REDIS_HOST: Redis主机REDIS_PORT: Redis端口SERVICE_PORT: 服务端口号
QUEUE_TYPE: 消息队列类型REDIS_HOST: Redis主机REDIS_PORT: Redis端口MONITORING_PORT: 监控端口
运行完整的测试套件:
# 安装测试依赖
pip install pytest pytest-asyncio
# 运行所有测试
python -m pytest test_system.py -v
# 运行特定测试类别
python -m pytest test_system.py::TestIntegration -v # 集成测试
python -m pytest test_system.py::TestPerformance -v # 性能测试- 请求计数和延迟
- 错误率和类型
- 队列深度
- 系统资源使用率
预配置的仪表板包括:
- 系统概览
- 服务性能
- 错误分析
- 队列监控
访问:http://localhost:3000 (admin/admin)
所有服务都配置结构化日志,存储在:/app/logs/
- 自动检测服务故障
- 防止级联故障
- 自动恢复机制
- 指数退避重试
- 可配置重试策略
- 死信队列处理
- 请求超时保护
- 优雅降级
- 错误恢复
- 每个服务可以独立扩展
- 消息队列负载均衡
- 无状态服务设计
- 资源配置优化
- 批处理支持
- 缓存策略
.
├── message_queue_client.py # 消息队列客户端
├── local_llm_service_restful.py # LLM服务层
├── langgraph_message_queue_nodes.py # LangGraph节点
├── error_handling.py # 错误处理
├── monitoring.py # 监控和可观测性
├── main_langgraph.py # 主应用入口
├── simple_llm_server.py # 本地LLM服务器
├── test_system.py # 系统测试
├── docker-compose.yml # Docker编排
└── requirements-*.txt # 依赖文件
- 创建服务类继承
LocalLLMRestfulService - 实现
process_request方法 - 创建FastAPI应用
- 更新Docker配置
实现 MessageQueueClient 抽象类:
class CustomMessageQueue(MessageQueueClient):
def publish(self, topic: str, message: Message) -> None:
# 实现发布逻辑
pass
def subscribe(self, topic: str, callback: Callable[[Message], None]) -> None:
# 实现订阅逻辑
pass
def close(self) -> None:
# 实现关闭逻辑
pass- Fork项目
- 创建特性分支
- 提交更改
- 推送到分支
- 创建Pull Request
MIT License
- 检查网络配置
- 验证端口是否开放
- 查看服务日志
- 检查Redis/RabbitMQ状态
- 验证连接参数
- 检查防火墙设置
- 检查LLM服务器状态
- 验证API调用格式
- 查看错误日志
# 检查服务状态
curl http://localhost:8004/health
# 查看Prometheus指标
curl http://localhost:9090/metrics
# 检查队列状态
redis-cli ping
redis-cli pubsub channels如有问题,请:
- 查看日志文件
- 检查监控仪表板
- 提交Issue到项目仓库