개요
Elasticsearch Scroll API를 이용하여 회원키를 추출하는 로직을 테스트 해보았다. 회사 기준 45만건 추출하는데 약 10~15초 정도 소요되었다. 이를 일은 보통 배치성으로 수행하니 Latency에 민감하지는 않고 잘 동작하는것 자체가 의미가 있다고 판단하였다. 다만 scroll API는 Keeping the search context alive의 특성을 타므로 아래의 예제에서 처럼 만료시간을 1분으로 설정했으면 1분내에는 완료를 하기 때문에 위의 소요시간과 비교하여 적절한 튜닝이 쓰는 사람 입장에서는 필요할듯 하다.
유의사항
필자가 Scroll API를 쓰는과정에서 당연히 매번 Scroll ID가 Pagination마다 바뀔것이라고 예상했다.
하지만 그런 개념이 아니라 Scroll ID는 Batch job ID가 하나 발급되는것과 같이 동작하였다. 즉, scrollID는 고정되어있고 scrollId로 Search요청을 보낼때마다 다른 데이터를 내려주었다.
코드
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ScrollTest {
@Autowired
Client client;
@Test
public void test() throws IOException {
String fileName = "test.csv";
BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
long start = System.currentTimeMillis();
TermQueryBuilder tb = QueryBuilders.termQuery("필드명", "필드값");
SearchResponse sr = client.prepareSearch("인덱스명").setTypes("타입명").setScroll(TimeValue.timeValueMinutes(1)).setQuery(tb).setSize(10000)
.addSort(SortBuilders.fieldSort("_doc")).get();
int totalCnt = (int) sr.getHits().totalHits;
int cnt = 0;
for (SearchHit hit : sr.getHits().getHits()) {
if (cnt < totalCnt) {
System.out.println(cnt);
cnt++;
out.write(hit.getId() + "\n");
}
}
while (cnt < totalCnt) {
SearchResponse sr2 = client.prepareSearchScroll(sr.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
for (SearchHit hit : sr2.getHits().getHits()) {
System.out.println(cnt);
cnt++;
out.write(hit.getId() + "\n");
}
}
long end = System.currentTimeMillis() - start;
System.out.println(cnt);
System.out.println("proceeding time = " + end);
out.close();
ClearScrollRequest csr = new ClearScrollRequest();
csr.addScrollId(sr.getScrollId());
client.clearScroll(csr).get();
}
}