개요

  • 지난 포스팅에 이어 ES의 색인 과정의 소스코드를 분석해본다.
  • HTTP 방식으로 색인 요청이 들어왔을 때 처리 과정을 소스코드로 기술한다.
  • 글의 시작점은 링크이다.

색인 Flow

색인의 Flow는 그림을 통해 잘 이해할 수 있을듯 하다. 자세한 배경지식은 링크 참고 바란다.

index

RestIndexAction

지난 포스팅을 보았으면 색인에 대한 Http 요청은 RestIndexAction이 수행한다는 것을 알 수 있다.

시작점은 RestIndexAction.handleRequest()이다. IndexRequest를 준비한 뒤 client.index()를 호출한다.

public class RestIndexAction extends BaseRestHandler {

    @Inject
    public RestIndexAction(Settings settings, RestController controller, Client client) {
    super(settings, controller, client);
    controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
    controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
    controller.registerHandler(POST, "/{index}/{type}/{id}", this);
    CreateHandler createHandler = new CreateHandler(settings, controller, client);
    controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", createHandler);
    controller.registerHandler(POST, "/{index}/{type}/{id}/_create", createHandler);
    }

    @Override
    public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
    IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
    indexRequest.routing(request.param("routing"));
    indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
    indexRequest.timestamp(request.param("timestamp"));
    if (request.hasParam("ttl")) {
        indexRequest.ttl(request.param("ttl"));
    }
    indexRequest.source(request.content());
    indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
    indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
    indexRequest.version(RestActions.parseVersion(request));
    indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
    String sOpType = request.param("op_type");
    if (sOpType != null) {
        try {
            indexRequest.opType(IndexRequest.OpType.fromString(sOpType));
        } catch (IllegalArgumentException eia){
            try {
                XContentBuilder builder = channel.newErrorBuilder();
                channel.sendResponse(new BytesRestResponse(BAD_REQUEST, builder.startObject().field("error", eia.getMessage()).endObject()));
            } catch (IOException e1) {
                logger.warn("Failed to send response", e1);
                return;
            }
        }
    }
    String consistencyLevel = request.param("consistency");
    if (consistencyLevel != null) {
        indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
    }
    client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
        @Override
        public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
            builder.startObject();
            ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
            builder.field(Fields._INDEX, response.getIndex())
                    .field(Fields._TYPE, response.getType())
                    .field(Fields._ID, response.getId())
                    .field(Fields._VERSION, response.getVersion());
            shardInfo.toXContent(builder, request);
            builder.field(Fields.CREATED, response.isCreated());
            builder.endObject();
            RestStatus status = shardInfo.status();
            if (response.isCreated()) {
                status = CREATED;
            }
            return new BytesRestResponse(status, builder);
        }
    });
    }
}

AbstractClient.java

AbstractClient.index()에서는 IndexAction.INSTANCE로 execute하는데 IndexAction.INSTANCE는 IndexAction이다.

이부분은 나중에 수행될때 action이 IndexAction 되는것을 뜻한다.

public abstract class AbstractClient extends AbstractComponent implements Client {

    @Override
    public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
    logger.info("AbstractClient.execute");
    headers.applyTo(request);
    listener = threadedWrapper.wrap(listener);
    doExecute(action, request, listener);
    }

    protected abstract <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener);

    @Override
    public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
    execute(IndexAction.INSTANCE, request, listener);
    }
}

NodeClient.java

NodeClient.doExecute()가 실행되고 여기에서 transportAction.execute()가 수행된다.

이는 TransportAction을 상속받은 TransportXXXAction.java의 doExecute()가 수행되는 부분으로 여기서는 TransportIndexAction가 해당된다.

이부분은 ActionModule.javaconfigure() 부분을 보면 registerAction(IndexAction.INSTANCE, TransportIndexAction.class)을 확인 할 수 있다.

public class NodeClient extends AbstractClient {

    private final ImmutableMap<GenericAction, TransportAction> actions;

    @Inject
    public NodeClient(Settings settings, ThreadPool threadPool, Headers headers, Map<GenericAction, TransportAction> actions) {
    super(settings, threadPool, headers);
    this.actions = ImmutableMap.copyOf(actions);
    }

    @Override
    public void close() {
    // nothing really to do
    }

