개요

  • 지난 포스팅에 이어 ES의 조회(GET) 과정의 소스코드를 분석해본다.
  • HTTP 방식으로 색인 요청이 들어왔을 때 처리 과정을 소스코드로 기술한다.
  • ES 엘라스틱서치(ES) 조회(GET) 소스코드 분석글의 시작점은 링크이다.
  • ES 소스분석글의 시작점은 링크이다.

코드분석

AsyncSingleAction.java

AsyncSingleAction.start()부분을 살펴보기에 앞서 AsyncSingleAction의 생성자 부분에 대한 이해가 필요하다.

indexNameExpressionResolver.concreteSingleIndex()를 통해 복수 인덱스를 지정했더라도 실제 단일 인덱스를 파싱하여 가져온다.

resolveRequest(clusterState, internalRequest)는 추상화 메서드만 AsyncSingleAction에 존재하고 실제 구현체는 상속받은 클래스들에 있다. 여기서는 TransportGetAction.resolveRequest()이다.

TransportGetAction.resolveRequest()에서 실제 인덱스에 대한 라우팅 정보를 업데이트 한다.

shards(clusterState, internalRequest)도 같은 맥락으로 구현체는 TransportGetAction.shards(clusterState, internalRequest)로써 클러스터 정보를 통해 실제 인덱스에 속한 문서가 속한 샤드 정보를 받아온다.

shards(clusterState, internalRequest)를 통해 매번 랜덤으로 primary 샤드와 replication 샤드간에서 셔플로 shardIt를 정합니다.

이제 start()를 살펴보면 shards()가 null을 리턴하면 로컬 노드를 뜻하므로 로컬에서 바로 요청을 수행하고, null이 아니라면 현재 노드는 cordination 노드이고 샤드가 있는 노드로 요청을 전송하기 위해 perform()을 수행한다.

perform()을 살펴보면 샤드 정보를 통해 실제 샤드를 가진 노드를 찾고(primary 일수도 있고 replication 일수도 있음) 이 노드로 요청을 전송함.

요청을 받은 노드에서는 transportShardAction에 해당하는 핸들러가 수행됨

public abstract class TransportSingleShardAction<Request extends SingleShardRequest, Response extends    ActionResponse> extends TransportAction<Request, Response> {
  class AsyncSingleAction {

          private final ActionListener<Response> listener;
          private final ShardsIterator shardIt;
          private final InternalRequest internalRequest;
          private final DiscoveryNodes nodes;
          private volatile Throwable lastFailure;

          private AsyncSingleAction(Request request, ActionListener<Response> listener) {
              logger.info("TransportSingleShardAction.AsyncSingleAction.AsyncSingleAction");
              this.listener = listener;

              ClusterState clusterState = clusterService.state();
              if (logger.isTraceEnabled()) {
                  logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
              }
              nodes = clusterState.nodes();
              ClusterBlockException blockException = checkGlobalBlock(clusterState);
              if (blockException != null) {
                  throw blockException;
              }

              String concreteSingleIndex;
              if (resolveIndex(request)) {
                  concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request);
              } else {
                  concreteSingleIndex = request.index();
              }
              this.internalRequest = new InternalRequest(request, concreteSingleIndex);
              resolveRequest(clusterState, internalRequest);

              blockException = checkRequestBlock(clusterState, internalRequest);
              if (blockException != null) {
                  throw blockException;
              }

              this.shardIt = shards(clusterState, internalRequest);
          }

