Two modules that run a BitTorrent server, and uses Twisted as a client to coordinate control-message passing, and progress monitoring. The server can be run as a separate process, or as a thread within the client -- the same messages can be passed back and forth.
Control messages can cancel individual downloads (or the whole process), as well as pause downloading. Progress queries can be invoked through the client, which will ping the server, and report back each downloads' progress.
# BitTorrent server launch
import logging,sys,pdb,time,traceback,os
from thread import get_ident
from operator import mod
#need this prior to BT imports
import gettext
gettext.install('bittorrent', 'locale')
from BitTorrent.launchmanycore import LaunchMany
from BitTorrent.defaultargs import get_defaults
from BitTorrent.parseargs import parseargs, printHelp
from BitTorrent import configfile
from BitTorrent import BTFailure
from BitTorrent.bencode import bdecode
from twisted.spread import pb
from twisted.internet import reactor
from twisted.internet import threads
class DataLock:
def __init__(self):
if sys.platform == 'win32':
from qt import QMutex
self.mutex = QMutex(True)
elif sys.platform == 'darwin':
from Foundation import NSRecursiveLock
self.mutex = NSRecursiveLock.alloc().init()
def lock(self): self.mutex.lock()
def unlock(self): self.mutex.unlock()
def downloadDir():
if sys.platform=='win32':
ddir = os.environ.get('HOMEPATH')
if ddir and len(ddir)>0:
ddir = '%s%sMy Documents%sTorrentServer' % (ddir,os.sep,os.sep)
if not os.path.isdir(ddir): os.makedirs(ddir)
else: ddir='.'
ddir = os.environ.get('HOME')
ddir += '/Documents/TorrentServer'
if not os.path.isdir(ddir): os.makedirs(ddir)
except: ddir = os.environ.get('HOME')
return ddir
class TorrentServer(pb.Root):
torrentPort = 11989
log = None
tServer = None
isProcess = True
def __init__(self, td):
TorrentServer.tServer = self
self.launcher = None
self.torrentDir = td
self.torrentLock = DataLock()
self.status = ''
self.torrents = {}
self.stoppedTorrents = {}
self.moduloCount = 0
def listen(self):
from twisted.internet import reactor
TorrentServer.log.debug('init(%s): listening '%get_ident())
reactor.listenTCP( TorrentServer.torrentPort, pb.PBServerFactory(self))
def remote_UpdateExecutionStatus(self, execStatus):
TorrentServer.log.debug('remote_UpdateExecutionStatus(%s): %s' %\
td = self.torrentDir
ttdd = execStatus.get('torrent_dir',td)
self.status = execStatus.get('status','')
self.torrents = execStatus.get('torrents',{})
if td and ttdd!=td:
self.torrentDir = ttdd
self.status = 'restart'
elif not td: self.torrentDir = ttdd
finally: self.torrentLock.unlock()
def remote_ReportProgressStatus(self):
#TorrentServer.log.debug('remote_ReportProgressStatus: ')
try: return {'torrents':self.torrents, 'status': self.status}
finally: self.torrentLock.unlock()
def initTorrent(self):
uiname = 'btlaunchmany'
defaults = get_defaults(uiname)
config, args = configfile.parse_configuration_and_args(defaults, uiname, [], 0, 1)
#config, args = configfile.parse_configuration_and_args(defaults, uiname, sys.argv[1:], 0, 1)
config['torrent_dir'] = self.torrentDir
config['parse_dir_interval'] = 20 #make the dir scan happen @20 seconds, not default of 60
self.config = config
except BTFailure, e:
TorrentServer.log.error(_("%s\nrun with no args for parameter explanations")) % str(e)
if self.torrentDir: self.runTorrents()
def runTorrents(self):
TorrentServer.log.debug('runTorrents(%s): LaunchMany... %s'%\
(get_ident(), self.torrentDir))
self.launcher = LaunchMany(self.config, self, 'btlaunchmany')
TorrentServer.log.debug('runTorrents(%s): DONE with torrents...'%get_ident())
if self.status=='quit':
if TorrentServer.isProcess:
if self.status=='restart':
log.debug('torrentServer(): Will restart %s '%self.torrentDir)
if self.torrentDir: self.runTorrents()
def display(self, data):
if self.status == 'quit': return True
if self.status=='restart': return True
while self.status=='paused':
#TorrentServer.log.debug( 'display(%s): is paused' % (get_ident()))
self.moduloCount += 1
modulo = mod(self.moduloCount, 3)
for xx in data:
( name, status, progress, peers, seeds, seedsmsg, dist,
uprate, dnrate, upamt, dnamt, size, t, msg ) = xx
if status is not 'downloading':
pass #TorrentServer.log.debug( 'display(%s): %s: %s (%s)' % (get_ident(),status, name, progress))
stopstatus = self.torrents.get(name)
if stopstatus and (stopstatus[0]=='cancel' or stopstatus[0]=='stop'):
try: os.remove(name)
except: traceback.print_exc()
del self.torrents[name]
self.torrents[name] = ['progress',progress]
del data
return False
finally: self.torrentLock.unlock()
def message(self, str):
TorrentServer.log.debug('FeedbackReporter.message(): %s'%str)
def exception(self, str):
TorrentServer.log.warn('FeedbackReporter: exception=%s'%str)
def didEndTorrentThread(self,foo=None):
TorrentServer.log.debug('didEndTorrentThread(): %s'%foo)
def didEndTorrentThreadErr(self,foo=None):
TorrentServer.log.debug('didEndTorrentThreadErr(): %s'%foo)
def main(args):
if __debug__:
level = logging.DEBUG
level = logging.WARN
logging.basicConfig(level=level, format='%(asctime)s %(levelname)s %(message)s')
handler = logging.StreamHandler()
TorrentServer.log = logging.getLogger('TorrentServer')
TorrentServer.log.propagate = 0
TorrentServer.log.debug('torrentServer.main(): will load config')
TorrentServer.tServer = TorrentServer(downloadDir())
dfr = threads.deferToThread(TorrentServer.tServer.initTorrent)
if __name__=="__main__":
# Twisted-based client
import pdb,os,time,traceback
from twisted.internet import threads
from twisted.spread import pb
from twisted.internet import reactor
from torrentServer import *
class Torrenting(object):
def start(doLaunch=True):
theTorrenter = Torrenting(doLaunch)
start = staticmethod(start)
def __init__(self, doLaunch=True):
self.filenames = {}
self.didQuit = self.isPaused = False
self.torrents = {}
self.pinger = None
if doLaunch: self.launch()
def willQuit(self,event=None):
self.didQuit = True
if TorrentServer.tServer:
TorrentServer.tServer.didQuit = True
def _threadLaunch(self):
TorrentServer.tServer = TorrentServer(downloadDir())
TorrentServer.log = log
def didEndTorrentThread(self,foo=None):
def didEndTorrentThreadErr(self,foo=None):
def launch(self):
if not __debug__:
if sys.platform=='win32':
TorrentServer.isProcess = True
dir,fn = os.path.split(sys.argv[0])
path = dir+os.sep+'torrentServer.exe'
except: traceback.print_exc()
if not launched:
TorrentServer.isProcess = False
dfr = threads.deferToThread(self._threadLaunch)
if launched:
if TorrentServer.isProcess:
self.pinger = TorrentPingerRemote(self)
else: self.pinger = TorrentPingerLocal(self)
def gotProgress(self, progress):
torrents = progress.get('torrents',{})
for fn,status in torrents.iteritems():
if not self.torrents.has_key(fn): continue
if status[0]=='progress':
progressf = float(status[1])
if progressf==100.0: self.didDownloadTorrent(fn)
else: print 'gotProgress: %s, %.0f' % (fn, progressf)
elif status[0]=='failed':
def addTorrent(self,filename):
self.torrents[filename] = ['progress',0.0]
def didCancelDownload(self,filename):
self.torrents[filename] = ['cancel']
try: del self.torrents[filename]
except: pass
def didDownloadTorrent(self,filename=None):
try: del self.filenames[filename]
except: pass
def didNotDownloadTorrent(self,filename=None):
try: del self.filenames[filename]
except: pass
def didCancelBuild(self):
for tt in self.torrents.keys():
try: os.remove(tt)
except: traceback.print_exc()
self.torrents = {}
def togglePause(self):
self.isPaused = not self.isPaused
class TorrentPinger:
def __init__( self, delegate=None ):
self.delegate = delegate
self.progressPing = None
def _didFail( self, foo ):
if self.progressPing:
self.progressPing = None
except: traceback.print_exc()
def didPause(self,foo=None):
def didQuit(self,foo=None):
def didUpdateTorrents(self,foo=None):
def updateTorrentsExecutionStatus(self, torrents):
cmds = {'torrents': torrents}
return cmds
def quitTorrents(self):
cmds = {'status':'quit'}
return cmds
def pauseTorrents(self, yn):
if yn: cmds = {'status':'paused'}
else: cmds = {'status':'active'}
return cmds
def queryProgress( self ):
def _progressStatus( self, progress):
if self.delegate: self.delegate.gotProgress(progress)
self.progressPing = reactor.callLater(1, self.queryProgress )
def cancelTorrents(self):
self.progressPing = None
except: traceback.print_exc()
class TorrentPingerLocal(TorrentPinger):
def __init__( self, delegate=None ):
TorrentPinger.__init__(self, delegate)
def _didFail(self,foo=None):
def didPause(self,foo=None):
def didQuit(self,foo=None):
def didUpdateTorrents(self,foo=None):
def updateTorrentsExecutionStatus(self, torrents):
cmds = TorrentPinger.updateTorrentsExecutionStatus(self,torrents)
def quitTorrents(self):
def pauseTorrents(self, yn):
dfr = reactor.callLater(0,TorrentServer.tServer.remote_UpdateExecutionStatus,
def queryProgress( self ):
status = TorrentServer.tServer.remote_ReportProgressStatus()
class TorrentPingerRemote(TorrentPinger):
def __init__( self, delegate=None ):
TorrentPinger.__init__(self, delegate)
self.torrentServerRoot = None
def start( self ):
client = pb.PBClientFactory()
reactor.connectTCP('', TorrentServer.torrentPort, client, 20)
dfr = client.getRootObject()
dfr.addCallbacks(self._gotRemote, self._didFail)
except: traceback.print_exc()
def _gotRemote( self, remote ):
self.torrentServerRoot = remote
except: traceback.print_exc()
def updateTorrentsExecutionStatus(self, torrents):
cmds = TorrentPinger.updateTorrentsExecutionStatus(torrents)
dfr = self.torrentServerRoot.callRemote('UpdateExecutionStatus', cmds)
dfr.addCallbacks(self.didUpdateTorrents, self._didFail)
def quitTorrents(self):
cmds = TorrentPinger.quitTorrents()
dfr = self.torrentServerRoot.callRemote('UpdateExecutionStatus', cmds)
dfr.addCallbacks(self.didQuit, self._didFail)
def pauseTorrents(self, yn):
cmds = TorrentPinger.pauseTorrents(yn)
dfr = self.torrentServerRoot.callRemote('UpdateExecutionStatus', cmds)
dfr.addCallbacks(self.didPause, self._didFail)
def queryProgress( self ):
dfr = self.torrentServerRoot.callRemote('ReportProgressStatus')
dfr.addCallbacks(self._progressStatus, self._didFail)
if __name__=="__main__":
png = TorrentPingerRemote()
These two modules will make integrating BitTorrent into your python app much easier. The server code is independent, and good to go. You'll want to replace the data lock object with something from your environment, and check that your download directory is set properly.
The client is designed to interface with a UI controller, which displays progress, and gives user control to pause or cancel the downloading. Individual downloads can also be cancelled, although this is as easy as deleting the .torrent file.
Note you can automatically launch the server process from the client. Either way, you need to decide if it'll run in a thread, or as a process.
This is some complicated code, so you may have to play with it to understand what it does. However it's much better than writing it from scratch !!
Good luck.
Borrowed Some Code. From this recipe:
This no longer works. As of BitTorrent 4.1 or so BT natively supports Twisted, as such the above recipe no longer works without some hevy modification. Currently the best idea is to subclass LaunchMany, or to just add your own layer of MultiTorrent.
History of BitTorrent vs. Twisted. As per the comments on metamoof.
The design of BitTorrent predates twisted and twisted's rawserver looks eerily similar to twisted's reactor. Of course twisted has grown well beyond what rawserver ever tried to achieve.
In fact many bugs in twisted were debugged by Greg Hazel who was responsible for integrating twisted into BitTorrent.