개요

  • 지난 포스팅에 이어 ES의 색인 과정의 소스코드를 분석해본다.
  • HTTP 방식으로 색인 요청이 들어왔을 때 처리 과정을 소스코드로 기술한다.
  • 색인글의 시작점은 링크이다

TransportReplicationAction.java

TransportReplicationAction.doExecute()를 통해 ReroutePhase.doRun()이 실행된다.(부모 클래스를 따라가보면 결국 doRun()이 실행됨)

ReroutePhase는 라우팅과 프라이머리 노드에서 일어나는 operation 실패에 대응하는 역할을 한다. 타겟노드로 라우팅 되기 전 index 정보와 샤드 id를 해석한다.

indexNameExpressionResolver.concreteSingleIndex(state, request)를 통해 실제 index를 알아내고(index expression 정제하여 단일 index명을 파싱)

resolveRequest(state.metaData(), concreteIndex, request)를 통해 index가 속한 샤드 ID를 알아낸다.

state.getRoutingTable().shardRoutingTable()를 통해 샤드정보(primary 샤드, replication 샤드 등에 대한 정보)를 가진 IndexShardRoutingTable을 획득한다.

DiscoveryNode node = state.nodes().get(primary.currentNodeId())를 통해 primary 샤드가 속한 노드정보를 가져온다.

이후 ReroutePhase가 일어나는 노드가 Primary 샤드가 있는 노드인지 아닌지에 따라 분기가 일어난다.

Primary 샤드가 있는 노드가 아니라면 performAction(node, actionName, false)로 다시 Primary 샤드가 있는 노드로 요청을 중계한다.

현재노드가 Primary 샤드가 있는 노드라면 performAction(node, transportPrimaryAction, true)이 실행된다.

transportPrimaryAction은 생성자를 보면 transportService로 PrimaryOperationTransportHandler가 등록된것을 알 수 있다.

PrimaryOperationTransportHandler.messageReceived()가 처리한다.

이를 통해 Primary샤드 처리는 PrimaryPhase에서 시작한다.

