"""Run asyncore/asynchat dispatchers in the GTK main event loop.
This module integrates asyncore/asynchat channels with the GTK main loop, by
automatically adding and maintaining gobject.io_add_watch():es.
Usage: asyncore.socket_map = AutoWatch(asyncore.socket_map)
"""
import asyncore
import UserDict
import gobject
# Make sure asyncore.readwrite() will work properly
import select
for mask in 'IN PRI OUT ERR HUP NVAL'.split():
assert getattr(gobject, 'IO_' + mask) == getattr(select, 'POLL' + mask)
del select, mask
IO_PRIORITY = gobject.PRIORITY_DEFAULT_IDLE
class AutoWatch(UserDict.DictMixin):
def __init__(self, mapping=None, **kwargs):
# socket -> (source_id, mask) map, for io_add_watch and source_remove
self._watch_map = dict()
# fd -> socket map, for the interface
self._fd_map = dict()
if mapping is not None:
self.update(mapping)
if kwargs:
self.update(kwargs)
def __getitem__(self, fd):
return self._fd_map[fd]
def __setitem__(self, fd, obj):
if fd in self._fd_map:
self._remove_watch(self._fd_map[fd])
source_id = gobject.idle_add(self._add_watch, obj,
priority=IO_PRIORITY)
self._watch_map[obj] = (source_id, 'idle_add')
self._fd_map[fd] = obj
def __delitem__(self, fd):
self._remove_watch(self._fd_map.pop(fd))
def __contains__(self, fd):
return fd in self._fd_map
def __iter__(self):
return iter(self._fd_map)
def keys(self):
return self._fd_map.keys()
def iteritems(self):
return self._fd_map.iteritems()
def _add_watch(self, obj):
mask = self._get_mask(obj)
if mask:
source_id = gobject.io_add_watch(obj, mask, self._handle_io,
priority=IO_PRIORITY)
self._watch_map[obj] = (source_id, mask)
return False
# This should be exceptional. The channel is still open, but is neither
# readable nor writable. Retry until it is, but with a timeout_add() to
# preserve CPU.
if self._watch_map[obj][1] == 'idle_add':
source_id = gobject.timeout_add(200, self._add_watch, obj,
priority=gobject.PRIORITY_LOW)
self._watch_map[obj] = (source_id, 'timeout_add')
return False
return True
def _remove_watch(self, obj):
gobject.source_remove(self._watch_map.pop(obj)[0])
def _get_mask(self, obj):
mask = 0
if obj.readable():
mask |= gobject.IO_IN | gobject.IO_PRI
if obj.writable():
mask |= gobject.IO_OUT
# Only watch for errors if the socket is either readable or writable
if mask:
mask |= gobject.IO_ERR | gobject.IO_HUP | gobject.IO_NVAL
return mask
def _handle_io(self, obj, mask):
asyncore.readwrite(obj, mask)
# Make sure objects removed during the readwrite() aren't re-added
if obj._fileno not in self._fd_map:
return False
# If read-/writability has changed, change watch mask
if self._get_mask(obj) != self._watch_map[obj][1]:
source_id = gobject.idle_add(self._add_watch, obj,
priority=IO_PRIORITY)
self._watch_map[obj] = (source_id, 'idle_add')
return False
return True