    @SuppressWarnings("unchecked")
    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
    logger.info("NodeClient.doExecute");
    TransportAction<Request, Response> transportAction = actions.get(action);
    if (transportAction == null) {
        throw new IllegalStateException("failed to find action [" + action + "] to execute");
    }
    transportAction.execute(request, listener);
    }
}

TransportIndexAction.java

TransportIndexAction.doExecute()에서는 인덱스가 생성되어 있지 않고 생성가능한 조건이면 인덱스를 먼저 생성한다.

이후, TransportIndexAction.doExecute()를 통해 TransportIndexAction.innerExecute()가 실행된다.

여기서는 다시 부모 클래스인 TransportReplicationAction의 doExecute()가 호출된다.

public class TransportIndexAction extends TransportReplicationAction<IndexRequest, IndexRequest, IndexResponse> {

    @Override
    protected void doExecute(final Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
    // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
    ClusterState state = clusterService.state();
    if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
        createIndexRequest.index(request.index());
        createIndexRequest.cause("auto(index api)");
        createIndexRequest.masterNodeTimeout(request.timeout());
        createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
            @Override
            public void onResponse(CreateIndexResponse result) {
                innerExecute(task, request, listener);
            }

            @Override
            public void onFailure(Throwable e) {
                if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
                    // we have the index, do it
                    try {
                        innerExecute(task, request, listener);
                    } catch (Throwable e1) {
                        listener.onFailure(e1);
                    }
                } else {
                    listener.onFailure(e);
                }
            }
        });
    } else {
        innerExecute(task, request, listener);
    }
    }

    private void innerExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
    super.doExecute(task, request, listener);
    }

    @Override
    protected IndexResponse newResponseInstance() {
    return new IndexResponse();
    }     
}

TransportReplicationAction.java

TransportReplicationAction.doExecute()를 통해 ReroutePhase.doRun()이 실행된다.(부모 클래스를 따라가보면 결국 doRun()이 실행됨)

ReroutePhase는 라우팅과 프라이머리 노드에서 일어나는 operation 실패에 대응하는 역할을 한다. 타겟노드로 라우팅 되기 전 index 정보와 샤드 id를 해석한다.

indexNameExpressionResolver.concreteSingleIndex(state, request)를 통해 실제 index를 알아내고(index expression 정제하여 단일 index명을 파싱)

resolveRequest(state.metaData(), concreteIndex, request)를 통해 index가 속한 샤드 ID를 알아낸다.

state.getRoutingTable().shardRoutingTable()를 통해 샤드정보(primary 샤드, replication 샤드 등에 대한 정보)를 가진 IndexShardRoutingTable을 획득한다.

DiscoveryNode node = state.nodes().get(primary.currentNodeId())를 통해 primary 샤드가 속한 노드정보를 가져온다.

이후 ReroutePhase가 일어나는 노드가 Primary 샤드가 있는 노드인지 아닌지에 따라 분기가 일어난다.

Primary 샤드가 있는 노드가 아니라면 performAction(node, actionName, false)로 다시 Primary 샤드가 있는 노드로 요청을 중계한다.

현재노드가 Primary 샤드가 있는 노드라면 performAction(node, transportPrimaryAction, true)이 실행된다.

transportPrimaryAction은 생성자를 보면 transportService로 PrimaryOperationTransportHandler가 등록된것을 알 수 있다.

PrimaryOperationTransportHandler.messageReceived()가 처리한다.

이를 통해 Primary샤드 처리는 PrimaryPhase에서 시작한다.