          public void start() {
              if (shardIt == null) {
                  // just execute it on the local node
                  transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
                      @Override
                      public Response newInstance() {
                          return newResponse();
                      }

                      @Override
                      public String executor() {
                          return ThreadPool.Names.SAME;
                      }

                      @Override
                      public void handleResponse(final Response response) {
                          listener.onResponse(response);
                      }

                      @Override
                      public void handleException(TransportException exp) {
                          listener.onFailure(exp);
                      }
                  });
              } else {
                  perform(null);
              }
          }

          private void onFailure(ShardRouting shardRouting, Throwable e) {
              if (logger.isTraceEnabled() && e != null) {
                  logger.trace("{}: failed to execute [{}]", e, shardRouting, internalRequest.request());
              }
              perform(e);
          }

          private void perform(@Nullable final Throwable currentFailure) {
              Throwable lastFailure = this.lastFailure;
              if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
                  lastFailure = currentFailure;
                  this.lastFailure = currentFailure;
              }
              final ShardRouting shardRouting = shardIt.nextOrNull();
              if (shardRouting == null) {
                  Throwable failure = lastFailure;
                  if (failure == null || isShardNotAvailableException(failure)) {
                      failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
                  } else {
                      if (logger.isDebugEnabled()) {
                          logger.debug("{}: failed to execute [{}]", failure, null, internalRequest.request());
                      }
                  }
                  listener.onFailure(failure);
                  return;
              }
              DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
              if (node == null) {
                  onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
              } else {
                  internalRequest.request().internalShardId = shardRouting.shardId();
                  if (logger.isTraceEnabled()) {
                      logger.trace(
                              "sending request [{}] to shard [{}] on node [{}]",
                              internalRequest.request(),
                              internalRequest.request().internalShardId,
                              node
                      );
                  }
                  transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {

                      @Override
                      public Response newInstance() {
                          return newResponse();
                      }

                      @Override
                      public String executor() {
                          return ThreadPool.Names.SAME;
                      }

                      @Override
                      public void handleResponse(final Response response) {
                          listener.onResponse(response);
                      }

                      @Override
                      public void handleException(TransportException exp) {
                          onFailure(shardRouting, exp);
                      }
                  });
              }
          }
      }
}

  public class TransportGetAction extends TransportSingleShardAction<GetRequest, GetResponse> {
    @Override
    protected void resolveRequest(ClusterState state, InternalRequest request) {
        if (request.request().realtime == null) {
            request.request().realtime = this.realtime;
        }
        IndexMetaData indexMeta = state.getMetaData().index(request.concreteIndex());
        if (request.request().realtime && // if the realtime flag is set
                request.request().preference() == null && // the preference flag is not already set
                indexMeta != null && // and we have the index
                IndexMetaData.isIndexUsingShadowReplicas(indexMeta.getSettings())) { // and the index uses shadow replicas
            // set the preference for the request to use "_primary" automatically
            request.request().preference(Preference.PRIMARY.type());
        }
        // update the routing (request#index here is possibly an alias)
        request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index()));
        // Fail fast on the node that received the request.
        if (request.request().routing() == null && state.getMetaData().routingRequired(request.concreteIndex(), request.request().type())) {
            throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
        }
    }

  @Override
  protected ShardIterator shards(ClusterState state, InternalRequest request) {
      return clusterService.operationRouting()
              .getShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing(), request.request().preference());
  }
}

ShardTransportHandler

TransportSingleShardAction의 생성자에 transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());을 통해 ShardTransportHandler가 등록된 것을 알 수 있음

중요 로직은 shardOperation()를 호출하는것이다. 이부분은 앞서 여러차례 설명한 원리로 TransportGetAction.shardOperation()이 호출됨.

public abstract class TransportSingleShardAction<Request extends SingleShardRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
  private class ShardTransportHandler extends TransportRequestHandler<Request> {

    @Override
    public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
        }
        Response response = shardOperation(request, request.internalShardId);
        channel.sendResponse(response);
    }
  }
}

shardOperation

크게 볼건 없고 indexShard.getService().get()을 호출함을 알 수 있다.

public class TransportGetAction extends TransportSingleShardAction<GetRequest, GetResponse> {
  @Override
  protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
      logger.info("TransportGetAction.shardOperation");
      IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
      IndexShard indexShard = indexService.shardSafe(shardId.id());

      if (request.refresh() && !request.realtime()) {
          indexShard.refresh("refresh_flag_get");
      }

      GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(),
              request.realtime(), request.version(), request.versionType(), request.fetchSourceContext(), request.ignoreErrorsOnGeneratedFields());
      return new GetResponse(result);
  }
}

ShardGetService.java

ShardGetService.get()을 통해 ShardGetService.innerGet()을 호출한다.

innerGet()에서는 indexShard.get()으로 문서를 가져온다.

문서를 가져온 후 출처에 따라 분기처리한다.

문서id, 버전이 없으면 트랜스로그에서 가져온것으로 트랜스로그 source로부터 직접 source를 만든다.

