본문으로 건너뛰기

[cpython] Python subprocess.communicate() 타임아웃 성능 개선: 느린 자식 프로세스 응답 방식 변경

PR 링크: python/cpython#149003 상태: Merged | 변경: +None / -None

들어가며

Python의 subprocess 모듈은 외부 프로세스를 실행하고 관리하는 강력한 도구입니다. 특히 communicate() 메서드는 자식 프로세스와 상호작용하며 표준 입력(stdin), 표준 출력(stdout), 표준 에러(stderr)를 처리할 때 유용하게 사용됩니다. 하지만 communicate() 메서드에 타임아웃을 설정했을 때, 특정 상황에서 테스트가 예상보다 훨씬 오래 걸리는 문제가 있었습니다. 바로 test_communicate_timeout_large_input 테스트 케이스에서 발생하는 'long tail' 문제입니다.

이 테스트는 자식 프로세스가 상당 시간 동안 입력을 처리하지 않고 대기하는 시나리오를 시뮬레이션합니다. 기존 구현에서는 자식 프로세스가 자체적으로 time.sleep(30)을 통해 30초간 대기하도록 하여, 타임아웃이 발생하더라도 실제 테스트 완료까지 30초 이상 소요되었습니다. 이는 테스트 실행 시간을 불필요하게 늘리는 요인이었습니다.

이번 PR (gh-141473)은 이 문제를 해결하여 test_communicate_timeout_large_input 테스트의 최악 실행 시간을 약 30초에서 1초 미만으로 획기적으로 단축시키는 것을 목표로 합니다. 이를 위해 자식 프로세스가 스스로 깨어나기를 기다리는 대신, 부모 프로세스가 제어하는 방식으로 변경하여 타임아웃 발생 시 즉시 테스트를 정상적으로 종료하도록 개선했습니다.

코드 변경사항 분석

이번 PR의 핵심 변경은 Lib/test/test_subprocess.py 파일 내 TestSubprocess.test_communicate_timeout_large_input 메서드에 집중되어 있습니다. 기존의 느리고 비효율적인 자식 프로세스 대기 방식을 더 빠르고 효율적인 방식으로 대체했습니다.

1. 느린 자식 프로세스 대기 방식 변경

Before:

기존에는 자식 프로세스가 표준 입력(stdin)을 읽기 전에 30초 동안 time.sleep(30)을 호출하여 단순히 대기했습니다. 이 방식은 타임아웃이 발생하더라도 자식 프로세스가 스스로 잠에서 깨어날 때까지 기다려야 했기 때문에 테스트 시간이 길어졌습니다.

--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -1044,19 +1045,49 @@ def test_communicate_timeout_large_input(self):
         # On Windows, stdin writing must also honor the timeout rather than
         # blocking indefinitely when the pipe buffer fills.
 
-        # Input larger than typical pipe buffer (4-64KB on Windows)
-        input_data = b"x" * (128 * 1024)
+        input_data = b"x" * (128 * 1024)  # > typical pipe buffer
 
+        # Cross-platform wake mechanism: the slow reader connects to a
+        # loopback socket and blocks in select() on it (capped at 9s
+        # as a safety net we don't expect to hit). After phase 1 raises
+        # TimeoutExpired, the parent sends a byte to release the child so
+        # it drains stdin. A socket (rather than a raw pipe) is required
+        # because Windows select() only supports sockets, not arbitrary
+        # file descriptors.
+        server = socket.create_server(('127.0.0.1', 0), backlog=1)
+        server.settimeout(10)  # bound the accept() if the child fails to start
+        port = server.getsockname()[1]
+        # The child sends one byte (low byte of its PID) first so the parent
+        # can detect the rare case of an unrelated process on the same host
+        # connecting to our ephemeral port before our child does. A single
+        # byte gives 1/256 collision odds, which is plenty for flake-prevention.
+        slow_reader = (
+            "import os, socket, sys, select; "
+            f"s = socket.create_connection(('127.0.0.1', {port}), timeout=9); "
+            "s.sendall(bytes([os.getpid() & 0xff])); "
+            "select.select([s], [], [], 9); "
+            "sys.stdout.buffer.write(sys.stdin.buffer.read())"
+        )
 
         p = subprocess.Popen(
-            [sys.executable, "-c",
-             "import sys, time; "
-             "time.sleep(30); "  # Don't read stdin for a long time
-             "sys.stdout.buffer.write(sys.stdin.buffer.read())"],
+            [sys.executable, "-c", slow_reader],
             stdin=subprocess.PIPE,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE)
 
+        conn = None
         try:
+            conn, _ = server.accept()
+            server.close()
+            server = None
+
+            conn.settimeout(5) # bound the recv() if the child fails to send handshake
+            peer_byte = conn.recv(1)
+            conn.settimeout(None)
+            self.assertEqual(peer_byte, bytes([p.pid & 0xff]),
+                f"loopback handshake byte {peer_byte!r} != "
+                f"low byte of child PID {p.pid} ({p.pid & 0xff:#x})")
+
             timeout = 0.2
             start = time.monotonic()
             try:
@@ -1065,7 +1095,7 @@ def test_communicate_timeout_large_input(self):
                 elapsed = time.monotonic() - start
                 self.fail(
                     f"TimeoutExpired not raised. communicate() completed in "
-                    f"{elapsed:.2f}s, but subprocess sleeps for 30s. "
+                    f"{elapsed:.2f}s, but slow reader stalls for up to 9s. "
                     "Stdin writing blocked without enforcing timeout.")
             except subprocess.TimeoutExpired:
                 elapsed = time.monotonic() - start
