개요

해당 글은 Java Application으로써 엘라스틱서치(이하 ES)가 어떻게 구동이 되는지 소스코드 레벨에서 따라가 보는것을 목적으로 한다.

또한 ES가 구동되는 내용이 매우 방대하기 때문에 ES가 구동되어 Http Server를 띄워 curl http://localhost:9200/ 호출시 서비스가 어떻게 동작할 수 있는지를 확인해 본다.

글을 시작점부터 보고 싶으면 링크를 참고 바란다.

글은 IDE에서 시작점(Entry Point)부터 따라가본 소스파일을 순서대로 기재하고 필요하다고 생각하는 커멘트를 추가한다.

글의 길이를 효율적으로 유지하기 위해 소스코드의 패키지명, import, 불필요 메서드는 삭제했다. 원본 소스코드를 보고 싶으면 시작글을 참고 바란다.

Elasticsearch.java

ES를 구동하는 소스코드의 시작점이 이 소스이다! 단순하게도 Elasticsearch.java이다.

기본 Java Application 처럼 static void main으로 실행을 하며 여기의 Bootstrap.init(args)를 실행하는 것이 포인트다.

public final class Elasticsearch {

    /** no instantiation */
    private Elasticsearch() {}

    private static final ESLogger logger = Loggers.getLogger(Elasticsearch.class);

    /**
     * Main entry point for starting elasticsearch
     */
    public static void main(String[] args) throws StartupError {
    try {
        Bootstrap.init(args);
    } catch (Throwable t) {
        // format exceptions to the console in a special way
        // to avoid 2MB stacktraces from guice, etc.
        throw new StartupError(t);
    }
    }

    /**
     * Required method that's called by Apache Commons procrun when
     * running as a service on Windows, when the service is stopped.
     *
     * http://commons.apache.org/proper/commons-daemon/procrun.html
     *
     * NOTE: If this method is renamed and/or moved, make sure to update service.bat!
     */
    static void close(String[] args) {
    Bootstrap.stop();
    }
}

Bootstrap.java

Bootstrap.init()에서는 커맨드라인과 설정파일을 통해 얻은 정보를 파싱하여 인스턴스를 시작하는게 내용이다.

내부적으로 다시 Bootstrap.start()를 호출하는데 이를 통해 Node를 시작한다.

final class Bootstrap {

static void init(String[] args) throws Throwable {
    // Set the system property before anything has a chance to trigger its use
    System.setProperty("es.logger.prefix", "");

    BootstrapCLIParser bootstrapCLIParser = new BootstrapCLIParser();
    CliTool.ExitStatus status = bootstrapCLIParser.execute(args);

    if (CliTool.ExitStatus.OK != status) {
        exit(status.status());
    }

    INSTANCE = new Bootstrap();

    boolean foreground = !"false".equals(System.getProperty("es.foreground", System.getProperty("es-foreground")));
    // handle the wrapper system property, if its a service, don't run as a service
    if (System.getProperty("wrapper.service", "XXX").equalsIgnoreCase("true")) {
        foreground = false;
    }

    Environment environment = initialSettings(foreground);
    Settings settings = environment.settings();
    LogConfigurator.configure(settings, true);
    checkForCustomConfFile();

    if (environment.pidFile() != null) {
        PidFile.create(environment.pidFile(), true);
    }

    if (System.getProperty("es.max-open-files", "false").equals("true")) {
        ESLogger logger = Loggers.getLogger(Bootstrap.class);
        logger.info("max_open_files [{}]", ProcessProbe.getInstance().getMaxFileDescriptorCount());
    }

    // warn if running using the client VM
    if (JvmInfo.jvmInfo().getVmName().toLowerCase(Locale.ROOT).contains("client")) {
        ESLogger logger = Loggers.getLogger(Bootstrap.class);
        logger.warn("jvm uses the client vm, make sure to run `java` with the server vm for best performance by adding `-server` to the command line");
    }

    try {
        if (!foreground) {
            Loggers.disableConsoleLogging();
            closeSystOut();
        }

        // fail if using broken version
        JVMCheck.check();

        INSTANCE.setup(true, settings, environment);

        INSTANCE.start();

        if (!foreground) {
            closeSysError();
        }
    } catch (Throwable e) {
        // disable console logging, so user does not see the exception twice (jvm will show it already)
        if (foreground) {
            Loggers.disableConsoleLogging();
        }
        ESLogger logger = Loggers.getLogger(Bootstrap.class);
        if (INSTANCE.node != null) {
            logger = Loggers.getLogger(Bootstrap.class, INSTANCE.node.settings().get("name"));
        }
        // HACK, it sucks to do this, but we will run users out of disk space otherwise
        if (e instanceof CreationException) {
            // guice: log the shortened exc to the log file
            ByteArrayOutputStream os = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(os, false, "UTF-8");
            new StartupError(e).printStackTrace(ps);
            ps.flush();
            logger.error("Guice Exception: {}", os.toString("UTF-8"));
        } else {
            // full exception
            logger.error("Exception", e);
        }
        // re-enable it if appropriate, so they can see any logging during the shutdown process
        if (foreground) {
            Loggers.enableConsoleLogging();
        }

        throw e;
    }
}

private void start() {
    node.start();
    keepAliveThread.start();
}

}