문서id, 버전이 있으면 문서가 존재하는 것으로 innerGetLoadFromStoredFields()로 문서를 가져온다.

public final class ShardGetService extends AbstractIndexShardComponent {
  public GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext, boolean ignoreErrorsOnGeneratedFields) {
      logger.info("ShardGetService.get");
      currentMetric.inc();
      try {
          long now = System.nanoTime();
          GetResult getResult = innerGet(type, id, gFields, realtime, version, versionType, fetchSourceContext, ignoreErrorsOnGeneratedFields);

          if (getResult.isExists()) {
              existsMetric.inc(System.nanoTime() - now);
          } else {
              missingMetric.inc(System.nanoTime() - now);
          }
          return getResult;
      } finally {
          currentMetric.dec();
      }
  }

  private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext, boolean ignoreErrorsOnGeneratedFields) {
      logger.info("innerGet");
      fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);

      Engine.GetResult get = null;
      if (type == null || type.equals("_all")) {
          for (String typeX : mapperService.types()) {
              get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(typeX, id)))
                      .version(version).versionType(versionType));
              if (get.exists()) {
                  type = typeX;
                  break;
              } else {
                  get.release();
              }
          }
          if (get == null) {
              return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
          }
          if (!get.exists()) {
              // no need to release here as well..., we release in the for loop for non exists
              return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
          }
      } else {
          get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(type, id)))
                  .version(version).versionType(versionType));
          if (!get.exists()) {
              get.release();
              return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
          }
      }

      DocumentMapper docMapper = mapperService.documentMapper(type);
      if (docMapper == null) {
          get.release();
          return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
      }

      try {
          // break between having loaded it from translog (so we only have _source), and having a document to load
          if (get.docIdAndVersion() != null) {
              return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, docMapper, ignoreErrorsOnGeneratedFields);
          } else {
              Translog.Source source = get.source();

              Map<String, GetField> fields = null;
              SearchLookup searchLookup = null;

              // we can only load scripts that can run against the source
              Set<String> neededFields = new HashSet<>();
              // add meta fields
              neededFields.add(RoutingFieldMapper.NAME);
              if (docMapper.parentFieldMapper().active()) {
                  neededFields.add(ParentFieldMapper.NAME);
              }
              if (docMapper.timestampFieldMapper().enabled()) {
                  neededFields.add(TimestampFieldMapper.NAME);
              }
              if (docMapper.TTLFieldMapper().enabled()) {
                  neededFields.add(TTLFieldMapper.NAME);
              }
              // add requested fields
              if (gFields != null) {
                  neededFields.addAll(Arrays.asList(gFields));
              }
              for (String field : neededFields) {
                  if (SourceFieldMapper.NAME.equals(field)) {
                      // dealt with when normalizing fetchSourceContext.
                      continue;
                  }
                  Object value = null;
                  if (field.equals(RoutingFieldMapper.NAME)) {
                      value = source.routing;
                  } else if (field.equals(ParentFieldMapper.NAME) && docMapper.parentFieldMapper().active()) {
                      value = source.parent;
                  } else if (field.equals(TimestampFieldMapper.NAME) && docMapper.timestampFieldMapper().enabled()) {
                      value = source.timestamp;
                  } else if (field.equals(TTLFieldMapper.NAME) && docMapper.TTLFieldMapper().enabled()) {
                      // Call value for search with timestamp + ttl here to display the live remaining ttl value and be consistent with the search result display
                      if (source.ttl > 0) {
                          value = docMapper.TTLFieldMapper().valueForSearch(source.timestamp + source.ttl);
                      }
                  } else {
                      if (searchLookup == null) {
                          searchLookup = new SearchLookup(mapperService, null, new String[]{type});
                          searchLookup.source().setSource(source.source);
                      }

                      FieldMapper fieldMapper = docMapper.mappers().smartNameFieldMapper(field);
                      if (fieldMapper == null) {
                          if (docMapper.objectMappers().get(field) != null) {
                              // Only fail if we know it is a object field, missing paths / fields shouldn't fail.
                              throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
                          }
                      } else if (shouldGetFromSource(ignoreErrorsOnGeneratedFields, docMapper, fieldMapper)) {
                          List<Object> values = searchLookup.source().extractRawValues(field);
                          if (!values.isEmpty()) {
                              for (int i = 0; i < values.size(); i++) {
                                  values.set(i, fieldMapper.fieldType().valueForSearch(values.get(i)));
                              }
                              value = values;
                          }

                      }
                  }
                  if (value != null) {
                      if (fields == null) {
                          fields = newHashMapWithExpectedSize(2);
                      }
                      if (value instanceof List) {
                          fields.put(field, new GetField(field, (List) value));
                      } else {
                          fields.put(field, new GetField(field, Collections.singletonList(value)));
                      }
                  }
              }

              // deal with source, but only if it's enabled (we always have it from the translog)
              BytesReference sourceToBeReturned = null;
              SourceFieldMapper sourceFieldMapper = docMapper.sourceMapper();
              if (fetchSourceContext.fetchSource() && sourceFieldMapper.enabled()) {

                  sourceToBeReturned = source.source;

                  // Cater for source excludes/includes at the cost of performance
                  // We must first apply the field mapper filtering to make sure we get correct results
                  // in the case that the fetchSourceContext white lists something that's not included by the field mapper

                  boolean sourceFieldFiltering = sourceFieldMapper.includes().length > 0 || sourceFieldMapper.excludes().length > 0;
                  boolean sourceFetchFiltering = fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0;
                  if (fetchSourceContext.transformSource() || sourceFieldFiltering || sourceFetchFiltering) {
                      // TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
                      Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source.source, true);
                      XContentType sourceContentType = typeMapTuple.v1();
                      Map<String, Object> sourceAsMap = typeMapTuple.v2();
                      if (fetchSourceContext.transformSource()) {
                          sourceAsMap = docMapper.transformSourceAsMap(sourceAsMap);
                      }
                      if (sourceFieldFiltering) {
                          sourceAsMap = XContentMapValues.filter(sourceAsMap, sourceFieldMapper.includes(), sourceFieldMapper.excludes());
                      }
                      if (sourceFetchFiltering) {
                          sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
                      }
                      try {
                          sourceToBeReturned = XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap).bytes();
                      } catch (IOException e) {
                          throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "] with includes/excludes set", e);
                      }
                  }
              }

              return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), sourceToBeReturned, fields);
          }
      } finally {
          get.release();
      }
  }

      private GetResult innerGetLoadFromStoredFields(String type, String id, String[] gFields, FetchSourceContext fetchSourceContext, Engine.GetResult get, DocumentMapper docMapper, boolean ignoreErrorsOnGeneratedFields) {
          logger.info("ShardGetService.innerGetLoadFromStoredFields");
          Map<String, GetField> fields = null;
          BytesReference source = null;
          Versions.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
          FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields, fetchSourceContext);
          if (fieldVisitor != null) {
              try {
                  docIdAndVersion.context.reader().document(docIdAndVersion.docId, fieldVisitor);
              } catch (IOException e) {
                  throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "]", e);
              }
              source = fieldVisitor.source();

              if (!fieldVisitor.fields().isEmpty()) {
                  fieldVisitor.postProcess(docMapper);
                  fields = new HashMap<>(fieldVisitor.fields().size());
                  for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) {
                      fields.put(entry.getKey(), new GetField(entry.getKey(), entry.getValue()));
                  }
              }
          }

          // now, go and do the script thingy if needed

          if (gFields != null && gFields.length > 0) {
              SearchLookup searchLookup = null;
              for (String field : gFields) {
                  Object value = null;
                  FieldMapper fieldMapper = docMapper.mappers().smartNameFieldMapper(field);
                  if (fieldMapper == null) {
                      if (docMapper.objectMappers().get(field) != null) {
                          // Only fail if we know it is a object field, missing paths / fields shouldn't fail.
                          throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
                      }
                  } else if (!fieldMapper.fieldType().stored() && !fieldMapper.isGenerated()) {
                      if (searchLookup == null) {
                          searchLookup = new SearchLookup(mapperService, null, new String[]{type});
                          LeafSearchLookup leafSearchLookup = searchLookup.getLeafSearchLookup(docIdAndVersion.context);
                          searchLookup.source().setSource(source);
                          leafSearchLookup.setDocument(docIdAndVersion.docId);
                      }

                      List<Object> values = searchLookup.source().extractRawValues(field);
                      if (!values.isEmpty()) {
                          for (int i = 0; i < values.size(); i++) {
                              values.set(i, fieldMapper.fieldType().valueForSearch(values.get(i)));
                          }
                          value = values;
                      }
                  }

                  if (value != null) {
                      if (fields == null) {
                          fields = newHashMapWithExpectedSize(2);
                      }
                      if (value instanceof List) {
                          fields.put(field, new GetField(field, (List) value));
                      } else {
                          fields.put(field, new GetField(field, Collections.singletonList(value)));
                      }
                  }
              }
          }

          if (!fetchSourceContext.fetchSource()) {
              source = null;
          } else if (fetchSourceContext.transformSource() || fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
              Map<String, Object> sourceAsMap;
              XContentType sourceContentType = null;
              // TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
              Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
              sourceContentType = typeMapTuple.v1();
              sourceAsMap = typeMapTuple.v2();
              if (fetchSourceContext.transformSource()) {
                  sourceAsMap = docMapper.transformSourceAsMap(sourceAsMap);
              }
              sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
              try {
                  source = XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap).bytes();
              } catch (IOException e) {
                  throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "] with includes/excludes set", e);
              }
          }

          return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), source, fields);
      }
}

