Welcome, guest | Sign In | My Account | Store | Cart

This program will make your cpu work at a given cpu usage. It should be also able to work on machines with multi-processors. The program has been tested on Windows xp sp2 with python of version 2.5.4.

The implementation is based on the fact that it will adjust the ratio of being busy over being idle in the main process to approach the target cpu usage rate.

Python, 158 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import sys
import threading
import ctypes
from ctypes import byref
from ctypes import Structure, Union
from ctypes.wintypes import * 

# other wintype definition
LPVOID = ctypes.c_void_p
LPCVOID = LPVOID
DWORD_PTR = DWORD
LONGLONG = ctypes.c_longlong
HCOUNTER = HQUERY = HANDLE

# error code
Error_Success = 0

# macro
sleep = ctypes.windll.kernel32.Sleep
pdh = ctypes.windll.pdh

# structure definition
class Sysinfo_Struct(Structure):
    _fields_ = [('wProcessorArchitecture', WORD),
                ('wReserved', WORD)]

class Sysinfo_Union(Union):
    _fields_ = [('dwOemId', DWORD),
                ('struct', Sysinfo_Struct)]

class System_Info(Structure):
    _fields_ = [('union', Sysinfo_Union),
                ('dwPageSize', DWORD),
                ('lpMinimumApplicationAddress', LPVOID),
                ('lpMaximumApplicationAddress', LPVOID),
                ('dwActiveProcessorMask', DWORD_PTR),
                ('dwNumberOfProcessors', DWORD),
                ('dwProcessorType', DWORD),
                ('dwAllocationGranularity', DWORD),
                ('wProcessorLevel', WORD),
                ('wProcessorRevision', WORD)]

class PDH_Counter_Union(Union):
    _fields_ = [('longValue', LONG),
                ('doubleValue', ctypes.c_double),
                ('largeValue', LONGLONG),
                ('AnsiStringValue', LPCSTR),
                ('WideStringValue', LPCWSTR)]

class PDH_FMT_COUNTERVALUE(Structure):
    _fields_ = [('CStatus', DWORD),
                ('union', PDH_Counter_Union),]


# Exception definition
class TargetRateExceedError(Exception):
    pass


def getProcessorNumber():
    si = System_Info()
    ctypes.windll.kernel32.GetSystemInfo(byref(si))
    return si.dwNumberOfProcessors

g_cpu_usage = 0
class QueryCPUUsageThread(threading.Thread):
    def __init__(self):
        super(QueryCPUUsageThread, self).__init__()
        self.hQuery = HQUERY()
        self.hCounter = HCOUNTER()
        if not pdh.PdhOpenQueryW(None, 0, byref(self.hQuery)) == Error_Success:
            raise RuntimeError('[QueryCPUUsageThread] Open query failed')
        if not pdh.PdhAddCounterW(self.hQuery,
                                 u'''\\Processor(_Total)\\% Processor Time''',
                                 0,
                                 byref(self.hCounter)) == Error_Success:
            raise RuntimeError('[QueryCPUUsageThread] Add counter failed')
        
    def run(self):
        while True: 
            usage = self.getCPUUsage()
            global g_cpu_usage
            g_cpu_usage = usage
        
    def getCPUUsage(self):
        if not pdh.PdhCollectQueryData(self.hQuery) == Error_Success:
            raise RuntimeError('[QueryCPUUsageThread] CollectQueryData failed')
        ctypes.windll.kernel32.Sleep(1000)
        if not pdh.PdhCollectQueryData(self.hQuery) == Error_Success:
            raise RuntimeError('[QueryCPUUsageThread] CollectQueryData failed')

        dwType = DWORD(0)
        value = PDH_FMT_COUNTERVALUE()
        if not pdh.PdhGetFormattedCounterValue(self.hCounter,
                                          0x00000100, # PDH_FMT_LONG
                                          byref(dwType),
                                          byref(value)) == Error_Success:
            raise RuntimeError('[QueryCPUUsageThread] Get CounterValue failed')

        return value.union.longValue
                             
def main(targetRateForMainProcess, targetRate):
    timeInterval = 50.0
    busyTime = float(targetRateForMainProcess) * timeInterval / 100.0
    idleTime = timeInterval - busyTime
    TOLERANCE = 2
    while True:
        if len(sys.argv)<=1:
            usage = g_cpu_usage
            if abs(usage - targetRate) < TOLERANCE:
                pass
            elif usage > targetRate:
                busyTime = max(busyTime - 0.1, 1)
                idleTime = timeInterval - busyTime
            else:
                busyTime = min(busyTime + 0.1, 49)
                idleTime = timeInterval - busyTime
            #print busyTime, idleTime
        startTime = ctypes.windll.kernel32.GetTickCount()
        while ctypes.windll.kernel32.GetTickCount() - startTime < int(round(busyTime)):
            pass
        sleep(int(round(idleTime)))

def subpMain():
    while True:
        pass

def inputLoop():
    while True:
        print 'please input target cpu usage: ',
        try:
            rate = float(raw_input().strip())
            if rate >= 100 or rate <=1:
                raise TargetRateExceedError
        except TargetRateExceedError:
            print 'please input a number in range 1-99'
        except Exception:
            print 'please input a target cpu usage rate, e.g. 70'
        else:
            return rate    

if __name__=='__main__':
    if len(sys.argv) <= 1:  # main process
        targetRate = inputLoop()
        nProcessor = getProcessorNumber()
        fullPercentPerProcessor = 100.0 / float(nProcessor)
        targetRateForMainProcess = targetRate
        
        while targetRateForMainProcess > fullPercentPerProcessor:
            from subprocess import Popen            
            obj=Popen([sys.executable, __file__, 'child'])
            targetRateForMainProcess -= fullPercentPerProcessor
            
        queryCpuThread = QueryCPUUsageThread()
        queryCpuThread.start()
        main(targetRateForMainProcess, targetRate)
    else: # subprocess
        subpMain()

I felt as if python's global interpreter lock may affect the conrollability of multi-cores cpu. Therefore, I did not put queryCPUThread in subprocesses; there is only one thread in subprocesses and it will keep itself as busy as possible.