개요
- 지난 포스팅에 이어 ES의 조회(GET) 과정의 소스코드를 분석해본다.
- HTTP 방식으로 색인 요청이 들어왔을 때 처리 과정을 소스코드로 기술한다.
- 글의 시작점은 링크이다.
GET Flow
문서 조회의 Flow는 그림을 통해 잘 이해할 수 있을듯 하다. 자세한 배경지식은 링크 참고 바란다.
코드분석
RestGetAction.java
아래와 같이 RestGetAction에서는 /{index}/{type}/{id} 패턴에 대해 REST Request를 받을 수 있게 핸들러가 등록되어 있다.
handleRequest에서는 GetRequest를 생성하여 client.get()을 호출한다.
public class RestGetAction extends BaseRestHandler {
@Inject
public RestGetAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(GET, "/{index}/{type}/{id}", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
logger.info("RestGetAction.handleRequest");
final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
getRequest.operationThreaded(true);
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing
getRequest.parent(request.param("parent"));
getRequest.preference(request.param("preference"));
getRequest.realtime(request.paramAsBoolean("realtime", null));
getRequest.ignoreErrorsOnGeneratedFields(request.paramAsBoolean("ignore_errors_on_generated_fields", false));
String sField = request.param("fields");
if (sField != null) {
String[] sFields = Strings.splitStringByCommaToArray(sField);
if (sFields != null) {
getRequest.fields(sFields);
}
}
getRequest.version(RestActions.parseVersion(request));
getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));
getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));
client.get(getRequest, new RestBuilderListener<GetResponse>(channel) {
@Override
public RestResponse buildResponse(GetResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContent(builder, request);
builder.endObject();
if (!response.isExists()) {
return new BytesRestResponse(NOT_FOUND, builder);
} else {
return new BytesRestResponse(OK, builder);
}
}
});
}
}
AbstractClient.java
GetAction.INSTANCE는 new GetAction() 이다.
이부분 코드의 흐름을 설명하자면 AbstractClient.get()에서 AbstractClient.execute()를 호출한다.
AbstractClient.execute()에서는 doExecute()를 호출하는데 이는 NodeClient일수도 있고 TransportClient일수도 있다. 현재 예제에서는 NodeClient이므로 NodeClient 소스로 이동한다.
public abstract class AbstractClient extends AbstractComponent implements Client {
@Override
public void get(final GetRequest request, final ActionListener<GetResponse> listener) {
execute(GetAction.INSTANCE, request, listener);
}
@Override
public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
logger.info("AbstractClient.execute");
headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
doExecute(action, request, listener);
}
}
NodeClient.java
NodeClient.doExecute()를 보면 결국 TransportAction.execute를 호출한다.
TransportAction<Request, Response> transportAction = actions.get(action); 이부분이 중요한데 TransportAction은 추상화 클래스로써 actions.get(action)로써 실제 상속받은 클래스의 인스턴스를 가져온다.
이를 위해 필요한 설명인 ActionModule을 아래에 기술한다.
위의 원리에 의해 여기서 transportAction.execute(request, listener);는 TransportGetAction.execute(request, listener);로 볼 수 있다.
또한 TransportGetAction.execute(request, listener)의 코드 위치는 부모 클래스인 TransportAction에 존재한다.
public class NodeClient extends AbstractClient {
private final ImmutableMap<GenericAction, TransportAction> actions;
@Inject
public NodeClient(Settings settings, ThreadPool threadPool, Headers headers, Map<GenericAction, TransportAction> actions) {
super(settings, threadPool, headers);
this.actions = ImmutableMap.copyOf(actions);
}
@Override
public void close() {
// nothing really to do
}
@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
logger.info("NodeClient.doExecute " + action.name());
TransportAction<Request, Response> transportAction = actions.get(action);
if (transportAction == null) {
throw new IllegalStateException("failed to find action [" + action + "] to execute");
}
transportAction.execute(request, listener);
}
}
ActionModule.java
TransportAction은 추상화 클래스로써 action에 맞는 상속 클래스들을 가지고 있다.
ActionModule.configure()를 보면 GetAction.INSTANCE은 TransportGetAction이 바인딩 되어 있다.
즉, NodeClient.doExecute() 호출 시 결과적으로 TransportGetAction.execute()가 호출된다는 것을 알 수 있다.
public class ActionModule extends AbstractModule {
@Override
protected void configure() {
// 생략
registerAction(GetAction.INSTANCE, TransportGetAction.class);
// 생략
}
}
TransportAction.java
TransportAction.execute()를 따라가보면 결국 TransportAction.doExecute(task, request, listener)를 호출한다.
그런데 여기서 TransportAction.doExecute()는 추상화 클래스인 TransportAction에 추상화 메서드만 있고 실제 구현체는 이를 상속받은 다양한 클래스들에 존재한다.
public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
public final Task execute(Request request, final ActionListener<Response> listener) {
logger.info("TransportAction.execute");
/*
* While this version of execute could delegate to the TaskListener version of execute that'd add yet another layer of wrapping on
* the listener and prevent us from using the listener bare if there isn't a task. That just seems like too many objects. Thus the
* two versions of this method.
*/
final Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
} else {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}
@Override
public void onFailure(Throwable e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
}
return task;
}
public final void execute(Task task, Request request, ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
if (filters.length == 0) {
try {
doExecute(task, request, listener);
} catch(Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
}
} else {
RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(task, actionName, request, listener);
}
}
}
TransportSingleShardAction.java
갑자기 언급한 적도 없는 클래스가 등장하여 당황스러울 수 있다.
이는 TransportGetAction 클래스가 TransportSingleShardAction를 상속받았고 doExecute(request, listener)경우 코드 구현체가 TransportSingleShardAction에 위치하기 때문에 이 클래스를 살펴보아야 한다.
TransportSingleShardAction은 TransportGetAction을 비롯한 단일 샤드에서 읽기 연산을 수행하기 위해 필요한 연산을 모아놓은 추상화 클래스이다.
/**
* 단일 샤드에서 읽기 연산을 수행하기 위해 필요한 연산을 모아놓은 추상화 클래스
* 만약 연산이 실패하면 다른 샤드 카피에서 연산이 수행될수 있다.
*/
public abstract class TransportSingleShardAction<Request extends SingleShardRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start();
}
class AsyncSingleAction {
private final ActionListener<Response> listener;
private final ShardsIterator shardIt;
private final InternalRequest internalRequest;
private final DiscoveryNodes nodes;
private volatile Throwable lastFailure;
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
logger.info("TransportSingleShardAction.AsyncSingleAction.AsyncSingleAction");
this.listener = listener;
ClusterState clusterState = clusterService.state();
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
}
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}
String concreteSingleIndex;
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request);
} else {
concreteSingleIndex = request.index();
}
this.internalRequest = new InternalRequest(request, concreteSingleIndex);
resolveRequest(clusterState, internalRequest);
blockException = checkRequestBlock(clusterState, internalRequest);
if (blockException != null) {
throw blockException;
}
this.shardIt = shards(clusterState, internalRequest);
}
public void start() {
if (shardIt == null) {
logger.info("shardIt null");
// just execute it on the local node
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
} else {
logger.info("shardIt not null");
perform(null);
}
}
private void onFailure(ShardRouting shardRouting, Throwable e) {
if (logger.isTraceEnabled() && e != null) {
logger.trace("{}: failed to execute [{}]", e, shardRouting, internalRequest.request());
}
perform(e);
}
private void perform(@Nullable final Throwable currentFailure) {
Throwable lastFailure = this.lastFailure;
if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting == null) {
Throwable failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
} else {
if (logger.isDebugEnabled()) {
logger.debug("{}: failed to execute [{}]", failure, null, internalRequest.request());
}
}
listener.onFailure(failure);
return;
}
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
internalRequest.request().internalShardId = shardRouting.shardId();
if (logger.isTraceEnabled()) {
logger.trace(
"sending request [{}] to shard [{}] on node [{}]",
internalRequest.request(),
internalRequest.request().internalShardId,
node
);
}
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}
});
}
}
}
}
To Be Continued
2부에서 이어간다.