@@ -1073,11 +1103,16 @@ def test_communicate_timeout_large_input(self):
             # Timeout should occur close to the specified timeout value, not after
             # waiting for the subprocess to finish sleeping.
             # Allow generous margin for slow CI, but must be well under
-            # the subprocess sleep time.
+            # the slow-reader's stall cap.
             self.assertLess(elapsed, 5.0,
                 f"TimeoutExpired raised after {elapsed:.2f}s; expected ~{timeout}s. "
                 "Stdin writing blocked without checking timeout.")
 
+            # Release the slow reader so it stops blocking and drains stdin.
+            conn.sendall(b'go')
+            conn.close()
+            conn = None
+
             # After timeout, continue communication. The remaining input
             # should be sent and we should receive all data back.
             stdout, stderr = p.communicate()
@@ -1087,6 +1122,10 @@ def test_communicate_timeout_large_input(self):
                 f"Expected {len(input_data)} bytes output but got {len(stdout)}")
             self.assertEqual(stdout, input_data)
         finally:
+            if conn is not None:
+                conn.close()
+            if server is not None:
+                server.close()
             p.kill()
             p.wait()
 

After:

새로운 방식은 다음과 같은 특징을 가집니다:

  1. Loopback Socket 사용: 부모 프로세스와 자식 프로세스 간 통신을 위해 로컬호스트(127.0.0.1)의 임시 포트를 사용하는 TCP 소켓을 생성합니다. Windows 환경에서는 select.select()가 파일 디스크립터 대신 소켓만 지원하기 때문에 이 방식이 필요합니다. socket.create_serversocket.create_connection을 사용하여 소켓을 설정합니다.
  2. 자식 프로세스의 능동적 대기: 자식 프로세스는 time.sleep() 대신 select.select([s], [], [], 9)를 사용하여 소켓에서 데이터가 오기를 최대 9초간 기다립니다. 이 9초는 안전망(safety net)으로, 실제로는 부모 프로세스가 신호를 보내면 즉시 해제됩니다.
  3. 부모 프로세스의 제어: subprocess.Popen으로 자식 프로세스를 생성한 후, 부모 프로세스는 server.accept()를 통해 자식 프로세스와의 연결을 기다립니다. 연결이 수립되면, 자식 프로세스는 자신의 PID의 하위 바이트를 부모에게 보내 핸드셰이크를 수행합니다. 이 핸드셰이크는 드물게 발생할 수 있는 포트 충돌을 방지하기 위함입니다.
  4. 즉각적인 해제: communicate() 호출 시 타임아웃이 발생하면, 부모 프로세스는 연결된 소켓에 conn.sendall(b'go')를 보내 자식 프로세스를 즉시 깨웁니다. 자식 프로세스는 이 신호를 받아 select.select에서 벗어나 표준 입력을 처리하고 종료합니다.

변경된 자식 프로세스 실행 코드는 다음과 같습니다:

# Before
sys.executable, "-c",
"import sys, time; "
"time.sleep(30); "  # Don't read stdin for a long time
"sys.stdout.buffer.write(sys.stdin.buffer.read())"

# After
sys.executable, "-c", slow_reader # slow_reader는 위에서 정의된 복잡한 문자열

slow_reader 변수에 할당된 문자열은 위에서 설명한 소켓 통신 및 select.select를 이용한 대기 로직을 포함합니다.

2. 타임아웃 검증 로직 개선

타임아웃 발생 후, 실제 경과 시간을 검증하는 로직도 개선되었습니다.

Before:

# (기존 코드에서는 타임아웃 발생 후 elapsed 시간을 검증하는 부분이 명시적으로 보이지 않음)
# 하지만 내부적으로는 자식 프로세스가 30초 대기 후 종료되기를 기다렸을 것임

After:

            except subprocess.TimeoutExpired:
                elapsed = time.monotonic() - start
                self.assertLess(elapsed, 5.0,
                    f"TimeoutExpired raised after {elapsed:.2f}s; expected ~{timeout}s. "
                    "Stdin writing blocked without checking timeout.")

            # Release the slow reader so it stops blocking and drains stdin.
            conn.sendall(b'go')
            conn.close()
            conn = None

타임아웃 예외가 발생한 후, 경과 시간(elapsed)이 예상치(약 timeout 값)에 훨씬 못 미치는 5.0초 이내인지 확인합니다. 또한, 타임아웃 발생 직후 conn.sendall(b'go')를 호출하여 자식 프로세스를 즉시 해제하고, 이후 p.communicate()를 호출하여 남은 입력을 처리하고 프로세스가 정상적으로 종료되도록 합니다.

왜 이게 좋은가?

1. 테스트 실행 시간 대폭 단축

가장 큰 이점은 테스트 실행 시간의 획기적인 단축입니다. 기존에는 test_communicate_timeout_large_input 테스트가 최악의 경우 30초 이상 소요되었으나, 이 PR 이후에는 1초 미만으로 줄어들었습니다. 이는 전체 테스트 스위트의 실행 시간을 크게 개선하여 개발 및 CI/CD 파이프라인의 효율성을 높입니다.

PR 설명에 따르면,

참고 자료

⚠️ 알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.

댓글

관련 포스트

PR Analysis 의 다른글