public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {

    protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
                                     ClusterService clusterService, IndicesService indicesService,
                                     ThreadPool threadPool, ShardStateAction shardStateAction,
                                     MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
                                     IndexNameExpressionResolver indexNameExpressionResolver, Class<Request> request,
                                     Class<ReplicaRequest> replicaRequest, String executor) {
    super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
    this.transportService = transportService;
    this.clusterService = clusterService;
    this.indicesService = indicesService;
    this.shardStateAction = shardStateAction;
    this.mappingUpdatedAction = mappingUpdatedAction;

    this.transportPrimaryAction = actionName + "[p]";
    this.transportReplicaAction = actionName + "[r]";
    this.executor = executor;
    this.checkWriteConsistency = checkWriteConsistency();
    transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
    transportService.registerRequestHandler(transportPrimaryAction, request, executor, new PrimaryOperationTransportHandler());
    // we must never reject on because of thread pool capacity on replicas
    transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true, true, new ReplicaOperationTransportHandler());

    this.transportOptions = transportOptions();

    this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
    }

    @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
    new ReroutePhase((ReplicationTask) task, request, listener).run();
    }

    final class ReroutePhase extends AbstractRunnable {
    private final ActionListener<Response> listener;
    private final Request request;
    private final ReplicationTask task;
    private final ClusterStateObserver observer;
    private final AtomicBoolean finished = new AtomicBoolean();

    ReroutePhase(ReplicationTask task, Request request, ActionListener<Response> listener) {
        this.request = request;
        if (task != null) {
            this.request.setParentTask(clusterService.localNode().getId(), task.getId());
        }
        this.listener = listener;
        this.task = task;
        this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
    }

    @Override
    public void onFailure(Throwable e) {
        finishWithUnexpectedFailure(e);
    }

    @Override
    protected void doRun() {
        setPhase(task, "routing");
        final ClusterState state = observer.observedState();
        ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel());
        if (blockException != null) {
            handleBlockException(blockException);
            return;
        }
        final String concreteIndex = resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request) : request.index();
        blockException = state.blocks().indexBlockedException(indexBlockLevel(), concreteIndex);
        if (blockException != null) {
            handleBlockException(blockException);
            return;
        }
        // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
        resolveRequest(state.metaData(), concreteIndex, request);
        assert request.shardId() != null : "request shardId must be set in resolveRequest";

        IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId().getIndex(), request.shardId().id());
        final ShardRouting primary = indexShard.primaryShard();
        if (primary == null || primary.active() == false) {
            logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), actionName, request, state.version());
            retryBecauseUnavailable(request.shardId(), "primary shard is not active");
            return;
        }
        if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
            logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
            retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
            return;
        }
        final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
        taskManager.registerChildTask(task, node.getId());
        if (primary.currentNodeId().equals(state.nodes().localNodeId())) {
            setPhase(task, "waiting_on_primary");
            if (logger.isTraceEnabled()) {
                logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ", transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
            }
            performAction(node, transportPrimaryAction, true);
        } else {
            if (state.version() < request.routedBasedOnClusterVersion()) {
                logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request, state.version(), request.routedBasedOnClusterVersion());
                retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]");
                return;
            } else {
                // chasing the node with the active primary for a second hop requires that we are at least up-to-date with the current cluster state version
                // this prevents redirect loops between two nodes when a primary was relocated and the relocation target is not aware that it is the active primary shard already.
                request.routedBasedOnClusterVersion(state.version());
            }
            if (logger.isTraceEnabled()) {
                logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName, request.shardId(), request, state.version(), primary.currentNodeId());
            }
            setPhase(task, "rerouted");
            performAction(node, actionName, false);
        }
    }

    private void handleBlockException(ClusterBlockException blockException) {
        if (blockException.retryable()) {
            logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
            retry(blockException);
        } else {
            finishAsFailed(blockException);
        }
    }

    private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction) {
        transportService.sendRequest(node, action, request, transportOptions, new BaseTransportResponseHandler<Response>() {

            @Override
            public Response newInstance() {
                return newResponseInstance();
            }

            @Override
            public String executor() {
                return ThreadPool.Names.SAME;
            }

            @Override
            public void handleResponse(Response response) {
                finishOnSuccess(response);
            }

            @Override
            public void handleException(TransportException exp) {
                try {
                    // if we got disconnected from the node, or the node / shard is not in the right state (being closed)
                    if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
                        (isPrimaryAction && retryPrimaryException(exp.unwrapCause()))) {
                        logger.trace("received an error from node [{}] for request [{}], scheduling a retry", exp, node.id(), request);
                        request.setCanHaveDuplicates();
                        retry(exp);
                    } else {
                        finishAsFailed(exp);
                    }
                } catch (Throwable t) {
                    finishWithUnexpectedFailure(t);
                }
            }
        });
    }

    void retry(Throwable failure) {
        assert failure != null;
        if (observer.isTimedOut()) {
            // we running as a last attempt after a timeout has happened. don't retry
            finishAsFailed(failure);
            return;
        }
        setPhase(task, "waiting_for_retry");
        observer.waitForNextChange(new ClusterStateObserver.Listener() {
            @Override
            public void onNewClusterState(ClusterState state) {
                run();
            }

            @Override
            public void onClusterServiceClose() {
                finishAsFailed(new NodeClosedException(clusterService.localNode()));
            }

            @Override
            public void onTimeout(TimeValue timeout) {
                // Try one more time...
                run();
            }
        });
    }

    void finishAsFailed(Throwable failure) {
        if (finished.compareAndSet(false, true)) {
            setPhase(task, "failed");
            logger.trace("operation failed. action [{}], request [{}]", failure, actionName, request);
            listener.onFailure(failure);
        } else {
            assert false : "finishAsFailed called but operation is already finished";
        }
    }

    void finishWithUnexpectedFailure(Throwable failure) {
        logger.warn("unexpected error during the primary phase for action [{}], request [{}]", failure, actionName, request);
        if (finished.compareAndSet(false, true)) {
            setPhase(task, "failed");
            listener.onFailure(failure);
        } else {
            assert false : "finishWithUnexpectedFailure called but operation is already finished";
        }
    }

    void finishOnSuccess(Response response) {
        if (finished.compareAndSet(false, true)) {
            setPhase(task, "finished");
            if (logger.isTraceEnabled()) {
                logger.trace("operation succeeded. action [{}],request [{}]", actionName, request);
            }
            listener.onResponse(response);
        } else {
            assert false : "finishOnSuccess called but operation is already finished";
        }
    }

    void retryBecauseUnavailable(ShardId shardId, String message) {
        retry(new UnavailableShardsException(shardId, "{} Timeout: [{}], request: [{}]", message, request.timeout(), request));
    }
    }
}

