개요
Keras에서 대용량 데이터 Batch를 처리하는 방법을 찾아봤는데 깔끔한 이해가 되는 코드나 내용을 찾기가 쉽지 않았다. 또한 keras.utils.Sequence
가 아닌 yield
를 이용한 Generator를 만드는 코드가 많았다.
그러던 중 마음에 드는 외국 블로그 포스트의 주요 내용을 찾아 내용을 번역 및 정리한다.
Motivation
- 대용량 데이터셋 처리에 많은 메모리를 소비해서 Trick을 통해 알아서 매끄럽게 처리가 되길 바란적이 있는가?
- 최근의 SOTA방식의 대용량 데이터 처리는 충분히 효율적임
- 이번시간에는 실시간으로 멀티 코어에서 처리가능한 데이터셋을 생성하고 트레이닝하는 방법을 알아본다
- 이번 튜토리얼은 Keras의 high level 패키지를 사용하여 TF나 Theano 백엔드의 GPU가속을 가정한다.
Previous Situation
-
이글을 읽기전까지는 아마도 케라스 코드를 아래와 같이 작성했을것임
-
이 글은 전체 데이터셋을 한번에 읽어오는것을 바꾸는 방법에 대한 것이다.
-
실제로 한번에 로드하는 것은 메모리가 데이터셋보다 작으면 문제가 생김
import numpy as np
from keras.models import Sequential
# Load entire dataset
X, y = np.load('some_training_set_with_labels.npy')
# Design model
model = Sequential()
[...] # Your architecture
model.compile()
# Train model on your dataset
model.fit(x=X, y=y)
이를 해결하기 위해 스텝 바이 스텝으로 data generator를 반드는 것을 배워보자. 앞으로의 코드는 좋은 skeleton 코드가 되었으면 한다.
Notations (1/3)
- 시작하기 전에 대용량 데이터를 다룰 때 몇가지 유용한 Tip을 정리해보자.
-
ID
는 파이썬 문자열로 주어진 데이터셋을 정의함 -
partion
디렉토리를 만듦partition['train']
은 트레이닝 ID 리스트partition['validation']
은 validation ID 리스트
labels
디렉토리를 만들어 각ID
데이터셋에 상응하는labels[ID]를 지님
Notations (2/3)
- 예를 들어 id-1, id-2, id-3은 training이고 id-4는 validation이면 구성은 아래와 같음
- 거기에 상응하는 labels는 아래와 같음
Notations (3/3)
- 모듈화를 위해 케라스 코드와 커스터마이즈한 클래스를 분리된 파일과 디렉토리로 아래와 같이 구성해보자.
data
디렉토리는 데이터셋을 저장하는 용도
DataGenerator.init()
-
이제 파이썬 클래스
DataGenerator
에 대해 알아보고 Kera 모델에서 어떻게 실시간 데이터 피딩을 하는지 알아보자. -
우선, 클래스의 Initialization부터 알아보자.
- 이것을 나중에
keras.utils.Sequence
를 상속한 후 사용할 예정이다. -
위 클래스를 활용하면 멀티 프로세싱을 손쉽게 이용할 수 있다.
-
추가적인 정보인 dim, n_channel, n_classes, shuffle등을 지정하였음
def __init__(self, list_IDs, labels, batch_size=32, dim=(32,32,32), n_channels=1,
n_classes=10, shuffle=True):
'Initialization'
self.dim = dim
self.batch_size = batch_size
self.labels = labels
self.list_IDs = list_IDs
self.n_channels = n_channels
self.n_classes = n_classes
self.shuffle = shuffle
self.on_epoch_end()
DataGenerator.on_epoch_end()
on_epoch_end
메소드는 각 epoch의 맨처음과 맨 끝에 실행됨shuffle
파라미터가True
이면 각 epoch마다 새로운 order를 만들어냄- 필자주) 코드를 보면 단순 index를 shuffle하는 것임
- shuffle을 통해 각 batch마다 identical한 데이터셋을 학습시키는 것을 방지하여 모델을 좀더 robust하게 만듦
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
DataGenerator.__data_generation()
__data_generation
는 generation process에서 core한 역할인 데이터의 batch를 생성함- data generation동안에 이 코드는
ID.npy
에 상응하는 example를 NumPy 배열로 만들어냄 - 코드가 multicore friendly 하기 때문에 차후에 더 복잡한 연산도 가능하다.(예: source 파일로 부터 계산)
keras.utils.to_categorical
함수를 통해 y에 저장되어 있는 숫자 label을 binary form(예: 6 클래스면 [0 0 1 0 0 0]) 으로 변환
def __data_generation(self, list_IDs_temp):
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
# Initialization
X = np.empty((self.batch_size, *self.dim, self.n_channels))
y = np.empty((self.batch_size), dtype=int)
# Generate data
for i, ID in enumerate(list_IDs_temp):
# Store sample
X[i,] = np.load('data/' + ID + '.npy')
# Store class
y[i] = self.labels[ID]
return X, keras.utils.to_categorical(y, num_classes=self.n_classes)
DataGenerator.len()
- 각 call request는 배치 index 0 ~ 총 batch 크기 만큼 될 수 있다.
- 이부분이
__len__
을 통해 컨트롤 된다.
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.batch_size))
- 통상적인 total batch size는 아래와 같이 정의됨
- #sample size / batch size
- 이를 통해 모델이 트레이닝 데이터를 epoch 한번에 거진 한번을 다 보는 효과를 만들어 줌
DataGenerator.getitem()
- batch 프로세싱이 주어진 index에 따라 호출 될 때 generator는
__getitem__
을 호출함 - 결국 batch size만큼의 entry를 계산해서 리턴해줌
- 예를 들어 batch size가 2이고 index가 10이라면 아래 코드에 의해 indexes에 10, 11이 리턴되고 이에 상응하는 list_IDs[10], list_IDs[11]이 list_IDs_temp에 리턴됨
- 이를 통해
__data_generation(list_IDs_temp)
를 통해 알맞은 X, y가 구해짐
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
X, y = self.__data_generation(list_IDs_temp)
return X, y
전체 코드
- 위의 설명한 내용에 대한 전체 코드는 아래와 같다.
import numpy as np
import keras
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, batch_size=32, dim=(32,32,32), n_channels=1,
n_classes=10, shuffle=True):
'Initialization'
self.dim = dim
self.batch_size = batch_size
self.labels = labels
self.list_IDs = list_IDs
self.n_channels = n_channels
self.n_classes = n_classes
self.shuffle = shuffle
self.on_epoch_end()
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.batch_size))
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
X, y = self.__data_generation(list_IDs_temp)
return X, y
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
def __data_generation(self, list_IDs_temp):
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
# Initialization
X = np.empty((self.batch_size, *self.dim, self.n_channels))
y = np.empty((self.batch_size), dtype=int)
# Generate data
for i, ID in enumerate(list_IDs_temp):
# Store sample
X[i,] = np.load('data/' + ID + '.npy')
# Store class
y[i] = self.labels[ID]
return X, keras.utils.to_categorical(y, num_classes=self.n_classes)
Keras Script
- 이제 위의
DataGenerator
를 통해 기존 Keras 코드를 수정해보자. model.fit()
대신에model.fit_generator()
를 호출하였는데n_workers
에 따라 batch가 병렬로 실행 가능하다.- 충분히 많은 worker는 CPU 연산을 효율적으로 관리함
- 이를 통해 적어도 병목이 CPU가 아니라 feed forward/back prop 과정에서의 GPU가 되게끔 만든다.
(필자주: 적어도 CPU 책임은 아니게 만든다는 뜻)
import numpy as np
from keras.models import Sequential
from my_classes import DataGenerator
# Parameters
params = {'dim': (32,32,32),
'batch_size': 64,
'n_classes': 6,
'n_channels': 1,
'shuffle': True}
# Datasets
partition = # IDs
labels = # Labels
# Generators
training_generator = DataGenerator(partition['train'], labels, **params)
validation_generator = DataGenerator(partition['validation'], labels, **params)
# Design model
model = Sequential()
[...] # Architecture
model.compile()
# Train model on dataset
model.fit_generator(generator=training_generator,
validation_data=validation_generator,
use_multiprocessing=True,
workers=6)