This recipe is the core of something called FDR (Full Duplex RPC). FDR has not been completed at this time, but the "spots" module takes most of the work off of other modules in network chatter. This is the second version of "spots" and simplifies the ZSP class while getting rid of the base255 module.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | '''Module for Send Python Object Through Socket (SPOTS).
This module implements the Zero SPOTS Protocol, the Query/Reply Protocol,
the Query/Reply Interface, and a shortcut for constructing QRI objects.'''
################################################################################
__version__ = '$Revision: 0 $'
__date__ = 'March 27, 2007'
__author__ = 'Stephen "Zero" Chappell <my.bios@gmail.com>'
__credits__ = '''\
T. Parker, for testing code that led to this module.
B. Brown, for teaching me some math courses.
G. Rossum, for allowing thread support in Python.'''
################################################################################
import cPickle as _cPickle
import sys as _sys
import thread as _thread
import time as _time
################################################################################
class ZSP:
'ZSP(socket) -> ZSP'
def __init__(self, socket):
'Initialize the Zero SPOTS Protocol object.'
self.__file = socket.makefile('b', 0)
def send(self, obj):
'Send one object.'
_cPickle.dump(obj, self.__file, _cPickle.HIGHEST_PROTOCOL)
def recv(self):
'Receive one object.'
return _cPickle.load(self.__file)
################################################################################
class QRP:
'QRP(ZSP) -> QRP'
def __init__(self, ZSP):
'Initialize the Query/Reply Protocol object.'
self.__ZSP = ZSP
self.__error = None
self.__Q_anchor = []
self.__Q_packet = []
self.__R_anchor = {}
self.__Q_lock = _thread.allocate_lock()
self.__R_lock = _thread.allocate_lock()
_thread.start_new_thread(self.__thread, ())
def send_Q(self, ID, obj):
'Send one query.'
if self.__error:
raise self.__error
self.__ZSP.send((False, ID, obj))
def recv_Q(self, timeout=None):
'Receive one query.'
if self.__error:
raise self.__error
if timeout is not None:
if not isinstance(timeout, (float, int, long)):
raise TypeError, 'timeout must be of type float, int, or long'
if not timeout >= 0:
raise ValueError, 'timeout must be greater than or equal to 0'
self.__Q_lock.acquire()
try:
try:
if self.__Q_packet:
Q = True
ID, obj = self.__Q_packet.pop()
else:
Q = False
anchor = [_thread.allocate_lock()]
anchor[0].acquire()
self.__Q_anchor.append(anchor)
finally:
self.__Q_lock.release()
except AttributeError:
raise self.__error
if Q:
return ID, obj
if timeout:
_thread.start_new_thread(self.__Q_thread, (timeout, anchor))
anchor[0].acquire()
try:
Q = anchor[1]
except IndexError:
if self.__error:
raise self.__error
raise Warning
return Q
def send_R(self, ID, obj):
'Send one reply.'
if self.__error:
raise self.__error
self.__ZSP.send((True, ID, obj))
def recv_R(self, ID, timeout=None):
'Receive one reply.'
if self.__error:
raise self.__error
if timeout is not None:
if not isinstance(timeout, (float, int, long)):
raise TypeError, 'timeout must be of type float, int, or long'
if not timeout >= 0:
raise ValueError, 'timeout must be greater than or equal to 0'
anchor = [_thread.allocate_lock()]
anchor[0].acquire()
self.__R_lock.acquire()
try:
try:
self.__R_anchor[ID] = anchor
finally:
self.__R_lock.release()
except AttributeError:
raise self.__error
if timeout:
_thread.start_new_thread(self.__R_thread, (timeout, ID))
anchor[0].acquire()
try:
R = anchor[1]
except IndexError:
if self.__error:
raise self.__error
raise Warning
return R
def __thread(self):
'Private class method.'
try:
while True:
R, ID, obj = self.__ZSP.recv()
if R:
self.__R_lock.acquire()
if self.__R_anchor.has_key(ID):
self.__R_anchor[ID].append(obj)
self.__R_anchor[ID][0].release()
del self.__R_anchor[ID]
self.__R_lock.release()
else:
self.__Q_lock.acquire()
if self.__Q_anchor:
anchor = self.__Q_anchor.pop()
anchor.append((ID, obj))
anchor[0].release()
else:
self.__Q_packet.append((ID, obj))
self.__Q_lock.release()
except Exception, error:
if isinstance(error, EOFError):
self.__error = EOFError
else:
self.__error = IOError
self.__Q_lock.acquire()
for anchor in self.__Q_anchor:
anchor[0].release()
del self.__Q_anchor
del self.__Q_packet
self.__Q_lock.release()
self.__R_lock.acquire()
for key in self.__R_anchor:
self.__R_anchor[key][0].release()
del self.__R_anchor
self.__R_lock.release()
def __Q_thread(self, timeout, anchor):
'Private class method.'
_time.sleep(timeout)
self.__Q_lock.acquire()
if not self.__error and anchor in self.__Q_anchor:
anchor[0].release()
self.__Q_anchor.remove(anchor)
self.__Q_lock.release()
def __R_thread(self, timeout, ID):
'Private class method.'
_time.sleep(timeout)
self.__R_lock.acquire()
if not self.__error and self.__R_anchor.has_key(ID):
self.__R_anchor[ID][0].release()
del self.__R_anchor[ID]
self.__R_lock.release()
################################################################################
class QRI:
'QRI(QRP) -> QRI'
def __init__(self, QRP):
'Initialize the Query/Reply Interface object.'
self.__QRP = QRP
self.__ID = 0
self.__lock = _thread.allocate_lock()
def call(self, obj, timeout=None):
'Send one query and receive one reply.'
self.__lock.acquire()
ID = ''.join(chr(self.__ID >> shift & 0xFF) for shift in xrange(24, -8, -8))
self.__ID = (self.__ID + 1) % (2 ** 32)
self.__lock.release()
self.__QRP.send_Q(ID, obj)
return self.__QRP.recv_R(ID, timeout)
def query(self, timeout=None):
'Receive one query.'
return self.__QRP.recv_Q(timeout)
def reply(self, ID, obj):
'Send one reply.'
self.__QRP.send_R(ID, obj)
################################################################################
def qri(socket):
'Construct a QRI object.'
return QRI(QRP(ZSP(socket)))
################################################################################
if __name__ == '__main__':
_sys.stdout.write('Content-Type: text/plain\n\n')
_sys.stdout.write(file(_sys.argv[0]).read())
|
The ZSP class is demonstrated in "Paint 2.0" to show one simple use of the "spots" module. This recipe demonstrates how network communications can be easier in a Python program by using a simple layer to handle data conversion on the fly.