Skip to content

FranzLy/Multi-agent-Risk-Control

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

多智能体风控决策支持平台(V2.0)

AI-assisted 实时金融风控决策支持系统。系统提供可解释、可审计的建议结果,不直接做最终风险决策

版本: V2.0 | 编排引擎: LangGraph StateGraph v3 | 目标吞吐: 200K QPS(EKS 部署)


目录

  1. 目录结构
  2. 系统架构图
  3. 智能体编排流程
  4. 端到端数据流
  5. 模块功能说明
  6. 快速启动
  7. 验证命令
  8. 接口文档
  9. 版本边界

1. 目录结构

.
├── README.md                        # 项目总体说明(本文件)
├── plan.md                          # 架构演进计划
├── CLAUDE.md                        # AI 编码规范
├── docker-compose.yml               # 本地全栈环境编排
├── .env                             # 敏感配置(不提交 git)
│
├── risk-signal-producer/            # 风险信号生产者(模拟上游)
├── flink-feature-job/               # PyFlink 实时特征工程作业
│   └── job/feature_enrichment.py   #   窗口聚合 + CEP + 异常评分
│
├── case-service/                    # 案例管理服务 :8001
├── orchestrator-service/            # LangGraph 编排核心 :8002
│   └── app/
│       ├── state.py                 #   RiskAnalysisState TypedDict
│       ├── graph.py                 #   StateGraph 定义
│       └── nodes.py                 #   6 个节点函数
│
├── fast-path-router/                # 快慢路径分流 :8007
├── rules-explainer-agent/           # 规则解释 :8003(Claude + Redis)
│   └── app/agent.py                 #   LLM 核心逻辑
├── score-analyst-agent/             # 分数分析 :8004(Claude + Redis)
│   └── app/agent.py
├── evidence-retriever-agent/        # 证据检索 :8005(ES BM25 + Redis)
│   └── knowledge_base/kb.json       #   知识库原始数据
├── graph-risk-agent/                # 图关系风险 :8008(Neo4j)
├── behavior-sequence-agent/         # 行为序列风险 :8009
├── verifier-agent/                  # 分析质量验证 :8006(Claude + Redis)
├── policy-calibration-agent/        # 策略校准 :8010
│
├── k8s/                             # AWS EKS 部署配置
│   ├── deployments/                 #   6 个 Deployment + Service
│   ├── hpa/                         #   4 个 HPA(按 CPU/Kafka lag)
│   └── ingress/                     #   AWS ALB Ingress
└── doc/
    ├── API_DOCS.md
    └── swagger.yaml                 # OpenAPI 契约(唯一权威源)

2. 系统架构图

flowchart TD
    subgraph ingest["接入与特征层"]
        producer["RiskSignalProducer\n模拟上游信号"]
        kafkaRaw["Kafka\nraw_risk_signals\n64 partitions"]
        flink["Flink PyJob\n窗口聚合 · CEP · 异常评分"]
        kafkaEnrich["Kafka\nenriched_risk_signals"]
    end

    subgraph core["决策核心层"]
        case["case-service :8001\nKafka消费 · Redis幂等\nPostgreSQL存储"]
        router["fast-path-router :8007\n快慢路径分流"]
        orch["orchestrator-service :8002\nLangGraph StateGraph\n6节点 · Claude LLM"]
    end

    subgraph agents["智能体层(均含 Redis 缓存)"]
        rules["rules-explainer :8003\nv2.0 · Claude\n规则解释+监管映射"]
        score["score-analyst :8004\nv2.0 · Claude\n因子归因+置信区间"]
        graph_a["graph-risk :8008\nv2.0 · Neo4j\n图关系风险"]
        behavior["behavior-sequence :8009\nv2.0\n行为序列风险"]
        evidence["evidence-retriever :8005\nv3.0 · ES BM25\n知识库RAG检索"]
        verifier["verifier :8006\nv2.1 · Claude\n质量交叉验证"]
        policy["policy-calibration :8010\nv2.0\n策略校准+最终建议"]
    end

    subgraph storage["存储层"]
        pg[("PostgreSQL\n:5432\n案例持久化")]
        redis[("Redis\n:6379\nmaxmem 2gb")]
        es[("Elasticsearch\n:9200\nrisk_knowledge")]
        neo4j[("Neo4j\n:7474\n图关系")]
    end

    producer -->|"JSON signal"| kafkaRaw
    kafkaRaw --> flink
    flink -->|"+computed_features"| kafkaEnrich
    kafkaEnrich --> case
    case -->|"幂等检查"| redis
    case -->|"写入"| pg
    case --> router
    router -->|"FAST路径"| case
    router -->|"SLOW路径"| orch

    orch --> rules & score & graph_a & behavior
    orch --> evidence
    orch --> verifier
    orch --> policy
    orch -->|"缓存读写"| redis

    rules & score & verifier -->|"结果缓存"| redis
    evidence -->|"BM25检索"| es
    evidence -->|"结果缓存"| redis
    graph_a --> neo4j

    policy -->|"decision_support"| case
