import collections
import errno
import functools
import select
import socket
'''Asynchronous socket service inspired by the basic design of Boost ASIO.
This service currently supports TCP sockets only, and supports asynchronous
versions of common client operations (connect, read, write) and server operations
(accept).
This implementation supports the use of select, poll, epoll, or kqueue as the
underlying poll system call.
Aaron Riekenberg
aaron.riekenberg@gmail.com
'''
class AsyncException(Exception):
def __init__(self, value):
super(AsyncException, self).__init__()
self.__value = value
def __str__(self):
return repr(self.__value)
class AsyncSocket(object):
'''Socket class supporting asynchronous operations.'''
def __init__(self, asyncIOService, sock = None):
super(AsyncSocket, self).__init__()
self.__asyncIOService = asyncIOService
self.__acceptCallback = None
self.__connectCallback = None
self.__readCallback = None
self.__writeAllCallback = None
self.__writeBuffer = b''
self.__maxReadBytes = 0
self.__closed = False
if sock:
self.__socket = sock
else:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket.setblocking(0)
asyncIOService.addAsyncSocket(self)
def __str__(self):
return ('AsyncSocket [ fileno = %d ]' % self.fileno())
def getsockname(self):
return self.__socket.getsockname()
def getpeername(self):
return self.__socket.getpeername()
def closed(self):
return self.__closed
def getSocket(self):
return self.__socket
def fileno(self):
return self.__socket.fileno()
def setReuseAddress(self):
self.__socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
def listen(self, backlog = socket.SOMAXCONN):
self.__socket.listen(backlog)
def bind(self, addr):
self.__socket.bind(addr)
def asyncConnect(self, address, callback):
if (self.__acceptCallback):
raise AsyncException('Accept already in progress')
if (self.__connectCallback):
raise AsyncException('Connect already in progress')
if (self.__readCallback):
raise AsyncException('Read already in progress')
if (self.__writeAllCallback):
raise AsyncException('Write all already in progress')
if (self.__closed):
raise AsyncException('AsyncSocket closed')
err = self.__socket.connect_ex(address)
if err in (errno.EINPROGRESS, errno.EWOULDBLOCK):
self.__connectCallback = callback
self.__asyncIOService.registerAsyncSocketForWrite(self)
else:
self.__asyncIOService.invokeLater(
functools.partial(callback, err = err))
def asyncAccept(self, callback):
if (self.__acceptCallback):
raise AsyncException('Accept already in progress')
if (self.__connectCallback):
raise AsyncException('Connect already in progress')
if (self.__readCallback):
raise AsyncException('Read already in progress')
if (self.__writeAllCallback):
raise AsyncException('Write all already in progress')
if (self.__closed):
raise AsyncException('AsyncSocket closed')
try:
(newSocket, addr) = self.__socket.accept()
asyncSocket = AsyncSocket(self.__asyncIOService, newSocket)
self.__asyncIOService.invokeLater(
functools.partial(callback, sock = asyncSocket, err = 0))
except socket.error as e:
if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
self.__acceptCallback = callback
self.__asyncIOService.registerAsyncSocketForRead(self)
else:
self.__asyncIOService.invokeLater(
functools.partial(callback, sock = None, err = e.args[0]))
def asyncRead(self, maxBytes, callback):
if (self.__acceptCallback):
raise AsyncException('Accept already in progress')
if (self.__connectCallback):
raise AsyncException('Connect already in progress')
if (self.__readCallback):
raise AsyncException('Read already in progress')
if (self.__closed):
raise AsyncException('AsyncSocket closed')
self.__maxReadBytes = maxBytes
try:
data = self.__socket.recv(self.__maxReadBytes)
self.__asyncIOService.invokeLater(
functools.partial(callback, data = data, err = 0))
except socket.error as e:
if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
self.__readCallback = callback
self.__asyncIOService.registerAsyncSocketForRead(self)
else:
self.__asyncIOService.invokeLater(
functools.partial(callback, data = data, err = e.args[0]))
def asyncWriteAll(self, data, callback):
if (self.__acceptCallback):
raise AsyncException('Accept already in progress')
if (self.__connectCallback):
raise AsyncException('Connect already in progress')
if (self.__writeAllCallback):
raise AsyncException('Write all already in progress')
if (self.__closed):
raise AsyncException('AsyncSocket closed')
self.__writeBuffer += data
writeWouldBlock = False
try:
bytesSent = self.__socket.send(self.__writeBuffer)
self.__writeBuffer = self.__writeBuffer[bytesSent:]
if (len(self.__writeBuffer) == 0):
self.__asyncIOService.invokeLater(
functools.partial(callback, err = 0))
else:
writeWouldBlock = True
except socket.error as e:
if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
writeWouldBlock = True
else:
self.__asyncIOService.invokeLater(
functools.partial(callback, err = e.args[0]))
if (writeWouldBlock):
self.__writeAllCallback = callback
self.__asyncIOService.registerAsyncSocketForWrite(self)
def close(self):
if self.__closed:
return
self.__asyncIOService.removeAsyncSocket(self)
self.__socket.close()
self.__closed = True
if self.__acceptCallback:
self.__asyncIOService.invokeLater(
functools.partial(self.__acceptCallback, sock = None, err = errno.EBADF))
self.__acceptCallback = None
if self.__connectCallback:
self.__asyncIOService.invokeLater(
functools.partial(self.__connectCallback, err = errno.EBADF))
self.__connectCallback = None
if self.__readCallback:
self.__asyncIOService.invokeLater(
functools.partial(self.__readCallback, data = None, err = errno.EBADF))
self.__readCallback = None
if self.__writeAllCallback:
self.__asyncIOService.invokeLater(
functools.partial(self.__writeAllCallback, err = errno.EBADF))
self.__writeAllCallback = None
def handleError(self):
err = self.__socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if (self.__connectCallback):
self.__asyncIOService.unregisterAsyncSocketForWrite(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__connectCallback, err = err))
self.__connectCallback = None
if (self.__acceptCallback):
self.__asyncIOService.unregisterAsyncSocketForRead(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__acceptCallback, sock = None, err = err))
self.__acceptCallback = None
if (self.__readCallback):
self.__asyncIOService.unregisterAsyncSocketForRead(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__readCallback, data = None, err = err))
self.__readCallback = None
if (self.__writeAllCallback):
self.__asyncIOService.unregisterAsyncSocketForWrite(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__writeAllCallback, err = err))
self.__writeAllCallback = None
def handleRead(self):
if (self.__acceptCallback):
try:
(newSocket, addr) = self.__socket.accept()
asyncSocket = AsyncSocket(self.__asyncIOService, newSocket)
self.__asyncIOService.unregisterAsyncSocketForRead(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__acceptCallback, sock = asyncSocket, err = 0))
self.__acceptCallback = None
except socket.error as e:
if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
pass
else:
self.__asyncIOService.unregisterAsyncSocketForRead(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__acceptCallback, sock = None, err = e.args[0]))
self.__acceptCallback = None
if (self.__readCallback):
try:
data = self.__socket.recv(self.__maxReadBytes)
self.__asyncIOService.unregisterAsyncSocketForRead(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__readCallback, data = data, err = 0))
self.__readCallback = None
except socket.error as e:
if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
pass
else:
self.__asyncIOService.unregisterAsyncSocketForRead(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__readCallback, data = None, err = e.args[0]))
self.__readCallback = None
def handleWrite(self):
if (self.__connectCallback):
err = self.__socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
self.__asyncIOService.unregisterAsyncSocketForWrite(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__connectCallback, err = err))
self.__connectCallback = None
if (self.__writeAllCallback):
try:
bytesSent = self.__socket.send(self.__writeBuffer)
self.__writeBuffer = self.__writeBuffer[bytesSent:]
if (len(self.__writeBuffer) == 0):
self.__asyncIOService.unregisterAsyncSocketForWrite(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__writeAllCallback, err = 0))
self.__writeAllCallback = None
except socket.error as e:
if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
pass
else:
self.__asyncIOService.unregisterAsyncSocketForWrite(self)
self.__asyncIOService.invokeLater(
functools.partial(self.__writeAllCallback, err = e.args[0]))
self.__writeAllCallback = None
class AsyncIOService(object):
'''Service used to poll asynchronous sockets.'''
def __init__(self):
self.__fdToAsyncSocket = {}
self.__fdsRegisteredForRead = set()
self.__fdsRegisteredForWrite = set()
self.__eventQueue = collections.deque()
def createAsyncSocket(self):
return AsyncSocket(asyncIOService = self)
def addAsyncSocket(self, asyncSocket):
self.__fdToAsyncSocket[asyncSocket.fileno()] = asyncSocket
def removeAsyncSocket(self, asyncSocket):
fileno = asyncSocket.fileno()
if fileno in self.__fdToAsyncSocket:
del self.__fdToAsyncSocket[fileno]
if ((fileno in self.__fdsRegisteredForRead) or
(fileno in self.__fdsRegisteredForWrite)):
self.unregisterForEvents(asyncSocket)
self.__fdsRegisteredForRead.discard(fileno)
self.__fdsRegisteredForWrite.discard(fileno)
def invokeLater(self, event):
self.__eventQueue.append(event)
def registerAsyncSocketForRead(self, asyncSocket):
fileno = asyncSocket.fileno()
if fileno not in self.__fdsRegisteredForRead:
if fileno in self.__fdsRegisteredForWrite:
self.modifyRegistrationForEvents(asyncSocket, readEvents = True, writeEvents = True)
else:
self.registerForEvents(asyncSocket, readEvents = True, writeEvents = False)
self.__fdsRegisteredForRead.add(fileno)
def unregisterAsyncSocketForRead(self, asyncSocket):
fileno = asyncSocket.fileno()
if fileno in self.__fdsRegisteredForRead:
if fileno in self.__fdsRegisteredForWrite:
self.modifyRegistrationForEvents(asyncSocket, readEvents = False, writeEvents = True)
else:
self.unregisterForEvents(asyncSocket)
self.__fdsRegisteredForRead.discard(fileno)
def registerAsyncSocketForWrite(self, asyncSocket):
fileno = asyncSocket.fileno()
if fileno not in self.__fdsRegisteredForWrite:
if fileno in self.__fdsRegisteredForRead:
self.modifyRegistrationForEvents(asyncSocket, readEvents = True, writeEvents = True)
else:
self.registerForEvents(asyncSocket, readEvents = False, writeEvents = True)
self.__fdsRegisteredForWrite.add(fileno)
def unregisterAsyncSocketForWrite(self, asyncSocket):
fileno = asyncSocket.fileno()
if fileno in self.__fdsRegisteredForWrite:
if fileno in self.__fdsRegisteredForRead:
self.modifyRegistrationForEvents(asyncSocket, readEvents = True, writeEvents = False)
else:
self.unregisterForEvents(asyncSocket)
self.__fdsRegisteredForWrite.discard(fileno)
def getReadFDSet(self):
return self.__fdsRegisteredForRead
def getWriteFDSet(self):
return self.__fdsRegisteredForWrite
def getNumFDs(self):
return len(self.__fdToAsyncSocket)
def registerForEvents(self, asyncSocket, readEvents, writeEvents):
raise NotImplementedError
def modifyRegistrationForEvents(self, asyncSocket, readEvents, writeEvents):
raise NotImplementedError
def unregisterForEvents(self, asyncSocket):
raise NotImplementedError
def doPoll(self, block):
raise NotImplementedError
def run(self):
while True:
# As we process events in self.__eventQueue, more events are likely
# to be added to it by invokeLater. We don't want to starve events
# coming in from doPoll, so we limit the number of events processed
# from self.__eventQueue to the initial size of the queue. After this if
# the queue is still not empty, set doPoll to be non blocking so we get
# back to processing events in the queue in a timely manner.
initialQueueLength = len(self.__eventQueue)
eventsProcessed = 0
while ((len(self.__eventQueue) > 0) and
(eventsProcessed < initialQueueLength)):
event = self.__eventQueue.popleft()
event()
eventsProcessed += 1
if ((len(self.__eventQueue) == 0) and
(len(self.__fdsRegisteredForRead) == 0) and
(len(self.__fdsRegisteredForWrite) == 0)):
break
block = True
if (len(self.__eventQueue) > 0):
block = False
self.doPoll(block = block)
def handleEventForFD(self, fd, readReady, writeReady, errorReady):
if fd in self.__fdToAsyncSocket:
asyncSocket = self.__fdToAsyncSocket[fd]
if (readReady):
asyncSocket.handleRead()
if (writeReady):
asyncSocket.handleWrite()
if (errorReady):
asyncSocket.handleError()
class EPollAsyncIOService(AsyncIOService):
def __init__(self):
super(EPollAsyncIOService, self).__init__()
self.__poller = select.epoll()
def __str__(self):
return ('EPollAsyncIOService [ fileno = %d ]' % self.__poller.fileno())
def registerForEvents(self, asyncSocket, readEvents, writeEvents):
fileno = asyncSocket.fileno()
eventMask = 0
if (readEvents):
eventMask |= select.EPOLLIN
if (writeEvents):
eventMask |= select.EPOLLOUT
self.__poller.register(fileno, eventMask)
def modifyRegistrationForEvents(self, asyncSocket, readEvents, writeEvents):
fileno = asyncSocket.fileno()
eventMask = 0
if (readEvents):
eventMask |= select.EPOLLIN
if (writeEvents):
eventMask |= select.EPOLLOUT
self.__poller.modify(fileno, eventMask)
def unregisterForEvents(self, asyncSocket):
fileno = asyncSocket.fileno()
self.__poller.unregister(fileno)
def doPoll(self, block):
readyList = self.__poller.poll(-1 if block else 0)
for (fd, eventMask) in readyList:
readReady = ((eventMask & select.EPOLLIN) != 0)
writeReady = ((eventMask & select.EPOLLOUT) != 0)
errorReady = ((eventMask &
(select.EPOLLERR | select.EPOLLHUP)) != 0)
self.handleEventForFD(fd = fd,
readReady = readReady,
writeReady = writeReady,
errorReady = errorReady)
class KQueueAsyncIOService(AsyncIOService):
def __init__(self):
super(KQueueAsyncIOService, self).__init__()
self.__kqueue = select.kqueue()
def __str__(self):
return ('KQueueAsyncIOService [ fileno = %d ]' % self.__kqueue.fileno())
def registerForEvents(self, asyncSocket, readEvents, writeEvents):
fileno = asyncSocket.fileno()
if readEvents:
readKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_READ,
flags = select.KQ_EV_ADD)
else:
readKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_READ,
flags = (select.KQ_EV_ADD | select.KQ_EV_DISABLE))
if writeEvents:
writeKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_WRITE,
flags = select.KQ_EV_ADD)
else:
writeKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_WRITE,
flags = (select.KQ_EV_ADD | select.KQ_EV_DISABLE))
# Should be able to put readKE and writeKE in a list in
# one call to kqueue.control, but this is broken due to Python issue 5910
self.__kqueue.control([readKE], 0, 0)
self.__kqueue.control([writeKE], 0, 0)
def modifyRegistrationForEvents(self, asyncSocket, readEvents, writeEvents):
fileno = asyncSocket.fileno()
if readEvents:
readKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_READ,
flags = select.KQ_EV_ENABLE)
else:
readKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_READ,
flags = select.KQ_EV_DISABLE)
if writeEvents:
writeKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_WRITE,
flags = select.KQ_EV_ENABLE)
else:
writeKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_WRITE,
flags = select.KQ_EV_DISABLE)
# Should be able to put readKE and writeKE in a list in
# one call to kqueue.control, but this is broken due to Python issue 5910
self.__kqueue.control([readKE], 0, 0)
self.__kqueue.control([writeKE], 0, 0)
def unregisterForEvents(self, asyncSocket):
fileno = asyncSocket.fileno()
readKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_READ,
flags = select.KQ_EV_DELETE)
writeKE = select.kevent(ident = fileno,
filter = select.KQ_FILTER_WRITE,
flags = select.KQ_EV_DELETE)
# Should be able to put readKE and writeKE in a list in
# one call to kqueue.control, but this is broken due to Python issue 5910
self.__kqueue.control([readKE], 0, 0)
self.__kqueue.control([writeKE], 0, 0)
def doPoll(self, block):
eventList = self.__kqueue.control(
None,
self.getNumFDs() * 2,
None if block else 0)
for ke in eventList:
fd = ke.ident
readReady = (ke.filter == select.KQ_FILTER_READ)
writeReady = (ke.filter == select.KQ_FILTER_WRITE)
errorReady = ((ke.flags & select.KQ_EV_EOF) != 0)
self.handleEventForFD(fd = fd,
readReady = readReady,
writeReady = writeReady,
errorReady = errorReady)
class PollAsyncIOService(AsyncIOService):
def __init__(self):
super(PollAsyncIOService, self).__init__()
self.__poller = select.poll()
def __str__(self):
return 'PollAsyncIOService'
def registerForEvents(self, asyncSocket, readEvents, writeEvents):
fileno = asyncSocket.fileno()
eventMask = 0
if (readEvents):
eventMask |= select.POLLIN
if (writeEvents):
eventMask |= select.POLLOUT
self.__poller.register(fileno, eventMask)
def modifyRegistrationForEvents(self, asyncSocket, readEvents, writeEvents):
fileno = asyncSocket.fileno()
eventMask = 0
if (readEvents):
eventMask |= select.POLLIN
if (writeEvents):
eventMask |= select.POLLOUT
self.__poller.modify(fileno, eventMask)
def unregisterForEvents(self, asyncSocket):
fileno = asyncSocket.fileno()
self.__poller.unregister(fileno)
def doPoll(self, block):
readyList = self.__poller.poll(None if block else 0)
for (fd, eventMask) in readyList:
readReady = ((eventMask & select.POLLIN) != 0)
writeReady = ((eventMask & select.POLLOUT) != 0)
errorReady = ((eventMask &
(select.POLLERR | select.POLLHUP | select.POLLNVAL)) != 0)
self.handleEventForFD(fd = fd,
readReady = readReady,
writeReady = writeReady,
errorReady = errorReady)
class SelectAsyncIOService(AsyncIOService):
def __init__(self):
super(SelectAsyncIOService, self).__init__()
def __str__(self):
return 'SelectAsyncIOService'
def registerForEvents(self, asyncSocket, readEvents, writeEvents):
pass
def modifyRegistrationForEvents(self, asyncSocket, readEvents, writeEvents):
pass
def unregisterForEvents(self, asyncSocket):
pass
def doPoll(self, block):
allFDSet = self.getReadFDSet() | self.getWriteFDSet()
(readList, writeList, exceptList) = \
select.select(self.getReadFDSet(), self.getWriteFDSet(), allFDSet,
None if block else 0)
for fd in allFDSet:
readReady = fd in readList
writeReady = fd in writeList
errorReady = fd in exceptList
if (readReady or writeReady or errorReady):
self.handleEventForFD(fd = fd,
readReady = readReady,
writeReady = writeReady,
errorReady = errorReady)
def createAsyncIOService(allow_epoll = True,
allow_kqueue = True,
allow_poll = True):
'''Create an AsyncIOService supported by the platform and parameters.'''
if (allow_epoll and hasattr(select, 'epoll')):
return EPollAsyncIOService()
elif (allow_kqueue and hasattr(select, 'kqueue')):
return KQueueAsyncIOService()
elif (allow_poll and hasattr(select, 'poll')):
return PollAsyncIOService()
else:
return SelectAsyncIOService()