public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {

    protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
                                     ClusterService clusterService, IndicesService indicesService,
                                     ThreadPool threadPool, ShardStateAction shardStateAction,
                                     MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
                                     IndexNameExpressionResolver indexNameExpressionResolver, Class<Request> request,
                                     Class<ReplicaRequest> replicaRequest, String executor) {
    super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
    this.transportService = transportService;
    this.clusterService = clusterService;
    this.indicesService = indicesService;
    this.shardStateAction = shardStateAction;
    this.mappingUpdatedAction = mappingUpdatedAction;

    this.transportPrimaryAction = actionName + "[p]";
    this.transportReplicaAction = actionName + "[r]";
    this.executor = executor;
    this.checkWriteConsistency = checkWriteConsistency();
    transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
    transportService.registerRequestHandler(transportPrimaryAction, request, executor, new PrimaryOperationTransportHandler());
    // we must never reject on because of thread pool capacity on replicas
    transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true, true, new ReplicaOperationTransportHandler());

    this.transportOptions = transportOptions();

    this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
    }

    @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
    new ReroutePhase((ReplicationTask) task, request, listener).run();
    }

    final class ReroutePhase extends AbstractRunnable {
    private final ActionListener<Response> listener;
    private final Request request;
    private final ReplicationTask task;
    private final ClusterStateObserver observer;
    private final AtomicBoolean finished = new AtomicBoolean();

    ReroutePhase(ReplicationTask task, Request request, ActionListener<Response> listener) {
        this.request = request;
        if (task != null) {
            this.request.setParentTask(clusterService.localNode().getId(), task.getId());
        }
        this.listener = listener;
        this.task = task;
        this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
    }

    @Override
    public void onFailure(Throwable e) {
        finishWithUnexpectedFailure(e);
    }

    @Override
    protected void doRun() {
        setPhase(task, "routing");
        final ClusterState state = observer.observedState();
        ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel());
        if (blockException != null) {
            handleBlockException(blockException);
            return;
        }
        final String concreteIndex = resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request) : request.index();
        blockException = state.blocks().indexBlockedException(indexBlockLevel(), concreteIndex);
        if (blockException != null) {
            handleBlockException(blockException);
            return;
        }
        // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
        resolveRequest(state.metaData(), concreteIndex, request);
        assert request.shardId() != null : "request shardId must be set in resolveRequest";

        IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId().getIndex(), request.shardId().id());
        final ShardRouting primary = indexShard.primaryShard();
        if (primary == null || primary.active() == false) {
            logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), actionName, request, state.version());
            retryBecauseUnavailable(request.shardId(), "primary shard is not active");
            return;
        }
        if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
            logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
            retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
            return;
        }
        final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
        taskManager.registerChildTask(task, node.getId());
        if (primary.currentNodeId().equals(state.nodes().localNodeId())) {
            setPhase(task, "waiting_on_primary");
            if (logger.isTraceEnabled()) {
                logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ", transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
            }
            performAction(node, transportPrimaryAction, true);
        } else {
            if (state.version() < request.routedBasedOnClusterVersion()) {
                logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request, state.version(), request.routedBasedOnClusterVersion());
                retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]");
                return;
            } else {
                // chasing the node with the active primary for a second hop requires that we are at least up-to-date with the current cluster state version
                // this prevents redirect loops between two nodes when a primary was relocated and the relocation target is not aware that it is the active primary shard already.
                request.routedBasedOnClusterVersion(state.version());
            }
            if (logger.isTraceEnabled()) {
                logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName, request.shardId(), request, state.version(), primary.currentNodeId());
            }
            setPhase(task, "rerouted");
            performAction(node, actionName, false);
        }
    }

    private void handleBlockException(ClusterBlockException blockException) {
        if (blockException.retryable()) {
            logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
            retry(blockException);
        } else {
            finishAsFailed(blockException);
        }
    }

    private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction) {
        transportService.sendRequest(node, action, request, transportOptions, new BaseTransportResponseHandler<Response>() {

            @Override
            public Response newInstance() {
                return newResponseInstance();
            }

            @Override
            public String executor() {
                return ThreadPool.Names.SAME;
            }

            @Override
            public void handleResponse(Response response) {
                finishOnSuccess(response);
            }

            @Override
            public void handleException(TransportException exp) {
                try {
                    // if we got disconnected from the node, or the node / shard is not in the right state (being closed)
                    if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
                        (isPrimaryAction && retryPrimaryException(exp.unwrapCause()))) {
                        logger.trace("received an error from node [{}] for request [{}], scheduling a retry", exp, node.id(), request);
                        request.setCanHaveDuplicates();
                        retry(exp);
                    } else {
                        finishAsFailed(exp);
                    }
                } catch (Throwable t) {
                    finishWithUnexpectedFailure(t);
                }
            }
        });
    }

    void retry(Throwable failure) {
        assert failure != null;
        if (observer.isTimedOut()) {
            // we running as a last attempt after a timeout has happened. don't retry
            finishAsFailed(failure);
            return;
        }
        setPhase(task, "waiting_for_retry");
        observer.waitForNextChange(new ClusterStateObserver.Listener() {
            @Override
            public void onNewClusterState(ClusterState state) {
                run();
            }

            @Override
            public void onClusterServiceClose() {
                finishAsFailed(new NodeClosedException(clusterService.localNode()));
            }

            @Override
            public void onTimeout(TimeValue timeout) {
                // Try one more time...
                run();
            }
        });
    }

    void finishAsFailed(Throwable failure) {
        if (finished.compareAndSet(false, true)) {
            setPhase(task, "failed");
            logger.trace("operation failed. action [{}], request [{}]", failure, actionName, request);
            listener.onFailure(failure);
        } else {
            assert false : "finishAsFailed called but operation is already finished";
        }
    }

    void finishWithUnexpectedFailure(Throwable failure) {
        logger.warn("unexpected error during the primary phase for action [{}], request [{}]", failure, actionName, request);
        if (finished.compareAndSet(false, true)) {
            setPhase(task, "failed");
            listener.onFailure(failure);
        } else {
            assert false : "finishWithUnexpectedFailure called but operation is already finished";
        }
    }

    void finishOnSuccess(Response response) {
        if (finished.compareAndSet(false, true)) {
            setPhase(task, "finished");
            if (logger.isTraceEnabled()) {
                logger.trace("operation succeeded. action [{}],request [{}]", actionName, request);
            }
            listener.onResponse(response);
        } else {
            assert false : "finishOnSuccess called but operation is already finished";
        }
    }

    void retryBecauseUnavailable(ShardId shardId, String message) {
        retry(new UnavailableShardsException(shardId, "{} Timeout: [{}], request: [{}]", message, request.timeout(), request));
    }
    }
}