Loading

3. 智能体编排流程(LangGraph)

orchestrator-service 使用 LangGraph StateGraph 编排 6 个节点,所有节点共享 RiskAnalysisState

flowchart LR
    S(["START"]) --> route

    route{"route_node\n判断路径"}
    route -->|"fast_path=true\nrisk<0.45 单规则 无异常"| build

    route -->|"fast_path=false\n高风险 / 复杂信号"| parallel

    subgraph slow["慢路径(Slow Path)"]
        parallel["parallel_analysis_node\nasyncio.gather\n① rules-explainer\n② score-analyst\n③ graph-risk\n④ behavior-sequence"]
        ev["evidence_node\n⑤ evidence-retriever\nES BM25 + Redis缓存"]
        vf["verify_node\n⑥ verifier\nClaude 质量验证\n→ quality_score"]
        pol["policy_node\n⑦ policy-calibration\n置信度汇总\n→ final_recommendation"]
        parallel --> ev --> vf --> pol --> build
    end

    build["build_result_node\n组装 decision_support\nJSON"]
    build --> E(["END"])

    style S fill:#555,color:#fff,stroke:none
    style E fill:#555,color:#fff,stroke:none
    style route fill:#d4860a,color:#fff
    style parallel fill:#5b4fc9,color:#fff
    style build fill:#1a7d4a,color:#fff
Loading

快路径触发条件(6 条全部满足):

条件 阈值
risk_score < 0.45
len(rules_hit) ≤ 1
geographic_anomaly = false
velocity_score ≤ 0.6
cep_pattern_flag = false
burst_flag = false

LangGraph 状态字段写入时序(RiskAnalysisState):

sequenceDiagram
    participant Entry as GraphEntry
    participant Route as route_node
    participant Parallel as parallel_analysis_node
    participant Evidence as evidence_node
    participant Verify as verify_node
    participant Policy as policy_node
    participant Build as build_result_node

    Entry->>Entry: init(case_id, signal, trace_id, start_ms)
    Route->>Route: write(fast_path)
    alt slow_path
        Parallel->>Parallel: write(rules_explanation, score_analysis, graph_analysis, behavior_analysis)
        Parallel->>Parallel: incr(cache_hits, llm_calls)
        Evidence->>Evidence: write(evidence)
        Verify->>Verify: write(verification.quality_score, verification.issues)
        Policy->>Policy: write(policy_result, confidence_breakdown)
    end
    Build->>Build: write(decision_support)
    Build->>Build: append(errors?) and finalize(audit fields)
Loading

4. 端到端数据流

