AI-assisted 实时金融风控决策支持系统。系统提供可解释、可审计的建议结果,不直接做最终风险决策。
版本: V2.0 | 编排引擎: LangGraph StateGraph v3 | 目标吞吐: 200K QPS(EKS 部署)
.
├── README.md # 项目总体说明(本文件)
├── plan.md # 架构演进计划
├── CLAUDE.md # AI 编码规范
├── docker-compose.yml # 本地全栈环境编排
├── .env # 敏感配置(不提交 git)
│
├── risk-signal-producer/ # 风险信号生产者(模拟上游)
├── flink-feature-job/ # PyFlink 实时特征工程作业
│ └── job/feature_enrichment.py # 窗口聚合 + CEP + 异常评分
│
├── case-service/ # 案例管理服务 :8001
├── orchestrator-service/ # LangGraph 编排核心 :8002
│ └── app/
│ ├── state.py # RiskAnalysisState TypedDict
│ ├── graph.py # StateGraph 定义
│ └── nodes.py # 6 个节点函数
│
├── fast-path-router/ # 快慢路径分流 :8007
├── rules-explainer-agent/ # 规则解释 :8003(Claude + Redis)
│ └── app/agent.py # LLM 核心逻辑
├── score-analyst-agent/ # 分数分析 :8004(Claude + Redis)
│ └── app/agent.py
├── evidence-retriever-agent/ # 证据检索 :8005(ES BM25 + Redis)
│ └── knowledge_base/kb.json # 知识库原始数据
├── graph-risk-agent/ # 图关系风险 :8008(Neo4j)
├── behavior-sequence-agent/ # 行为序列风险 :8009
├── verifier-agent/ # 分析质量验证 :8006(Claude + Redis)
├── policy-calibration-agent/ # 策略校准 :8010
│
├── k8s/ # AWS EKS 部署配置
│ ├── deployments/ # 6 个 Deployment + Service
│ ├── hpa/ # 4 个 HPA(按 CPU/Kafka lag)
│ └── ingress/ # AWS ALB Ingress
└── doc/
├── API_DOCS.md
└── swagger.yaml # OpenAPI 契约(唯一权威源)
flowchart TD
subgraph ingest["接入与特征层"]
producer["RiskSignalProducer\n模拟上游信号"]
kafkaRaw["Kafka\nraw_risk_signals\n64 partitions"]
flink["Flink PyJob\n窗口聚合 · CEP · 异常评分"]
kafkaEnrich["Kafka\nenriched_risk_signals"]
end
subgraph core["决策核心层"]
case["case-service :8001\nKafka消费 · Redis幂等\nPostgreSQL存储"]
router["fast-path-router :8007\n快慢路径分流"]
orch["orchestrator-service :8002\nLangGraph StateGraph\n6节点 · Claude LLM"]
end
subgraph agents["智能体层(均含 Redis 缓存)"]
rules["rules-explainer :8003\nv2.0 · Claude\n规则解释+监管映射"]
score["score-analyst :8004\nv2.0 · Claude\n因子归因+置信区间"]
graph_a["graph-risk :8008\nv2.0 · Neo4j\n图关系风险"]
behavior["behavior-sequence :8009\nv2.0\n行为序列风险"]
evidence["evidence-retriever :8005\nv3.0 · ES BM25\n知识库RAG检索"]
verifier["verifier :8006\nv2.1 · Claude\n质量交叉验证"]
policy["policy-calibration :8010\nv2.0\n策略校准+最终建议"]
end
subgraph storage["存储层"]
pg[("PostgreSQL\n:5432\n案例持久化")]
redis[("Redis\n:6379\nmaxmem 2gb")]
es[("Elasticsearch\n:9200\nrisk_knowledge")]
neo4j[("Neo4j\n:7474\n图关系")]
end
producer -->|"JSON signal"| kafkaRaw
kafkaRaw --> flink
flink -->|"+computed_features"| kafkaEnrich
kafkaEnrich --> case
case -->|"幂等检查"| redis
case -->|"写入"| pg
case --> router
router -->|"FAST路径"| case
router -->|"SLOW路径"| orch
orch --> rules & score & graph_a & behavior
orch --> evidence
orch --> verifier
orch --> policy
orch -->|"缓存读写"| redis
rules & score & verifier -->|"结果缓存"| redis
evidence -->|"BM25检索"| es
evidence -->|"结果缓存"| redis
graph_a --> neo4j
policy -->|"decision_support"| case
orchestrator-service 使用 LangGraph StateGraph 编排 6 个节点,所有节点共享 RiskAnalysisState。
flowchart LR
S(["START"]) --> route
route{"route_node\n判断路径"}
route -->|"fast_path=true\nrisk<0.45 单规则 无异常"| build
route -->|"fast_path=false\n高风险 / 复杂信号"| parallel
subgraph slow["慢路径(Slow Path)"]
parallel["parallel_analysis_node\nasyncio.gather\n① rules-explainer\n② score-analyst\n③ graph-risk\n④ behavior-sequence"]
ev["evidence_node\n⑤ evidence-retriever\nES BM25 + Redis缓存"]
vf["verify_node\n⑥ verifier\nClaude 质量验证\n→ quality_score"]
pol["policy_node\n⑦ policy-calibration\n置信度汇总\n→ final_recommendation"]
parallel --> ev --> vf --> pol --> build
end
build["build_result_node\n组装 decision_support\nJSON"]
build --> E(["END"])
style S fill:#555,color:#fff,stroke:none
style E fill:#555,color:#fff,stroke:none
style route fill:#d4860a,color:#fff
style parallel fill:#5b4fc9,color:#fff
style build fill:#1a7d4a,color:#fff
快路径触发条件(6 条全部满足):
| 条件 | 阈值 |
|---|---|
risk_score |
< 0.45 |
len(rules_hit) |
≤ 1 |
geographic_anomaly |
= false |
velocity_score |
≤ 0.6 |
cep_pattern_flag |
= false |
burst_flag |
= false |
LangGraph 状态字段写入时序(RiskAnalysisState):
sequenceDiagram
participant Entry as GraphEntry
participant Route as route_node
participant Parallel as parallel_analysis_node
participant Evidence as evidence_node
participant Verify as verify_node
participant Policy as policy_node
participant Build as build_result_node
Entry->>Entry: init(case_id, signal, trace_id, start_ms)
Route->>Route: write(fast_path)
alt slow_path
Parallel->>Parallel: write(rules_explanation, score_analysis, graph_analysis, behavior_analysis)
Parallel->>Parallel: incr(cache_hits, llm_calls)
Evidence->>Evidence: write(evidence)
Verify->>Verify: write(verification.quality_score, verification.issues)
Policy->>Policy: write(policy_result, confidence_breakdown)
end
Build->>Build: write(decision_support)
Build->>Build: append(errors?) and finalize(audit fields)
sequenceDiagram
autonumber
participant P as RiskSignalProducer
participant K1 as Kafka<br/>raw_risk_signals
participant FL as Flink PyJob
participant K2 as Kafka<br/>enriched_risk_signals
participant CS as case-service
participant RD as Redis
participant DB as PostgreSQL
participant RT as fast-path-router
participant OC as orchestrator<br/>(LangGraph)
participant AG as agents<br/>(并发4个)
participant EV as evidence-retriever
participant VF as verifier
P->>K1: publish(signal)
K1->>FL: consume(raw_signal)
FL->>FL: 计算 computed_features
FL->>K2: publish(enriched_signal)
K2->>CS: consume(enriched_signal)
CS->>RD: GET case:created:{id}(幂等检查)
alt 已处理
RD-->>CS: "1" → 跳过
else 新信号
CS->>DB: INSERT case
CS->>RD: SETEX case:created:{id} 86400
CS->>RT: POST /route
alt FAST路径
RT-->>CS: route=FAST + decision_support
CS->>DB: UPDATE decision_support
else SLOW路径
RT-->>CS: route=SLOW
CS->>OC: POST /analyze
OC->>OC: route_node(判断fast/slow)
par 并发分析
OC->>AG: POST /explain(rules-explainer)
OC->>AG: POST /analyze(score-analyst)
OC->>AG: POST /analyze-graph(graph-risk)
OC->>AG: POST /analyze-sequence(behavior-sequence)
end
OC->>EV: POST /retrieve(ES BM25 检索)
OC->>VF: POST /verify(quality_score 评估)
OC->>OC: policy_node + build_result_node
OC-->>CS: decision_support JSON
CS->>DB: UPDATE decision_support
end
end
功能: 模拟上游系统,向 Kafka raw_risk_signals 发送原始风险信号。
输出(Kafka 消息):
{
"case_id": "CASE_000001",
"entity": { "type": "CUSTOMER", "id": "entity_001" },
"facts": {
"transaction_amount": 52000,
"transaction_count_24h": 18,
"country": "US",
"merchant_category": "HIGH_RISK"
},
"features": {
"velocity_score": 0.85,
"amount_deviation": 2.3,
"geographic_anomaly": true,
"time_pattern_risk": 0.72
},
"rules_hit": ["RULE_001", "RULE_005", "RULE_012"],
"risk_score": 0.78,
"snapshot_ref": "snap_001",
"ts": "2026-01-01T00:00:00Z"
}功能: PyFlink DataStream 作业,消费 raw_risk_signals,进行实时特征增强后发布到 enriched_risk_signals。
输入: raw_risk_signals topic
处理管道:
flowchart LR
source[KafkaSource raw_risk_signals] --> parse[Parse JSON]
parse --> keyBy[KeyBy entity_id]
keyBy --> tw[TumblingWindow 60s]
keyBy --> sw[SlidingWindow 300s/60s]
keyBy --> sess[SessionWindow gap=120s]
keyBy --> cep[CEP Pattern 5min]
tw --> f1[tx_count_1min]
sw --> f2[velocity_delta]
sess --> f3[burst_flag]
cep --> f4[cep_pattern_flag]
keyBy --> f5[anomaly_score]
f1 --> merge[Merge computed_features]
f2 --> merge
f3 --> merge
f4 --> merge
f5 --> merge
merge --> sink[KafkaSink enriched_risk_signals]
附加字段(computed_features):
| 字段 | 类型 | 算法 | 说明 |
|---|---|---|---|
tx_count_1min |
int | TumblingWindow(60s) | 1 分钟滚动交易估算次数 |
velocity_delta |
float | 基于 velocity_score | 与基线速度的偏差量 |
burst_flag |
bool | SessionWindow(gap=120s) | 是否检测到突发交易模式 |
anomaly_score |
float | 多因子加权 | 综合异常评分 [0, 1] |
cep_pattern_flag |
bool | CEP 5min window | RULE_001 + RULE_005 是否在 5 分钟内共现 |
enrichment_ts |
str | 系统时间 | 特征计算时间戳 |
输出: enriched_risk_signals topic(原始信号 + computed_features 字段)
功能: 案例生命周期管理。Kafka 消费者,负责案例创建、Redis 幂等去重、PostgreSQL 持久化、触发分析流程,以及存储最终 decision_support。
Kafka 消费: enriched_risk_signals topic,group_id=case-service-consumers
消费与分支逻辑:
flowchart TD
msg[Consume enriched_risk_signals] --> idem{Redis key exists? case:created:id}
idem -- yes --> skip[Skip message]
idem -- no --> dbcheck{Case exists in DB?}
dbcheck -- yes --> repair[Repair Redis key and skip]
dbcheck -- no --> create[Insert case row + trace_id]
create --> mark[SETEX case:created:id 86400]
mark --> route[POST fast-path-router /route]
route --> path{path_type}
path -- FAST --> fastsave[Store decision_support directly]
path -- SLOW --> orch[POST orchestrator /analyze]
orch --> slowsave[Store decision_support from orchestrator]
fastsave --> done[Done]
slowsave --> done
PostgreSQL cases 表关键字段:
| 字段 | 类型 | 说明 |
|---|---|---|
case_id |
String | 唯一标识 |
status |
OPEN / REVIEWED / ESCALATED | 案例状态 |
raw_signal |
JSON | 完整 enriched_signal |
decision_support |
JSON | 最终分析结果 |
path_type |
FAST / SLOW | 路径类型 |
trace_id |
String | XAI 审计追踪 ID |
sla_ms |
Integer | 端到端延迟 ms |
confidence_breakdown |
JSON | 置信度分解 |
API 端点:
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /cases |
案例列表 |
| GET | /cases/{case_id} |
案例详情 |
| GET | /cases/{case_id}/trace |
XAI 审计轨迹 |
| PATCH | /cases/{case_id}/status |
更新状态 |
| GET | /health |
健康检查 |
功能: 基于 LangGraph StateGraph 的多智能体编排核心。驱动 6 节点有向图,协调所有 agent 的调用顺序、并发、缓存与 LLM 增强。
输入(POST /analyze):
{
"case_id": "CASE_000001",
"trace_id": "trace_001",
"signal": { /* enriched_signal 完整对象 */ }
}节点功能总览:
| # | 节点 | 功能 | Redis 缓存 |
|---|---|---|---|
| 1 | route_node |
判断快/慢路径(6 条件全满足 → fast) | 无 |
| 2 | parallel_analysis_node |
asyncio.gather 并发调用 rules/score/graph/behavior + Claude 增强 | rules/score/graph/behavior_cache,TTL 300s |
| 3 | evidence_node |
调用 evidence-retriever,ES BM25 检索知识库 | evidence_cache,TTL 600s |
| 4 | verify_node |
调用 verifier + Claude 交叉验证,生成 quality_score | verify_cache,TTL 300s |
| 5 | policy_node |
置信度汇总,调用 policy-calibration 输出最终建议 | 无 |
| 6 | build_result_node |
组装最终 decision_support JSON,附完整审计字段 | 无 |
输出(decision_support):
{
"case_id": "CASE_000001",
"trace_id": "trace_001",
"recommendation": "ESCALATE",
"confidence": 0.73,
"confidence_breakdown": {
"risk_score_component": 0.78,
"graph_component": 0.62,
"behavior_component": 0.55,
"evidence_component": 0.80,
"verification_component": 1.0
},
"summary": "LangGraph multi-agent analysis: risk_score=0.78, ...",
"top_signals": [
{ "type": "rule", "id": "RULE_001", "severity": "high" }
],
"evidence": [ /* ES 检索结果 */ ],
"verification": { "verified": true, "quality_score": 0.85, "issues": [] },
"audit": {
"workflow": "langgraph_stategraph_v3",
"steps": ["route", "parallel_analysis", "evidence", "verify", "policy", "build_result"],
"latency_ms": 1240,
"cache_hits": 3,
"llm_calls": 2,
"llm_enhanced": true
},
"xai": {
"why": "LangGraph StateGraph: parallel agent analysis → ES evidence RAG → LLM cross-validation → policy calibration",
"counterfactual_notes": "Low-risk signals (score<0.45, single rule) take fast path",
"policy_refs": ["BASE_DECISION_POLICY_V2", "VERIFICATION_GATE_V2"]
},
"decision_version": "3.0.0"
}功能: 快慢路径分流器。根据 risk_score、规则命中数、Flink 特征标记决定是否跳过深度分析,直接返回轻量级建议(延迟 < 45ms)。
输入(POST /route):
| 字段 | 类型 | 说明 |
|---|---|---|
case_id |
str | 案例 ID |
risk_score |
float | 综合风险分 |
rules_hit |
List[str] | 触发规则 |
features |
Dict | 风险特征 |
computed_features |
Dict | Flink 计算特征 |
输出:
{
"route": "FAST",
"reason": "Low risk score, single rule, no anomaly indicators",
"decision_support": { "recommendation": "REVIEW", "confidence": 0.65, ... }
}功能: 解释触发的风控规则,关联监管框架(AML/BSA/FATF/PCI-DSS),给出严重程度评级。支持 Claude LLM 增强,无 API Key 时降级为规则字典。
输入(POST /explain):
| 字段 | 类型 | 说明 |
|---|---|---|
rules_hit |
List[str] | 触发规则 ID 列表,如 ["RULE_001", "RULE_005"] |
facts |
Dict | 交易事实(金额、次数、国家等) |
features |
Dict | 风险特征(velocity_score、geo 等) |
输出(ExplainResponse):
{
"explanations": [
{
"rule_id": "RULE_001",
"explanation": "交易金额 $52,000 超过 $40,000 AML 报告阈值,需提交 SAR",
"trigger_reason": "transaction_amount=52000 > threshold=40000",
"regulatory_ref": "BSA/AML - SAR filing requirement",
"severity": "HIGH"
}
]
}Redis 缓存: rules_cache:{sha256(sorted_rules+facts)[:16]},TTL=300s
功能: 分析风险分数的因子贡献,输出权重归因、置信区间和自然语言解读。支持 Claude LLM 生成解读文本,支持 Flink computed_features(含 anomaly_score)。
输入(POST /analyze):
| 字段 | 类型 | 说明 |
|---|---|---|
risk_score |
float | 综合风险分 [0, 1] |
features |
Dict | 风险特征(velocity_score、amount_deviation 等) |
computed_features |
Dict? | Flink 计算特征(可选,含 anomaly_score) |
输出(AnalyzeResponse):
{
"top_factors": [
{ "feature": "velocity_score", "contribution": 0.30, "note": "高频交易模式,超出正常阈值" },
{ "feature": "geographic_anomaly","contribution": 0.25, "note": "跨境高风险地区" },
{ "feature": "amount_deviation", "contribution": 0.20, "note": "偏离均值 2.3 个标准差" }
],
"uncertainty": ["数据窗口仅 24h,长期行为基线不足"],
"confidence_interval": { "low": 0.65, "high": 0.88 },
"interpretation": "综合速度、地理和金额异常,风险分数置信度较高,建议人工复核"
}Redis 缓存: score_cache:{sha256(score+features)[:16]},TTL=300s
功能: 混合证据检索。优先从 Elasticsearch risk_knowledge 索引执行 BM25 全文检索(启动时自动从 kb.json 建库),检索不可用时回退到本地内存评分。
输入(POST /retrieve):
| 字段 | 类型 | 说明 |
|---|---|---|
rules_hit |
List[str] | 触发规则(用于 ES boost) |
summary |
str | 案例摘要文本(ES query_string) |
ES 检索管道(含 boost 层叠):
flowchart LR
input[Input rules_hit + summary] --> cache{Redis evidence_cache hit?}
cache -- yes --> cached[Return cached top-k]
cache -- no --> bm25[ES BM25 multi_match]
bm25 --> boost_rule[Rule boost]
boost_rule --> boost_cep[CEP co-occurrence boost +0.20]
boost_cep --> avail{ES available?}
avail -- no --> local[Local lexical/vector fallback]
avail -- yes --> esres[ES candidates]
esres --> merge[Merge + dedupe + rank]
local --> merge
merge --> topk[Top-5 evidence]
topk --> writecache[Write evidence_cache TTL 600s]
writecache --> output[Return RetrieveResponse]
输出(RetrieveResponse):
{
"evidence": [
{
"id": "KB_006",
"title": "Combined rule co-occurrence",
"content": "高价值交易与速度规则同时触发是洗钱 structuring/layering 的强烈信号",
"score": 0.92,
"type": "cep_pattern"
}
],
"source": "elasticsearch",
"kb_size": 6
}Redis 缓存: evidence_cache:{sha256(rules+summary)[:16]},TTL=600s
功能: 基于 Neo4j 图数据库分析实体关联风险。识别账户网络中的高风险连接关系、资金流转路径和共谋模式。
输入(POST /analyze-graph):
| 字段 | 类型 | 说明 |
|---|---|---|
entity |
Dict | 实体信息(type, id) |
facts |
Dict | 交易事实 |
rules_hit |
List[str] | 触发规则 |
输出:
{
"graph_risk_score": 0.62,
"patterns": [
{ "pattern": "SUSPICIOUS_NETWORK_CLUSTER", "confidence": 0.71 }
],
"connected_entities": 3,
"risk_explanation": "实体关联 3 个已知高风险账户"
}Redis 缓存: graph_cache:{sha256(entity+rules)[:16]},TTL=300s
功能: 分析交易行为序列风险。检测异常的时间模式、操作序列和会话行为,识别账户接管、欺诈工具自动化等行为特征。
输入(POST /analyze-sequence):
| 字段 | 类型 | 说明 |
|---|---|---|
case_id |
str | 案例 ID |
facts |
Dict | 交易事实(含 transaction_count_24h 等) |
features |
Dict | 风险特征(含 time_pattern_risk) |
输出:
{
"sequence_risk_score": 0.55,
"anomalous_patterns": ["RAPID_SUCCESSION", "OFF_HOURS_ACTIVITY"],
"risk_explanation": "检测到非工作时间高频操作序列"
}Redis 缓存: behavior_cache:{sha256(case_id+facts)[:16]},TTL=300s
功能: 验证多智能体分析的完整性与内部一致性。规则基础检查 + 可选 Claude 交叉验证,输出 quality_score 供 LangGraph orchestrator 路由使用(< 0.5 触发 NEEDS_HUMAN_REVIEW)。
输入(POST /verify):
| 字段 | 类型 | 说明 |
|---|---|---|
case_id |
str | 案例 ID |
rules_explanation |
Dict | rules-explainer 输出 |
score_analysis |
Dict | score-analyst 输出 |
evidence |
Dict | evidence-retriever 输出 |
graph_analysis |
Dict? | graph-risk 输出(可选) |
behavior_analysis |
Dict? | behavior-sequence 输出(可选) |
质量评分规则(起始 1.0,按缺陷扣分):
| 缺陷项 | 扣分 |
|---|---|
| 无规则解释 | -0.25 |
| 单条规则缺少 explanation 或 trigger_reason | -0.10 |
| 无 top_factors | -0.20 |
| 无支持证据 | -0.20 |
| 所有证据相关度 < 0.3 | -0.10 |
| 图风险分 >0.7 但无证据 | -0.10 |
LLM 混合权重:final_score = 0.6 × rule_score + 0.4 × llm_score
输出(VerifyResponse):
{
"verified": true,
"issues": [],
"quality_score": 0.85
}quality_score 阈值路由:
flowchart TD
qs[quality_score] --> d1{score >= 0.9}
d1 -- yes --> excellent[Excellent: auto-pass verification]
d1 -- no --> d2{score >= 0.7}
d2 -- yes --> good[Good: keep model recommendation]
d2 -- no --> d3{score >= 0.5}
d3 -- yes --> acceptable[Acceptable: REVIEW with caution]
d3 -- no --> human[NEEDS_HUMAN_REVIEW]
Redis 缓存: verify_cache:{sha256(all_inputs)[:16]},TTL=300s
功能: 汇总各维度置信度,依据业务策略规则校准最终建议(ESCALATE / REVIEW / NEEDS_HUMAN_REVIEW),记录策略引用来源。
输入(POST /calibrate):
| 字段 | 类型 | 说明 |
|---|---|---|
case_id |
str | 案例 ID |
base_recommendation |
str | 初始建议(risk_score≥0.8 → ESCALATE) |
risk_score |
float | 综合风险分 |
verification |
Dict | verifier 输出(含 quality_score) |
evidence_count |
int | 证据条数 |
confidence_breakdown |
Dict | 各维度置信分解 |
置信度分解字段:
| 维度 | 来源 |
|---|---|
risk_score_component |
原始风险分 |
graph_component |
graph-risk 输出的 graph_risk_score |
behavior_component |
behavior-sequence 输出的 sequence_risk_score |
evidence_component |
证据平均相关度得分 |
verification_component |
verified=true → 1.0,否则 → 0.4 |
置信度聚合关系:
flowchart LR
r[risk_score_component] --> agg[Confidence Aggregator]
g[graph_component] --> agg
b[behavior_component] --> agg
e[evidence_component] --> agg
v[verification_component] --> agg
agg --> conf[calibrated_confidence]
agg --> rec[final_recommendation]
agg --> refs[policy_refs]
输出:
{
"final_recommendation": "ESCALATE",
"calibrated_confidence": 0.73,
"policy_adjustment": "none",
"policy_refs": ["BASE_DECISION_POLICY_V2", "VERIFICATION_GATE_V2", "LANGGRAPH_WORKFLOW_V1"]
}# 依赖建议:
# - Docker / Docker Compose Plugin
# - curl + jq
# - (可选)redis-cli
# 复制环境变量模板
cp .env.example .env
# LLM 配置(默认本地 Ollama)
# LLM_PROVIDER=ollama
# LLM_MODEL=qwen2.5:14b
# OLLAMA_BASE_URL=http://host.docker.internal:11434
#
# 可选:切换到 OpenAI
# LLM_PROVIDER=openai
# LLM_MODEL=gpt-4o-mini
# OPENAI_API_KEY=sk-...
# OPENAI_BASE_URL=https://api.openai.com/v1
#
# 可选:切换到 Claude
# LLM_PROVIDER=anthropic
# LLM_MODEL=claude-haiku-4-5-20251001
# ANTHROPIC_API_KEY=sk-ant-...# 首次或依赖变更后,建议带 --build
docker compose up -d --build
# 查看容器状态(全部 Up/healthy 为理想状态)
docker compose ps# 核心服务
curl http://localhost:8001/health # case-service
curl http://localhost:8002/health # orchestrator-service
curl http://localhost:8007/health # fast-path-router
# 智能体服务
curl http://localhost:8003/health # rules-explainer-agent
curl http://localhost:8004/health # score-analyst-agent
curl http://localhost:8005/health # evidence-retriever-agent
curl http://localhost:8006/health # verifier-agent
curl http://localhost:8008/health # graph-risk-agent
curl http://localhost:8009/health # behavior-sequence-agent
curl http://localhost:8010/health # policy-calibration-agent
# 基础设施
curl http://localhost:9200/ # elasticsearch
redis-cli -p 6379 ping # 预期: PONG
curl http://localhost:8081/overview # flink web UI# 实时查看日志
docker compose logs -f case-service orchestrator-service
# 重启单个服务
docker compose restart case-service
# 停止全部服务
docker compose down
# 停止并删除数据卷(会清空本地数据)
docker compose down -vcurl http://localhost:8001/health | jq
curl http://localhost:8002/health | jq
curl http://localhost:8003/health | jq
curl http://localhost:8004/health | jq
curl http://localhost:8005/health | jq
curl http://localhost:8006/health | jq
curl http://localhost:8007/health | jq# 等待 20~40 秒让 producer/flink/case-service 完成一批流转
curl http://localhost:8001/cases | jq '.[0]'
# 查看最新 case 与审计轨迹
LATEST_CASE_ID=$(curl -s http://localhost:8001/cases | jq -r '.[0].case_id')
curl http://localhost:8001/cases/${LATEST_CASE_ID} | jq
curl http://localhost:8001/cases/${LATEST_CASE_ID}/trace | jqcurl -X POST http://localhost:8002/analyze \
-H "Content-Type: application/json" \
-d '{
"case_id": "TEST_SLOW_001",
"trace_id": "trace_001",
"signal": {
"case_id": "TEST_SLOW_001",
"entity": {"type": "CUSTOMER", "id": "entity_001"},
"facts": {"transaction_amount": 52000, "transaction_count_24h": 18,
"country": "US", "merchant_category": "HIGH_RISK"},
"features": {"velocity_score": 0.85, "amount_deviation": 2.3,
"geographic_anomaly": true, "time_pattern_risk": 0.72},
"rules_hit": ["RULE_001", "RULE_005", "RULE_012"],
"risk_score": 0.78,
"ts": "2026-01-01T00:00:00Z"
}
}' | jq预期检查点:
audit.workflow = "langgraph_stategraph_v3"audit.steps包含route / parallel_analysis / evidence / verify / policy / build_result- 返回中包含
verification.quality_score
curl -X POST http://localhost:8007/route \
-H "Content-Type: application/json" \
-d '{
"case_id": "TEST_FAST_001",
"trace_id": "trace_fast_001",
"signal": {
"risk_score": 0.2,
"rules_hit": ["RULE_012"],
"features": {"velocity_score": 0.2, "geographic_anomaly": false}
}
}' | jq预期检查点:
path_type = "FAST"target_sla_ms = 100- 返回
decision_support非空
# ES 索引文档数(应 > 0)
curl http://localhost:9200/risk_knowledge/_count | jq
# Redis 缓存键检查
redis-cli keys "rules_cache:*"
redis-cli keys "score_cache:*"
redis-cli keys "evidence_cache:*"
redis-cli keys "verify_cache:*"| 方式 | 位置 |
|---|---|
| 人类可读 | doc/API_DOCS.md |
| OpenAPI 契约 | doc/swagger.yaml(唯一权威源) |
| 运行时 Swagger | http://localhost:<port>/docs |
数据库直接访问:
docker compose exec postgres psql -U postgres -d risk_control已实现:
- Kafka 双 topic 架构(
raw_risk_signals→ Flink →enriched_risk_signals) - LangGraph StateGraph 6 节点编排,支持快/慢路径分流(含 6 条件快路径判断)
- Claude LLM 增强:规则解释监管映射 + 质量交叉验证(配置
ANTHROPIC_API_KEY激活) - 多 LLM Provider:支持
ollama/openai/anthropic(默认ollama + qwen2.5:14b) - Elasticsearch BM25 知识库检索(
risk_knowledgeindex,启动自动建库) - Redis 全链路 content-hash 缓存(6 种 key,TTL 分 300s/600s/86400s)
quality_score字段(verifier → orchestrator 路由感知)- K8s HPA 配置(minReplicas=64 对齐 Kafka 分区,目标 200K QPS)
仍在演进:
- Flink 状态化窗口聚合(当前为无状态 per-event 近似实现)
- Human-in-the-loop 流程(
quality_score < 0.5时的人工审核队列) - 高负载压测基线与可观测性面板(Prometheus/Grafana)
- 详见
plan.md