"""Module for simple UDP broadcast support.
The classes in this module are stepping stones for building discoverable
services on a network. Server replies are to be handled by the importer."""
################################################################################
__author__ = 'Stephen "Zero" Chappell <Noctis.Skytower@gmail.com>'
__date__ = '22 November 2011'
__version__ = '$Revision: 2 $'
################################################################################
import socket
import _thread
import time
################################################################################
class Beacon:
def __init__(self, port):
"Initialize the beacon for sending and receiving data."
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True)
self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.__sock.bind(('0.0.0.0', port))
self.__addr = '255.255.255.255', port
def accept(self, size):
"Receive a broadcast through the underlying socket."
return self.__sock.recvfrom(size)
def connect(self, data):
"Send a broadcast through the underlying socket."
self.__sock.sendto(data, self.__addr)
def __gettimeout(self):
return self.__sock.gettimeout()
def __settimeout(self, value):
self.__sock.settimeout(value)
timeout = property(__gettimeout, __settimeout,
doc='Timeout on blocking socket operations.')
################################################################################
def test():
"Test the beacon broadcasting class."
b = Beacon(50000)
_thread.start_new_thread(test_connect, (b,))
test_accept(b)
def test_connect(b):
"Test the beacon's connect method."
while True:
b.connect(time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()).encode())
time.sleep(1)
def test_accept(b):
"Test the beacon's accept method."
while True:
data, address = b.accept(1 << 12)
print('From: {}\n{}\n'.format(address, data.decode()))
################################################################################
if __name__ == '__main__':
test()
Diff to Previous Revision
--- revision 2 2011-11-19 05:50:53
+++ revision 3 2011-11-23 06:01:19
@@ -6,8 +6,8 @@
################################################################################
__author__ = 'Stephen "Zero" Chappell <Noctis.Skytower@gmail.com>'
-__date__ = '18 November 2011'
-__version__ = '$Revision: 1 $'
+__date__ = '22 November 2011'
+__version__ = '$Revision: 2 $'
################################################################################
@@ -17,62 +17,54 @@
################################################################################
-class Client:
-
- "Client(port) -> Client instance"
-
- __slots__ = '__socket', '__address'
+class Beacon:
def __init__(self, port):
- "Initialize the client with a sending socket."
- self.__socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True)
- self.__address = '255.255.255.255', port
+ "Initialize the beacon for sending and receiving data."
+ self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, True)
+ self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
+ self.__sock.bind(('0.0.0.0', port))
+ self.__addr = '255.255.255.255', port
+
+ def accept(self, size):
+ "Receive a broadcast through the underlying socket."
+ return self.__sock.recvfrom(size)
def connect(self, data):
"Send a broadcast through the underlying socket."
- self.__socket.sendto(data, self.__address)
+ self.__sock.sendto(data, self.__addr)
+
+ def __gettimeout(self):
+ return self.__sock.gettimeout()
+
+ def __settimeout(self, value):
+ self.__sock.settimeout(value)
+
+ timeout = property(__gettimeout, __settimeout,
+ doc='Timeout on blocking socket operations.')
################################################################################
-class Server:
+def test():
+ "Test the beacon broadcasting class."
+ b = Beacon(50000)
+ _thread.start_new_thread(test_connect, (b,))
+ test_accept(b)
- "Server(port) -> Server instance"
-
- __slots__ = '__socket'
-
- def __init__(self, port):
- "Initialize the server with a receiving socket."
- self.__socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
- self.__socket.bind(('0.0.0.0', port))
-
- def accept(self, size):
- "Receive a broadcast through the underlying socket."
- return self.__socket.recvfrom(size)
-
-################################################################################
-
-def test(port):
- "Test the client and server broadcasting classes."
- _thread.start_new_thread(test_client, (port,))
- test_server(port)
-
-def test_client(port):
- "Test the client broadcasting class."
- c = Client(port)
+def test_connect(b):
+ "Test the beacon's connect method."
while True:
- c.connect(time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()).encode())
+ b.connect(time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()).encode())
time.sleep(1)
-def test_server(port):
- "Test the server broadcasting class."
- s = Server(port)
+def test_accept(b):
+ "Test the beacon's accept method."
while True:
- data, address = s.accept(1 << 10)
+ data, address = b.accept(1 << 12)
print('From: {}\n{}\n'.format(address, data.decode()))
################################################################################
if __name__ == '__main__':
- test(50000)
+ test()