sequenceDiagram
    autonumber
    participant P as RiskSignalProducer
    participant K1 as Kafka<br/>raw_risk_signals
    participant FL as Flink PyJob
    participant K2 as Kafka<br/>enriched_risk_signals
    participant CS as case-service
    participant RD as Redis
    participant DB as PostgreSQL
    participant RT as fast-path-router
    participant OC as orchestrator<br/>(LangGraph)
    participant AG as agents<br/>(并发4个)
    participant EV as evidence-retriever
    participant VF as verifier

    P->>K1: publish(signal)
    K1->>FL: consume(raw_signal)
    FL->>FL: 计算 computed_features
    FL->>K2: publish(enriched_signal)

    K2->>CS: consume(enriched_signal)
    CS->>RD: GET case:created:{id}(幂等检查)
    alt 已处理
        RD-->>CS: "1" → 跳过
    else 新信号
        CS->>DB: INSERT case
        CS->>RD: SETEX case:created:{id} 86400
        CS->>RT: POST /route

        alt FAST路径
            RT-->>CS: route=FAST + decision_support
            CS->>DB: UPDATE decision_support
        else SLOW路径
            RT-->>CS: route=SLOW
            CS->>OC: POST /analyze

            OC->>OC: route_node(判断fast/slow)
            par 并发分析
                OC->>AG: POST /explain(rules-explainer)
                OC->>AG: POST /analyze(score-analyst)
                OC->>AG: POST /analyze-graph(graph-risk)
                OC->>AG: POST /analyze-sequence(behavior-sequence)
            end
            OC->>EV: POST /retrieve(ES BM25 检索)
            OC->>VF: POST /verify(quality_score 评估)
            OC->>OC: policy_node + build_result_node

            OC-->>CS: decision_support JSON
            CS->>DB: UPDATE decision_support
        end
    end
Loading

5. 模块功能说明

5.1 RiskSignalProducer

功能: 模拟上游系统,向 Kafka raw_risk_signals 发送原始风险信号。

输出(Kafka 消息):

{
  "case_id": "CASE_000001",
  "entity": { "type": "CUSTOMER", "id": "entity_001" },
  "facts": {
    "transaction_amount": 52000,
    "transaction_count_24h": 18,
    "country": "US",
    "merchant_category": "HIGH_RISK"
  },
  "features": {
    "velocity_score": 0.85,
    "amount_deviation": 2.3,
    "geographic_anomaly": true,
    "time_pattern_risk": 0.72
  },
  "rules_hit": ["RULE_001", "RULE_005", "RULE_012"],
  "risk_score": 0.78,
  "snapshot_ref": "snap_001",
  "ts": "2026-01-01T00:00:00Z"
}

5.2 Flink Feature Job

功能: PyFlink DataStream 作业,消费 raw_risk_signals,进行实时特征增强后发布到 enriched_risk_signals

输入: raw_risk_signals topic

处理管道:

flowchart LR
    source[KafkaSource raw_risk_signals] --> parse[Parse JSON]
    parse --> keyBy[KeyBy entity_id]
    keyBy --> tw[TumblingWindow 60s]
    keyBy --> sw[SlidingWindow 300s/60s]
    keyBy --> sess[SessionWindow gap=120s]
    keyBy --> cep[CEP Pattern 5min]

    tw --> f1[tx_count_1min]
    sw --> f2[velocity_delta]
    sess --> f3[burst_flag]
    cep --> f4[cep_pattern_flag]
    keyBy --> f5[anomaly_score]

    f1 --> merge[Merge computed_features]
    f2 --> merge
    f3 --> merge
    f4 --> merge
    f5 --> merge
    merge --> sink[KafkaSink enriched_risk_signals]
Loading

附加字段(computed_features):

字段 类型 算法 说明
tx_count_1min int TumblingWindow(60s) 1 分钟滚动交易估算次数
velocity_delta float 基于 velocity_score 与基线速度的偏差量
burst_flag bool SessionWindow(gap=120s) 是否检测到突发交易模式
anomaly_score float 多因子加权 综合异常评分 [0, 1]
cep_pattern_flag bool CEP 5min window RULE_001 + RULE_005 是否在 5 分钟内共现
enrichment_ts str 系统时间 特征计算时间戳

输出: enriched_risk_signals topic(原始信号 + computed_features 字段)


5.3 case-service(:8001)

