Programming,  Python

[Python] Multi-Thread Queue Polling

여러개의 thread 중 하나의 thread가 처리하게끔 만드는것을 polling이라고 한다. Linux의 경우엔 select()라는 함수가 존재하는데, 이를 Python에서 구현하면 다음과 같다.

import queue
import socket
import os

class PollableQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        if os.name == 'posix':
            self._putsocket, self._getsocket = socket.socketpair()
        else:
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._putsocket.connect(server.getsockname())
            self._getsocket, _ = server.accept()
            server.close()

    def fileno(self):
        return self._getsocket.fileno()

    def put(self, item):
        super().put(item)
        self._putsocket.send(b'x')

    def get(self):
        self._getsocket.recv(1)
        return super().get()

if __name__ == '__main__':
    import select
    import threading
    import time

    def consumer(queues):
        while True:
            can_read, _, _ = select.select(queues,[],[])
            for r in can_read:
                item = r.get()
                print('Got:', item)

    q1 = PollableQueue()
    q2 = PollableQueue()
    q3 = PollableQueue()
    t = threading.Thread(target=consumer, args=([q1,q2,q3],))
    t.daemon = True
    t.start()

    q1.put(1)
    q2.put(10)
    q3.put('hello')
    q2.put(15)

    time.sleep(1)
Got: 1
Got: 10
Got: hello
Got: 15

운영체제 시스템에선 유저와 커널이 서로 데이터를 주고받을 수 있는 연결이 존재하는데 이를 socket이라고 부른다. 위 코드를 그림으로 그려보면 다음과 같다.

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)를 통해 socket을 만든다. 만약 descriptor에 3이 넘어오게 되면 kernel엔 구조체가 넘어오게 된다.
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)에서 또 다른 socket을 만들게 되며 descriptor가 4라고 가정한다. 이 때 3번 socket과 4번 socket이 연결이 되는데 server.accept()가 수행되면서 자동으로 5번 socket이 생성된다. 그리고 새로운 socket을 self._get_socket에 assign 한다.
그 후 server.close()를 하게되면 server=3이 사라지면서 4번과 5번 socket이 서로 연결된다.

Linux는 socketpair() 함수를 통해 한번에 할 수 있지만, Windows는 이런 기능이 없기 때문에 해당 코드를 통해 구현한 것이다.

q1 = PollableQueue()
q2 = PollableQueue()
q3 = PollableQueue()

3개의 instance를 만들었기 때문에 총 3 쌍의 socket pair가 생길 것이다.

can_read, ,  = select.select(queues,[],[])

위 코드는 queues인 q1, q2, q3 중 data가 넘어왔을 때, 즉 super().put(item)에 값이 전달되면서 r.get()을 했을 때 넣은 값이 리턴된다.

            for r in can_read:
                item = r.get()
                print('Got:', item)

지속적으로 for문을 수행하면서 polling 동작을 수행하게 된다.

전체적으로 봤을 때 보내는 sender는 다수지만, 받아서 처리하는 애는 단일이 된다.

Leave a Reply

Your email address will not be published. Required fields are marked *