Node.java

Node에서는 크게 볼게 2가지가 있다. 생성자 부분과 Node.start() 부분이다.

생성자에서는 Google Guice를 이용하여 필요한 Module들을 생성하고 있다. 또한 각 Module들에서는 AbstractModule.configure()를 override하여 관련있는 다른 클래스들을 싱글턴으로 binding한다.

Node.start()에서는 필요한 서비스들을 start하는 부분이 많이 보이는데 이중에서 **injector.getInstance(HttpServer.class).start();를 통해 ES가 Http Service를 시작하는 것을 확인 할 수 있다.

public class Node implements Releasable {

    protected Node(Environment tmpEnv, Version version, Collection<Class<? extends Plugin>> classpathPlugins) {
    Settings tmpSettings = settingsBuilder().put(tmpEnv.settings())
            .put(Client.CLIENT_TYPE_SETTING, CLIENT_TYPE).build();
    tmpSettings = TribeService.processSettings(tmpSettings);

    ESLogger logger = Loggers.getLogger(Node.class, tmpSettings.get("name"));
    logger.info("version[{}], pid[{}], build[{}/{}]", version, JvmInfo.jvmInfo().pid(), Build.CURRENT.hashShort(), Build.CURRENT.timestamp());

    logger.info("initializing ...");

    if (logger.isDebugEnabled()) {
        logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
                tmpEnv.configFile(), Arrays.toString(tmpEnv.dataFiles()), tmpEnv.logsFile(), tmpEnv.pluginsFile());
    }

    this.pluginsService = new PluginsService(tmpSettings, tmpEnv.modulesFile(), tmpEnv.pluginsFile(), classpathPlugins);
    this.settings = pluginsService.updatedSettings();
    // create the environment based on the finalized (processed) view of the settings
    this.environment = new Environment(this.settings());

    final NodeEnvironment nodeEnvironment;
    try {
        nodeEnvironment = new NodeEnvironment(this.settings, this.environment);
    } catch (IOException ex) {
        throw new IllegalStateException("Failed to created node environment", ex);
    }

    final ThreadPool threadPool = new ThreadPool(settings);
    NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();

    boolean success = false;
    try {
        ModulesBuilder modules = new ModulesBuilder();
        modules.add(new Version.Module(version));
        modules.add(new CircuitBreakerModule(settings));
        // plugin modules must be added here, before others or we can get crazy injection errors...
        for (Module pluginModule : pluginsService.nodeModules()) {
            modules.add(pluginModule);
        }
        modules.add(new PluginsModule(pluginsService));
        modules.add(new SettingsModule(this.settings));
        modules.add(new NodeModule(this));
        modules.add(new NetworkModule(namedWriteableRegistry));
        modules.add(new ScriptModule(this.settings));
        modules.add(new EnvironmentModule(environment));
        modules.add(new NodeEnvironmentModule(nodeEnvironment));
        modules.add(new ClusterNameModule(this.settings));
        modules.add(new ThreadPoolModule(threadPool));
        modules.add(new DiscoveryModule(this.settings));
        modules.add(new ClusterModule(this.settings));
        modules.add(new RestModule(this.settings));
        modules.add(new TransportModule(settings, namedWriteableRegistry));
        if (settings.getAsBoolean(HTTP_ENABLED, true)) {
            modules.add(new HttpServerModule(settings));
        }
        modules.add(new IndicesModule());
        modules.add(new SearchModule());
        modules.add(new ActionModule(false));
        modules.add(new MonitorModule(settings));
        modules.add(new GatewayModule(settings));
        modules.add(new NodeClientModule());
        modules.add(new ShapeModule());
        modules.add(new PercolatorModule());
        modules.add(new ResourceWatcherModule());
        modules.add(new RepositoriesModule());
        modules.add(new TribeModule());


        pluginsService.processModules(modules);

        injector = modules.createInjector();

        client = injector.getInstance(Client.class);
        threadPool.setNodeSettingsService(injector.getInstance(NodeSettingsService.class));
        success = true;
    } finally {
        if (!success) {
            nodeEnvironment.close();
            ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
        }
    }