功能: 案例生命周期管理。Kafka 消费者,负责案例创建、Redis 幂等去重、PostgreSQL 持久化、触发分析流程,以及存储最终 decision_support

Kafka 消费: enriched_risk_signals topic,group_id=case-service-consumers

消费与分支逻辑:

flowchart TD
    msg[Consume enriched_risk_signals] --> idem{Redis key exists? case:created:id}
    idem -- yes --> skip[Skip message]
    idem -- no --> dbcheck{Case exists in DB?}
    dbcheck -- yes --> repair[Repair Redis key and skip]
    dbcheck -- no --> create[Insert case row + trace_id]
    create --> mark[SETEX case:created:id 86400]
    mark --> route[POST fast-path-router /route]
    route --> path{path_type}
    path -- FAST --> fastsave[Store decision_support directly]
    path -- SLOW --> orch[POST orchestrator /analyze]
    orch --> slowsave[Store decision_support from orchestrator]
    fastsave --> done[Done]
    slowsave --> done
Loading

PostgreSQL cases 表关键字段:

字段 类型 说明
case_id String 唯一标识
status OPEN / REVIEWED / ESCALATED 案例状态
raw_signal JSON 完整 enriched_signal
decision_support JSON 最终分析结果
path_type FAST / SLOW 路径类型
trace_id String XAI 审计追踪 ID
sla_ms Integer 端到端延迟 ms
confidence_breakdown JSON 置信度分解

API 端点:

方法 路径 说明
GET /cases 案例列表
GET /cases/{case_id} 案例详情
GET /cases/{case_id}/trace XAI 审计轨迹
PATCH /cases/{case_id}/status 更新状态
GET /health 健康检查

5.4 orchestrator-service(:8002)

功能: 基于 LangGraph StateGraph 的多智能体编排核心。驱动 6 节点有向图,协调所有 agent 的调用顺序、并发、缓存与 LLM 增强。

输入(POST /analyze):

{
  "case_id": "CASE_000001",
  "trace_id": "trace_001",
  "signal": { /* enriched_signal 完整对象 */ }
}

节点功能总览:

# 节点 功能 Redis 缓存
1 route_node 判断快/慢路径(6 条件全满足 → fast)
2 parallel_analysis_node asyncio.gather 并发调用 rules/score/graph/behavior + Claude 增强 rules/score/graph/behavior_cache,TTL 300s
3 evidence_node 调用 evidence-retriever,ES BM25 检索知识库 evidence_cache,TTL 600s
4 verify_node 调用 verifier + Claude 交叉验证,生成 quality_score verify_cache,TTL 300s
5 policy_node 置信度汇总,调用 policy-calibration 输出最终建议
6 build_result_node 组装最终 decision_support JSON,附完整审计字段

输出(decision_support):

{
  "case_id": "CASE_000001",
  "trace_id": "trace_001",
  "recommendation": "ESCALATE",
  "confidence": 0.73,
  "confidence_breakdown": {
    "risk_score_component": 0.78,
    "graph_component": 0.62,
    "behavior_component": 0.55,
    "evidence_component": 0.80,
    "verification_component": 1.0
  },
  "summary": "LangGraph multi-agent analysis: risk_score=0.78, ...",
  "top_signals": [
    { "type": "rule", "id": "RULE_001", "severity": "high" }
  ],
  "evidence": [ /* ES 检索结果 */ ],
  "verification": { "verified": true, "quality_score": 0.85, "issues": [] },
  "audit": {
    "workflow": "langgraph_stategraph_v3",
    "steps": ["route", "parallel_analysis", "evidence", "verify", "policy", "build_result"],
    "latency_ms": 1240,
    "cache_hits": 3,
    "llm_calls": 2,
    "llm_enhanced": true
  },
  "xai": {
    "why": "LangGraph StateGraph: parallel agent analysis → ES evidence RAG → LLM cross-validation → policy calibration",
    "counterfactual_notes": "Low-risk signals (score<0.45, single rule) take fast path",
    "policy_refs": ["BASE_DECISION_POLICY_V2", "VERIFICATION_GATE_V2"]
  },
  "decision_version": "3.0.0"
}

