freeleaps-ops/apps/gitea-webhook-ambassador-python/test_enhanced_features.py
Nicolas f6c515157c feat: 添加 Python 版本的 Gitea Webhook Ambassador
- 新增完整的 Python 实现,替代 Go 版本
- 添加 Web 登录界面和仪表板
- 实现 JWT 认证和 API 密钥管理
- 添加数据库存储功能
- 保持与 Go 版本一致的目录结构和启动脚本
- 包含完整的文档和测试脚本
2025-07-20 21:17:10 +08:00

226 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""
Gitea Webhook Ambassador 增强版功能测试脚本
"""
import requests
import json
import time
BASE_URL = "http://localhost:8000"
ADMIN_SECRET_KEY = "admin-secret-key-change-in-production"
def test_health_check():
"""测试健康检查"""
print("🔍 测试健康检查...")
try:
response = requests.get(f"{BASE_URL}/health")
if response.status_code == 200:
data = response.json()
print(f"✅ 健康检查成功: {data['status']}")
print(f" 版本: {data['version']}")
print(f" 运行时间: {data['uptime']}")
print(f" 内存使用: {data['memory']}")
return True
else:
print(f"❌ 健康检查失败: {response.status_code}")
return False
except Exception as e:
print(f"❌ 健康检查异常: {e}")
return False
def test_login():
"""测试登录功能"""
print("\n🔐 测试登录功能...")
try:
# 测试登录 API
login_data = {"secret_key": ADMIN_SECRET_KEY}
response = requests.post(
f"{BASE_URL}/api/auth/login",
json=login_data,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
data = response.json()
token = data.get("token")
print(f"✅ 登录成功,获得 JWT 令牌")
return token
else:
print(f"❌ 登录失败: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"❌ 登录异常: {e}")
return None
def test_api_key_management(token):
"""测试 API 密钥管理"""
print("\n🔑 测试 API 密钥管理...")
headers = {"Authorization": f"Bearer {token}"}
try:
# 创建 API 密钥
key_data = {"description": "测试 API 密钥"}
response = requests.post(
f"{BASE_URL}/api/keys",
json=key_data,
headers=headers
)
if response.status_code == 200:
data = response.json()
api_key = data["key"]
key_id = data["id"]
print(f"✅ 创建 API 密钥成功: {api_key[:20]}...")
# 列出 API 密钥
response = requests.get(f"{BASE_URL}/api/keys", headers=headers)
if response.status_code == 200:
keys_data = response.json()
print(f"✅ 列出 API 密钥成功,共 {len(keys_data['keys'])}")
# 删除 API 密钥
response = requests.delete(f"{BASE_URL}/api/keys/{key_id}", headers=headers)
if response.status_code == 200:
print(f"✅ 删除 API 密钥成功")
return True
else:
print(f"❌ 删除 API 密钥失败: {response.status_code}")
return False
else:
print(f"❌ 列出 API 密钥失败: {response.status_code}")
return False
else:
print(f"❌ 创建 API 密钥失败: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"❌ API 密钥管理异常: {e}")
return False
def test_project_management(token):
"""测试项目管理"""
print("\n📁 测试项目管理...")
headers = {"Authorization": f"Bearer {token}"}
try:
# 创建项目
project_data = {
"name": "测试项目",
"jenkinsJob": "test-job",
"giteaRepo": "test-owner/test-repo"
}
response = requests.post(
f"{BASE_URL}/api/projects/",
json=project_data,
headers=headers
)
if response.status_code == 200:
data = response.json()
project_id = data["id"]
print(f"✅ 创建项目成功: {data['name']}")
# 列出项目
response = requests.get(f"{BASE_URL}/api/projects/", headers=headers)
if response.status_code == 200:
projects_data = response.json()
print(f"✅ 列出项目成功,共 {len(projects_data['projects'])}")
# 删除项目
response = requests.delete(f"{BASE_URL}/api/projects/{project_id}", headers=headers)
if response.status_code == 200:
print(f"✅ 删除项目成功")
return True
else:
print(f"❌ 删除项目失败: {response.status_code}")
return False
else:
print(f"❌ 列出项目失败: {response.status_code}")
return False
else:
print(f"❌ 创建项目失败: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"❌ 项目管理异常: {e}")
return False
def test_stats(token):
"""测试统计信息"""
print("\n📊 测试统计信息...")
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get(f"{BASE_URL}/api/stats", headers=headers)
if response.status_code == 200:
data = response.json()
print(f"✅ 获取统计信息成功:")
print(f" 总项目数: {data['total_projects']}")
print(f" API 密钥数: {data['total_api_keys']}")
print(f" 今日触发次数: {data['today_triggers']}")
print(f" 成功触发次数: {data['successful_triggers']}")
return True
else:
print(f"❌ 获取统计信息失败: {response.status_code}")
return False
except Exception as e:
print(f"❌ 统计信息异常: {e}")
return False
def test_logs(token):
"""测试日志功能"""
print("\n📝 测试日志功能...")
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get(f"{BASE_URL}/api/logs", headers=headers)
if response.status_code == 200:
data = response.json()
print(f"✅ 获取日志成功,共 {len(data['logs'])} 条记录")
return True
else:
print(f"❌ 获取日志失败: {response.status_code}")
return False
except Exception as e:
print(f"❌ 日志功能异常: {e}")
return False
def main():
"""主测试函数"""
print("🚀 Gitea Webhook Ambassador 增强版功能测试")
print("=" * 50)
# 测试健康检查
if not test_health_check():
print("❌ 健康检查失败,服务可能未启动")
return
# 测试登录
token = test_login()
if not token:
print("❌ 登录失败,无法继续测试")
return
# 测试各项功能
test_api_key_management(token)
test_project_management(token)
test_stats(token)
test_logs(token)
print("\n" + "=" * 50)
print("🎉 增强版功能测试完成!")
print("\n📋 已实现的功能:")
print(" ✅ Web 登录界面")
print(" ✅ 数据库存储 API 密钥")
print(" ✅ 延长 JWT 有效期 (7天)")
print(" ✅ 前端仪表板")
print(" ✅ 项目管理")
print(" ✅ API 密钥管理")
print(" ✅ 日志查看")
print(" ✅ 健康状态监控")
print("\n🌐 访问地址:")
print(f" 登录页面: {BASE_URL}/login")
print(f" 仪表板: {BASE_URL}/dashboard")
print(f" 管理员密钥: {ADMIN_SECRET_KEY}")
if __name__ == "__main__":
main()