개요

Apache Atlashive-bridge 모듈을 분석해보았다.

분석 버전은 1.0.0-SNAPSHOT 기준으로 모듈이 하는 역할은 hive에 agent모듈을 꽂아서 hive가 runtime에 수행될때 operation(database/table 자체를 생성/수정/삭제)을 분석하여 이로부터 Data Lineage(서로의 의존성)을 분석하여
Atlas로 정보를 REST로 전송하는 역할을 한다. 이 역할을 하는 org.apache.atlas.hive.hook.HiveHook.java를 분석해보고 정리해둔다.

꼭 Apache Atlas를 사용하지 않더라도 Hive에시 실행 Event를 Hooking하여 Handling하는 방법에 대해 잘 정리되어 있는 코드니 다른분들에게도 의미가 있을것 같아 공유한다.

Hook 예시

아래와 같은 create table 쿼리가 있으면 이를 분석해서 아래 그림과 같은 Data Lineage를 만들어준다.

create table t2 as select id, name from T1

코드 분석

실제 파악은 다 했지만 주요 코드정보만 간단히 기술해둔다.

프로퍼티 정보

관련된 설정 프로퍼티 정리해둔다.

  • atlas.hook.hive.numRetries
  • atlas.hook.hive.database.name.cache.count
  • atlas.hook.hive.table.name.cache.count
  • atlas.cluster.name

코드핵심

HiveHook 클래스는 org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext 인터페이스를 구현하여 hive 쿼리 수행의 pre/post 시점에 이벤트를 받을 수 있게 한다.

전체 코드

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p/>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p/>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.atlas.hive.hook;

import org.apache.atlas.hive.hook.events.*;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.utils.LruCache;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_DB;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.HIVE_TYPE_TABLE;

/**
 * ExecuteWithHookContext를 구현하여 Hive의 Execution 이벤트를 Hooking하여
 * db/테이블 생성/수정/삭제 operation이 있는 경우 이를 Atlas에 정보를 Kafka를 통해 전송하는 역할을 한다.
 * 
 * @modifier lks21c
 */
