'''
monte carlo tool for simple strategy, more generally for use with any
strategy that I want to back test...
Engine takes a class instance, derived from a base class wih two methods
initialise()
onsimulation()
aftersimulation()
ontrial()
finalise()
'''
import unittest, datetime
import numpy as np
from BackTest import MonteCarloModel, MonteCarloEngine, Simulation
import matplotlib.pyplot as plt
from random import randint
from scipy.optimize import fmin_powell
from hashlib import md5
from time import localtime
def Root2(r,verbose=True):
'''
simple wrapper routine for solver. returns the energy level for a given R=r.
the solver can use this method to minimise the energy by varying r as necessary.
'''
if r <= 0:
return 1e99
else:
e = MonteCarloEngine(moduleName='MonteCarloHeadsTailsExample', className='SimpleHeadTailModel')
losers = e.start(args={'wager_multiplier':r})
if verbose: print 'solving for r: ', r
return losers
class SimpleHeadTailModel(MonteCarloModel):
'''
model wager
previous bet success or not, impact to profit.
'''
def toss(self):
'''
1 - win
-1 - loss
Each toss determines whether the position is successful or not. This way no need to keep track of a decision
and an associated variable. Simply do I win or not.
'''
coin_toss = randint(1,2)
if coin_toss == 1:
return 1
else:
return -1
def initialise(self, context):
self.name = 'My Simple Heads And Tails Model'
# start with 10 USD bet
self.wager= 100
self.wager_initial= 100
self.starting_pot = 1000
self.previous_value = 1 # default to 1 on first round
self.simulations = 100 # MC simulation trials
self.trials = 100 # subintervals
self.r = np.zeros(shape=(self.simulations, self.trials), dtype=float) # matrix to hold all results
self.pnl = np.zeros(shape=(self.simulations, self.trials), dtype=float) # matrix to hold all results
print "simulations %d, trials %d starting pot %d " % (self.simulations, self.trials, self.starting_pot)
# Tell the engine where to associate the data to security.
context[self.name] = Simulation(self.simulations, self.trials, self.toss)
self.fig = plt.figure()
self.ax = self.fig.add_subplot(211)
self.ax1 = self.fig.add_subplot(212)
self.ax.autoscale_view(True,True,True)
def onsimulation(self, model, simulation, engine):
self.r[simulation,0] = 0
# assume starting pot here
self.pnl[simulation,0] = self.starting_pot
def aftersimulation(self, model, simulation, engine):
self.ax.plot(np.arange(0, self.trials, 1), self.r[simulation])
self.ax1.plot(np.arange(0, self.trials, 1), self.pnl[simulation])
def reset_wager(self):
self.wager = self.wager_initial
def ontrial(self, model, simulation, trial, value, engine, args):
'''
want to test some strategies for betting
set wager for each bet
if previous bet
value : float
sample from model
'''
# if we lost last time then double up
if self.previous_value == -1:
if args.has_key('args'):
self.wager = (self.wager*float(args['args']['wager_multiplier'][0]))
else:
self.wager = (self.wager*0.1)
# keep track of coin toss paths
self.r[simulation,trial] = self.r[simulation,trial-1] + value
if args.has_key('args'):
self.r0 = float(args['args']['wager_multiplier'][0])
else:
self.r0 =0.1
# if we won, add the wager
# else subtract the wager
if self.pnl[simulation,trial-1] > 0:
if value == 1 :
self.pnl[simulation,trial] = self.pnl[simulation,trial-1] + self.wager
else:
self.pnl[simulation,trial] = self.pnl[simulation,trial-1] - self.wager
else:
# no bet to be made here
self.pnl[simulation,trial] = self.pnl[simulation,trial-1]
# always reset wager
self.reset_wager()
# keep track of the previous value for next time around
self.previous_value = value
def add_prefix(self, filename):
from hashlib import md5
from time import localtime
return "%s_%s"%(md5(str(localtime())).hexdigest(), filename)
def finalise(self, model, engine):
'''
returns the value that we are trying to minimise, here the number of losers.
'''
# what is our survivability
number_of_losers = len([f for f in self.pnl if f[len(f)-1]<=0])
number_of_survivors = len([f for f in self.pnl if f[len(f)-1]>0])
number_of_participants = len(self.pnl)
print "participants [%d] survivors [%2.1f%%] losers [%2.1f%%] weight [%2.6f] "% (number_of_participants, float(number_of_survivors)/float(number_of_participants)*100, float(number_of_losers)/float(number_of_participants)*100, self.r0)
plt.title('Simulations %d Steps %d' % (int(self.simulations), int(self.trials)))
plt.xlabel('steps')
plt.ylabel('profit and loss')
plt.savefig("%s_%s"%(md5(str(localtime())).hexdigest(), 'model'))
return float(number_of_losers)/float(number_of_participants)*100
class TestNode(unittest.TestCase):
def setUp(self):
pass
def test_engine(self):
'''
example of how to launch the MontoCarloTestEngine
this is modelled on the quantopian style interface.
'''
e = MonteCarloEngine(moduleName='MonteCarloHeadsTailsExample', className='SimpleHeadTailModel')
e.start()
def test_minimise(self):
print '#################################'
print '# Test Equilibrium Loss Wager'
print '#################################'
wager_multiplier=fmin_powell(Root2, x0=1., maxiter=20)
print "highest survivability following loss, multiply wager by %2.4f %% "%(wager_multiplier*100)
if __name__ == '__main__':
unittest.main()
==================================================================
== BackTest module
==================================================================
'''
back testing tool for prediction strategy, more generally for use with any
strategy that I want to back test...
Engine takes a class instance, derived from a base class wih twomethods
initialise()
ondata()
'''
import unittest, time, datetime
from pandas import DataFrame
import numpy as np
from pylab import show
import random
from abc import ABCMeta, abstractmethod
import pandas as pd
#
# Simple python component wrapper
#
class Component(object):
__metaclass__ = ABCMeta
@abstractmethod
def start(self):
raise NotImplementedError("Should implement intialise()!")
class BackTestModel(object):
__metaclass__ = ABCMeta
@abstractmethod
def initialise(self, context):
raise NotImplementedError("Should implement intialise()!")
@abstractmethod
def ondata(self, sid, data):
raise NotImplementedError("Should implement ondata()!")
class MonteCarloModel(object):
__metaclass__ = ABCMeta
@abstractmethod
def initialise(self, context):
raise NotImplementedError("Should implement intialise()!")
@abstractmethod
def ontrial(self, model, simulation, trial, value, engine, args):
raise NotImplementedError("Should implement ontrial()!")
def onsimulation(self, model, simulation, engine):
raise NotImplementedError("Should implement onsimulation()!")
def aftersimulation(self, model, simulation, engine):
raise NotImplementedError("Should implement aftersimulation()!")
def finalise(self, model, engine):
raise NotImplementedError("Should implement finalise()!")
class Simulation(object):
'''
simple wrapper that describes a simulation
'''
def __init__(self, n, m, func):
'''
n integer
number of simulations
m integer
number of trials per simulation
func class method or function
used to sample the value
'''
self.number_of_simulations = n
self.number_of_trials = m
self.func = func
@property
def sample(self):
return self.func
class MonteCarloEngineException(Exception):
pass
class MonteCarloEngine(Component):
'''
twist on the Engine that will take a different type of context, this time
a Simulation class instance. This will be called
class ExampleModel(MonteCarloModel):
def initialise(self, context):
context['My Simple Model'] = Simulation(10, 100, standard_normal())
print 'setting My Simple Model'
def ondata(self, model, simulation, trial, value, engine):
print simulation, trial, value, engine
'''
def __init__(self, moduleName, className):
self.context = {}
self.obj = self.__generate__(moduleName, className)
#print isinstance(self.obj, MonteCarloModel), hasattr(self.obj, 'initialise')
if isinstance(self.obj, MonteCarloModel):
if hasattr(self.obj, 'initialise'):
self.obj.initialise(self.context)
print 'calling initialise'
else:
print 'no initialise'
#
# TODO: load data from somewhere for securities in context
#
print self.context
def __generate__(self, module_name, class_name):
module = __import__(module_name)
class_ = getattr(module, class_name)
instance = class_()
#print instance
return instance
def start(self, **args):
#print 'starting...', self.obj, self.context, args
if self.obj is None:
raise MonteCarloEngineException('No engine exists')
for name, model in self.context.items():
for simulation in np.arange(0, model.number_of_simulations): # number of MC simulations
# call to signal new simulation
if hasattr(self.obj, 'onsimulation'):
self.obj.onsimulation(model, simulation, self)
for trial in np.arange(1,model.number_of_trials): #trials per simulation
value = model.sample()
if hasattr(self.obj, 'ontrial'):
self.obj.ontrial(model, simulation, trial, value, self, args)
# call to signal after simulation
if hasattr(self.obj, 'aftersimulation'):
self.obj.aftersimulation(model, simulation, self)
#self.post_ondata(k, index2, value)
if hasattr(self.obj, 'finalise'):
return self.obj.finalise(model, self)
'''
class ExampleModel(BackTestModel):
def initialise(self, context):
context['ARM.L'] = 'Book1.csv'
print 'setting ARM.L'
def ondata(self, sid, data):
print sid, data
'''
class Engine(Component):
'''
responsible for handling instances of the back test models
'''
def __init__(self, moduleName, className):
self.context = {}
self.data = {}
self.orders = {}
self.positions = {}
self.pnl = {}
self.risk = {}
self.obj = self.__generate__(moduleName, className)
#print isinstance(self.obj, BackTestModel), hasattr(self.obj, 'initialise')
if isinstance(self.obj, BackTestModel):
if hasattr(self.obj, 'initialise'):
self.obj.initialise(self.context)
#print 'calling initialise'
else:
print 'no initialise'
#
# TODO: load data from somewhere for securities in context
#
#print self.context
for k in self.context.keys():
#print k
t = time.clock()
myfilename = self.context[k]
data = DataFrame.from_csv(myfilename,header=0,index_col=0,parse_dates=True)
print 'load data', time.clock()-t
self.data[k] = data
self.positions[k] = 0
self.pnl[k] = 0
self.risk[k] = 0
def order(self, sid, value):
# queue order to be processed
self.orders[sid] = (value, False)
@property
def position(self):
return self.positions
def __generate__(self, module_name, class_name):
module = __import__(module_name)
class_ = getattr(module, class_name)
instance = class_()
print instance
return instance
def post_ondata(self, sid, index, value):
# process orders
for k in self.context.keys():
if hasattr(self, 'orders'):
if len(self.orders) == 0:
print 'no orders'
break
m_size, m_processed = self.orders[k]
# check we have not processed this order already.
if not m_processed:
if not (self.positions[k]+ m_size <= 0):
self.positions[k] += m_size
print 'ordering %d of %s, total %d' % (m_size, k, self.positions[k])
self.orders[k] = None
else:
print 'no short selling'
# handle pnl and risk
# tick() charts
def start(self):
print 'starting...', self.obj, self.context
if not self.obj is None:
for k in self.context:
for i, (index, value) in enumerate(self.data[k]['value'].iteritems()):
index2 = datetime.datetime(pd.to_datetime(index).year
, pd.to_datetime(index).month
, pd.to_datetime(index).day)
self.obj.ondata(k
, index2
, value
, self.data[k]['value'][0:i]
, self)
self.post_ondata(k, index2, value)