    logger.info("initialized");
    }

    /**
     * Start the node. If the node is already started, this method is no-op.
     */
    public Node start() {
    if (!lifecycle.moveToStarted()) {
        return this;
    }

    ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
    logger.info("starting ...");
    // hack around dependency injection problem (for now...)
    injector.getInstance(Discovery.class).setRoutingService(injector.getInstance(RoutingService.class));
    for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) {
        injector.getInstance(plugin).start();
    }

    injector.getInstance(MappingUpdatedAction.class).setClient(client);
    injector.getInstance(IndicesService.class).start();
    injector.getInstance(IndexingMemoryController.class).start();
    injector.getInstance(IndicesClusterStateService.class).start();
    injector.getInstance(IndicesTTLService.class).start();
    injector.getInstance(SnapshotsService.class).start();
    injector.getInstance(SnapshotShardsService.class).start();
    injector.getInstance(RoutingService.class).start();
    injector.getInstance(SearchService.class).start();
    injector.getInstance(MonitorService.class).start();
    injector.getInstance(RestController.class).start();

    // TODO hack around circular dependencies problems
    injector.getInstance(GatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(RoutingService.class));

    injector.getInstance(ResourceWatcherService.class).start();
    injector.getInstance(GatewayService.class).start();

    // Start the transport service now so the publish address will be added to the local disco node in ClusterService
    TransportService transportService = injector.getInstance(TransportService.class);
    transportService.start();
    injector.getInstance(ClusterService.class).start();

    // start after cluster service so the local disco is known
    DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();


    transportService.acceptIncomingRequests();
    discoService.joinClusterAndWaitForInitialState();

    if (settings.getAsBoolean("http.enabled", true)) {
        injector.getInstance(HttpServer.class).start();
    }
    injector.getInstance(TribeService.class).start();
    if (settings.getAsBoolean("node.portsfile", false)) {
        if (settings.getAsBoolean("http.enabled", true)) {
            HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
            writePortsFile("http", http.boundAddress());
        }
        TransportService transport = injector.getInstance(TransportService.class);
        writePortsFile("transport", transport.boundAddress());
    }
    logger.info("started");

    return this;
    }
}

HttpServer.java

HttpServer.java에서는 생성자 생성시 HttpServer의 request dispatch를 transport.httpServerAdapter(new Dispatcher(this))를 통해 Dispatcher에게 일임함을 알 수 있다.

HttpServer.start()를 통해 Http 서버를 시작한다. 이부분은 Node.start()를 통해 호출되었다.

HttpServer.Dispatcher.dispatchRequest()HttpServer.internalDispatchRequest()를 호출하고 여기서는 restController를 통해 dispatch한다. 즉 restController.dispatchRequest(request, responseChannel)를 호출한다.

public class HttpServer extends AbstractLifecycleComponent<HttpServer> {

@Inject
public HttpServer(Settings settings, Environment environment, HttpServerTransport transport, RestController restController, NodeService nodeService,
                  CircuitBreakerService circuitBreakerService) {
    super(settings);
    this.environment = environment;
    this.transport = transport;
    this.restController = restController;
    this.nodeService = nodeService;
    this.circuitBreakerService = circuitBreakerService;
    nodeService.setHttpServer(this);

    this.disableSites = this.settings.getAsBoolean("http.disable_sites", false);

    transport.httpServerAdapter(new Dispatcher(this));
}

static class Dispatcher implements HttpServerAdapter {

    private final HttpServer server;

    Dispatcher(HttpServer server) {
        this.server = server;
    }

    @Override
    public void dispatchRequest(RestRequest request, RestChannel channel) {
        server.internalDispatchRequest(request, channel);
    }
}

@Override
protected void doStart() {
    transport.start();
    if (logger.isInfoEnabled()) {
        logger.info("{}", transport.boundAddress());
    }
    nodeService.putAttribute("http_address", transport.boundAddress().publishAddress().toString());
}

public void internalDispatchRequest(final RestRequest request, final RestChannel channel) {
    String rawPath = request.rawPath();
    if (rawPath.startsWith("/_plugin/")) {
        RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);
        filterChain.continueProcessing(request, channel);
        return;
    } else if (rawPath.equals("/favicon.ico")) {
        handleFavicon(request, channel);
        return;
    }
    RestChannel responseChannel = channel;
    try {
        int contentLength = request.content().length();
        if (restController.canTripCircuitBreaker(request)) {
            inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
        } else {
            inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
        }
        // iff we could reserve bytes for the request we need to send the response also over this channel
        responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService);
        restController.dispatchRequest(request, responseChannel);
    } catch (Throwable t) {
        restController.sendErrorResponse(request, responseChannel, t);
    }
}

}

RestController

RestController.dispatchRequest()RestController.executeHandler()를 실행한다.

RestController.executeHandler()내부의 **handler.handleRequest(request, channel)를 통해 추상화 클래스 BaseRestHandler.java를 상속한 RestAction.java류의 클래스 중 하나에 request uri에 맡게끔 맵핑된다.

