개요

Keras에서 대용량 데이터 Batch를 처리하는 방법을 찾아봤는데 깔끔한 이해가 되는 코드나 내용을 찾기가 쉽지 않았다. 또한 keras.utils.Sequence가 아닌 yield를 이용한 Generator를 만드는 코드가 많았다.

그러던 중 마음에 드는 외국 블로그 포스트의 주요 내용을 찾아 내용을 번역 및 정리한다.


Motivation

  • 대용량 데이터셋 처리에 많은 메모리를 소비해서 Trick을 통해 알아서 매끄럽게 처리가 되길 바란적이 있는가?
  • 최근의 SOTA방식의 대용량 데이터 처리는 충분히 효율적임
  • 이번시간에는 실시간으로 멀티 코어에서 처리가능한 데이터셋을 생성하고 트레이닝하는 방법을 알아본다
  • 이번 튜토리얼은 Keras의 high level 패키지를 사용하여 TF나 Theano 백엔드의 GPU가속을 가정한다.

Previous Situation

  • 이글을 읽기전까지는 아마도 케라스 코드를 아래와 같이 작성했을것임

  • 이 글은 전체 데이터셋을 한번에 읽어오는것을 바꾸는 방법에 대한 것이다.

  • 실제로 한번에 로드하는 것은 메모리가 데이터셋보다 작으면 문제가 생김

import numpy as np
from keras.models import Sequential

# Load entire dataset
X, y = np.load('some_training_set_with_labels.npy')

# Design model
model = Sequential()
[...] # Your architecture
model.compile()

# Train model on your dataset
model.fit(x=X, y=y)

이를 해결하기 위해 스텝 바이 스텝으로 data generator를 반드는 것을 배워보자. 앞으로의 코드는 좋은 skeleton 코드가 되었으면 한다.


Notations (1/3)

  • 시작하기 전에 대용량 데이터를 다룰 때 몇가지 유용한 Tip을 정리해보자.
  • ID는 파이썬 문자열로 주어진 데이터셋을 정의함

  • partion 디렉토리를 만듦

    • partition['train']은 트레이닝 ID 리스트
    • partition['validation']은 validation ID 리스트
  • labels 디렉토리를 만들어 각 ID 데이터셋에 상응하는 labels[ID]를 지님

Notations (2/3)

  • 예를 들어 id-1, id-2, id-3은 training이고 id-4는 validation이면 구성은 아래와 같음
>>> partition
{'train': ['id-1', 'id-2', 'id-3'], 'validation': ['id-4']}
  • 거기에 상응하는 labels는 아래와 같음
>>> labels
{'id-1': 0, 'id-2': 1, 'id-3': 2, 'id-4': 1}

Notations (3/3)

  • 모듈화를 위해 케라스 코드와 커스터마이즈한 클래스를 분리된 파일과 디렉토리로 아래와 같이 구성해보자.
  • data 디렉토리는 데이터셋을 저장하는 용도
folder/
├── my_classes.py
├── keras_script.py
└── data/

DataGenerator.init()

  • 이제 파이썬 클래스 DataGenerator에 대해 알아보고 Kera 모델에서 어떻게 실시간 데이터 피딩을 하는지 알아보자.

  • 우선, 클래스의 Initialization부터 알아보자.

  • 이것을 나중에 keras.utils.Sequence를 상속한 후 사용할 예정이다.
  • 위 클래스를 활용하면 멀티 프로세싱을 손쉽게 이용할 수 있다.

  • 추가적인 정보인 dim, n_channel, n_classes, shuffle등을 지정하였음

def __init__(self, list_IDs, labels, batch_size=32, dim=(32,32,32), n_channels=1,
             n_classes=10, shuffle=True):
    'Initialization'
    self.dim = dim
    self.batch_size = batch_size
    self.labels = labels
    self.list_IDs = list_IDs
    self.n_channels = n_channels
    self.n_classes = n_classes
    self.shuffle = shuffle
    self.on_epoch_end()

DataGenerator.on_epoch_end()

  • on_epoch_end 메소드는 각 epoch의 맨처음과 맨 끝에 실행됨
  • shuffle 파라미터가 True이면 각 epoch마다 새로운 order를 만들어냄
  • 필자주) 코드를 보면 단순 index를 shuffle하는 것임
  • shuffle을 통해 각 batch마다 identical한 데이터셋을 학습시키는 것을 방지하여 모델을 좀더 robust하게 만듦