5.5 fast-path-router(:8007)

功能: 快慢路径分流器。根据 risk_score、规则命中数、Flink 特征标记决定是否跳过深度分析,直接返回轻量级建议(延迟 < 45ms)。

输入(POST /route):

字段 类型 说明
case_id str 案例 ID
risk_score float 综合风险分
rules_hit List[str] 触发规则
features Dict 风险特征
computed_features Dict Flink 计算特征

输出:

{
  "route": "FAST",
  "reason": "Low risk score, single rule, no anomaly indicators",
  "decision_support": { "recommendation": "REVIEW", "confidence": 0.65, ... }
}

5.6 rules-explainer-agent(:8003)v2.0

功能: 解释触发的风控规则,关联监管框架(AML/BSA/FATF/PCI-DSS),给出严重程度评级。支持 Claude LLM 增强,无 API Key 时降级为规则字典。

输入(POST /explain):

字段 类型 说明
rules_hit List[str] 触发规则 ID 列表,如 ["RULE_001", "RULE_005"]
facts Dict 交易事实(金额、次数、国家等)
features Dict 风险特征(velocity_score、geo 等)

输出(ExplainResponse):

{
  "explanations": [
    {
      "rule_id": "RULE_001",
      "explanation": "交易金额 $52,000 超过 $40,000 AML 报告阈值,需提交 SAR",
      "trigger_reason": "transaction_amount=52000 > threshold=40000",
      "regulatory_ref": "BSA/AML - SAR filing requirement",
      "severity": "HIGH"
    }
  ]
}

Redis 缓存: rules_cache:{sha256(sorted_rules+facts)[:16]},TTL=300s


5.7 score-analyst-agent(:8004)v2.0

功能: 分析风险分数的因子贡献,输出权重归因、置信区间和自然语言解读。支持 Claude LLM 生成解读文本,支持 Flink computed_features(含 anomaly_score)。

输入(POST /analyze):

字段 类型 说明
risk_score float 综合风险分 [0, 1]
features Dict 风险特征(velocity_score、amount_deviation 等)
computed_features Dict? Flink 计算特征(可选,含 anomaly_score)

输出(AnalyzeResponse):

{
  "top_factors": [
    { "feature": "velocity_score",   "contribution": 0.30, "note": "高频交易模式,超出正常阈值" },
    { "feature": "geographic_anomaly","contribution": 0.25, "note": "跨境高风险地区" },
    { "feature": "amount_deviation",  "contribution": 0.20, "note": "偏离均值 2.3 个标准差" }
  ],
  "uncertainty": ["数据窗口仅 24h,长期行为基线不足"],
  "confidence_interval": { "low": 0.65, "high": 0.88 },
  "interpretation": "综合速度、地理和金额异常,风险分数置信度较高,建议人工复核"
}

Redis 缓存: score_cache:{sha256(score+features)[:16]},TTL=300s


5.8 evidence-retriever-agent(:8005)v3.0

功能: 混合证据检索。优先从 Elasticsearch risk_knowledge 索引执行 BM25 全文检索(启动时自动从 kb.json 建库),检索不可用时回退到本地内存评分。

输入(POST /retrieve):

字段 类型 说明
rules_hit List[str] 触发规则(用于 ES boost)
summary str 案例摘要文本(ES query_string)

ES 检索管道(含 boost 层叠):

flowchart LR
    input[Input rules_hit + summary] --> cache{Redis evidence_cache hit?}
    cache -- yes --> cached[Return cached top-k]
    cache -- no --> bm25[ES BM25 multi_match]
    bm25 --> boost_rule[Rule boost]
    boost_rule --> boost_cep[CEP co-occurrence boost +0.20]
    boost_cep --> avail{ES available?}
    avail -- no --> local[Local lexical/vector fallback]
    avail -- yes --> esres[ES candidates]
    esres --> merge[Merge + dedupe + rank]
    local --> merge
    merge --> topk[Top-5 evidence]
    topk --> writecache[Write evidence_cache TTL 600s]
    writecache --> output[Return RetrieveResponse]