PrimaryPhase.java

PrimaryPhase은 로컬 노드의 primary 샤드에 index를 기록하고 replication action을 replication 샤드가 있는 노드로 전송하는 역할을 한다.

즉, 다시 말하면 PrimaryPhase를 끝내고 ReplicationPhase를 실행하는 역할을 한다.

PrimaryPhases.doRun()에서 shardOperationOnPrimary()를 호출하는데 이는 TransportIndexAction.shardOperationOnPrimary()이다.

primary 샤드의 기록이 끝나면 finishAndMoveToReplication()를 수행한다.

public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
    final class PrimaryPhase extends AbstractRunnable {

        @Override
        protected void doRun() throws Exception {
            setPhase(task, "primary");
            // request shardID was set in ReroutePhase
            assert request.shardId() != null : "request shardID must be set prior to primary phase";
            final ShardId shardId = request.shardId();
            final String writeConsistencyFailure = checkWriteConsistency(shardId);
            if (writeConsistencyFailure != null) {
            finishBecauseUnavailable(shardId, writeConsistencyFailure);
            return;
            }
            final ReplicationPhase replicationPhase;
            try {
            indexShardReference = getIndexShardOperationsCounter(shardId);
            Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
            if (logger.isTraceEnabled()) {
                logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
            }
            replicationPhase = new ReplicationPhase(task, primaryResponse.v2(), primaryResponse.v1(), shardId, channel,
                indexShardReference);
            } catch (Throwable e) {
            request.setCanHaveDuplicates();
            if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
                if (logger.isTraceEnabled()) {
                    logger.trace("failed to execute [{}] on [{}]", e, request, shardId);
                }
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("failed to execute [{}] on [{}]", e, request, shardId);
                }
            }
            finishAsFailed(e);
            return;
            }
            finishAndMoveToReplication(replicationPhase);
        }
}

TransportIndexAction.java

TransportIndexAction.shardOperationOnPrimary()에서 executeIndexRequestOnPrimary()를 호출한다.

TransportIndexAction.executeIndexRequestOnPrimary()에서는 **operation.execute(indexShard)를 호출한다.

public class TransportIndexAction extends TransportReplicationAction<IndexRequest, IndexRequest, IndexResponse> {


    @Override
    protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable {

    // validate, if routing is required, that we got routing
    IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex());
    MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
    if (mappingMd != null && mappingMd.routing().required()) {
        if (request.routing() == null) {
            throw new RoutingMissingException(request.shardId().getIndex(), request.type(), request.id());
        }
    }

    IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
    IndexShard indexShard = indexService.shardSafe(request.shardId().id());