public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
    private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);

    public static final String CONF_PREFIX                    = "atlas.hook.hive.";
    public static final String HOOK_NUM_RETRIES               = CONF_PREFIX + "numRetries";
    public static final String HOOK_DATABASE_NAME_CACHE_COUNT = CONF_PREFIX + "database.name.cache.count";
    public static final String HOOK_TABLE_NAME_CACHE_COUNT    = CONF_PREFIX + "table.name.cache.count";
    public static final String CONF_CLUSTER_NAME              = "atlas.cluster.name";

    public static final String DEFAULT_CLUSTER_NAME = "primary";

    private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
    private static final String                     clusterName;

    /* 알려진 database와 테이블들 */
    private static final Map<String, Long>          knownDatabases;
    private static final Map<String, Long>          knownTables;

    static {
        for (HiveOperation hiveOperation : HiveOperation.values()) {
            OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation);
        }

        /* db와 테이블 캐시 갯수 */
        int dbNameCacheCount  = atlasProperties.getInt(HOOK_DATABASE_NAME_CACHE_COUNT, 10000);
        int tblNameCacheCount = atlasProperties.getInt(HOOK_TABLE_NAME_CACHE_COUNT, 10000);

        clusterName    = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
        knownDatabases = dbNameCacheCount > 0 ? Collections.synchronizedMap(new LruCache<String, Long>(dbNameCacheCount, 0)) : null;
        knownTables    = tblNameCacheCount > 0 ? Collections.synchronizedMap(new LruCache<String, Long>(tblNameCacheCount, 0)) : null;
    }

    public HiveHook() {
    }

    @Override
    protected String getNumberOfRetriesPropertyKey() {
        return HOOK_NUM_RETRIES;
    }

    @Override
    public void run(HookContext hookContext) throws Exception {
        // debug level이 켜져 있으면 hive hook이 실행되는지 확인 할 수 있다.
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
        }

        try {
            HiveOperation        oper    = OPERATION_MAP.get(hookContext.getOperationName());

            // 아틀라스에서 wrapping한 hook context이다.
            AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext);

            BaseHiveEvent event = null;

            // 아래와 같은 create/alter/drop 이벤트가 있으면 이를 잡아 Atlas에 정보를 전송한다.
            switch (oper) {
                case CREATEDATABASE:
                    event = new CreateDatabase(context);
                break;

                case DROPDATABASE:
                    event = new DropDatabase(context);
                break;

                case ALTERDATABASE:
                case ALTERDATABASE_OWNER:
                    event = new AlterDatabase(context);
                break;

                case CREATETABLE:
                    event = new CreateTable(context, true);
                break;

                case DROPTABLE:
                case DROPVIEW:
                    event = new DropTable(context);
                break;

                case CREATETABLE_AS_SELECT:
                case CREATEVIEW:
                case ALTERVIEW_AS:
                case LOAD:
                case EXPORT:
                case IMPORT:
                case QUERY:
                case TRUNCATETABLE:
                    event = new CreateHiveProcess(context);
                break;

                case ALTERTABLE_FILEFORMAT:
                case ALTERTABLE_CLUSTER_SORT:
                case ALTERTABLE_BUCKETNUM:
                case ALTERTABLE_PROPERTIES:
                case ALTERVIEW_PROPERTIES:
                case ALTERTABLE_SERDEPROPERTIES:
                case ALTERTABLE_SERIALIZER:
                case ALTERTABLE_ADDCOLS:
                case ALTERTABLE_REPLACECOLS:
                case ALTERTABLE_PARTCOLTYPE:
                case ALTERTABLE_LOCATION:
                    event = new AlterTable(context);
                break;

                case ALTERTABLE_RENAME:
                case ALTERVIEW_RENAME:
                    event = new AlterTableRename(context);
                break;

                case ALTERTABLE_RENAMECOL:
                    event = new AlterTableRenameCol(context);
                break;

                default:
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName());
                    }
                break;
            }

            // 이벤트가 있으면 이를 Kafka Topic으로 전송한다.
            // 아무래도 이부분은 Rest로 쏘는거 보다 Flow Control 및 Scale-Out을 고려한듯 하다.
            if (event != null) {
                super.notifyEntities(event.getNotificationMessages());
            }
        } catch (Throwable t) {
            LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("<== HiveHook.run({})", hookContext.getOperationName());
        }
    }

    public String getClusterName() {
        return clusterName;
    }

    public boolean isKnownDatabase(String dbQualifiedName) {
        return knownDatabases != null && dbQualifiedName != null ? knownDatabases.containsKey(dbQualifiedName) : false;
    }

    public boolean isKnownTable(String tblQualifiedName) {
        return knownTables != null && tblQualifiedName != null ? knownTables.containsKey(tblQualifiedName) : false;
    }

    public void addToKnownEntities(Collection<AtlasEntity> entities) {
        if (knownDatabases != null || knownTables != null) { // caching should be enabled at least for one
            if (entities != null) {
                for (AtlasEntity entity : entities) {
                    if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_DB)) {
                        addToKnownDatabase((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
                    } else if (StringUtils.equalsIgnoreCase(entity.getTypeName(), HIVE_TYPE_TABLE)) {
                        addToKnwnTable((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
                    }
                }
            }
        }
    }

    public void addToKnownDatabase(String dbQualifiedName) {
        if (knownDatabases != null && dbQualifiedName != null) {
            knownDatabases.put(dbQualifiedName, System.currentTimeMillis());
        }
    }

    public void addToKnwnTable(String tblQualifiedName) {
        if (knownTables != null && tblQualifiedName != null) {
            knownTables.put(tblQualifiedName, System.currentTimeMillis());
        }
    }

    public void removeFromKnownDatabase(String dbQualifiedName) {
        if (knownDatabases != null && dbQualifiedName != null) {
            knownDatabases.remove(dbQualifiedName);
        }
    }

    public void removeFromKnownTable(String tblQualifiedName) {
        if (knownTables != null && tblQualifiedName != null) {
            knownTables.remove(tblQualifiedName);
        }
    }
}