#!python.exe # encoding: utf-8 from __future__ import with_statement import os, ctypes, sys from ctypes.wintypes import DWORD, INT, LPWSTR, LONG, WORD, BYTE, WCHAR # Some utility wrappers for pointer stuff class c_void(ctypes.Structure): # c_void_p is a buggy return type, converting to int, so # POINTER(None) == c_void_p is actually written as # POINTER(c_void), so it can be treated as a real pointer. _fields_ = [('dummy', ctypes.c_int)] def __init__(self, value=None): if value is None: value = 0 super(c_void, self).__init__(value) def POINTER(obj): ptr = ctypes.POINTER(obj) # Convert None to a real NULL pointer to work around bugs # in how ctypes handles None on 64-bit platforms if not isinstance(ptr.from_param, classmethod): def from_param(cls, x): if x is None: return cls() return x ptr.from_param = classmethod(from_param) return ptr # Shadow built-in c_void_p LPVOID = c_void_p = POINTER(c_void) #Globals NULL = LPVOID() kernel32 = ctypes.WinDLL('kernel32') advapi32 = ctypes.WinDLL('advapi32') _obtained_privileges = [] # Aliases to functions/classes, and utility lambdas cast = ctypes.cast byref = ctypes.byref sizeof = ctypes.sizeof addrof = ctypes.addressof WinError = ctypes.WinError hasflag = lambda val, flag: (val & flag) == flag # Configuration constants isx64 = ctypes.sizeof(ctypes.c_void_p) == ctypes.sizeof(ctypes.c_ulonglong) # Constants derived from C TRUE = 1 FALSE = 0 INVALID_HANDLE_VALUE = -1 # Desired access for OpenProcessToken TOKEN_ADJUST_PRIVILEGES = 0x0020 # SE Privilege Names SE_RESTORE_NAME = 'SeRestorePrivilege' SE_BACKUP_NAME = 'SeBackupPrivilege' # SE Privilege Attributes SE_PRIVILEGE_ENABLED = 0x00000002L # Access FILE_ANY_ACCESS = 0 # CreateFile flags FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000 FILE_FLAG_REPARSE_BACKUP = FILE_FLAG_OPEN_REPARSE_POINT | FILE_FLAG_BACKUP_SEMANTICS # Generic access GENERIC_READ = 0x80000000L GENERIC_WRITE = 0x40000000L GENERIC_RW = GENERIC_READ | GENERIC_WRITE # File shared access FILE_SHARE_READ = 0x00000001 FILE_SHARE_WRITE = 0x00000002 FILE_SHARE_DELETE = 0x00000004 FILE_SHARE_READ_WRITE = FILE_SHARE_READ | FILE_SHARE_WRITE FILE_SHARE_ALL = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE # File stuff OPEN_EXISTING = 3 FILE_ATTRIBUTE_REPARSE_POINT = 0x00000400 FILE_DEVICE_FILE_SYSTEM = 0x00000009 # Utility lambdas for figuring out ctl codes CTL_CODE = lambda devtype, func, meth, acc: (devtype << 16) | (acc << 14) | (func << 2) | meth # Methods METHOD_BUFFERED = 0 # Reparse Point Tags IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003L IO_REPARSE_TAG_SYMBOLIC_LINK = 0xA000000CL # WinIoCtl Codes FSCTL_GET_REPARSE_POINT = CTL_CODE(FILE_DEVICE_FILE_SYSTEM, 42, METHOD_BUFFERED, FILE_ANY_ACCESS) FSCTL_DELETE_REPARSE_POINT = CTL_CODE(FILE_DEVICE_FILE_SYSTEM, 43, METHOD_BUFFERED, FILE_ANY_ACCESS) # Reparse Point buffer constants MAX_NAME_LENGTH = 1024 MAX_REPARSE_BUFFER = 16 * MAX_NAME_LENGTH REPARSE_POINT_HEADER_SIZE = sizeof(DWORD) + (2 * sizeof(WORD)) REPARSE_GUID_DATA_BUFFER_HEADER_SIZE = REPARSE_POINT_HEADER_SIZE + (sizeof(DWORD) * 4) # For our generic reparse buffer MAX_GENERIC_REPARSE_BUFFER = MAX_REPARSE_BUFFER - REPARSE_GUID_DATA_BUFFER_HEADER_SIZE MAX_SYMLINK_REPARSE_BUFFER = MAX_GENERIC_REPARSE_BUFFER - sizeof(DWORD) - (sizeof(WORD) * 4) # Type aliases UCHAR = ctypes.c_ubyte ULONG_PTR = ctypes.c_ssize_t LPDWORD = POINTER(DWORD) # CTypes-based wrapper classes class BOOL(INT): """ Wrapper around ctypes.wintypes.INT (ctypes.c_int) to make BOOL act a bit more like a boolean. """ @classmethod def from_param(cls, value): if not value or value is None: return INT(0) elif isinstance(value, ctypes._SimpleCData): return value def __eq__(self, other): value = bool(self.value) if isinstance(other, bool): return value and other elif isinstance(other, ctypes._SimpleCData): return value and bool(other.value) else: return value and bool(other) def __hash__(self): return hash(self._as_parameter_) class HANDLE(ULONG_PTR): """ Wrapper around the numerical representation of a pointer to add checks for INVALID_HANDLE_VALUE """ NULL = None INVALID = None def __init__(self, value=None): if value is None: value = 0 super(HANDLE, self).__init__(value) self.autoclose = False @classmethod def from_param(cls, value): if value is None: return HANDLE(0) elif isinstance(value, ctypes._SimpleCData): return value else: return HANDLE(value) def close(self): if bool(self): try: CloseHandle(self) except: pass def __enter__(self): self.autoclose = True return self def __exit__(self, exc_typ, exc_val, trace): self.close() return False def __del__(self): if hasattr(self, 'autoclose') and self.autoclose: CloseHandle(self) def __nonzero__(self): return super(HANDLE, self).__nonzero__() and \ self.value != HANDLE.INVALID.value class GUID(ctypes.Structure): """ Borrowed small parts of this from the comtypes module. """ NULL = None _fields_ = [ ('Data1', DWORD), ('Data2', WORD), ('Data3', WORD), ('Data4', (BYTE * 8)), ] def __nonzero__(self): return self != GUID.NULL def __eq__(self, other): return isinstance(other, GUID) and \ binary(self) == binary(other) def __hash__(self): return hash(binary(self)) def __cmp__(self, other): if isinstance(other, GUID): return cmp(binary(self), binary(other)) return -1 GUID.NULL = GUID() # Ctypes Structures class LUID(ctypes.Structure): _fields_ = [ ('LowPart', DWORD), ('HighPart', LONG), ] class LUID_AND_ATTRIBUTES(LUID): _fields_ = [ ('Attributes', DWORD), ] class ReparsePointBuffer(ctypes.Union): class SymbolicLinkBuffer(ctypes.Structure): _fields_ = [ ('SubstituteNameOffset', WORD), ('SubstituteNameLength', WORD), ('PrintNameOffset', WORD), ('PrintNameLength', WORD), ('Flags', DWORD), ('PathBuffer', WCHAR * MAX_SYMLINK_REPARSE_BUFFER) ] class MountPointBuffer(ctypes.Structure): _fields_ = [ ('SubstituteNameOffset', WORD), ('SubstituteNameLength', WORD), ('PrintNameOffset', WORD), ('PrintNameLength', WORD), ('PathBuffer', WCHAR * MAX_SYMLINK_REPARSE_BUFFER) ] class GenericReparseBuffer(ctypes.Structure): _fields_ = [ ('PathBuffer', UCHAR * MAX_GENERIC_REPARSE_BUFFER) ] _fields_ = [ ('SymbolicLink', SymbolicLinkBuffer), ('MountPoint', MountPointBuffer), ('Generic', GenericReparseBuffer) ] class ReparsePoint(ctypes.Structure): _fields_ = [ ('ReparseTag', DWORD), ('ReparseDataLength', WORD), ('Reserved', WORD), ('ReparseGuid', GUID), ('_Buffer', ReparsePointBuffer) ] @property def Buffer(self): if self.ReparseTag == IO_REPARSE_TAG_MOUNT_POINT: return self._Buffer.MountPoint elif self.ReparseTag == IO_REPARSE_TAG_SYMBOLIC_LINK: return self._Buffer.SymbolicLink else: return self._Buffer # Common uses of HANDLE HANDLE.NULL = HANDLE() HANDLE.INVALID = HANDLE(INVALID_HANDLE_VALUE) LPHANDLE = POINTER(HANDLE) # C Function Prototypes CreateFile = kernel32.CreateFileW CreateFile.restype = HANDLE CreateFile.argtypes = [ LPWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE ] GetFileAttributes = kernel32.GetFileAttributesW GetFileAttributes.restype = DWORD GetFileAttributes.argtypes = [ LPWSTR ] RemoveDirectory = kernel32.RemoveDirectoryW RemoveDirectory.restype = BOOL RemoveDirectory.argtypes = [ LPWSTR ] CloseHandle = kernel32.CloseHandle CloseHandle.restype = BOOL CloseHandle.argtypes = [ HANDLE ] GetCurrentProcess = kernel32.GetCurrentProcess GetCurrentProcess.restype = HANDLE GetCurrentProcess.argtypes = [] OpenProcessToken = advapi32.OpenProcessToken OpenProcessToken.restype = BOOL OpenProcessToken.argtypes = [ HANDLE, DWORD, LPHANDLE ] LookupPrivilegeValue = advapi32.LookupPrivilegeValueW LookupPrivilegeValue.restype = BOOL LookupPrivilegeValue.argtypes = [ LPWSTR, LPWSTR, POINTER(LUID_AND_ATTRIBUTES) ] AdjustTokenPrivileges = advapi32.AdjustTokenPrivileges AdjustTokenPrivileges.restype = BOOL AdjustTokenPrivileges.argtypes = [ HANDLE, BOOL, LPVOID, DWORD, LPVOID, LPDWORD ] _DeviceIoControl = kernel32.DeviceIoControl _DeviceIoControl.restype = BOOL _DeviceIoControl.argtypes = [ HANDLE, DWORD, LPVOID, DWORD, LPVOID, DWORD, LPDWORD, LPVOID ] def DeviceIoControl(hDevice, dwCtrlCode, lpIn, szIn, lpOut, szOut, lpOverlapped=None): """ Wrapper around the real DeviceIoControl to return a tuple containing a bool indicating success, and a number containing the size of the bytes returned. (Also, lpOverlapped to default to NULL) """ dwRet = DWORD(0) return bool( _DeviceIoControl(hDevice, dwCtrlCode, lpIn, szIn, lpOut, szOut, byref(dwRet), lpOverlapped) ), dwRet.value def obtain_privileges(privileges): """ Given a list of SE privilege names (eg: [ SE_CREATE_TOKEN_NAME, SE_BACKUP_NAME ]), lookup the privilege values for each and then attempt to acquire them for the current process. """ global _obtained_privileges privileges = filter(lambda priv: priv not in _obtained_privileges, list(set(privileges))) privcount = len(privileges) if privcount == 0: return class TOKEN_PRIVILEGES(ctypes.Structure): #noinspection PyTypeChecker _fields_ = [ ('PrivilegeCount', DWORD), ('Privileges', LUID_AND_ATTRIBUTES * privcount), ] with HANDLE() as hToken: tp = TOKEN_PRIVILEGES() tp.PrivilegeCount = privcount hProcess = GetCurrentProcess() if not OpenProcessToken(hProcess, TOKEN_ADJUST_PRIVILEGES, byref(hToken)): raise WinError() for i, privilege in enumerate(privileges): tp.Privileges[i].Attributes = SE_PRIVILEGE_ENABLED if not LookupPrivilegeValue(None, privilege, byref(tp.Privileges[i])): raise Exception('LookupPrivilegeValue failed for privilege: {0}'.format(privilege)) if not AdjustTokenPrivileges(hToken, False, byref(tp), sizeof(TOKEN_PRIVILEGES), None, None): raise WinError() _obtained_privileges.extend(privileges) def open_file(filepath, flags=FILE_FLAG_OPEN_REPARSE_POINT, autoclose=False): """ Open file for read & write, acquiring the SE_BACKUP & SE_RESTORE privileges. """ obtain_privileges([ SE_BACKUP_NAME, SE_RESTORE_NAME ]) if (flags & FILE_FLAG_BACKUP_SEMANTICS) != FILE_FLAG_BACKUP_SEMANTICS: flags |= FILE_FLAG_BACKUP_SEMANTICS hFile = CreateFile(filepath, GENERIC_RW, FILE_SHARE_ALL, NULL, OPEN_EXISTING, flags, HANDLE.NULL) if not hFile: raise WinError() if autoclose: hFile.autoclose = True return hFile def get_buffer(filepath, hFile=HANDLE.INVALID): """ Get a reparse point buffer. """ if not hFile: hFile = open_file(filepath, autoclose = True) obj = ReparsePoint() result, dwRet = DeviceIoControl(hFile, FSCTL_GET_REPARSE_POINT, None, 0L, byref(obj), MAX_REPARSE_BUFFER) return obj if result else None def delete_reparse_point(filepath): """ Remove the reparse point folder at filepath. """ dwRet = 0 with open_file(filepath) as hFile: # Try to delete it first without the reparse GUID info = ReparsePoint() info.ReparseTag = 0 result, dwRet = DeviceIoControl(hFile, FSCTL_DELETE_REPARSE_POINT, byref(info), REPARSE_GUID_DATA_BUFFER_HEADER_SIZE , None, 0L) if not result: # If the first try fails, we'll set the GUID and try again buffer = get_buffer(filepath, hFile) info.ReparseTag = buffer.ReparseTag info.ReparseGuid = info.ReparseGuid result, dwRet = DeviceIoControl(hFile, FSCTL_DELETE_REPARSE_POINT, byref(info), REPARSE_GUID_DATA_BUFFER_HEADER_SIZE, None, 0L) if not result: raise WinError() if not RemoveDirectory(filepath): raise WinError() return dwRet def is_reparse_point(filepath): """ Check whether or not filepath refers to a reparse point. """ return hasflag(GetFileAttributes(filepath), FILE_ATTRIBUTE_REPARSE_POINT) def rmtree(filepath, ignore_errors=False, onerror=None): """ Re-implementation of shutil.rmtree to checking for reparse points (junctions/symbolic links) before iterating folders. """ def rm(fn, childpath): try: fn(childpath) except: if not ignore_errors: if onerror is None: raise else: onerror(fn, childpath, sys.exc_info()[0]) def visit_files(root, targets): for target in targets: rm(os.unlink, os.path.join(root, target)) def visit_dirs(root, targets): for target in targets: childpath = os.path.join(root, target) if rmtree(childpath): rm(os.rmdir, childpath) if is_reparse_point(filepath): rm(delete_reparse_point, filepath) return False for root, dirs, files in os.walk(path): visit_files(root, files) visit_dirs(root, dirs) return True