IndexShard.java

다시 IndexShard를 보면 engine().get(get)를 호출한다.

public class IndexShard extends AbstractIndexShardComponent {
  public Engine.GetResult get(Engine.Get get) {
      logger.info("IndexShard.get");
      readAllowed();
      return engine().get(get);
  }
}

InternalEngine.java

InternalEngine.get()을 보면 트랜스로그를 먼저 뒤져보고 트랜스로그에 없으면 직접 문서를 검색한다.

트랜스로그에서 읽었을때는 source를 가져온다.

getFromSearcher()를 통해서는 searcher, 문서 ID, 문서 VERSION만 가져온다.

realtime값은 기본이 true이기 때문에 따로 설정을 바꾼게 아니라면 트랜스로그부터 확인하는 것을 알 수 있다.

public class InternalEngine extends Engine {
    @Override
    public GetResult get(Get get) throws EngineException {
        logger.info("InternalEngine.get");
        try (ReleasableLock lock = readLock.acquire()) {
            ensureOpen();
            if (get.realtime()) {
                VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
                if (versionValue != null) {
                    if (versionValue.delete()) {
                        return GetResult.NOT_EXISTS;
                    }
                    if (get.versionType().isVersionConflictForReads(versionValue.version(), get.version())) {
                        Uid uid = Uid.createUid(get.uid().text());
                        throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), versionValue.version(), get.version());
                    }
                    Translog.Operation op = translog.read(versionValue.translogLocation());
                    if (op != null) {
                        logger.info("translog not null");
                        return new GetResult(true, versionValue.version(), op.getSource());
                    }
                }
            }

