1. 개요

지난 AWS 전체 계정에 MFA 적용하기 글을 통해 회사에서 관리하는 전체 계정에 MFA를 강제하게 만들었다.

그럼에도 불구하고 사용자가 MFA 설정을 하지 않는 경우 이를 강제할 방법이 없어 고민하다가 1일 1회 MFA 적용 권고 메일을 보내기로 한다.
(AWS 기본 기능으로 제공하지 않는듯 하다.)

이를 위해 lambda를 통해 serverless로 1일 1회 실행하면 될 것 같아 내용을 진행 한 뒤 정리해둔다.

2. 기술 구성

비용, 관리면에서 효율적이라고 판단하여 아래와 같이 구성하였다.

  • lambda
    • IAM 유저 리스트 중 MFA 미설정자를 알아내어 SES로 전송
    • cron으로 1일 1회 실행 trigger
  • SES
    • 메일송신
  • SNS
    • kafka topic 처럼 CloudWatch Event발생시 이쪽으로 msg publish
  • CloudWatch
    • lambda error log watch 후 에러 발생시 SNS로 publish

3. 기술 적용 정리

위의 기술 구성대로 세세하게 하나씩 살펴보자.

3.1. Lambda 개발

3.1.1. Lambda 코드 작성

백문이 불여일견 코드를 보자면 boto3만 가볍게 import 하여 user의 MFA여부를 체크한 뒤 SES로 비활성자에게 메일을 쏜다.

코드 개발시는 커플링을 제거하기 위해 클래스 단위로 만들었으나 lambda script에 넣을때의 편의를 위해 하나의 파일에 모두 집어넣었다.

import json

import boto3


class IamManager(object):
    def __init__(self):
        self.cli = boto3.client('iam')

    def get_users(self):
        resp = self.cli.list_users()
        un_list = [u['UserName'] for u in resp['Users']]
        return un_list

    def get_none_mfa_users(self):
        none_mfa_un_list = []
        for un in self.get_users():
            resp = self.cli.list_mfa_devices(UserName=un)
            if not resp['MFADevices']:
                none_mfa_un_list.append(un)
        return none_mfa_un_list


class SesManager(object):
    def __init__(self):
        print('send mail')
        self.cli = boto3.client('ses')
        self.BODY = '<p>AWS 계정의 MFA 인증이 필요합니다.</p>'\
                    '<p><b>AWS 계정명은 회사 포탈과 동일합니다.</b></p>'\
                    '<p>자세한 내용은 <a href="https://sla-bda.slack.com/archives/C02KZHZK3EX/p1643865600856759">가이드</a> 참고 부탁 드립니다.</p>'

    def send_email(self, email_list):
        for email in email_list:
            resp = self.cli.send_email(
                Source='lks21c@gmail.com',
                Destination={
                                'ToAddresses': [ f'{email}' ]
                            },
                Message={
                            'Subject': {
                            'Data': 'MFA 인증이 필요합니다.',
                            'Charset': 'UTF-8'
                            },
                            'Body': {
                                'Html': {
                                    'Data': self.BODY,
                                    'Charset': 'UTF-8'
                                }
                            }
                        },
                )


class IamSliConverter(object):
    @staticmethod
    def getSliEmail(iam_user_name):
        if 'hydra01' == iam_user_name:
            return 'lks21c.lee@kwangsiklee.com'
        else:
            return f'{iam_user_name}@kwangsiklee.com'


def lambda_handler(event, context):
    im = IamManager()
    email_list = [IamSliConverter.getSliEmail(u) for u in im.get_none_mfa_users()]

    sm = SesManager()
    sm.send_email(email_list)
    return {
    'statusCode': 200,
    'body': json.dumps('Email has send successfully.')
    }

3.1.2. Lambda Role 설정

IAM List, MFA Device 여부 확인들을 하기 위해 IAM 등의 필요 권한이 발생한다.

권한이 제대로 설정되지 않으면 아래와 같이 에러가 난다.

필요한 만큼 권한을 설정하여 Role을 만든 뒤 Lambda에서 Role을 수정해준다.

Role이 제대로 적용되면 아래와 같이 lambda 가 정상 실행된다. 더불어 lambda 수행 제한시간이 너무 짧게 잡혀있으면 timeout에 의해 error status가 리턴 될 수 있으니 이럴 경우 제한시간을 늘려주자.

모든게 제대로 설정되고 다시 Test 해보면 정상 수행된다.

3.1.3. Lambda 메일 수신 확인

람다 실행 후 메일을 확인해 보면 아래와 같이 정상적으로 메일이 도착한것을 알 수 있다.

유의사항 #1
SES는 최초 생성 시 sandbox 모드로 동작하는데 verified된 email whitelist에 속하지 않으면 메일 송/수신에 제한이 있다.
내용 참고해서 필요시 sandbox 해제 신청을 하면 된다.
필자는 회사에서의 사용 목적을 자세히 적어 sandbox를 해제했다.

유의사항 #2
source email에 필자의 회사메일을 썼을 경우 제대로 송신이 되지 않아 개인 메일로 지정하였다.
이건 회사마다 이메일 서버 포함 보안이 다를수 있으니 참고하자.