PrimaryPhase.java

PrimaryPhase은 로컬 노드의 primary 샤드에 index를 기록하고 replication action을 replication 샤드가 있는 노드로 전송하는 역할을 한다.

즉, 다시 말하면 PrimaryPhase를 끝내고 ReplicationPhase를 실행하는 역할을 한다.

PrimaryPhases.doRun()에서 shardOperationOnPrimary()를 호출하는데 이는 TransportIndexAction.shardOperationOnPrimary()이다.

primary 샤드의 기록이 끝나면 finishAndMoveToReplication()를 수행한다.

public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
    final class PrimaryPhase extends AbstractRunnable {

        @Override
        protected void doRun() throws Exception {
            setPhase(task, "primary");
            // request shardID was set in ReroutePhase
            assert request.shardId() != null : "request shardID must be set prior to primary phase";
            final ShardId shardId = request.shardId();
            final String writeConsistencyFailure = checkWriteConsistency(shardId);
            if (writeConsistencyFailure != null) {
            finishBecauseUnavailable(shardId, writeConsistencyFailure);
            return;
            }
            final ReplicationPhase replicationPhase;
            try {
            indexShardReference = getIndexShardOperationsCounter(shardId);
            Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
            if (logger.isTraceEnabled()) {
                logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
            }
            replicationPhase = new ReplicationPhase(task, primaryResponse.v2(), primaryResponse.v1(), shardId, channel,
                indexShardReference);
            } catch (Throwable e) {
            request.setCanHaveDuplicates();
            if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
                if (logger.isTraceEnabled()) {
                    logger.trace("failed to execute [{}] on [{}]", e, request, shardId);
                }
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("failed to execute [{}] on [{}]", e, request, shardId);
                }
            }
            finishAsFailed(e);
            return;
            }
            finishAndMoveToReplication(replicationPhase);
        }
}

TransportIndexAction.java

TransportIndexAction.shardOperationOnPrimary()에서 executeIndexRequestOnPrimary()를 호출한다.

TransportIndexAction.executeIndexRequestOnPrimary()에서는 **operation.execute(indexShard)를 호출한다.

public class TransportIndexAction extends TransportReplicationAction<IndexRequest, IndexRequest, IndexResponse> {


    @Override
    protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable {

    // validate, if routing is required, that we got routing
    IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex());
    MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
    if (mappingMd != null && mappingMd.routing().required()) {
        if (request.routing() == null) {
            throw new RoutingMissingException(request.shardId().getIndex(), request.type(), request.id());
        }
    }

    IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
    IndexShard indexShard = indexService.shardSafe(request.shardId().id());

    final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard, mappingUpdatedAction);
    final IndexResponse response = result.response;
    final Translog.Location location = result.location;
    processAfterWrite(request.refresh(), indexShard, location);
    return new Tuple<>(response, request);
    }

    public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Throwable {
    Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
    Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
    final ShardId shardId = indexShard.shardId();
    if (update != null) {
        final String indexName = shardId.getIndex();
        mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
        operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
        update = operation.parsedDoc().dynamicMappingsUpdate();
        if (update != null) {
            throw new RetryOnPrimaryException(shardId,
                "Dynamic mappings are not available on the node that holds the primary yet");
        }
    }
    final boolean created = operation.execute(indexShard);

    // update the version on request so it will happen on the replicas
    final long version = operation.version();
    request.version(version);
    request.versionType(request.versionType().versionTypeForReplicationAndRecovery());

    assert request.versionType().validateVersionForWrites(request.version());

    return new WriteResult<>(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
    }
}

Engine.java

Engine.execute()가 호출되고 이를 통해 shard.index(this)가 호출된다.

public abstract class Engine implements Closeable {
    public static final class Index extends IndexingOperation {     
    @Override
    public boolean execute(IndexShard shard) {
        return shard.index(this);
    }
    }
}

IndexShard.java

IndexShard.index() 안에서 **engine().index(index)가 호출된다.