“curl http://localhost:9200″의 경우에는 RestMainAction.java가 바인딩 된다. 검증을 위해 logger로 로깅을 찍어보면 아래와 같은 로그 메세지를 확인 할 수 있다.

[2016-11-30 11:52:00,134][INFO ][org.elasticsearch.rest ] [티파니] handler = org.elasticsearch.rest.action.main.RestMainAction@5a09ee63

public class RestController extends AbstractLifecycleComponent<RestController> {

    public void dispatchRequest(final RestRequest request, final RestChannel channel) throws Exception {
    if (!checkRequestParameters(request, channel)) {
        return;
    }

    if (filters.length == 0) {
        executeHandler(request, channel);
    } else {
        ControllerFilterChain filterChain = new ControllerFilterChain(handlerFilter);
        filterChain.continueProcessing(request, channel);
    }
    }

    void executeHandler(RestRequest request, RestChannel channel) throws Exception {
    final RestHandler handler = getHandler(request);
    logger.info("handler = " + handler.toString());
    if (handler != null) {
        handler.handleRequest(request, channel);
    } else {
        if (request.method() == RestRequest.Method.OPTIONS) {
            // when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added)
            channel.sendResponse(new BytesRestResponse(OK));
        } else {
            channel.sendResponse(new BytesRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
        }
    }
    }

    private RestHandler getHandler(RestRequest request) {
    String path = getPath(request);
    PathTrie<RestHandler> handlers = getHandlersForMethod(request.method());
    if (handlers != null) {
        return handlers.retrieve(path, request.params());
    } else {
        return null;
    }
    }
}

BaseRestHandler.java

BaseRestHandler.handleRequest()가 실행되면 내부의 handleRequest() 추상 메서드가 실행되고 즉 BaseRestHandler를 상속받은 연관있는 클래스의 handleRequest가 실행된다는 것을 알 수 있다.

public abstract class BaseRestHandler extends AbstractComponent implements RestHandler {

    @Override
    public final void handleRequest(RestRequest request, RestChannel channel) throws Exception {
    handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, controller.relevantHeaders()));
    }

    protected abstract void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception;
}

RestMainAction.java

끝으로 RestMainAction을 살펴보면 생성자에서 RestController에 “/” Request가 Mapping되어 있는 것을 알 수 있고, RestMainAction.handleRequest 를 통해 필요한 정보들을 구성해 response로 리턴하는 것을 확인 할 수 있다.

public class RestMainAction extends BaseRestHandler {
    private final Version version;
    private final ClusterName clusterName;
    private final ClusterService clusterService;

    @Inject
    public RestMainAction(Settings settings, Version version, RestController controller, ClusterName clusterName, Client client, ClusterService clusterService) {
    super(settings, controller, client);
    this.version = version;
    this.clusterName = clusterName;
    this.clusterService = clusterService;
    controller.registerHandler(GET, "/", this);
    controller.registerHandler(HEAD, "/", this);
    }

    @Override
    public void handleRequest(final RestRequest request, RestChannel channel, final Client client) throws Exception {

    RestStatus status = RestStatus.OK;
    ClusterState clusterState = clusterService.state();
    if (clusterState.blocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE)) {
        status = RestStatus.SERVICE_UNAVAILABLE;
    }
    if (request.method() == RestRequest.Method.HEAD) {
        channel.sendResponse(new BytesRestResponse(status));
        return;
    }

    XContentBuilder builder = channel.newBuilder();

    // Default to pretty printing, but allow ?pretty=false to disable
    if (!request.hasParam("pretty")) {
        builder.prettyPrint().lfAtEnd();
    }

    builder.startObject();
    if (settings.get("name") != null) {
        builder.field("name", settings.get("name"));
    }
    builder.field("cluster_name", clusterName.value());
    builder.field("cluster_uuid", clusterState.metaData().clusterUUID());
    builder.startObject("version")
            .field("number", version.number())
            .field("build_hash", Build.CURRENT.hash())
            .field("build_timestamp", Build.CURRENT.timestamp())
            .field("build_snapshot", version.snapshot)
            .field("lucene_version", ve이rsion.luceneVersion.toString())
            .endObject();
    builder.field("tagline", "You Know, for Search");
    builder.endObject();

    channel.sendResponse(new BytesRestResponse(status, builder));
    }
}

Response Message

Curl 실행시 Response 메세지는 아래와 같고 RestMainAction.java에서 리턴한 정보들이다.

$ curl localhost:9200
{
  "name" : "티파니",
  "cluster_name" : "apt4you",
  "cluster_uuid" : "fYcN-fmYSiiVkqlyjPfKvw",
  "version" : {
    "number" : "2.4.1",
    "build_hash" : "",
    "build_timestamp" : "NA",
    "build_snapshot" : false,
    "lucene_version" : "5.5.2"
  },
  "tagline" : "You Know, for Search"
}