            // no version, get the version from the index, we know that we refresh on flush
            return getFromSearcher(get);
        }
    }
}

Engine.java

Engine.getFromSearcher()에서는 Searcher를 통해 문서가 검색되면 문서 ID와 VERSION을 반환한다.

public abstract class Engine implements Closeable {
    final protected GetResult getFromSearcher(Get get) throws EngineException {
        logger.info("Engine.getFromSearcher");
        final Searcher searcher = acquireSearcher("get");
        final Versions.DocIdAndVersion docIdAndVersion;
        try {
            docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
        } catch (Throwable e) {
            Releasables.closeWhileHandlingException(searcher);
            //TODO: A better exception goes here
            throw new EngineException(shardId, "Couldn't resolve version", e);
        }

        if (docIdAndVersion != null) {
            logger.info("docIdAndVersion.version = " + docIdAndVersion.version);
            if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
                Releasables.close(searcher);
                Uid uid = Uid.createUid(get.uid().text());
                throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version());
            }
        }

        if (docIdAndVersion != null) {
            // don't release the searcher on this path, it is the
            // responsibility of the caller to call GetResult.release
            return new GetResult(searcher, docIdAndVersion);
        } else {
            Releasables.close(searcher);
            return GetResult.NOT_EXISTS;
        }
    }
}