Loading

输出(RetrieveResponse):

{
  "evidence": [
    {
      "id": "KB_006",
      "title": "Combined rule co-occurrence",
      "content": "高价值交易与速度规则同时触发是洗钱 structuring/layering 的强烈信号",
      "score": 0.92,
      "type": "cep_pattern"
    }
  ],
  "source": "elasticsearch",
  "kb_size": 6
}

Redis 缓存: evidence_cache:{sha256(rules+summary)[:16]},TTL=600s


5.9 graph-risk-agent(:8008)v2.0

功能: 基于 Neo4j 图数据库分析实体关联风险。识别账户网络中的高风险连接关系、资金流转路径和共谋模式。

输入(POST /analyze-graph):

字段 类型 说明
entity Dict 实体信息(type, id)
facts Dict 交易事实
rules_hit List[str] 触发规则

输出:

{
  "graph_risk_score": 0.62,
  "patterns": [
    { "pattern": "SUSPICIOUS_NETWORK_CLUSTER", "confidence": 0.71 }
  ],
  "connected_entities": 3,
  "risk_explanation": "实体关联 3 个已知高风险账户"
}

Redis 缓存: graph_cache:{sha256(entity+rules)[:16]},TTL=300s


5.10 behavior-sequence-agent(:8009)v2.0

功能: 分析交易行为序列风险。检测异常的时间模式、操作序列和会话行为,识别账户接管、欺诈工具自动化等行为特征。

输入(POST /analyze-sequence):

字段 类型 说明
case_id str 案例 ID
facts Dict 交易事实(含 transaction_count_24h 等)
features Dict 风险特征(含 time_pattern_risk)

输出:

{
  "sequence_risk_score": 0.55,
  "anomalous_patterns": ["RAPID_SUCCESSION", "OFF_HOURS_ACTIVITY"],
  "risk_explanation": "检测到非工作时间高频操作序列"
}

Redis 缓存: behavior_cache:{sha256(case_id+facts)[:16]},TTL=300s


5.11 verifier-agent(:8006)v2.1

功能: 验证多智能体分析的完整性与内部一致性。规则基础检查 + 可选 Claude 交叉验证,输出 quality_score 供 LangGraph orchestrator 路由使用(< 0.5 触发 NEEDS_HUMAN_REVIEW)。

输入(POST /verify):

字段 类型 说明
case_id str 案例 ID
rules_explanation Dict rules-explainer 输出
score_analysis Dict score-analyst 输出
evidence Dict evidence-retriever 输出
graph_analysis Dict? graph-risk 输出(可选)
behavior_analysis Dict? behavior-sequence 输出(可选)

质量评分规则(起始 1.0,按缺陷扣分):

缺陷项 扣分
无规则解释 -0.25
单条规则缺少 explanation 或 trigger_reason -0.10
无 top_factors -0.20
无支持证据 -0.20
所有证据相关度 < 0.3 -0.10
图风险分 >0.7 但无证据 -0.10

LLM 混合权重:final_score = 0.6 × rule_score + 0.4 × llm_score

输出(VerifyResponse):

{
  "verified": true,
  "issues": [],
  "quality_score": 0.85
}

quality_score 阈值路由:

flowchart TD
    qs[quality_score] --> d1{score >= 0.9}
    d1 -- yes --> excellent[Excellent: auto-pass verification]
    d1 -- no --> d2{score >= 0.7}
    d2 -- yes --> good[Good: keep model recommendation]
    d2 -- no --> d3{score >= 0.5}
    d3 -- yes --> acceptable[Acceptable: REVIEW with caution]
    d3 -- no --> human[NEEDS_HUMAN_REVIEW]
Loading

Redis 缓存: verify_cache:{sha256(all_inputs)[:16]},TTL=300s


