This article covers I/O Multiplexing: select(), poll(), and epoll() Explaination Extended. How do servers handle thousands of connections? Learn readiness-based I/O with select/poll/epoll, common edge cases, and build a tiny event loop i...
Imagine a server that needs to handle 10,000 concurrent clients. How do you efficiently manage all these I/O operations?
Naive approach (one thread per connection):
Thread 1 β Client 1 (mostly waiting on I/O)
Thread 2 β Client 2 (mostly waiting on I/O)
...
Thread 10,000 β Client 10,000 (mostly waiting on I/O)
Problem: 10,000 threads consuming memory and CPU for context switching
I/O Multiplexing approach:
Single thread monitors all 10,000 connections
β
Only processes connections that have data ready
β
Efficient use of CPU and memory
I/O multiplexing is a technique that allows a single thread to monitor multiple I/O streams (sockets, files, pipes) and process those that are ready for I/O operations.
Key benefits:
Traditional (blocking): I/O Multiplexing (event-driven):
βββββββββββββββββββββ ββββββββββββββββββββββββββββββ
read(socket1) while true:
β BLOCKS wait_for_events([socket1, socket2, ...])
data arrives β
process data for each ready_socket:
read(socket2) read(ready_socket)
β BLOCKS process data
... β
continue monitoring
Understanding I/O multiplexing requires understanding the different I/O models.
The application waits (blocks) until the I/O operation completes.
Application Kernel
βββββββββββ ββββββ
read(fd) βββββββββββββββ [waiting for data]
β BLOCKED β
β [data arrives]
β [copy data to app buffer]
β ββββββββββββββββββ [return]
process data
Pseudo code:
# Blocking I/O
socket = create_socket()
socket.connect(server_address)
# This blocks until data arrives
data = socket.recv(1024) # BLOCKS HERE
process(data)
Characteristics:
The application checks if I/O is ready, returns immediately if not.
Application Kernel
βββββββββββ ββββββ
read(fd) βββββββββββββββ [no data ready]
β ββββββββββββββββββ [return EWOULDBLOCK]
read(fd) βββββββββββββββ [no data ready]
β ββββββββββββββββββ [return EWOULDBLOCK]
read(fd) βββββββββββββββ [data ready!]
β ββββββββββββββββββ [return data]
process data
Pseudo code:
# Non-blocking I/O
socket = create_socket()
socket.setblocking(False) # Make non-blocking
socket.connect(server_address)
while True:
try:
data = socket.recv(1024) # Returns immediately
if data:
process(data)
break
except WouldBlockError:
# No data yet, do other work
do_other_work()
continue
Characteristics:
The application uses a system call (select/poll/epoll) to wait for multiple I/O sources.
Application Kernel
βββββββββββ ββββββ
select([fd1, fd2, fd3])
β BLOCKED ββββββββββββββ [waiting for any fd]
β β
β [fd2 has data!]
β ββββββββββββββββββ [return: fd2 ready]
read(fd2) ββββββββββββββ [copy data]
β ββββββββββββββββββ [return data]
process data
Pseudo code:
# I/O Multiplexing
sockets = [socket1, socket2, socket3, ...]
while True:
# Block until at least one socket is ready
ready_sockets = select(sockets) # BLOCKS HERE
for sock in ready_sockets:
data = sock.recv(1024) # Won't block, data is ready
process(data)
Characteristics:
The kernel sends a signal when I/O is ready.
Application Kernel
βββββββββββ ββββββ
setup_signal_handler()
fcntl(fd, F_SETSIG)
β [waiting for data]
do_other_work() β
β [data arrives]
β [send SIGIO signal]
β ββββββββββββββββββ
signal_handler():
read(fd)
process(data)
Characteristics:
The application initiates I/O, continues working, and is notified when complete.
Application Kernel
βββββββββββ ββββββ
aio_read(fd, buffer)
β βββ [initiate read]
β [DMA transfer in background]
do_other_work() β
β [data copied to buffer]
β [notify completion]
β ββββββββββββββββββ
completion_handler():
process(buffer)
Pseudo code:
# Asynchronous I/O
def completion_callback(data):
process(data)
# Initiate async read
aio_read(socket, buffer, completion_callback)
# Continue doing other work immediately
do_other_work()
# When data arrives, completion_callback is called
Characteristics:
Blocking I/O:
βββββββββββββββββββββββββββββββββββββββ
App: [wait] β [process] β [wait] β ...
β β
(data ready) (data ready)
I/O Multiplexing:
βββββββββββββββββββββββββββββββββββββββ
App: [wait for any] β [process ready ones] β [wait for any] β ...
β β
(fd2, fd5 ready) (fd1, fd3 ready)
Asynchronous I/O:
βββββββββββββββββββββββββββββββββββββββ
App: [initiate] β [do work] β [do work] β [callback] β ...
β β
[OS does I/O in background] βββββββββββββββ
How it works: Pass file descriptors sets to the kernel, which returns ready ones.
// Pseudo code
fd_set read_fds, write_fds, except_fds;
while (true) {
FD_ZERO(&read_fds);
FD_SET(socket1, &read_fds);
FD_SET(socket2, &read_fds);
FD_SET(socket3, &read_fds);
// Block until at least one fd is ready
int ready = select(max_fd + 1, &read_fds, &write_fds, &except_fds, NULL);
if (FD_ISSET(socket1, &read_fds)) {
handle_read(socket1);
}
if (FD_ISSET(socket2, &read_fds)) {
handle_read(socket2);
}
// ... check all fds
}
Limitations:
Best for: Small number of file descriptors (<1000)
How it works: Similar to select but uses array of pollfd structures.
// Pseudo code
struct pollfd fds[MAX_CONNECTIONS];
int nfds = 0;
// Setup
fds[0].fd = socket1;
fds[0].events = POLLIN; // Interested in read events
fds[1].fd = socket2;
fds[1].events = POLLIN;
nfds = 2;
while (true) {
// Block until events occur
int ready = poll(fds, nfds, -1);
for (int i = 0; i < nfds; i++) {
if (fds[i].revents & POLLIN) {
handle_read(fds[i].fd);
}
if (fds[i].revents & POLLOUT) {
handle_write(fds[i].fd);
}
}
}
Advantages over select:
Limitations:
Best for: Moderate number of connections (1000-10000)
How it works: Kernel maintains interest list; only returns ready fds.
// Pseudo code
// Create epoll instance
int epfd = epoll_create1(0);
// Register interest in events
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = socket1;
epoll_ctl(epfd, EPOLL_CTL_ADD, socket1, &ev);
ev.data.fd = socket2;
epoll_ctl(epfd, EPOLL_CTL_ADD, socket2, &ev);
struct epoll_event events[MAX_EVENTS];
while (true) {
// Block until events occur
int nready = epoll_wait(epfd, events, MAX_EVENTS, -1);
// Only iterate over ready fds!
for (int i = 0; i < nready; i++) {
int fd = events[i].data.fd;
if (events[i].events & EPOLLIN) {
handle_read(fd);
}
if (events[i].events & EPOLLOUT) {
handle_write(fd);
}
}
}
Key features:
Edge-triggered vs Level-triggered:
Level-triggered (default):
- epoll_wait returns while fd is ready
- Can read partial data
- Similar to poll/select behavior
Edge-triggered (EPOLLET):
- epoll_wait returns only on state change
- Must read all available data
- More efficient but requires careful programming
Best for: High-performance servers with many connections
How it works: Similar to epoll but with kevent structures.
// Pseudo code
// Create kqueue
int kq = kqueue();
// Register events
struct kevent changes[2];
EV_SET(&changes[0], socket1, EVFILT_READ, EV_ADD, 0, 0, NULL);
EV_SET(&changes[1], socket2, EVFILT_READ, EV_ADD, 0, 0, NULL);
struct kevent events[MAX_EVENTS];
while (true) {
// Block until events occur
int nev = kevent(kq, changes, 2, events, MAX_EVENTS, NULL);
for (int i = 0; i < nev; i++) {
int fd = events[i].ident;
if (events[i].filter == EVFILT_READ) {
handle_read(fd);
}
if (events[i].filter == EVFILT_WRITE) {
handle_write(fd);
}
}
}
Features:
Best for: macOS/BSD systems
How it works: True asynchronous I/O with completion notifications.
// Pseudo code
// Create completion port
HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// Associate socket with completion port
CreateIoCompletionPort((HANDLE)socket, iocp, socket, 0);
// Initiate async operation
WSABUF buf;
DWORD flags = 0;
WSARecv(socket, &buf, 1, NULL, &flags, &overlapped, NULL);
while (true) {
// Wait for completion
DWORD bytes;
ULONG_PTR key;
OVERLAPPED* overlapped;
GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped, INFINITE);
// Operation completed
int socket = (int)key;
process_completion(socket, overlapped, bytes);
// Initiate next async operation
WSARecv(socket, &buf, 1, NULL, &flags, &overlapped, NULL);
}
Features:
Best for: Windows high-performance servers
| Feature | select | poll | epoll | kqueue | IOCP |
|---|---|---|---|---|---|
| Platform | Unix/Windows | Unix/Windows | Linux | BSD/macOS | Windows |
| FD limit | FD_SETSIZE | No limit | No limit | No limit | No limit |
| Performance | O(n) | O(n) | O(1) | O(1) | O(1) |
| Scalability | Poor | Moderate | Excellent | Excellent | Excellent |
| I/O Model | Sync | Sync | Sync | Sync | Async |
| Edge-triggered | No | No | Yes | Yes | N/A |
| Best for | <100 fds | <10K fds | >10K fds | >10K fds | Windows |
The Reactor pattern is a synchronous event demultiplexing and dispatching pattern. It waits for events to occur on multiple sources and dispatches them to appropriate handlers.
Core idea: "React to events as they arrive"
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Reactor Pattern β
βββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ β
β β Reactor β β
β β (main loop) β β
β ββββββββ¬ββββββββ β
β β β
β β register β
β β β
β ββββββββββββββββββββ β
β β Synchronous Eventβ β
β β Demultiplexer β β (select/epoll/kqueue) β
β β (waits for I/O) β β
β ββββββββ¬ββββββββββββ β
β β β
β β notify β
β β β
β ββββββββββββββββββββ β
β β Event Handlers β β
β β - AcceptHandler β β
β β - ReadHandler β β
β β - WriteHandler β β
β ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Flow:
1. Application registers event handlers with Reactor
2. Reactor uses demultiplexer (epoll/select) to wait
3. When event occurs, Reactor dispatches to handler
4. Handler processes event synchronously
5. Return to Reactor
1. Handle (File Descriptor)
class Handle:
"""Represents a resource (socket, file, etc.)"""
def __init__(self, fd):
self.fd = fd
def fileno(self):
return self.fd
2. Synchronous Event Demultiplexer
class SynchronousEventDemultiplexer:
"""Uses select/epoll to wait for events"""
def __init__(self):
self.epoll = select.epoll()
self.handlers = {}
def register(self, handle, event_type, handler):
"""Register interest in events on a handle"""
self.epoll.register(handle.fileno(), event_type)
self.handlers[handle.fileno()] = handler
def unregister(self, handle):
"""Remove handle from monitoring"""
self.epoll.unregister(handle.fileno())
del self.handlers[handle.fileno()]
def select(self, timeout=-1):
"""Wait for events and return ready handles"""
events = self.epoll.poll(timeout)
ready_handlers = []
for fd, event in events:
if fd in self.handlers:
ready_handlers.append((self.handlers[fd], event))
return ready_handlers
3. Event Handler (Abstract)
class EventHandler:
"""Abstract handler interface"""
def handle_event(self, handle, event_type):
"""Process the event - must be implemented by subclasses"""
raise NotImplementedError
def get_handle(self):
"""Return the handle this handler is associated with"""
raise NotImplementedError
4. Concrete Event Handlers
class AcceptHandler(EventHandler):
"""Handles new connection acceptance"""
def __init__(self, server_socket, reactor):
self.server_socket = server_socket
self.reactor = reactor
def handle_event(self, handle, event_type):
# Accept new connection
client_socket, addr = self.server_socket.accept()
client_socket.setblocking(False)
print(f"New connection from {addr}")
# Create handler for this client
handler = ReadHandler(client_socket, self.reactor)
# Register with reactor
self.reactor.register(
Handle(client_socket.fileno()),
select.EPOLLIN,
handler
)
def get_handle(self):
return Handle(self.server_socket.fileno())
class ReadHandler(EventHandler):
"""Handles reading from client"""
def __init__(self, socket, reactor):
self.socket = socket
self.reactor = reactor
self.buffer = b""
def handle_event(self, handle, event_type):
try:
# Read data from socket
data = self.socket.recv(4096)
if not data:
# Client closed connection
print("Client disconnected")
self.reactor.unregister(self.get_handle())
self.socket.close()
return
# Process data
self.buffer += data
print(f"Received: {data.decode('utf-8', errors='ignore')}")
# Echo back (switch to write handler)
write_handler = WriteHandler(self.socket, self.reactor, data)
self.reactor.modify(
self.get_handle(),
select.EPOLLOUT,
write_handler
)
except Exception as e:
print(f"Error reading: {e}")
self.reactor.unregister(self.get_handle())
self.socket.close()
def get_handle(self):
return Handle(self.socket.fileno())
class WriteHandler(EventHandler):
"""Handles writing to client"""
def __init__(self, socket, reactor, data):
self.socket = socket
self.reactor = reactor
self.data = data
self.sent = 0
def handle_event(self, handle, event_type):
try:
# Write data to socket
n = self.socket.send(self.data[self.sent:])
self.sent += n
if self.sent >= len(self.data):
# All data sent, switch back to read handler
read_handler = ReadHandler(self.socket, self.reactor)
self.reactor.modify(
self.get_handle(),
select.EPOLLIN,
read_handler
)
except Exception as e:
print(f"Error writing: {e}")
self.reactor.unregister(self.get_handle())
self.socket.close()
def get_handle(self):
return Handle(self.socket.fileno())
5. Reactor (Main Event Loop)
class Reactor:
"""The main reactor that manages the event loop"""
def __init__(self):
self.demux = SynchronousEventDemultiplexer()
self.running = False
def register(self, handle, event_type, handler):
"""Register a handler for events on a handle"""
self.demux.register(handle, event_type, handler)
def unregister(self, handle):
"""Unregister a handle"""
self.demux.unregister(handle)
def modify(self, handle, event_type, handler):
"""Modify handler for a handle"""
self.demux.unregister(handle)
self.demux.register(handle, event_type, handler)
def run(self):
"""Main event loop"""
self.running = True
while self.running:
# Wait for events (blocking)
ready_handlers = self.demux.select(timeout=1.0)
# Dispatch events to handlers
for handler, event_type in ready_handlers:
try:
handler.handle_event(handler.get_handle(), event_type)
except Exception as e:
print(f"Handler error: {e}")
def stop(self):
"""Stop the reactor"""
self.running = False
6. Complete Server Example
def main():
# Create server socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(128)
server.setblocking(False)
print("Server listening on port 8080")
# Create reactor
reactor = Reactor()
# Create accept handler
accept_handler = AcceptHandler(server, reactor)
# Register accept handler
reactor.register(
Handle(server.fileno()),
select.EPOLLIN,
accept_handler
)
# Run event loop
try:
reactor.run()
except KeyboardInterrupt:
print("\nShutting down...")
reactor.stop()
server.close()
if __name__ == "__main__":
main()
Synchronous:
Single-threaded (basic version):
Application reads/writes:
Flow:
1. Reactor waits (epoll_wait/select)
2. Event occurs β Reactor notified
3. Reactor dispatches to handler
4. Handler reads/writes synchronously
5. Handler returns to reactor
6. Reactor continues waiting
Multi-threaded Reactor:
class ThreadPoolReactor(Reactor):
"""Reactor with thread pool for handlers"""
def __init__(self, num_threads=4):
super().__init__()
self.thread_pool = ThreadPoolExecutor(max_workers=num_threads)
def run(self):
self.running = True
while self.running:
ready_handlers = self.demux.select(timeout=1.0)
for handler, event_type in ready_handlers:
# Submit handler to thread pool
self.thread_pool.submit(
handler.handle_event,
handler.get_handle(),
event_type
)
Multiple Reactors (one per core):
Main Reactor (accept only)
β
βββββββββ¬ββββββββ¬ββββββββ
β R1 β R2 β R3 β Worker Reactors
β (CPU1)β (CPU2)β (CPU3)β (handle connections)
βββββββββ΄ββββββββ΄ββββββββ
Benefits:
- Better CPU utilization
- Load balancing
- Cache locality
The Proactor pattern is an asynchronous event completion pattern. Instead of waiting for I/O readiness, it initiates I/O operations and is notified when they complete.
Core idea: "Initiate I/O, do other work, get notified when complete"
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Proactor Pattern β
βββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ β
β β Proactor β β
β β (main loop) β β
β ββββββββ¬ββββββββ β
β β β
β β initiate_async_op β
β β β
β ββββββββββββββββββββββββ β
β β Asynchronous β β
β β Operation Processor β β (OS/kernel does I/O)β
β β (OS performs I/O) β β
β ββββββββ¬ββββββββββββββββ β
β β β
β β completion notification β
β β β
β ββββββββββββββββββββ β
β βCompletion Handlerβ β
β β - ReadCompleted β β
β β - WriteCompletedβ β
β β - AcceptCompleteβ β
β ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Flow:
1. Application initiates async operation (aio_read)
2. OS/kernel performs I/O in background
3. Application continues other work
4. When I/O completes, OS notifies Proactor
5. Proactor dispatches to completion handler
6. Handler processes completed data
1. Asynchronous Operation
class AsyncOperation:
"""Represents an async I/O operation"""
def __init__(self, op_type, handle, buffer, handler):
self.op_type = op_type # READ, WRITE, ACCEPT
self.handle = handle
self.buffer = buffer
self.handler = handler
self.bytes_transferred = 0
self.error = None
2. Asynchronous Operation Processor (OS Layer)
class AsynchronousOperationProcessor:
"""
Simulates OS-level async I/O
In reality, this is done by:
- IOCP on Windows
- io_uring on Linux
- kqueue on BSD
"""
def __init__(self):
self.pending_ops = []
self.completion_queue = queue.Queue()
self.worker_threads = []
# Start worker threads to simulate async I/O
for _ in range(4):
thread = threading.Thread(target=self._worker, daemon=True)
thread.start()
self.worker_threads.append(thread)
def initiate_async_read(self, handle, buffer, handler):
"""Initiate async read operation"""
op = AsyncOperation('READ', handle, buffer, handler)
self.pending_ops.append(op)
return op
def initiate_async_write(self, handle, buffer, handler):
"""Initiate async write operation"""
op = AsyncOperation('WRITE', handle, buffer, handler)
self.pending_ops.append(op)
return op
def initiate_async_accept(self, handle, handler):
"""Initiate async accept operation"""
op = AsyncOperation('ACCEPT', handle, None, handler)
self.pending_ops.append(op)
return op
def _worker(self):
"""Worker thread that performs actual I/O"""
while True:
if not self.pending_ops:
time.sleep(0.001)
continue
op = self.pending_ops.pop(0)
try:
if op.op_type == 'READ':
# Perform blocking read (in background thread)
data = op.handle.recv(len(op.buffer))
op.buffer[:len(data)] = data
op.bytes_transferred = len(data)
elif op.op_type == 'WRITE':
# Perform blocking write
n = op.handle.send(op.buffer)
op.bytes_transferred = n
elif op.op_type == 'ACCEPT':
# Perform blocking accept
client, addr = op.handle.accept()
op.buffer = (client, addr)
op.bytes_transferred = 1
except Exception as e:
op.error = e
# Put completed operation in completion queue
self.completion_queue.put(op)
def get_completion(self, timeout=None):
"""Get next completed operation"""
try:
return self.completion_queue.get(timeout=timeout)
except queue.Empty:
return None
3. Completion Handler (Abstract)
class CompletionHandler:
"""Abstract completion handler interface"""
def handle_read_completion(self, bytes_transferred, buffer):
"""Called when async read completes"""
raise NotImplementedError
def handle_write_completion(self, bytes_transferred):
"""Called when async write completes"""
raise NotImplementedError
def handle_accept_completion(self, client_socket, addr):
"""Called when async accept completes"""
raise NotImplementedError
def handle_error(self, error):
"""Called when operation fails"""
print(f"Error: {error}")
4. Concrete Completion Handlers
class EchoServerHandler(CompletionHandler):
"""Handler for echo server connections"""
def __init__(self, socket, proactor):
self.socket = socket
self.proactor = proactor
self.read_buffer = bytearray(4096)
# Immediately initiate async read
self.proactor.initiate_async_read(
self.socket,
self.read_buffer,
self
)
def handle_read_completion(self, bytes_transferred, buffer):
"""Called when read completes"""
if bytes_transferred == 0:
# Connection closed
print("Client disconnected")
self.socket.close()
return
# Data received - echo it back
data = bytes(buffer[:bytes_transferred])
print(f"Received: {data.decode('utf-8', errors='ignore')}")
# Initiate async write
self.proactor.initiate_async_write(
self.socket,
data,
self
)
def handle_write_completion(self, bytes_transferred):
"""Called when write completes"""
print(f"Sent {bytes_transferred} bytes")
# Continue reading
self.proactor.initiate_async_read(
self.socket,
self.read_buffer,
self
)
class AcceptCompletionHandler(CompletionHandler):
"""Handler for accepting new connections"""
def __init__(self, server_socket, proactor):
self.server_socket = server_socket
self.proactor = proactor
# Immediately initiate async accept
self.proactor.initiate_async_accept(
self.server_socket,
self
)
def handle_accept_completion(self, client_socket, addr):
"""Called when accept completes"""
print(f"New connection from {addr}")
# Create handler for this client
client_handler = EchoServerHandler(client_socket, self.proactor)
# Initiate next accept
self.proactor.initiate_async_accept(
self.server_socket,
self
)
5. Proactor (Main Event Loop)
class Proactor:
"""The proactor that manages async operations"""
def __init__(self):
self.async_processor = AsynchronousOperationProcessor()
self.running = False
def initiate_async_read(self, handle, buffer, handler):
"""Initiate async read operation"""
self.async_processor.initiate_async_read(handle, buffer, handler)
def initiate_async_write(self, handle, buffer, handler):
"""Initiate async write operation"""
self.async_processor.initiate_async_write(handle, buffer, handler)
def initiate_async_accept(self, handle, handler):
"""Initiate async accept operation"""
self.async_processor.initiate_async_accept(handle, handler)
def run(self):
"""Main event loop - waits for completions"""
self.running = True
while self.running:
# Wait for completed operation
completed_op = self.async_processor.get_completion(timeout=1.0)
if completed_op is None:
continue
# Dispatch to completion handler
try:
handler = completed_op.handler
if completed_op.error:
handler.handle_error(completed_op.error)
elif completed_op.op_type == 'READ':
handler.handle_read_completion(
completed_op.bytes_transferred,
completed_op.buffer
)
elif completed_op.op_type == 'WRITE':
handler.handle_write_completion(
completed_op.bytes_transferred
)
elif completed_op.op_type == 'ACCEPT':
client, addr = completed_op.buffer
handler.handle_accept_completion(client, addr)
except Exception as e:
print(f"Handler error: {e}")
def stop(self):
"""Stop the proactor"""
self.running = False
6. Complete Server Example
def main():
# Create server socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(128)
print("Server listening on port 8080")
# Create proactor
proactor = Proactor()
# Create accept handler
accept_handler = AcceptCompletionHandler(server, proactor)
# Run event loop
try:
proactor.run()
except KeyboardInterrupt:
print("\nShutting down...")
proactor.stop()
server.close()
if __name__ == "__main__":
main()
Asynchronous:
OS does the work:
Completion-based:
Flow:
1. Application initiates async_read()
2. OS starts DMA transfer in background
3. Application continues doing other work
4. OS completes transfer
5. OS notifies Proactor
6. Proactor calls completion handler
7. Handler processes already-transferred data
REACTOR PATTERN PROACTOR PATTERN
βββββββββββββββββ βββββββββββββββββ
βββββββββββββββββββ βββββββββββββββββββ
β Application β β Application β
ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ
β β
β 1. register β 1. initiate_async_read
β β
βββββββββββββββββββ βββββββββββββββββββ
β Reactor β β Proactor β
β (event loop) β β (event loop) β
ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ
β β
β 2. wait β 2. continue work
β β
βββββββββββββββββββ β
β select/epoll β β
β (wait for read) β β
ββββββββββ¬βββββββββ β
β β
β 3. readable β
β β
βββββββββββββββββββ βββββββββββββββββββ
β Application β β OS β
β reads data β β reads data β
β (sync read) β β (async DMA) β
ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ
β β
β 4. process β 3. completion
β β
βββββββββββββββββββ βββββββββββββββββββ
β Handler β β Handler β
β process(data) β β process(data) β
βββββββββββββββββββ βββββββββββββββββββ
| Aspect | Reactor | Proactor |
|---|---|---|
| I/O Model | Synchronous non-blocking | Asynchronous |
| Who reads? | Application | OS/Kernel |
| Notification | "Ready to read" | "Read completed" |
| Data location | Still in kernel buffer | Already in app buffer |
| Handler sees | Socket ready for I/O | Completed data |
| Blocking | Handler may block on I/O | Handler never blocks |
| OS Support | select/poll/epoll/kqueue | IOCP/io_uring |
| Complexity | Simpler | More complex |
| Performance | Good | Better (less context switch) |
| Portability | High (Unix-like systems) | Lower (OS-specific) |
REACTOR - Handler reads:
class ReactorReadHandler:
def handle_event(self, socket, event):
# Socket is ready - NOW we read
data = socket.recv(4096) # Synchronous read
if not data:
# Connection closed
self.close()
return
# Process data
self.process(data)
# Maybe write response
socket.send(response) # Synchronous write
PROACTOR - Handler receives completed data:
class ProactorReadHandler:
def __init__(self, socket, proactor):
# Immediately initiate async read
buffer = bytearray(4096)
proactor.async_read(socket, buffer, self)
def handle_read_complete(self, bytes_transferred, buffer):
# Data already read by OS into buffer!
data = buffer[:bytes_transferred]
if bytes_transferred == 0:
# Connection closed
self.close()
return
# Process data
self.process(data)
# Initiate async write
proactor.async_write(socket, response, self)
def handle_write_complete(self, bytes_transferred):
# Write completed, initiate next read
proactor.async_read(socket, buffer, self)
Reactor is better when:
Proactor is better when:
Reactor:
Timeline:
βββββββββββββββββββββββββββββββββββββββββββββ
t0: epoll_wait() [blocked]
t1: epoll returns [ready]
t2: read() system call [context switch]
t3: data copied [context switch back]
t4: process data
βββββββββββββββββββββββββββββββββββββββββββββ
Total: 2 context switches per operation
Proactor:
Timeline:
βββββββββββββββββββββββββββββββββββββββββββββ
t0: initiate async_read()
t1: continue application work
t2: OS does DMA transfer [background]
t3: completion notification
t4: process data (already in buffer)
βββββββββββββββββββββββββββββββββββββββββββββ
Total: 1 context switch, data ready
Reactor Pattern Used By:
Proactor Pattern Used By:
import select
import socket
import errno
from collections import defaultdict
class Reactor:
"""Production-quality Reactor implementation"""
def __init__(self):
# Use epoll on Linux, kqueue on BSD/macOS, poll fallback
if hasattr(select, 'epoll'):
self.poller = select.epoll()
self.poller_type = 'epoll'
elif hasattr(select, 'kqueue'):
self.poller = select.kqueue()
self.poller_type = 'kqueue'
else:
self.poller = select.poll()
self.poller_type = 'poll'
self.handlers = {} # fd -> handler
self.running = False
def register(self, fd, events, handler):
"""Register file descriptor for events"""
self.handlers[fd] = handler
if self.poller_type == 'epoll':
self.poller.register(fd, events)
elif self.poller_type == 'kqueue':
# kqueue registration different
filters = []
if events & select.POLLIN:
filters.append(
select.kevent(fd, filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD)
)
if events & select.POLLOUT:
filters.append(
select.kevent(fd, filter=select.KQ_FILTER_WRITE,
flags=select.KQ_EV_ADD)
)
self.poller.control(filters, 0)
else:
self.poller.register(fd, events)
def unregister(self, fd):
"""Unregister file descriptor"""
if fd in self.handlers:
del self.handlers[fd]
try:
if self.poller_type == 'epoll':
self.poller.unregister(fd)
elif self.poller_type == 'kqueue':
# Remove from kqueue
self.poller.control([
select.kevent(fd, filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_DELETE),
select.kevent(fd, filter=select.KQ_FILTER_WRITE,
flags=select.KQ_EV_DELETE)
], 0)
else:
self.poller.unregister(fd)
except (OSError, ValueError):
# Already unregistered
pass
def modify(self, fd, events, handler):
"""Modify events for file descriptor"""
self.handlers[fd] = handler
if self.poller_type == 'epoll':
self.poller.modify(fd, events)
else:
# For poll/kqueue, unregister and re-register
self.unregister(fd)
self.register(fd, events, handler)
def run(self):
"""Main event loop"""
self.running = True
while self.running:
try:
# Wait for events
if self.poller_type == 'epoll':
events = self.poller.poll(timeout=1.0)
elif self.poller_type == 'kqueue':
events = self.poller.control(None, 10, 1.0)
# Convert kqueue events to (fd, event) tuples
events = [(e.ident, self._kqueue_to_poll_event(e))
for e in events]
else:
events = self.poller.poll(1000) # ms
# Dispatch events
for fd, event in events:
if fd in self.handlers:
try:
self.handlers[fd].handle_event(fd, event)
except Exception as e:
print(f"Handler error: {e}")
self.unregister(fd)
except KeyboardInterrupt:
break
except Exception as e:
print(f"Event loop error: {e}")
def stop(self):
"""Stop the reactor"""
self.running = False
def _kqueue_to_poll_event(self, kevent):
"""Convert kqueue event to poll event"""
event = 0
if kevent.filter == select.KQ_FILTER_READ:
event |= select.POLLIN
if kevent.filter == select.KQ_FILTER_WRITE:
event |= select.POLLOUT
return event
class ConnectionHandler:
"""Handler for client connections"""
def __init__(self, client_socket, reactor):
self.socket = client_socket
self.reactor = reactor
self.read_buffer = bytearray()
self.write_buffer = bytearray()
# Set non-blocking
self.socket.setblocking(False)
# Register for read events
self.reactor.register(
self.socket.fileno(),
select.POLLIN,
self
)
def handle_event(self, fd, event):
"""Handle I/O events"""
try:
if event & select.POLLIN:
self._handle_read()
if event & select.POLLOUT:
self._handle_write()
if event & (select.POLLERR | select.POLLHUP):
self._close()
except Exception as e:
print(f"Connection error: {e}")
self._close()
def _handle_read(self):
"""Handle read event"""
try:
while True:
data = self.socket.recv(4096)
if not data:
# EOF - connection closed
self._close()
return
# Process received data
self.read_buffer.extend(data)
# Process complete messages
self._process_buffer()
except socket.error as e:
if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK):
raise
def _handle_write(self):
"""Handle write event"""
if not self.write_buffer:
# Nothing to write, switch to read-only
self.reactor.modify(
self.socket.fileno(),
select.POLLIN,
self
)
return
try:
sent = self.socket.send(self.write_buffer)
self.write_buffer = self.write_buffer[sent:]
if not self.write_buffer:
# All data sent, back to read-only
self.reactor.modify(
self.socket.fileno(),
select.POLLIN,
self
)
except socket.error as e:
if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK):
raise
def _process_buffer(self):
"""Process received data"""
# Echo back (simple example)
if self.read_buffer:
self.write_buffer.extend(self.read_buffer)
self.read_buffer.clear()
# Switch to write mode
self.reactor.modify(
self.socket.fileno(),
select.POLLIN | select.POLLOUT,
self
)
def _close(self):
"""Close connection"""
self.reactor.unregister(self.socket.fileno())
self.socket.close()
class AcceptHandler:
"""Handler for accepting connections"""
def __init__(self, server_socket, reactor):
self.socket = server_socket
self.reactor = reactor
def handle_event(self, fd, event):
"""Handle accept events"""
try:
while True:
client, addr = self.socket.accept()
print(f"Connection from {addr}")
# Create handler for client
ConnectionHandler(client, self.reactor)
except socket.error as e:
if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK):
raise
def reactor_server(port=8080):
"""Start reactor-based server"""
# Create server socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', port))
server.listen(128)
server.setblocking(False)
print(f"Reactor server listening on port {port}")
# Create reactor
reactor = Reactor()
# Register accept handler
accept_handler = AcceptHandler(server, reactor)
reactor.register(
server.fileno(),
select.POLLIN,
accept_handler
)
# Run
try:
reactor.run()
finally:
server.close()
import threading
import queue
import socket
class AsyncOperation:
"""Represents an async I/O operation"""
READ = 1
WRITE = 2
ACCEPT = 3
CONNECT = 4
def __init__(self, op_type, handle, buffer, completion_handler):
self.op_type = op_type
self.handle = handle
self.buffer = buffer
self.completion_handler = completion_handler
self.bytes_transferred = 0
self.error = None
self.result_data = None
class AsyncOperationProcessor:
"""Simulates OS-level async I/O (like IOCP)"""
def __init__(self, num_threads=4):
self.pending_queue = queue.Queue()
self.completion_queue = queue.Queue()
self.workers = []
# Start worker threads
for i in range(num_threads):
worker = threading.Thread(
target=self._worker_thread,
args=(i,),
daemon=True
)
worker.start()
self.workers.append(worker)
def submit(self, operation):
"""Submit async operation"""
self.pending_queue.put(operation)
def get_completion(self, timeout=None):
"""Get completed operation"""
try:
return self.completion_queue.get(timeout=timeout)
except queue.Empty:
return None
def _worker_thread(self, worker_id):
"""Worker thread that performs I/O"""
while True:
try:
op = self.pending_queue.get()
# Perform operation based on type
if op.op_type == AsyncOperation.READ:
self._do_read(op)
elif op.op_type == AsyncOperation.WRITE:
self._do_write(op)
elif op.op_type == AsyncOperation.ACCEPT:
self._do_accept(op)
elif op.op_type == AsyncOperation.CONNECT:
self._do_connect(op)
# Put in completion queue
self.completion_queue.put(op)
except Exception as e:
if op:
op.error = e
self.completion_queue.put(op)
def _do_read(self, op):
"""Perform blocking read"""
try:
# This blocks in worker thread
data = op.handle.recv(len(op.buffer))
op.bytes_transferred = len(data)
op.buffer[:len(data)] = data
except Exception as e:
op.error = e
def _do_write(self, op):
"""Perform blocking write"""
try:
sent = op.handle.send(op.buffer)
op.bytes_transferred = sent
except Exception as e:
op.error = e
def _do_accept(self, op):
"""Perform blocking accept"""
try:
client, addr = op.handle.accept()
op.result_data = (client, addr)
op.bytes_transferred = 1
except Exception as e:
op.error = e
def _do_connect(self, op):
"""Perform blocking connect"""
try:
op.handle.connect(op.buffer) # buffer = address
op.bytes_transferred = 1
except Exception as e:
op.error = e
class Proactor:
"""Proactor event loop"""
def __init__(self, num_threads=4):
self.async_processor = AsyncOperationProcessor(num_threads)
self.running = False
def async_read(self, handle, buffer, completion_handler):
"""Initiate async read"""
op = AsyncOperation(
AsyncOperation.READ,
handle,
buffer,
completion_handler
)
self.async_processor.submit(op)
def async_write(self, handle, buffer, completion_handler):
"""Initiate async write"""
op = AsyncOperation(
AsyncOperation.WRITE,
handle,
buffer,
completion_handler
)
self.async_processor.submit(op)
def async_accept(self, handle, completion_handler):
"""Initiate async accept"""
op = AsyncOperation(
AsyncOperation.ACCEPT,
handle,
None,
completion_handler
)
self.async_processor.submit(op)
def run(self):
"""Main proactor loop"""
self.running = True
while self.running:
# Wait for completion
completed = self.async_processor.get_completion(timeout=1.0)
if completed is None:
continue
# Dispatch to completion handler
try:
handler = completed.completion_handler
if completed.error:
handler.handle_error(completed.error)
elif completed.op_type == AsyncOperation.READ:
handler.handle_read_complete(
completed.bytes_transferred,
completed.buffer
)
elif completed.op_type == AsyncOperation.WRITE:
handler.handle_write_complete(
completed.bytes_transferred
)
elif completed.op_type == AsyncOperation.ACCEPT:
client, addr = completed.result_data
handler.handle_accept_complete(client, addr)
except Exception as e:
print(f"Completion handler error: {e}")
def stop(self):
"""Stop proactor"""
self.running = False
class ProactorConnectionHandler:
"""Completion handler for client connections"""
def __init__(self, client_socket, proactor):
self.socket = client_socket
self.proactor = proactor
self.read_buffer = bytearray(4096)
# Initiate first async read
self.proactor.async_read(
self.socket,
self.read_buffer,
self
)
def handle_read_complete(self, bytes_transferred, buffer):
"""Called when async read completes"""
if bytes_transferred == 0:
# Connection closed
print("Client disconnected")
self.socket.close()
return
# Data received - echo it back
data = bytes(buffer[:bytes_transferred])
print(f"Received {bytes_transferred} bytes")
# Initiate async write
self.proactor.async_write(
self.socket,
data,
self
)
def handle_write_complete(self, bytes_transferred):
"""Called when async write completes"""
print(f"Sent {bytes_transferred} bytes")
# Initiate next read
self.proactor.async_read(
self.socket,
self.read_buffer,
self
)
def handle_error(self, error):
"""Called on error"""
print(f"Connection error: {error}")
self.socket.close()
class ProactorAcceptHandler:
"""Completion handler for accepting connections"""
def __init__(self, server_socket, proactor):
self.socket = server_socket
self.proactor = proactor
# Initiate first async accept
self.proactor.async_accept(
self.socket,
self
)
def handle_accept_complete(self, client_socket, addr):
"""Called when async accept completes"""
print(f"Connection from {addr}")
# Create handler for client
ProactorConnectionHandler(client_socket, self.proactor)
# Initiate next accept
self.proactor.async_accept(
self.socket,
self
)
def handle_error(self, error):
"""Called on error"""
print(f"Accept error: {error}")
def proactor_server(port=8080):
"""Start proactor-based server"""
# Create server socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', port))
server.listen(128)
print(f"Proactor server listening on port {port}")
# Create proactor
proactor = Proactor(num_threads=4)
# Create accept handler
accept_handler = ProactorAcceptHandler(server, proactor)
# Run
try:
proactor.run()
finally:
server.close()
Reactor (epoll-based):
Metrics (10,000 concurrent connections):
βββββββββββββββββββββββββββββββββββββββββ
Throughput: ~50,000 requests/sec
Latency (avg): ~2ms
Latency (p99): ~10ms
CPU usage: 60-70%
Context switches: High (2 per I/O op)
Proactor (IOCP/io_uring):
Metrics (10,000 concurrent connections):
βββββββββββββββββββββββββββββββββββββββββ
Throughput: ~80,000 requests/sec
Latency (avg): ~1.2ms
Latency (p99): ~6ms
CPU usage: 50-60%
Context switches: Lower (1 per I/O op)
Single-threaded Reactor:
Limitations:
- Single core utilization
- Handler blocks event loop
- Not suitable for CPU-intensive work
Good for:
- I/O-bound workloads
- Simple applications
- Low connection counts (<1000)
Multi-threaded Reactor:
Pattern:
ββββββββββββββββ
β Main Reactor β (accept only)
ββββββββ¬ββββββββ
β
βββββββββ¬βββ΄βββββ¬ββββββββ
β β β β
R1 R2 R3 R4 (worker reactors)
Scales to: ~100K connections
Reactor + Thread Pool:
Pattern:
ββββββββββββββββ
β Reactor β
ββββββββ¬ββββββββ
β
β
ββββββββββββββββ
β Thread Pool β (for heavy processing)
ββββββββββββββββ
Best for: Mixed I/O and CPU work
Reactor:
Per connection:
- Socket buffer: ~8KB (kernel)
- App buffer: ~4KB
- Handler object: ~1KB
Total: ~13KB per connection
10K connections = ~130MB
Proactor:
Per connection:
- Socket buffer: ~8KB (kernel)
- App buffer: ~8KB (pre-allocated)
- Handler object: ~1KB
- Operation state: ~500B
Total: ~17.5KB per connection
10K connections = ~175MB
System call overhead:
Reactor (per request):
- epoll_wait: 1 syscall
- read: 1 syscall
- write: 1 syscall
Total: 3 syscalls
Proactor (per request):
- GetQueuedCompletionStatus: 1 syscall
- (read/write done by OS)
Total: 1 syscall
Savings: 66% reduction in syscalls
class HTTPRequest:
"""Parse HTTP request"""
def __init__(self, data):
lines = data.decode('utf-8').split('\r\n')
request_line = lines[0].split()
self.method = request_line[0]
self.path = request_line[1]
self.headers = {}
for line in lines[1:]:
if ':' in line:
key, value = line.split(':', 1)
self.headers[key.strip()] = value.strip()
class HTTPHandler:
"""HTTP request handler using Reactor"""
def __init__(self, socket, reactor):
self.socket = socket
self.reactor = reactor
self.buffer = bytearray()
self.reactor.register(
self.socket.fileno(),
select.POLLIN,
self
)
def handle_event(self, fd, event):
if event & select.POLLIN:
self._handle_read()
elif event & select.POLLOUT:
self._handle_write()
def _handle_read(self):
data = self.socket.recv(4096)
if not data:
self._close()
return
self.buffer.extend(data)
# Check if we have complete request
if b'\r\n\r\n' in self.buffer:
request = HTTPRequest(self.buffer)
response = self._process_request(request)
# Switch to write mode
self.write_buffer = response
self.reactor.modify(
self.socket.fileno(),
select.POLLOUT,
self
)
def _process_request(self, request):
"""Process HTTP request"""
if request.path == '/':
body = b"<h1>Hello, World!</h1>"
else:
body = b"<h1>Not Found</h1>"
response = (
b"HTTP/1.1 200 OK\r\n"
b"Content-Length: " + str(len(body)).encode() + b"\r\n"
b"Content-Type: text/html\r\n"
b"\r\n" + body
)
return response
def _handle_write(self):
sent = self.socket.send(self.write_buffer)
self.write_buffer = self.write_buffer[sent:]
if not self.write_buffer:
# Response sent, close connection
self._close()
def _close(self):
self.reactor.unregister(self.socket.fileno())
self.socket.close()
class DatabaseQuery:
"""Async database query"""
def __init__(self, sql, params, callback):
self.sql = sql
self.params = params
self.callback = callback
class DatabaseConnectionPool:
"""Async database connection pool"""
def __init__(self, proactor, num_connections=10):
self.proactor = proactor
self.connections = queue.Queue()
self.pending_queries = queue.Queue()
# Create connections
for _ in range(num_connections):
conn = self._create_connection()
self.connections.put(conn)
# Start query processor
threading.Thread(
target=self._process_queries,
daemon=True
).start()
def execute_async(self, sql, params, callback):
"""Execute query asynchronously"""
query = DatabaseQuery(sql, params, callback)
self.pending_queries.put(query)
def _process_queries(self):
"""Process queries in background"""
while True:
query = self.pending_queries.get()
conn = self.connections.get()
try:
# Execute query
result = conn.execute(query.sql, query.params)
# Call completion callback
query.callback(result, None)
except Exception as e:
query.callback(None, e)
finally:
self.connections.put(conn)
1. Keep handlers fast:
# Bad - blocks event loop
def handle_event(self, fd, event):
data = self.socket.recv(4096)
result = expensive_computation(data) # BLOCKS!
self.socket.send(result)
# Good - offload to thread pool
def handle_event(self, fd, event):
data = self.socket.recv(4096)
self.thread_pool.submit(
self._process_in_background,
data
)
2. Handle partial reads/writes:
# Good - handle EAGAIN
def _handle_write(self):
while self.write_buffer:
try:
sent = self.socket.send(self.write_buffer)
self.write_buffer = self.write_buffer[sent:]
except socket.error as e:
if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
# Would block - wait for next event
break
raise
3. Use edge-triggered mode carefully:
# Edge-triggered: must read ALL data
def handle_event(self, fd, event):
while True:
try:
data = self.socket.recv(4096)
if not data:
break
self.buffer.extend(data)
except socket.error as e:
if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
break # No more data
raise
1. Pre-allocate buffers:
# Good - reuse buffers
class ConnectionHandler:
def __init__(self, socket, proactor):
self.socket = socket
self.proactor = proactor
self.read_buffer = bytearray(4096) # Pre-allocated
# Start reading
self._initiate_read()
def _initiate_read(self):
# Reuse same buffer
self.proactor.async_read(
self.socket,
self.read_buffer,
self
)
2. Chain operations:
# Good - chain async operations
def handle_read_complete(self, bytes_transferred, buffer):
# Process data
response = self.process(buffer[:bytes_transferred])
# Immediately initiate write
self.proactor.async_write(
self.socket,
response,
self
)
def handle_write_complete(self, bytes_transferred):
# Immediately initiate next read
self._initiate_read()
3. Handle errors gracefully:
def handle_error(self, error):
"""Handle operation errors"""
if isinstance(error, ConnectionResetError):
# Client disconnected - cleanup
self.socket.close()
elif isinstance(error, TimeoutError):
# Timeout - retry or close
self._handle_timeout()
else:
# Unexpected error - log and close
log.error(f"Unexpected error: {error}")
self.socket.close()
1. Set socket options:
# Optimize socket performance
socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
socket.setblocking(False)
2. Monitor performance:
import time
class MonitoredReactor(Reactor):
def run(self):
self.running = True
while self.running:
start = time.time()
events = self.poller.poll(timeout=1.0)
poll_time = time.time() - start
# Track metrics
self.metrics['poll_time'].append(poll_time)
self.metrics['events'].append(len(events))
# Dispatch events
for fd, event in events:
handler_start = time.time()
self.handlers[fd].handle_event(fd, event)
handler_time = time.time() - handler_start
# Warn if handler is slow
if handler_time > 0.1:
log.warning(f"Slow handler: {handler_time}s")
3. Implement graceful shutdown:
def shutdown_gracefully(self):
"""Graceful shutdown"""
# Stop accepting new connections
self.reactor.unregister(self.server_socket.fileno())
self.server_socket.close()
# Wait for existing connections to finish (with timeout)
timeout = time.time() + 30 # 30 second timeout
while self.active_connections and time.time() < timeout:
time.sleep(0.1)
# Force close remaining connections
for conn in self.active_connections:
conn.close()
# Stop reactor
self.reactor.stop()
I/O multiplexing patterns are fundamental to high-performance network programming:
Reactor Pattern:
Proactor Pattern:
Choose based on:
Both patterns enable building scalable servers that handle thousands of concurrent connections efficiently.
Further Reading: