[Python] Multi-Thread Queue Polling
여러개의 thread 중 하나의 thread가 처리하게끔 만드는것을 polling이라고 한다. Linux의 경우엔 select()
라는 함수가 존재하는데, 이를 Python에서 구현하면 다음과 같다.
import queue import socket import os class PollableQueue(queue.Queue): def __init__(self): super().__init__() if os.name == 'posix': self._putsocket, self._getsocket = socket.socketpair() else: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('127.0.0.1', 0)) server.listen(1) self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._putsocket.connect(server.getsockname()) self._getsocket, _ = server.accept() server.close() def fileno(self): return self._getsocket.fileno() def put(self, item): super().put(item) self._putsocket.send(b'x') def get(self): self._getsocket.recv(1) return super().get() if __name__ == '__main__': import select import threading import time def consumer(queues): while True: can_read, _, _ = select.select(queues,[],[]) for r in can_read: item = r.get() print('Got:', item) q1 = PollableQueue() q2 = PollableQueue() q3 = PollableQueue() t = threading.Thread(target=consumer, args=([q1,q2,q3],)) t.daemon = True t.start() q1.put(1) q2.put(10) q3.put('hello') q2.put(15) time.sleep(1)
Got: 1 Got: 10 Got: hello Got: 15 |
운영체제 시스템에선 유저와 커널이 서로 데이터를 주고받을 수 있는 연결이 존재하는데 이를 socket이라고 부른다. 위 코드를 그림으로 그려보면 다음과 같다.
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
를 통해 socket을 만든다. 만약 descriptor에 3이 넘어오게 되면 kernel엔 구조체가 넘어오게 된다. self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
에서 또 다른 socket을 만들게 되며 descriptor가 4라고 가정한다. 이 때 3번 socket과 4번 socket이 연결이 되는데 server.accept()
가 수행되면서 자동으로 5번 socket이 생성된다. 그리고 새로운 socket을 self._get_socket
에 assign 한다.
그 후 server.close()
를 하게되면 server=3
이 사라지면서 4번과 5번 socket이 서로 연결된다.
Linux는 socketpair()
함수를 통해 한번에 할 수 있지만, Windows는 이런 기능이 없기 때문에 해당 코드를 통해 구현한 것이다.
q1 = PollableQueue() q2 = PollableQueue() q3 = PollableQueue()
3개의 instance를 만들었기 때문에 총 3 쌍의 socket pair가 생길 것이다.
can_read, , = select.select(queues,[],[])
위 코드는 queues인 q1, q2, q3
중 data가 넘어왔을 때, 즉 super().put(item)
에 값이 전달되면서 r.get()
을 했을 때 넣은 값이 리턴된다.
for r in can_read: item = r.get() print('Got:', item)
지속적으로 for문을 수행하면서 polling 동작을 수행하게 된다.
전체적으로 봤을 때 보내는 sender는 다수지만, 받아서 처리하는 애는 단일이 된다.