5.12 policy-calibration-agent(:8010)v2.0

功能: 汇总各维度置信度,依据业务策略规则校准最终建议(ESCALATE / REVIEW / NEEDS_HUMAN_REVIEW),记录策略引用来源。

输入(POST /calibrate):

字段 类型 说明
case_id str 案例 ID
base_recommendation str 初始建议(risk_score≥0.8 → ESCALATE)
risk_score float 综合风险分
verification Dict verifier 输出(含 quality_score)
evidence_count int 证据条数
confidence_breakdown Dict 各维度置信分解

置信度分解字段:

维度 来源
risk_score_component 原始风险分
graph_component graph-risk 输出的 graph_risk_score
behavior_component behavior-sequence 输出的 sequence_risk_score
evidence_component 证据平均相关度得分
verification_component verified=true → 1.0,否则 → 0.4

置信度聚合关系:

flowchart LR
    r[risk_score_component] --> agg[Confidence Aggregator]
    g[graph_component] --> agg
    b[behavior_component] --> agg
    e[evidence_component] --> agg
    v[verification_component] --> agg

    agg --> conf[calibrated_confidence]
    agg --> rec[final_recommendation]
    agg --> refs[policy_refs]
Loading

输出:

{
  "final_recommendation": "ESCALATE",
  "calibrated_confidence": 0.73,
  "policy_adjustment": "none",
  "policy_refs": ["BASE_DECISION_POLICY_V2", "VERIFICATION_GATE_V2", "LANGGRAPH_WORKFLOW_V1"]
}

6. 启动与运行

6.1 启动前准备

# 依赖建议:
# - Docker / Docker Compose Plugin
# - curl + jq
# - (可选)redis-cli

# 复制环境变量模板
cp .env.example .env

# LLM 配置(默认本地 Ollama)
# LLM_PROVIDER=ollama
# LLM_MODEL=qwen2.5:14b
# OLLAMA_BASE_URL=http://host.docker.internal:11434
#
# 可选:切换到 OpenAI
# LLM_PROVIDER=openai
# LLM_MODEL=gpt-4o-mini
# OPENAI_API_KEY=sk-...
# OPENAI_BASE_URL=https://api.openai.com/v1
#
# 可选:切换到 Claude
# LLM_PROVIDER=anthropic
# LLM_MODEL=claude-haiku-4-5-20251001
# ANTHROPIC_API_KEY=sk-ant-...

6.2 启动服务

# 首次或依赖变更后,建议带 --build
docker compose up -d --build

# 查看容器状态(全部 Up/healthy 为理想状态)
docker compose ps

6.3 运行状态检查(建议按顺序)

# 核心服务
curl http://localhost:8001/health   # case-service
curl http://localhost:8002/health   # orchestrator-service
curl http://localhost:8007/health   # fast-path-router

# 智能体服务
curl http://localhost:8003/health   # rules-explainer-agent
curl http://localhost:8004/health   # score-analyst-agent
curl http://localhost:8005/health   # evidence-retriever-agent
curl http://localhost:8006/health   # verifier-agent
curl http://localhost:8008/health   # graph-risk-agent
curl http://localhost:8009/health   # behavior-sequence-agent
curl http://localhost:8010/health   # policy-calibration-agent

# 基础设施
curl http://localhost:9200/         # elasticsearch
redis-cli -p 6379 ping              # 预期: PONG
curl http://localhost:8081/overview # flink web UI

6.4 常用运行命令

# 实时查看日志
docker compose logs -f case-service orchestrator-service

# 重启单个服务
docker compose restart case-service

# 停止全部服务
docker compose down

# 停止并删除数据卷(会清空本地数据)
docker compose down -v

7. 测试方式

7.1 冒烟测试(服务是否可用)

curl http://localhost:8001/health | jq
curl http://localhost:8002/health | jq
curl http://localhost:8003/health | jq
curl http://localhost:8004/health | jq
curl http://localhost:8005/health | jq
curl http://localhost:8006/health | jq
curl http://localhost:8007/health | jq

