DataHub와의 상호작용을 위한 데이터 관련 유틸리티 패키지입니다.
utils/data/
├── __pycache__/
├── datahub_services/
│ ├── __pycache__/
│ ├── base_client.py # 기본 클라이언트
│ ├── glossary_service.py # 용어집 서비스
│ ├── metadata_service.py # 메타데이터 서비스
│ ├── query_service.py # 쿼리 서비스
│ ├── __init__.py # 패키지 초기화 및 exports
│ └── README.md # 서비스 상세 문서
├── datahub_source.py # 통합 메타데이터 페처
└── queries.py # GraphQL 쿼리 모음
DataHub 메타데이터 페처 - 리팩토링된 버전으로 기존 DatahubMetadataFetcher의 모든 기능을 유지하면서 내부적으로는 분리된 서비스 모듈들을 사용합니다.
클래스:
DatahubMetadataFetcher: 기존 인터페이스를 유지하는 통합 페처
초기화:
from utils.data.datahub_source import DatahubMetadataFetcher
fetcher = DatahubMetadataFetcher(gms_server="http://localhost:8080", extra_headers={})주요 메서드:
필터를 적용하여 데이터셋의 URN 목록을 가져옵니다.
반환값:
- URN 목록
URN에 대한 테이블 이름을 가져옵니다.
파라미터:
urn(str): 데이터셋 URN
반환값:
str: 테이블 이름 (database.table 형태) 또는 None
URN에 대한 테이블 설명을 가져옵니다.
파라미터:
urn(str): 데이터셋 URN
반환값:
str: 테이블 설명 또는 None
URN에 대한 컬럼 이름 및 설명을 가져옵니다.
파라미터:
urn(str): 데이터셋 URN
반환값:
list: 컬럼 정보 리스트 (각 항목: column_name, column_description, column_type)
URN에 대한 DOWNSTREAM/UPSTREAM lineage entity를 가져옵니다.
파라미터:
urn(str): 데이터셋 URNcounts(int): 가져올 엔티티 수 (기본값: 100)direction(str): 리니지 방향 ("DOWNSTREAM" 또는 "UPSTREAM", 기본값: "DOWNSTREAM")degree_values(list): 필터링할 degree 값 리스트 (기본값: ["1", "2"])
반환값:
tuple: (urn, lineage_result)
URN에 대한 UPSTREAM lineage의 column source를 가져옵니다.
파라미터:
urn(str): 데이터셋 URN
반환값:
dict: 컬럼 리니지 정보downstream_dataset: 다운스트림 데이터셋 이름lineage_by_upstream_dataset: 업스트림 데이터셋별 컬럼 매핑
lineage 중 최소 degree만 가져옵니다.
파라미터:
lineage_result:get_table_lineage의 반환값
반환값:
dict: 테이블별 최소 degree 매핑
테이블 단위로 통합 메타데이터를 생성합니다.
파라미터:
urn(str): 데이터셋 URNmax_degree(int): 최대 degree 제한 (기본값: 2)sort_by_degree(bool): degree 기준 정렬 여부 (기본값: True)
반환값:
dict: 통합 메타데이터table_name: 테이블 이름description: 테이블 설명columns: 컬럼 정보 리스트lineage: 리니지 정보
특정 URN에 대한 모든 관련 정보를 가져옵니다.
파라미터:
urn(str): 조회할 데이터셋 URN
반환값:
dict: URN에 대한 전체 메타데이터 정보
URN 메타데이터를 보기 좋게 출력하는 내부 함수입니다.
DataHub에서 루트 용어집 노드를 가져옵니다.
반환값:
dict: 루트 용어집 노드 정보
특정 URN의 용어집 노드 및 그 자식 항목을 가져옵니다.
파라미터:
urn(str): 용어집 노드의 URN
반환값:
dict: 용어집 노드 정보와 자식 항목
용어집 노드의 기본 정보를 딕셔너리로 반환합니다.
파라미터:
node(dict): 용어집 노드 정보index(int): 노드의 인덱스
반환값:
dict: 노드의 기본 정보
자식 엔티티(용어 또는 노드)의 정보를 딕셔너리로 반환합니다.
파라미터:
entity(dict): 자식 엔티티 정보index(int): 엔티티의 인덱스
반환값:
dict: 엔티티 정보
노드의 상세 정보를 처리하고 딕셔너리로 반환합니다.
파라미터:
node(dict): 용어집 노드 정보
반환값:
dict: 노드의 상세 정보
용어집 노드 결과를 처리하고 딕셔너리로 반환합니다.
파라미터:
result(dict): API 응답 결과
반환값:
dict: 처리된 용어집 노드 데이터
DataHub에서 전체 용어집 데이터를 가져와 처리합니다.
반환값:
dict: 처리된 용어집 데이터
특정 데이터셋 URN의 glossary terms를 조회합니다.
파라미터:
dataset_urn(str): 데이터셋 URN
반환값:
dict: glossary terms 정보
DataHub에서 쿼리 목록을 가져옵니다.
파라미터:
start(int): 시작 인덱스 (기본값: 0)count(int): 반환할 쿼리 수 (기본값: 10)query(str): 필터링에 사용할 쿼리 문자열 (기본값: "*")filters(list): 추가 필터 (기본값: None)
반환값:
dict: 쿼리 목록 정보
쿼리 목록 결과를 처리하고 간소화된 형태로 반환합니다.
파라미터:
result(dict): API 응답 결과
반환값:
dict: 처리된 쿼리 목록 데이터
DataHub에서 쿼리 목록을 가져와 처리합니다.
파라미터:
start(int): 시작 인덱스 (기본값: 0)count(int): 반환할 쿼리 수 (기본값: 10)query(str): 필터링에 사용할 쿼리 문자열 (기본값: "*")filters(list): 추가 필터 (기본값: None)
반환값:
dict: 처리된 쿼리 목록 데이터
특정 데이터셋 URN과 연관된 쿼리들을 조회합니다.
파라미터:
dataset_urn(str): 데이터셋 URN
반환값:
dict: 연관된 쿼리 목록
의존성:
utils.data.datahub_services.base_client.DataHubBaseClient: 기본 클라이언트utils.data.datahub_services.metadata_service.MetadataService: 메타데이터 서비스utils.data.datahub_services.query_service.QueryService: 쿼리 서비스utils.data.datahub_services.glossary_service.GlossaryService: 용어집 서비스
사용 예시:
from utils.data.datahub_source import DatahubMetadataFetcher
# 페처 초기화
fetcher = DatahubMetadataFetcher(gms_server="http://localhost:8080")
# 테이블 메타데이터 조회
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,db.schema.table,TABLE)"
metadata = fetcher.get_urn_info(urn)
# 용어집 조회
glossary_data = fetcher.get_glossary_data()
# 쿼리 조회
queries = fetcher.get_query_data(start=0, count=10)import 되어 사용되는 위치:
utils/llm/tools/datahub.py:from utils.data.datahub_source import DatahubMetadataFetcher
DataHub GraphQL 쿼리 모음 파일입니다. DataHub GMS 서버와 통신하기 위한 모든 GraphQL 쿼리 문자열을 포함합니다.
주요 쿼리:
루트 용어집 노드를 조회하는 쿼리입니다. 4단계 계층 구조까지 포함합니다.
사용 위치:
utils.data.datahub_services.glossary_service.GlossaryService.get_root_glossary_nodes()
특정 URN의 용어집 노드 상세 정보를 조회하는 쿼리입니다. 자식 노드, 용어, 소유권, 권한 등의 상세 정보를 포함합니다.
파라미터:
urn(str): 용어집 노드 URN
사용 위치:
utils.data.datahub_services.glossary_service.GlossaryService.get_glossary_node_by_urn()
쿼리 목록을 조회하는 쿼리입니다. URN, 이름, 설명, SQL 문, 데이터셋 정보 등을 포함합니다.
파라미터:
input(dict): ListQueriesInputstart(int): 시작 인덱스count(int): 반환할 항목 수query(str): 검색 쿼리filters(list, optional): 추가 필터
사용 위치:
utils.data.datahub_services.query_service.QueryService.get_queries()
특정 URN과 연관된 쿼리를 조회하는 단순화된 쿼리입니다.
파라미터:
input(dict): ListQueriesInput
사용 위치:
utils.data.datahub_services.query_service.QueryService.get_queries_by_urn()
특정 데이터셋 URN의 glossary terms를 조회하는 쿼리입니다.
파라미터:
urn(str): 데이터셋 URN
사용 위치:
utils.data.datahub_services.glossary_service.GlossaryService.get_glossary_terms_by_urn()utils.data.datahub_services.query_service.QueryService.get_glossary_terms_by_urn()
import 되어 사용되는 위치:
utils.data.datahub_services.glossary_service:from utils.data.queries import ...utils.data.datahub_services.query_service:from utils.data.queries import ...
DataHub와의 상호작용을 위한 서비스 모듈들을 제공하는 서브패키지입니다.
주요 구성요소:
base_client.py: 기본 연결 및 통신glossary_service.py: 용어집 서비스metadata_service.py: 메타데이터 서비스query_service.py: 쿼리 서비스__init__.py: 패키지 초기화 및 exports
상세한 문서는 datahub_services/README.md를 참조하세요.
from utils.data.datahub_source import DatahubMetadataFetcher
# 페처 초기화
fetcher = DatahubMetadataFetcher(gms_server="http://localhost:8080")
# 테이블 메타데이터 조회
urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,db.schema.table,TABLE)"
metadata = fetcher.get_urn_info(urn)
# 용어집 조회
glossary_data = fetcher.get_glossary_data()
# 쿼리 조회
queries = fetcher.get_query_data(start=0, count=10)from utils.data.datahub_services import (
DataHubBaseClient,
MetadataService,
QueryService,
GlossaryService
)
# 클라이언트 초기화
client = DataHubBaseClient(gms_server="http://localhost:8080")
# 서비스 초기화
metadata_service = MetadataService(client)
query_service = QueryService(client)
glossary_service = GlossaryService(client)
# 메타데이터 조회
urn = "urn:li:dataset:(...)"
metadata = metadata_service.build_table_metadata(urn)
# 쿼리 조회
queries = query_service.get_query_data(start=0, count=10)
# 용어집 조회
glossary_data = glossary_service.get_glossary_data()from utils.data.datahub_source import DatahubMetadataFetcher
fetcher = DatahubMetadataFetcher(gms_server="http://localhost:8080")
# 테이블 정보 조회
urn = "urn:li:dataset:(...)"
table_name = fetcher.get_table_name(urn)
description = fetcher.get_table_description(urn)
columns = fetcher.get_column_names_and_descriptions(urn)
# 리니지 조회
downstream_lineage = fetcher.get_table_lineage(urn, direction="DOWNSTREAM")
upstream_lineage = fetcher.get_table_lineage(urn, direction="UPSTREAM")
column_lineage = fetcher.get_column_lineage(urn)
# 최소 degree 필터링
min_degree = fetcher.min_degree_lineage(downstream_lineage)requests: HTTP 요청 처리datahub: DataHub Python SDKdatahub.emitter.rest_emitter.DatahubRestEmitterdatahub.ingestion.graph.client.DatahubClientConfigdatahub.ingestion.graph.client.DataHubGraphdatahub.metadata.schema_classes
utils.data.queries: GraphQL 쿼리 정의 모듈utils.data.datahub_services.*: DataHub 서비스 레이어
DatahubMetadataFetcher는 하위 호환성을 위해 분리된 서비스들을 내부적으로 사용합니다.- 모든 서비스는
DataHubBaseClient를 필요로 합니다. - GMS 서버 URL은 초기화 시 유효성 검사가 수행됩니다.
- GraphQL 쿼리는
queries.py에 중앙 집중식으로 정의되어 있습니다. - 오류 발생 시 dict 형태로
{"error": True, "message": "..."}구조로 반환됩니다. utils/llm/tools/datahub.py에서DatahubMetadataFetcher를 import하여 LLM 도구로 사용됩니다.