public class IndexShard extends AbstractIndexShardComponent {
    public boolean index(Engine.Index index) {
    ensureWriteAllowed(index);
    markLastWrite();
    index = indexingService.preIndex(index);
    final boolean created;
    try {
        if (logger.isTraceEnabled()) {
            logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
        }
        created = engine().index(index);
        index.endTime(System.nanoTime());
    } catch (Throwable ex) {
        indexingService.postIndex(index, ex);
        throw ex;
    }
    indexingService.postIndex(index, created);
    return created;
    }
}

InternalEngine.java

InternalEngine.index()를 통해 innerIndex(index)를 호출한다.

InternalEngine.innerIndex()를 호출하는데 문서가 존재하지 않는 경우 루신의 indexWriter.addDocuments()가 호출된다.

indexWriter.addDocuments() 부터는 루신의 영역이므로 포스팅에서는 더 소스를 다루지 않는다.

public class InternalEngine extends Engine {
    @Override
    public boolean index(Index index) throws EngineException {
    final boolean created;
    try (ReleasableLock lock = readLock.acquire()) {
        ensureOpen();
        if (index.origin() == Operation.Origin.RECOVERY) {
            // Don't throttle recovery operations
            created = innerIndex(index);
        } else {
            try (Releasable r = throttle.acquireThrottle()) {
                created = innerIndex(index);
            }
        }
    } catch (OutOfMemoryError | IllegalStateException | IOException t) {
        maybeFailEngine("index", t);
        throw new IndexFailedEngineException(shardId, index.type(), index.id(), t);
    }
    checkVersionMapRefresh();
    return created;
    }

    private boolean innerIndex(Index index) throws IOException {
    try (Releasable ignored = acquireLock(index.uid())) {
        final long currentVersion;
        VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
        if (versionValue == null) {
            currentVersion = loadCurrentVersionFromIndex(index.uid());
        } else {
            if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() -
                    versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
                currentVersion = Versions.NOT_FOUND; // deleted, and GC
            } else {
                currentVersion = versionValue.version();
            }
        }

        long updatedVersion;
        long expectedVersion = index.version();
        if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
            if (index.origin() == Operation.Origin.RECOVERY) {
                return false;
            } else {
                throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
            }
        }
        updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);

        final boolean created;
        index.updateVersion(updatedVersion);
        if (currentVersion == Versions.NOT_FOUND) {
            // document does not exists, we can optimize for create
            created = true;
            if (index.docs().size() > 1) {
                indexWriter.addDocuments(index.docs());
            } else {
                indexWriter.addDocument(index.docs().get(0));
            }
        } else {
            if (versionValue != null) {
                created = versionValue.delete(); // we have a delete which is not GC'ed...
            } else {
                created = false;
            }
            if (index.docs().size() > 1) {
                indexWriter.updateDocuments(index.uid(), index.docs());
            } else {
                indexWriter.updateDocument(index.uid(), index.docs().get(0));
            }
        }
        Translog.Location translogLocation = translog.add(new Translog.Index(index));

        versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
        index.setTranslogLocation(translogLocation);
        indexingService.postIndexUnderLock(index);
        return created;
    }
    }
}

ReplicationPhase.java

PrimaryPhase가 끝날때 PrimaryPhase.finishAndMoveToReplication()를 호출하여 ReplicationPhase로 넘어간다.

ReplicationPhase는 Replication 샤드가 있는 노드들로 request(AsyncReplicaAction)를 보내는 역할을 한다.

ReplicationPhase.performOnReplica()를 호출하여 replication 샤드가 있는 노드로 요청을 보낸다.

transportReplicaAction은 TransportReplicationAction 생성자에서 ReplicaOperationTransportHandler로 요청을 받게끔 바인딩 되어 있다.