def on_epoch_end(self):
  'Updates indexes after each epoch'
  self.indexes = np.arange(len(self.list_IDs))
  if self.shuffle == True:
      np.random.shuffle(self.indexes)

DataGenerator.__data_generation()

  • __data_generation는 generation process에서 core한 역할인 데이터의 batch를 생성함
  • data generation동안에 이 코드는 ID.npy에 상응하는 example를 NumPy 배열로 만들어냄
  • 코드가 multicore friendly 하기 때문에 차후에 더 복잡한 연산도 가능하다.(예: source 파일로 부터 계산)
  • keras.utils.to_categorical 함수를 통해 y에 저장되어 있는 숫자 label을 binary form(예: 6 클래스면 [0 0 1 0 0 0]) 으로 변환
def __data_generation(self, list_IDs_temp):
  'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
  # Initialization
  X = np.empty((self.batch_size, *self.dim, self.n_channels))
  y = np.empty((self.batch_size), dtype=int)

  # Generate data
  for i, ID in enumerate(list_IDs_temp):
      # Store sample
      X[i,] = np.load('data/' + ID + '.npy')

      # Store class
      y[i] = self.labels[ID]

  return X, keras.utils.to_categorical(y, num_classes=self.n_classes)

DataGenerator.len()

  • 각 call request는 배치 index 0 ~ 총 batch 크기 만큼 될 수 있다.
  • 이부분이 __len__을 통해 컨트롤 된다.
def __len__(self):
  'Denotes the number of batches per epoch'
  return int(np.floor(len(self.list_IDs) / self.batch_size))

  • 통상적인 total batch size는 아래와 같이 정의됨
  • #sample size / batch size
  • 이를 통해 모델이 트레이닝 데이터를 epoch 한번에 거진 한번을 다 보는 효과를 만들어 줌

DataGenerator.getitem()

  • batch 프로세싱이 주어진 index에 따라 호출 될 때 generator는 __getitem__을 호출함
  • 결국 batch size만큼의 entry를 계산해서 리턴해줌
  • 예를 들어 batch size가 2이고 index가 10이라면 아래 코드에 의해 indexes에 10, 11이 리턴되고 이에 상응하는 list_IDs[10], list_IDs[11]이 list_IDs_temp에 리턴됨
  • 이를 통해 __data_generation(list_IDs_temp)를 통해 알맞은 X, y가 구해짐
def __getitem__(self, index):
  'Generate one batch of data'
  # Generate indexes of the batch
  indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

  # Find list of IDs
  list_IDs_temp = [self.list_IDs[k] for k in indexes]

  # Generate data
  X, y = self.__data_generation(list_IDs_temp)

  return X, y

전체 코드

  • 위의 설명한 내용에 대한 전체 코드는 아래와 같다.
import numpy as np
import keras

class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, labels, batch_size=32, dim=(32,32,32), n_channels=1,
                 n_classes=10, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.labels = labels
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty((self.batch_size), dtype=int)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            X[i,] = np.load('data/' + ID + '.npy')

            # Store class
            y[i] = self.labels[ID]

        return X, keras.utils.to_categorical(y, num_classes=self.n_classes)

Keras Script

  • 이제 위의 DataGenerator를 통해 기존 Keras 코드를 수정해보자.
  • model.fit() 대신에 model.fit_generator()를 호출하였는데 n_workers에 따라 batch가 병렬로 실행 가능하다.
  • 충분히 많은 worker는 CPU 연산을 효율적으로 관리함
  • 이를 통해 적어도 병목이 CPU가 아니라 feed forward/back prop 과정에서의 GPU가 되게끔 만든다.
    (필자주: 적어도 CPU 책임은 아니게 만든다는 뜻)
import numpy as np

from keras.models import Sequential
from my_classes import DataGenerator

# Parameters
params = {'dim': (32,32,32),
          'batch_size': 64,
          'n_classes': 6,
          'n_channels': 1,
          'shuffle': True}

# Datasets
partition = # IDs
labels = # Labels

# Generators
training_generator = DataGenerator(partition['train'], labels, **params)
validation_generator = DataGenerator(partition['validation'], labels, **params)

# Design model
model = Sequential()
[...] # Architecture
model.compile()

# Train model on dataset
model.fit_generator(generator=training_generator,
                    validation_data=validation_generator,
                    use_multiprocessing=True,
                    workers=6)