동시성 문제해결

동시성 문제해결

본격적으로 Celery, Isolate를 이용하여 채점 기능을 구현하면서 발생한 문제에 대해서 설명해 보려고 한다.
순차적으로 채점을 수행하는 것과 다르게 동시에 여러개의 채점이 수행되다 보니 예상하지 못했던 문제가 발생했었다. 이 문제를 Redis의 원자적 연산을 이용하여 해결하였다.

제한 사항 및 해결 방안

  1. 제한 사항

    • 각 제출에 대한 채점은 isolate의 -b 옵션으로 box id를 지정해야 한다.
    • 채점은 비동기 태스크 큐인 celery를 통해 병렬로 수행된다
      • celery는 기본적으로 컴퓨터의 CPU 코어 수에 해당하는 태스크(채점)을 동시에 수행할 수 있다.
    • 각각의 box는 채점 시작 전 —-init, 종료 후 —-cleanup을 해줘야 한다.
  2. 문제점

    • 2개 이상의 채점이 동시에 시작될 때 같은 isolate box id 를 지정하는 문제가 발생한다.
  3. 해결 방안

    • box id의 범위를 제한하고 사용중인 box id 를 저장하여 관리한다.
    • 채점이 시작될 때 box id의 범위를 순회하며 사용가능한 box id를 지정받는다.
    • 기본적인 구성
    box_id=0;
    box_id_range=8;
    while True:
    	if box_id가 사용중인가?
    		해당 box_id를 '사용중'으로 변경
    		break
    	else:
    		time.sleep(2)
    		box_id=(box_id+1)%box_id_range
    • DB를 활용한 box id 관리
      • box id를 위한 공간을 생성한다.
      • 해당하는 공간의 무결성을 지키면서 지정받은 box id를 저장한다.
        • MySQL : Lock 기능을 활용할 수 있다.
        • Redis : 기본적인 연산은 하나의 쓰레드만 사용하기에 연산의 원자성이 보장된다.
      • 공간에서 해당하는 box id 가 존재한다면 사용중이라는 뜻으로 활용한다.

구현

MySQL로도 구현할 수 있을 것이라고 생각되지만 인메모리 DB로써 연산의 속도가 빠른 Redis를 활용하여 문제 해결을 위한 기능을 구현해보았다.

구현 1 - 초기 구현

def get_box_id(timelimit:int)->int|None:
	"""
    현재 사용가능한 isolate box id를 폴링으로 획득

    params
    - timelimit : 최대 polling 시간
    -----------------------------------------------------
    return
    - 사용가능한 isolate box id 리턴
    - timelimit 동안 획득 실패시 None 리턴
    """
    box_id=1
    timeover=0
		with redis.StrictRedis(host=os.getenv('REDIS_HOST'), port=os.getenv('REDIS_PORT'), db=os.getenv('REDIS_DB')) as conn:
				while timeover<timelimit:#폴링으로 box가 사용가능한지 계속 확인
				    if conn.set(str(box_id),1,nx=True):
				        return box_id
				    box_id=((box_id+1)%int(os.getenv("CELERY_CONCURRENCY"))+1)
				    if box_id==0:
				        box_id=1
				    time.sleep(0.1)
				    timeover+=1
				else:
						return None
  • conn.set(str(box_id),1,nx=True) : str(box_id)의 값을 1로 설정한다. 만약 str(box_id)가 존재한다면 None을 리턴한다.
  • 단순하게 폴링을 사용하여 사용가능한 box id만을 리턴하는 함수이다.
  • box id 획득과 box 초기화를 해주도록 개선해보겠다.

구현 2 - 개선 1

def get_isolate(timelimit:int)->int|None:
    """
    현재 사용가능한 isolate box를 초기화하고 id를 획득

    params
    - timelimit : 최대 polling 시간
    -----------------------------------------------------
    return
    - 사용가능한 isolate box id 리턴
    - timelimit 동안 획득 실패시 None 리턴
    """
    box_id=1
    timeover=0
		with redis.StrictRedis(host=os.getenv('REDIS_HOST'), port=os.getenv('REDIS_PORT'), db=os.getenv('REDIS_DB')) as conn:
				while timeover<timelimit:#폴링으로 box가 사용가능한지 계속 확인
				    if conn.set(str(box_id),1,nx=True):
								#isolate box 초기화
                subprocess.run(['isolate','--cg', '-b',str(box_id),'--init'],capture_output=True,text=True)
				        return box_id
				    box_id=((box_id+1)%int(os.getenv("CELERY_CONCURRENCY"))+1)
				    if box_id==0:
				        box_id=1
				    time.sleep(0.1)
				    timeover+=1
				else:
						return None
  • box id 획득과 box 초기화를 해주도록 했다.
  • 이 경우 채점 도중 오류가 발생했을 시에 box --cleanup과 사용가능 플래그 설정이 작동하지 않을 수도 있다. 이를 개선해보았다.

구현 3 - 개선 2

@contextmanager
def get_isolate(timelimit:int)->int|None:
    """
    현재 사용가능한 isolate box id context를 폴링으로 획득 및 isolate box 초기화
    사용 종료 시 isolate box 삭제 및 box id 를 사용가능으로 변경

    redis는 기본적으로 싱글 쓰레드로 동작하기 때문에 해당 사용중인 box id를 redis에 저장해두고 해당 id가 존재하는지 확인함으로써
    사용중인 isolate box를 다른 celery worker가 공유하지 않도록 구현

    params
    - timelimit : 최대 polling 시간
    -----------------------------------------------------
    return
    - 사용가능한 isolate box id 리턴
    - timelimit 동안 획득 실패시 None 리턴
    """
    box_id=1
    timeover=0
    with redis.StrictRedis(host=os.getenv('REDIS_HOST'), port=os.getenv('REDIS_PORT'), db=os.getenv('REDIS_DB')) as conn:
        try:
            while timeover<timelimit:#폴링으로 box가 사용가능한지 계속 확인
                if conn.set(str(box_id),1,nx=True):
                    #isolate box 초기화
                    subprocess.run(['isolate','--cg', '-b',str(box_id),'--init'],capture_output=True,text=True)
                    time.sleep(0.2)#격리 공간 생기는 거 기다리기
                    yield box_id
                    break
                
                box_id=((box_id+1)%int(os.getenv("CELERY_CONCURRENCY"))+1)
                if box_id==0:
                    box_id=1
                time.sleep(0.1)
                timeover+=1
            else:
                box_id=None
                yield box_id
        finally:
            if box_id:
                #사용가능한 box id를 획득했다면 사용종료 후 isolate box 삭제
                subprocess.run(['isolate', '--cg','-b',str(box_id),'--cleanup'],capture_output=True)
            #isolate box id 사용가능으로 설정하기 위해 box id를 삭제
            conn.delete(str(box_id))
  • contextmanger를 사용하여 채점이 어떻게든 종료되었을 때 box --cleanup과 사용가능플래그 설정을 하도록 개선하였다.

정리

위 함수를 통해서 같은 box 내에서 채점이 진행되는 문제를 해결할 수 있었다.

보통의 비동기 작업 처리는 HTTP 연결에 대해서만 경험을 해봤어서 이러한 문제를 직면했을 때 애를 많이 먹은 것 같다. CPU내에서 연산이 동작하는 방식과 프로세스와 쓰레드의 작업들이 수행되는 과정같은 CS지식들이 활용될 수 있던 경험이었다.

Last updated on