public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
    final class ReplicationPhase extends AbstractRunnable {
        private final ReplicationTask task;
        private final ReplicaRequest replicaRequest;
        private final Response finalResponse;
        private final TransportChannel channel;
        private final ShardId shardId;
        private final List<ShardRouting> shards;
        private final DiscoveryNodes nodes;
        private final boolean executeOnReplica;
        private final String indexUUID;
        private final AtomicBoolean finished = new AtomicBoolean();
        private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard
        private final ConcurrentMap<String, Throwable> shardReplicaFailures = ConcurrentCollections.newConcurrentMap();
        private final AtomicInteger pending;
        private final int totalShards;
        private final Releasable indexShardReference;

        public ReplicationPhase(ReplicationTask task, ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId,
                            TransportChannel channel, Releasable indexShardReference) {
            this.task = task;
            this.replicaRequest = replicaRequest;
            this.channel = channel;
            this.finalResponse = finalResponse;
            this.indexShardReference = indexShardReference;
            this.shardId = shardId;

            // we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
            // we have to make sure that every operation indexed into the primary after recovery start will also be replicated
            // to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
            // If the index gets deleted after primary operation, we skip replication
            final ClusterState state = clusterService.state();
            final IndexRoutingTable index = state.getRoutingTable().index(shardId.getIndex());
            final IndexShardRoutingTable shardRoutingTable = (index != null) ? index.shard(shardId.id()) : null;
            final IndexMetaData indexMetaData = state.getMetaData().index(shardId.getIndex());
            this.shards = (shardRoutingTable != null) ? shardRoutingTable.shards() : Collections.<ShardRouting>emptyList();
            this.executeOnReplica = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings());
            this.indexUUID = (indexMetaData != null) ? indexMetaData.getIndexUUID() : null;
            this.nodes = state.getNodes();

            if (shards.isEmpty()) {
            logger.debug("replication phase for request [{}] on [{}] is skipped due to index deletion after primary operation", replicaRequest, shardId);
            }

            // we calculate number of target nodes to send replication operations, including nodes with relocating shards
            int numberOfIgnoredShardInstances = 0;
            int numberOfPendingShardInstances = 0;
            for (ShardRouting shard : shards) {
            if (shard.state() != ShardRoutingState.STARTED) {
                replicaRequest.setCanHaveDuplicates();
            }
            if (shard.primary() == false && executeOnReplica == false) {
                numberOfIgnoredShardInstances++;
            } else if (shard.unassigned()) {
                numberOfIgnoredShardInstances++;
            } else {
                if (shard.currentNodeId().equals(nodes.localNodeId()) == false) {
                    numberOfPendingShardInstances++;
                }
                if (shard.relocating()) {
                    numberOfPendingShardInstances++;
                }
            }
            }
            // one for the local primary copy
            this.totalShards = 1 + numberOfPendingShardInstances + numberOfIgnoredShardInstances;
            this.pending = new AtomicInteger(numberOfPendingShardInstances);
            if (logger.isTraceEnabled()) {
            logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(),
                transportReplicaAction, replicaRequest, state.version());
            }
        }

        /**
         * total shard copies
         */
        int totalShards() {
            return totalShards;
        }

        /**
         * total successful operations so far
         */
        int successful() {
            return success.get();
        }

        /**
         * number of pending operations
         */
        int pending() {
            return pending.get();
        }

        @Override
        public void onFailure(Throwable t) {
            logger.error("unexpected error while replicating for action [{}]. shard [{}]. ", t, actionName, shardId);
            forceFinishAsFailed(t);
        }

        /**
         * start sending replica requests to target nodes
         */
        @Override
        protected void doRun() {
            setPhase(task, "replicating");
            if (pending.get() == 0) {
            doFinish();
            return;
            }
            for (ShardRouting shard : shards) {
            if (shard.primary() == false && executeOnReplica == false) {
                // If the replicas use shadow replicas, there is no reason to
                // perform the action on the replica, so skip it and
                // immediately return

                // this delays mapping updates on replicas because they have
                // to wait until they get the new mapping through the cluster
                // state, which is why we recommend pre-defined mappings for
                // indices using shadow replicas
                continue;
            }
            if (shard.unassigned()) {
                continue;
            }
            // we index on a replica that is initializing as well since we might not have got the event
            // yet that it was started. We will get an exception IllegalShardState exception if its not started
            // and that's fine, we will ignore it

            // we never execute replication operation locally as primary operation has already completed locally
            // hence, we ignore any local shard for replication
            if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
                performOnReplica(shard);
            }
            // send operation to relocating shard
            if (shard.relocating()) {
                performOnReplica(shard.buildTargetRelocatingShard());
            }
            }
        }

        /**
         * send replica operation to target node
         */
        void performOnReplica(final ShardRouting shard) {
            // if we don't have that node, it means that it might have failed and will be created again, in
            // this case, we don't have to do the operation, and just let it failover
            final String nodeId = shard.currentNodeId();
            if (!nodes.nodeExists(nodeId)) {
            logger.trace("failed to send action [{}] on replica [{}] for request [{}] due to unknown node [{}]", transportReplicaAction, shard.shardId(), replicaRequest, nodeId);
            onReplicaFailure(nodeId, null);
            return;
            }
            if (logger.isTraceEnabled()) {
            logger.trace("send action [{}] on replica [{}] for request [{}] to [{}]", transportReplicaAction, shard.shardId(), replicaRequest, nodeId);
            }

            final DiscoveryNode node = nodes.get(nodeId);
            transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                @Override
                public void handleResponse(TransportResponse.Empty vResponse) {
                    onReplicaSuccess();
                }

                @Override
                public void handleException(TransportException exp) {
                    onReplicaFailure(nodeId, exp);
                    logger.trace("[{}] transport failure during replica request [{}], action [{}]", exp, node, replicaRequest, transportReplicaAction);
                    if (mustFailReplica(exp)) {
                        assert ignoreReplicaException(exp) == false;
                        logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node);
                        shardStateAction.shardFailed(shard, indexUUID, "failed to perform " + actionName + " on replica on node " + node, exp);
                    }
                }
            }
            );
        }


        void onReplicaFailure(String nodeId, @Nullable Throwable e) {
            // Only version conflict should be ignored from being put into the _shards header?
            if (e != null && ignoreReplicaException(e) == false) {
            shardReplicaFailures.put(nodeId, e);
            }
            decPendingAndFinishIfNeeded();
        }

        void onReplicaSuccess() {
            success.incrementAndGet();
            decPendingAndFinishIfNeeded();
        }

        private void decPendingAndFinishIfNeeded() {
            if (pending.decrementAndGet() <= 0) {
            doFinish();
            }
        }

        private void forceFinishAsFailed(Throwable t) {
            setPhase(task, "failed");
            if (finished.compareAndSet(false, true)) {
            Releasables.close(indexShardReference);
            try {
                channel.sendResponse(t);
            } catch (IOException responseException) {
                logger.warn("failed to send error message back to client for action [{}]", responseException, transportReplicaAction);
                logger.warn("actual Exception", t);
            }
            }
        }

        private void doFinish() {
            if (finished.compareAndSet(false, true)) {
            setPhase(task, "finished");
            Releasables.close(indexShardReference);
            final ActionWriteResponse.ShardInfo.Failure[] failuresArray;
            if (!shardReplicaFailures.isEmpty()) {
                int slot = 0;
                failuresArray = new ActionWriteResponse.ShardInfo.Failure[shardReplicaFailures.size()];
                for (Map.Entry<String, Throwable> entry : shardReplicaFailures.entrySet()) {
                    RestStatus restStatus = ExceptionsHelper.status(entry.getValue());
                    failuresArray[slot++] = new ActionWriteResponse.ShardInfo.Failure(
                        shardId.getIndex(), shardId.getId(), entry.getKey(), entry.getValue(), restStatus, false
                    );
                }
            } else {
                failuresArray = ActionWriteResponse.EMPTY;
            }
            finalResponse.setShardInfo(new ActionWriteResponse.ShardInfo(
                    totalShards,
                    success.get(),
                    failuresArray

                )
            );
            try {
                channel.sendResponse(finalResponse);
            } catch (IOException responseException) {
                logger.warn("failed to send error message back to client for action [" + transportReplicaAction + "]", responseException);
            }
            if (logger.isTraceEnabled()) {
                logger.trace("action [{}] completed on all replicas [{}] for request [{}]", transportReplicaAction, shardId, replicaRequest);
            }
            }
        }

        }

    class ReplicaOperationTransportHandler extends TransportRequestHandler<ReplicaRequest> {
    @Override
    public void messageReceived(final ReplicaRequest request, final TransportChannel channel) throws Exception {
        throw new UnsupportedOperationException("the task parameter is required for this operation");
    }

    @Override
    public void messageReceived(ReplicaRequest request, TransportChannel channel, Task task) throws Exception {
        new AsyncReplicaAction(request, channel, (ReplicationTask) task).run();
    }
    }
}

AsyncReplicaAction.java

AsyncReplicaAction.shardOperationOnReplica(request)를 호출한다.

public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
    private final class AsyncReplicaAction extends AbstractRunnable {
    private final ReplicaRequest request;
    private final TransportChannel channel;
    /**
     * The task on the node with the replica shard.
     */
    private final ReplicationTask task;
    // important: we pass null as a timeout as failing a replica is
    // something we want to avoid at all costs
    private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);

    AsyncReplicaAction(ReplicaRequest request, TransportChannel channel, ReplicationTask task) {
        this.request = request;
        this.channel = channel;
        this.task = task;
    }

    @Override
    public void onFailure(Throwable t) {
        if (t instanceof RetryOnReplicaException) {
            logger.trace("Retrying operation on replica, action [{}], request [{}]", t, transportReplicaAction, request);
            observer.waitForNextChange(new ClusterStateObserver.Listener() {
                @Override
                public void onNewClusterState(ClusterState state) {
                    // Forking a thread on local node via transport service so that custom transport service have an
                    // opportunity to execute custom  logic before the replica operation begins
                    String extraMessage = "action [" + transportReplicaAction  + "], request[" + request + "]";
                    TransportChannelResponseHandler<TransportResponse.Empty> handler = TransportChannelResponseHandler.emptyResponseHandler(logger, channel, extraMessage);
                    transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler);
                }

                @Override
                public void onClusterServiceClose() {
                    responseWithFailure(new NodeClosedException(clusterService.localNode()));
                }

                @Override
                public void onTimeout(TimeValue timeout) {
                    throw new AssertionError("Cannot happen: there is not timeout");
                }
            });
        } else {
            try {
                failReplicaIfNeeded(t);
            } catch (Throwable unexpected) {
                logger.error("{} unexpected error while failing replica", unexpected, request.shardId().id());
            } finally {
                responseWithFailure(t);
            }
        }
    }

    private void failReplicaIfNeeded(Throwable t) {
        String index = request.shardId().getIndex();
        int shardId = request.shardId().id();
        logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
        if (mustFailReplica(t)) {
            assert ignoreReplicaException(t) == false;
            IndexService indexService = indicesService.indexService(index);
            if (indexService == null) {
                logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
                return;
            }
            IndexShard indexShard = indexService.shard(shardId);
            if (indexShard == null) {
                logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
                return;
            }
            indexShard.failShard(actionName + " failed on replica", t);
        }
    }

    protected void responseWithFailure(Throwable t) {
        try {
            channel.sendResponse(t);
        } catch (IOException responseException) {
            logger.warn("failed to send error message back to client for action [" + transportReplicaAction + "]", responseException);
            logger.warn("actual Exception", t);
        }
    }

    @Override
    protected void doRun() throws Exception {
        setPhase(task, "replica");
        assert request.shardId() != null : "request shardId must be set";
        try (Releasable ignored = getIndexShardOperationsCounter(request.shardId())) {
            shardOperationOnReplica(request);
            if (logger.isTraceEnabled()) {
                logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
            }
        }
        setPhase(task, "finished");
        channel.sendResponse(TransportResponse.Empty.INSTANCE);
    }
    }
}