    final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard, mappingUpdatedAction);
    final IndexResponse response = result.response;
    final Translog.Location location = result.location;
    processAfterWrite(request.refresh(), indexShard, location);
    return new Tuple<>(response, request);
    }

    public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Throwable {
    Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
    Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
    final ShardId shardId = indexShard.shardId();
    if (update != null) {
        final String indexName = shardId.getIndex();
        mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
        operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
        update = operation.parsedDoc().dynamicMappingsUpdate();
        if (update != null) {
            throw new RetryOnPrimaryException(shardId,
                "Dynamic mappings are not available on the node that holds the primary yet");
        }
    }
    final boolean created = operation.execute(indexShard);

    // update the version on request so it will happen on the replicas
    final long version = operation.version();
    request.version(version);
    request.versionType(request.versionType().versionTypeForReplicationAndRecovery());

    assert request.versionType().validateVersionForWrites(request.version());

    return new WriteResult<>(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
    }
}

Engine.java

Engine.execute()가 호출되고 이를 통해 shard.index(this)가 호출된다.

public abstract class Engine implements Closeable {
    public static final class Index extends IndexingOperation {     
    @Override
    public boolean execute(IndexShard shard) {
        return shard.index(this);
    }
    }
}

IndexShard.java

IndexShard.index() 안에서 **engine().index(index)가 호출된다.

public class IndexShard extends AbstractIndexShardComponent {
    public boolean index(Engine.Index index) {
    ensureWriteAllowed(index);
    markLastWrite();
    index = indexingService.preIndex(index);
    final boolean created;
    try {
        if (logger.isTraceEnabled()) {
            logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
        }
        created = engine().index(index);
        index.endTime(System.nanoTime());
    } catch (Throwable ex) {
        indexingService.postIndex(index, ex);
        throw ex;
    }
    indexingService.postIndex(index, created);
    return created;
    }
}

InternalEngine.java

거의다 왔다. InternalEngine.index()를 통해 innerIndex(index)를 호출한다.

InternalEngine.innerIndex()를 호출하는데 문서가 존재하지 않는 경우 루신의 indexWriter.addDocuments()가 호출된다.

indexWriter.addDocuments() 부터는 루신의 영역이므로 포스팅에서는 더 소스를 다루지 않는다.

public class InternalEngine extends Engine {
    @Override
    public boolean index(Index index) throws EngineException {
    final boolean created;
    try (ReleasableLock lock = readLock.acquire()) {
        ensureOpen();
        if (index.origin() == Operation.Origin.RECOVERY) {
            // Don't throttle recovery operations
            created = innerIndex(index);
        } else {
            try (Releasable r = throttle.acquireThrottle()) {
                created = innerIndex(index);
            }
        }
    } catch (OutOfMemoryError | IllegalStateException | IOException t) {
        maybeFailEngine("index", t);
        throw new IndexFailedEngineException(shardId, index.type(), index.id(), t);
    }
    checkVersionMapRefresh();
    return created;
    }

    private boolean innerIndex(Index index) throws IOException {
    try (Releasable ignored = acquireLock(index.uid())) {
        final long currentVersion;
        VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
        if (versionValue == null) {
            currentVersion = loadCurrentVersionFromIndex(index.uid());
        } else {
            if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() -
                    versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
                currentVersion = Versions.NOT_FOUND; // deleted, and GC
            } else {
                currentVersion = versionValue.version();
            }
        }

        long updatedVersion;
        long expectedVersion = index.version();
        if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
            if (index.origin() == Operation.Origin.RECOVERY) {
                return false;
            } else {
                throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
            }
        }
        updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);

        final boolean created;
        index.updateVersion(updatedVersion);
        if (currentVersion == Versions.NOT_FOUND) {
            // document does not exists, we can optimize for create
            created = true;
            if (index.docs().size() > 1) {
                indexWriter.addDocuments(index.docs());
            } else {
                indexWriter.addDocument(index.docs().get(0));
            }
        } else {
            if (versionValue != null) {
                created = versionValue.delete(); // we have a delete which is not GC'ed...
            } else {
                created = false;
            }
            if (index.docs().size() > 1) {
                indexWriter.updateDocuments(index.uid(), index.docs());
            } else {
                indexWriter.updateDocument(index.uid(), index.docs().get(0));
            }
        }
        Translog.Location translogLocation = translog.add(new Translog.Index(index));

        versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
        index.setTranslogLocation(translogLocation);
        indexingService.postIndexUnderLock(index);
        return created;
    }
    }
}

이어지는 내용은 #2에서 포스팅한다.