개요
Apache Atlas의 hive-bridge 모듈을 분석해보았다.
분석 버전은 1.0.0-SNAPSHOT 기준으로 모듈이 하는 역할은 hive에 agent모듈을 꽂아서 hive가 runtime에 수행될때 operation(database/table 자체를 생성/수정/삭제)을 분석하여 이로부터 Data Lineage(서로의 의존성)을 분석하여
Atlas로 정보를 REST로 전송하는 역할을 한다. 이 역할을 하는 org.apache.atlas.hive.hook.HiveHook.java를 분석해보고 정리해둔다.
꼭 Apache Atlas를 사용하지 않더라도 Hive에시 실행 Event를 Hooking하여 Handling하는 방법에 대해 잘 정리되어 있는 코드니 다른분들에게도 의미가 있을것 같아 공유한다.
Hook 예시
아래와 같은 create table 쿼리가 있으면 이를 분석해서 아래 그림과 같은 Data Lineage를 만들어준다.
코드 분석
실제 파악은 다 했지만 주요 코드정보만 간단히 기술해둔다.
프로퍼티 정보
관련된 설정 프로퍼티 정리해둔다.
- atlas.hook.hive.numRetries
- atlas.hook.hive.database.name.cache.count
- atlas.hook.hive.table.name.cache.count
- atlas.cluster.name
코드핵심
HiveHook 클래스는 org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext 인터페이스를 구현하여 hive 쿼리 수행의 pre/post 시점에 이벤트를 받을 수 있게 한다.
전체 코드
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook;
import org.apache.atlas.hive.hook.events.*;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.utils.LruCache;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;
/**
* ExecuteWithHookContext를 구현하여 Hive의 Execution 이벤트를 Hooking하여
* db/테이블 생성/수정/삭제 operation이 있는 경우 이를 Atlas에 정보를 Kafka를 통해 전송하는 역할을 한다.
*
* @modifier lks21c
*/
public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
public static final String HOOK_DATABASE_NAME_CACHE_COUNT = CONF_PREFIX + "database.name.cache.count";
public static final String HOOK_TABLE_NAME_CACHE_COUNT = CONF_PREFIX + "table.name.cache.count";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
private static final String clusterName;
/* 알려진 database와 테이블들 */
private static final Map<String, Long> knownDatabases;
private static final Map<String, Long> knownTables;
static {
for (HiveOperation hiveOperation : HiveOperation.values()) {
OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation);
}
/* db와 테이블 캐시 갯수 */
int dbNameCacheCount = atlasProperties.getInt(HOOK_DATABASE_NAME_CACHE_COUNT, 10000);
int tblNameCacheCount = atlasProperties.getInt(HOOK_TABLE_NAME_CACHE_COUNT, 10000);
clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
knownDatabases = dbNameCacheCount > 0 ? Collections.synchronizedMap(new LruCache<String, Long>(dbNameCacheCount, 0)) : null;
knownTables = tblNameCacheCount > 0 ? Collections.synchronizedMap(new LruCache<String, Long>(tblNameCacheCount, 0)) : null;
}
public HiveHook() {
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
@Override
public void run(HookContext hookContext) throws Exception {
// debug level이 켜져 있으면 hive hook이 실행되는지 확인 할 수 있다.
if (LOG.isDebugEnabled()) {
LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
}
try {
HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName());
// 아틀라스에서 wrapping한 hook context이다.
AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext);
BaseHiveEvent event = null;
// 아래와 같은 create/alter/drop 이벤트가 있으면 이를 잡아 Atlas에 정보를 전송한다.
switch (oper) {
case CREATEDATABASE:
event = new CreateDatabase(context);
break;
case DROPDATABASE:
event = new DropDatabase(context);
break;
case ALTERDATABASE:
case ALTERDATABASE_OWNER:
event = new AlterDatabase(context);
break;
case CREATETABLE:
event = new CreateTable(context, true);
break;
case DROPTABLE:
case DROPVIEW:
event = new DropTable(context);
break;
case CREATETABLE_AS_SELECT:
case CREATEVIEW:
case ALTERVIEW_AS:
case LOAD:
case EXPORT:
case IMPORT:
case QUERY:
case TRUNCATETABLE:
event = new CreateHiveProcess(context);
break;
case ALTERTABLE_FILEFORMAT:
case ALTERTABLE_CLUSTER_SORT:
case ALTERTABLE_BUCKETNUM:
case ALTERTABLE_PROPERTIES:
case ALTERVIEW_PROPERTIES:
case ALTERTABLE_SERDEPROPERTIES:
case ALTERTABLE_SERIALIZER:
case ALTERTABLE_ADDCOLS:
case ALTERTABLE_REPLACECOLS:
case ALTERTABLE_PARTCOLTYPE:
case ALTERTABLE_LOCATION:
event = new AlterTable(context);
break;
case ALTERTABLE_RENAME:
case ALTERVIEW_RENAME:
event = new AlterTableRename(context);
break;
case ALTERTABLE_RENAMECOL:
event = new AlterTableRenameCol(context);
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName());
}
break;
}
// 이벤트가 있으면 이를 Kafka Topic으로 전송한다.
// 아무래도 이부분은 Rest로 쏘는거 보다 Flow Control 및 Scale-Out을 고려한듯 하다.
if (event != null) {
super.notifyEntities(event.getNotificationMessages());
}
} catch (Throwable t) {
LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HiveHook.run({})", hookContext.getOperationName());
}
}
public String getClusterName() {
return clusterName;
}
public boolean isKnownDatabase(String dbQualifiedName) {
return knownDatabases != null && dbQualifiedName != null ? knownDatabases.containsKey(dbQualifiedName) : false;
}
public boolean isKnownTable(String tblQualifiedName) {
return knownTables != null && tblQualifiedName != null ? knownTables.containsKey(tblQualifiedName) : false;
}
public void addToKnownEntities(Collection<AtlasEntity> entities) {
if (knownDatabases != null || knownTables != null) { // caching should be enabled at least for one
if (entities != null) {
for (AtlasEntity entity : entities) {
if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_DB)) {
addToKnownDatabase((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
} else if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_TABLE)) {
addToKnwnTable((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
}
}
}
}
public void addToKnownDatabase(String dbQualifiedName) {
if (knownDatabases != null && dbQualifiedName != null) {
knownDatabases.put(dbQualifiedName, System.currentTimeMillis());
}
}
public void addToKnwnTable(String tblQualifiedName) {
if (knownTables != null && tblQualifiedName != null) {
knownTables.put(tblQualifiedName, System.currentTimeMillis());
}
}
public void removeFromKnownDatabase(String dbQualifiedName) {
if (knownDatabases != null && dbQualifiedName != null) {
knownDatabases.remove(dbQualifiedName);
}
}
public void removeFromKnownTable(String tblQualifiedName) {
if (knownTables != null && tblQualifiedName != null) {
knownTables.remove(tblQualifiedName);
}
}
}