TransportIndexAction.java

TransportIndexAction.shardOperationOnReplica()를 호출하여 executeIndexRequestOnReplica()를 호출한다.

이후 operation.execute()를 호출하는것은 Primary 샤드를 가진 노드에서 수행한것과 동일하다.

public class TransportIndexAction extends TransportReplicationAction<IndexRequest, IndexRequest, IndexResponse> {
    @Override
    protected void shardOperationOnReplica(IndexRequest request) {
    final ShardId shardId = request.shardId();
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
    IndexShard indexShard = indexService.shardSafe(shardId.id());
    final Engine.IndexingOperation operation = executeIndexRequestOnReplica(request, indexShard);
    processAfterWrite(request.refresh(), indexShard, operation.getTranslogLocation());
    }

    public static Engine.IndexingOperation executeIndexRequestOnReplica(IndexRequest request, IndexShard indexShard) {
    final ShardId shardId = indexShard.shardId();
    SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).index(shardId.getIndex()).type(request.type()).id(request.id())
        .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());

    final Engine.IndexingOperation operation;
    if (request.opType() == IndexRequest.OpType.INDEX) {
        operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.canHaveDuplicates());
    } else {
        assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
        operation = indexShard.prepareCreateOnReplica(sourceToParse, request.version(), request.versionType(), request.canHaveDuplicates(), request.autoGeneratedId());
    }

    Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
    if (update != null) {
        throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
    }

    operation.execute(indexShard);
    return operation;
    }
}