3.1.4. Lambda 유닛 테스트 작성

Lambda를 개발할 때 로컬에서 실행 및 테스트 하기 위해 아래와 같이 unittest 및 nosetests 실행을 위한 환경을 만들었다.

import sys
import logging
import unittest
from unittest import mock

from mfa_notifier import IamManager
from mfa_notifier import SesManager
from mfa_notifier import lambda_handler
from mfa_notifier import IamSliConverter

class TestIamManager(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.im = IamManager()
        logging.basicConfig(level=logging.INFO)
        cls.logger = logging.getLogger('TestIamManager')
        handler = logging.StreamHandler(sys.stdout)
        handler.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        cls.logger.addHandler(handler)

    def test_get_users(self):
        un_list = self.im.get_users()
        self.logger.info(un_list)
        self.assertIsNotNone(un_list)

    def test_get_users2(self):
        none_mfa_un_list = self.im.get_none_mfa_users()
        self.logger.info(none_mfa_un_list)
        self.assertIsNotNone(none_mfa_un_list)


class TestIamSliConverter(unittest.TestCase):
    def test_get_sli_email(self):
        self.assertEqual('lks21c.lee@kwangsiklee.com', IamSliConverter.getSliEmail('hydra01'))


class TestSesManager(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.sm = SesManager()
        logging.basicConfig(level=logging.INFO)
        cls.logger = logging.getLogger('TestSesManager')
        handler = logging.StreamHandler(sys.stdout)
        handler.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        cls.logger.addHandler(handler)

    def testSendEmail(self):
        self.sm.send_email(['lks21c@gmail.com', 'lks21c.lee@kwangsiklee.com'])


class TestLambda(unittest.TestCase):
    def test_lambda_handler(self):
        ret = lambda_handler(None, None)
        self.assertEqual(200, ret['statusCode'])

위 코드 기준으로 nosetests 수행도 해보면 커버리지가 98% 나온다.

3.2. Lambda Batch 적용하기

이제 잘 실행되는 람다를 1일 1회 실행되게 만들기 위한 방법을 알아보자.

람다 함수 페이지 상단 함수 개요에서 트리거 추가를 클릭한다.

여기서 EventBridge로 cron expression을 적용해두면 된다.

여기서는 매일 오전 08:30 기준 실행되게 지정해 두었다. KRT가 아닌 UTC라 감안해서 입력하면 된다.

아래와 같이 함수 개요가 바뀌면 된다.

3.3. Lambda Error 감지하기

3.2. 까지만 하더라도 정상적으로 람다가 잘 동작한다. 그러나 람다 실행이 실패했을때는 자동화된 형태로 적극적으로 감지하는 형태로 고도화 시켜보자.

최종적으로 운영에 드는 비용을 줄이기 위함이다.

3.3.1. Flow 요약

아래와 같이 동작 시키고자 한다.

lambda 실행 시 error log 발생 -> Cloud Watch 감지 -> SNS로 전송 -> SNS에서 Email로 전송

3.3.2. SNS 설정

SNS 메뉴에서 create topic 을 클릭한다.

아래와 같이 토픽을 만든다.

토픽을 만든 뒤 subscription을 생성해보자.

아래와 같이 SNS로 메세지가 수신되면 지정한 이메일로 송신하게 설정한다.

subscription 설정한 이메일로 verification 안내 메일이 전송된다. 이걸 수락해야 이후 정상적인 subscription을 할 수 있다.

3.3.3. CloudWatch 설정하기

3.3.2. 에서 노티를 받을곳을 설정했으니 이제 cloud watch로 lambda error시 이벤트를 감지하게 설정해보자.

cloudwatch alarm 메뉴에서 add alaram을 눌러 아래와 같이 입력한다.

select metric을 클릭한다.

기존에 작성한 람다 이름중 일부인 mfa를 입력해본다.

기존 작성한 람다인 mfa-notifier의 error 로그로 지정한다.

alaram 조건은 5분 내 sum 기준으로 1건이라도 발생하면 알람이 발생하게 만들었다.

Alarm state trigger는 설명이 처음에 이해가 안되었는데 노티를 보낼때 알람 상태를 무엇이라고 정의할 것이가 이다.

웬만하면 기본에서 안건드리면 된다. 더불어 notification 전송을 SNS에 미리 만들어둔 곳으로 보내게 만든다.

3.3.4. 알림 검증해보기

만든게 제대로 작동하는지 테스트해보자.

람다 코드에 의도적으로 에러를 심어서 이메일로 노티가 오는지 확인해보면 된다.

에러 코드 삽입 후 실행 시 바로 에러가 난다.

얼마 이후 이메일을 보면 알람이 도착해있다.

세부 내용에서 수치를 볼 수 있고 verbose한 알람을 막기 위해 바로 알람을 중지하는 기능도 포함되어있다.

이런 부분들이 직접 노티모듈을 개발하는 것 대비 제공되는 부분이 많아 개발 운영 비용을 줄일 수 있는듯 하다.