7.2 端到端流式测试(Producer -> Kafka -> Case -> Router/Orchestrator)

# 等待 20~40 秒让 producer/flink/case-service 完成一批流转
curl http://localhost:8001/cases | jq '.[0]'

# 查看最新 case 与审计轨迹
LATEST_CASE_ID=$(curl -s http://localhost:8001/cases | jq -r '.[0].case_id')
curl http://localhost:8001/cases/${LATEST_CASE_ID} | jq
curl http://localhost:8001/cases/${LATEST_CASE_ID}/trace | jq

7.3 手工 API 测试(慢路径)

curl -X POST http://localhost:8002/analyze \
  -H "Content-Type: application/json" \
  -d '{
    "case_id": "TEST_SLOW_001",
    "trace_id": "trace_001",
    "signal": {
      "case_id": "TEST_SLOW_001",
      "entity": {"type": "CUSTOMER", "id": "entity_001"},
      "facts": {"transaction_amount": 52000, "transaction_count_24h": 18,
                "country": "US", "merchant_category": "HIGH_RISK"},
      "features": {"velocity_score": 0.85, "amount_deviation": 2.3,
                   "geographic_anomaly": true, "time_pattern_risk": 0.72},
      "rules_hit": ["RULE_001", "RULE_005", "RULE_012"],
      "risk_score": 0.78,
      "ts": "2026-01-01T00:00:00Z"
    }
  }' | jq

预期检查点:

  • audit.workflow = "langgraph_stategraph_v3"
  • audit.steps 包含 route / parallel_analysis / evidence / verify / policy / build_result
  • 返回中包含 verification.quality_score

7.4 快路径测试(低风险)

curl -X POST http://localhost:8007/route \
  -H "Content-Type: application/json" \
  -d '{
    "case_id": "TEST_FAST_001",
    "trace_id": "trace_fast_001",
    "signal": {
      "risk_score": 0.2,
      "rules_hit": ["RULE_012"],
      "features": {"velocity_score": 0.2, "geographic_anomaly": false}
    }
  }' | jq

预期检查点:

  • path_type = "FAST"
  • target_sla_ms = 100
  • 返回 decision_support 非空

7.5 缓存与检索验证

# ES 索引文档数(应 > 0)
curl http://localhost:9200/risk_knowledge/_count | jq

# Redis 缓存键检查
redis-cli keys "rules_cache:*"
redis-cli keys "score_cache:*"
redis-cli keys "evidence_cache:*"
redis-cli keys "verify_cache:*"

8. 接口文档

方式 位置
人类可读 doc/API_DOCS.md
OpenAPI 契约 doc/swagger.yaml(唯一权威源)
运行时 Swagger http://localhost:<port>/docs

数据库直接访问:

docker compose exec postgres psql -U postgres -d risk_control

9. 版本边界(V2.0)

已实现:

  • Kafka 双 topic 架构(raw_risk_signals → Flink → enriched_risk_signals
  • LangGraph StateGraph 6 节点编排,支持快/慢路径分流(含 6 条件快路径判断)
  • Claude LLM 增强:规则解释监管映射 + 质量交叉验证(配置 ANTHROPIC_API_KEY 激活)
  • 多 LLM Provider:支持 ollama / openai / anthropic(默认 ollama + qwen2.5:14b
  • Elasticsearch BM25 知识库检索(risk_knowledge index,启动自动建库)
  • Redis 全链路 content-hash 缓存(6 种 key,TTL 分 300s/600s/86400s)
  • quality_score 字段(verifier → orchestrator 路由感知)
  • K8s HPA 配置(minReplicas=64 对齐 Kafka 分区,目标 200K QPS)

仍在演进:

  • Flink 状态化窗口聚合(当前为无状态 per-event 近似实现)
  • Human-in-the-loop 流程(quality_score < 0.5 时的人工审核队列)
  • 高负载压测基线与可观测性面板(Prometheus/Grafana)
  • 详见 plan.md

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors