본문으로 건너뛰기

[cpython] Python `subprocess` 테스트 최적화: `communicate()` 타임아웃 테스트 속도 향상

PR 링크: python/cpython#149004 상태: Merged | 변경: +None / -None

들어가며

소프트웨어 개발에서 테스트는 필수적인 부분이지만, 때로는 테스트 자체가 병목이 되어 개발 속도를 저해하기도 합니다. 특히 긴 시간을 소요하는 테스트는 CI/CD 파이프라인의 효율성을 떨어뜨리고, 개발자의 피드백 루프를 길게 만듭니다. 이번에 살펴볼 python/cpython 레포지토리의 PR은 subprocess 모듈의 test_communicate_timeout_large_input 테스트 케이스의 런타임을 획기적으로 단축하는 최적화를 담고 있습니다. 기존에는 약 30초가 걸리던 테스트가 1초 미만으로 줄어들어, 전체 테스트 스위트의 효율성을 크게 개선했습니다.

이 PR이 해결하고자 하는 핵심 문제는 subprocess.Popen.communicate() 메서드의 timeout 인자가 제대로 작동하는지 검증하는 테스트에서 발생했습니다. 이 테스트는 자식 프로세스가 stdin을 오랫동안 읽지 않도록 time.sleep(30)을 사용하여 의도적으로 지연시켰습니다. 문제는 communicate()TimeoutExpired 예외를 발생시킨 후에도, 부모 프로세스가 자식 프로세스의 stdin 드레인을 기다려야 했고, 이 과정에서 다시 30초의 불필요한 대기 시간이 발생했다는 점입니다. 이는 테스트의 목적(타임아웃 발생 여부 확인)과는 무관하게 테스트 런타임을 길게 만드는 요인이었습니다.

코드 분석: test_subprocess.py 변경사항

이 PR의 핵심 변경사항은 Lib/test/test_subprocess.py 파일의 test_communicate_timeout_large_input 메서드에 집중되어 있습니다. 기존에는 단순히 자식 프로세스에서 30초 동안 sleep을 호출하여 stdin 읽기를 지연시켰지만, 이제는 루프백 소켓을 통해 부모 프로세스가 자식 프로세스의 stdin 드레인을 제어할 수 있도록 변경되었습니다.

1. slow_reader 자식 프로세스 로직 변경

Before:

        p = subprocess.Popen(
            [sys.executable, "-c",
             "import sys, time; "
             "time.sleep(30); "  # Don't read stdin for a long time
             "sys.stdout.buffer.write(sys.stdin.buffer.read())"],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)

After:

        # Cross-platform wake mechanism: the slow reader connects to a
        # loopback TCP socket and blocks in select() on it (capped at 9s
        # as a safety net we don't expect to hit). After phase 1 raises
        # TimeoutExpired, the parent sends a byte to release the child so
        # it drains stdin. A socket (rather than a raw pipe) is required
        # because Windows select() only supports sockets, not arbitrary
        # file descriptors.
        server = socket.create_server(('127.0.0.1', 0), backlog=1)
        server.settimeout(10)  # bound the accept() if the child fails to start
        port = server.getsockname()[1]
        # The child sends one byte (low byte of its PID) first so the parent
        # can detect the rare case of an unrelated process on the same host
        # connecting to our ephemeral port before our child does. A single
        # byte gives 1/256 collision odds, which is plenty for flake-prevention.
        slow_reader = (
            "import os, socket, sys, select; "
            f"s = socket.create_connection(('127.0.0.1', {port}), timeout=9); "
            "s.sendall(bytes([os.getpid() & 0xff])); "
            "select.select([s], [], [], 9); "
            "sys.stdout.buffer.write(sys.stdin.buffer.read())"
        )

        p = subprocess.Popen(
            [sys.executable, "-c", slow_reader],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)

가장 큰 변화는 자식 프로세스의 time.sleep(30)select.select([s], [], [], 9)로 대체된 것입니다. 여기서 s는 부모 프로세스와 연결된 루프백 TCP 소켓입니다. 자식 프로세스는 이 소켓에서 데이터가 오기를 최대 9초 동안 기다립니다. 부모 프로세스가 TimeoutExpired 예외를 받은 후, 이 소켓에 데이터를 전송하여 자식 프로세스를 깨웁니다. 이로써 자식 프로세스는 stdin을 즉시 드레인하고 종료할 수 있게 됩니다.

또한, socket.create_server를 사용하여 임시 포트를 할당하고, 자식 프로세스가 연결되면 자신의 PID의 하위 1바이트를 전송하여 부모가 올바른 자식과 통신하고 있는지 확인할 수 있도록 하는 flake-prevention 메커니즘도 추가되었습니다. 이는 테스트의 견고성을 높이는 좋은 사례입니다.

2. 부모 프로세스의 소켓 통신 및 제어 로직 추가

Before:

        try:
            timeout = 0.2
            start = time.monotonic()
            try:
                p.communicate(input_data, timeout=timeout)
                elapsed = time.monotonic() - start
                self.fail(
                    f"TimeoutExpired not raised. communicate() completed in "
                    f"{elapsed:.2f}s, but subprocess sleeps for 30s. "
                    "Stdin writing blocked without enforcing timeout.")
            except subprocess.TimeoutExpired:
                elapsed = time.monotonic() - start
                self.assertLess(elapsed, 5.0,
                    f"TimeoutExpired raised after {elapsed:.2f}s; expected ~{timeout}s. "
                    "Stdin writing blocked without checking timeout.")

            # After timeout, continue communication. The remaining input
            # should be sent and we should receive all data back.
            stdout, stderr = p.communicate()
            self.assertEqual(p.returncode, 0)
            self.assertEqual(len(stdout), len(input_data),
                f"Expected {len(input_data)} bytes output but got {len(stdout)}")
            self.assertEqual(stdout, input_data)
        finally:
            p.kill()
            p.wait()

After:

        conn = None
        try:
            conn, _ = server.accept()
            server.close()
            server = None

            conn.settimeout(5)
            peer_byte = conn.recv(1)
            conn.settimeout(None)
            self.assertEqual(peer_byte, bytes([p.pid & 0xff]),
                f"loopback handshake byte {peer_byte!r} != "
                f"low byte of child PID {p.pid} ({p.pid & 0xff:#x})")

            timeout = 0.2
            start = time.monotonic()
            try:
                p.communicate(input_data, timeout=timeout)
                elapsed = time.monotonic() - start
                self.fail(
                    f"TimeoutExpired not raised. communicate() completed in "
                    f"{elapsed:.2f}s, but slow reader stalls for up to 9s. "
                    "Stdin writing blocked without enforcing timeout.")
            except subprocess.TimeoutExpired:
                elapsed = time.monotonic() - start
                self.assertLess(elapsed, 5.0,
                    f"TimeoutExpired raised after {elapsed:.2f}s; expected ~{timeout}s. "
                    "Stdin writing blocked without checking timeout.")

            # Release the slow reader so it stops blocking and drains stdin.
            conn.sendall(b'go')
            conn.close()
            conn = None

            # After timeout, continue communication. The remaining input
            # should be sent and we should receive all data back.
            stdout, stderr = p.communicate()
            self.assertEqual(p.returncode, 0)
            self.assertEqual(len(stdout), len(input_data),
                f"Expected {len(input_data)} bytes output but got {len(stdout)}")
            self.assertEqual(stdout, input_data)
        finally:
            if conn is not None:
                conn.close()
            if server is not None:
                server.close()
            p.kill()
            p.wait()

부모 프로세스는 server.accept()를 통해 자식 프로세스의 연결을 기다리고, 연결이 수립되면 자식 프로세스가 보낸 PID 바이트를 확인하여 flake를 방지합니다. p.communicate(input_data, timeout=timeout) 호출 후 TimeoutExpired 예외가 발생하면, 부모 프로세스는 conn.sendall(b'go')를 호출하여 자식 프로세스를 깨웁니다. 이로써 자식 프로세스는 select.select()에서 벗어나 stdin을 읽고 종료할 수 있게 됩니다. finally 블록에서 소켓을 적절히 닫아 리소스 누수를 방지하는 것도 중요합니다.

리뷰 댓글에서는 server.settimeout(10)conn.settimeout(5)와 같은 타임아웃 설정이 accept()recv() 호출이 무한정 블록되는 것을 방지하여 테스트의 견고성을 높인다는 점이 긍정적으로 평가되었습니다. 또한, serverconn 변수를 finally 블록에서 안전하게 닫기 위해 None으로 초기화하고 if not None 체크를 하는 패턴도 좋은 관행으로 언급되었습니다.

왜 이게 좋은 최적화인가?

이 변경사항은 subprocess.communicate()timeout 기능을 테스트하는 방식에 대한 근본적인 개선이며, 여러 면에서 좋은 최적화로 평가할 수 있습니다.

  1. 테스트 런타임 획기적 단축: 가장 명확한 이점은 테스트 런타임이 약 30초에서 1초 미만으로 줄어들었다는 것입니다. 이는 CI/CD 파이프라인의 속도를 크게 향상시키고, 개발자가 더 빠르게 피드백을 받을 수 있도록 합니다. time.sleep()과 같은 고정된 긴 대기 시간 대신, 이벤트 기반(소켓 통신)으로 자식 프로세스의 동작을 제어함으로써 필요한 최소한의 시간만 소요하게 됩니다.

  2. 정확한 테스트 목적 달성: 이 테스트의 핵심은 communicate()timeout 인자가 예상대로 TimeoutExpired 예외를 발생시키는지 확인하는 것입니다. 기존 방식은 타임아웃 이후 자식 프로세스가 stdin을 드레인하는 데 30초를 기다려야 했지만, 새로운 방식은 부모가 자식을 즉시 깨워 이 불필요한 대기 시간을 제거합니다. 이는 테스트가 본래의 목적에 더 충실하게 작동하도록 만듭니다.

  3. 플랫폼 독립적인 해결책: select.select()가 Windows에서 소켓만 지원한다는 점을 고려하여 루프백 TCP 소켓을 사용한 것은 크로스 플랫폼 호환성을 보장하는 현명한 선택입니다. 이는 Python이 다양한 운영체제에서 일관되게 작동해야 하는 핵심 라이브러리임을 감안할 때 매우 중요합니다.

  4. 견고성 향상: 자식 프로세스의 PID를 이용한 flake-prevention 메커니즘은 드물게 발생할 수 있는 포트 충돌 문제를 방지하여 테스트의 신뢰성을 높입니다. 또한, server.settimeout()conn.settimeout()을 통해 accept()recv() 호출이 무한정 블록되는 것을 방지하여 테스트 자체가 hang되는 것을 막습니다.

일반적 교훈

이 최적화는 테스트 작성 시 다음과 같은 중요한 교훈을 제공합니다.

  • 불필요한 대기 시간 제거: 테스트에서 time.sleep()과 같은 고정된 대기 시간은 가능한 한 피해야 합니다. 대신, 이벤트 기반 메커니즘(예: 소켓, 파이프, 스레드 동기화 프리미티브)을 사용하여 필요한 시점에만 다음 단계로 진행하도록 만드는 것이 효율적입니다.
  • 테스트의 목적에 집중: 테스트는 검증하고자 하는 특정 동작에만 집중해야 합니다. 부수적인 동작으로 인해 테스트 런타임이 길어지거나 복잡해지는 것을 경계해야 합니다.
  • 플랫폼 호환성 고려: 특히 시스템 레벨의 기능을 테스트할 때는 다양한 운영체제에서의 동작 차이를 고려하여 플랫폼 독립적인 솔루션을 모색해야 합니다.
  • 테스트 견고성 확보: 드물게 발생할 수 있는 flake를 방지하기 위한 메커니즘(예: 임시 포트 사용 시 충돌 방지)을 고려하여 테스트의 신뢰성을 높여야 합니다.

이 PR은 단순히 코드 몇 줄을 바꾼 것이 아니라, 테스트의 본질과 효율성에 대한 깊은 이해를 바탕으로 이루어진 모범적인 최적화 사례라고 할 수 있습니다.

References

참고 자료

⚠️ 알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.

댓글

관련 포스트

PR Analysis 의 다른글