Welcome, guest | Sign In | My Account | Store | Cart

A quick fcntl.flock(fcntl.LOCK_EX | fcntl.LOCK_NB) call sampling script: with one file object (and descriptor) or separate file objects (and different descriptors) pointing to the same filesystem path -- with/without threading or forking.

It's rather exemplum-and-educational piece of code than utility-script, unless somebody has to few slots in their memory to remember that flock is file-descriptor-tacked (then quick run of the script can save web-searching) :)

Python, 138 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/python
# Unix/Linux only (as the 'fcntl' module is). Python >=2.6/3.x compatibile.
# Copyright (c) 2010 Jan Kaliszewski (zuo). Licensed under the MIT License.

"""
flocktests.py: fcntl.flock(LOCK_EX|LOCK_NB) behaviour sampling -- with one
file object or separate file objects (pointing to the same filesystem path),
with/without threading or forking.
"""

from __future__ import print_function

import os
import sys
import threading

from fcntl import flock, LOCK_EX, LOCK_UN, LOCK_NB
from os.path import basename


class lockpath(object):

    "Open a file, flock it, generate appropriate info, unflock if necessary"

    def __init__(self, path, _keep=False):
        "By default, unflock/close the file immediately after setting the lock"
        self.file = file = open(path, 'a')
        locked = _lockonly(file)
        if _keep:
            self.locked = locked
        else:
            file.close()
            self.locked = False

    @classmethod
    def keeping(cls, path):
        "Constructor for with-blocks: tries to keep the file open and flocked"
        return cls(path, _keep=True)

    def __enter__(self):
        "Enter a with-block binding the file to the as-clause target"
        if self.locked:
            return self.file
        else:
            raise RuntimeError('lockpath().file is not locked '
                               '-- cannot enter the with-block')

    def __exit__(self, *args, **kwargs):
        "Leave the with-block closing the file (=> unflocking it)"
        self.file.close()


def _help(*args):
    print('\n  '.join(args), file=sys.stderr)

def _msg(*args):
    print('pid:{0}'.format(os.getpid()),
          threading.current_thread().name,
          *args)

def _lockonly(file):
    _msg('got file #', file.fileno())
    try:
        flock(file, LOCK_EX | LOCK_NB)
    except IOError:
        _msg('failed to lock')
        return False
    else:
        _msg('locked successfully')
        return True

def lockfile(file):
    "flock a given file, then unflock it immediately"
    if _lockonly(file):
        flock(file, LOCK_UN)

# Options

def n(path):
    "one file object + no concurrency"
    with lockpath.keeping(path) as file:
        lockfile(file)

def N(path):
    "separate file objects + no concurrency"
    with lockpath.keeping(path):
        lockpath(path)

def t(path):
    "one file object + threading"
    with lockpath.keeping(path) as file:
        t = threading.Thread(target=lockfile, args=(file,))
        t.start()
        t.join()

def T(path):
    "separate file objects + threading"
    with lockpath.keeping(path):
        t = threading.Thread(target=lockpath, args=(path,))
        t.start()
        t.join()

def f(path):
    "one file object + forking"
    with lockpath.keeping(path) as file:
        if os.fork():
            os.wait()
        else:
            lockfile(file)

def F(path):
    "separate file objects + forking"
    with lockpath.keeping(path):
        if os.fork():
            os.wait()
        else:
            lockpath(path)


OPTIONS = 'nNtTfF'

def main(program, option='', path='test.flock'):
    "Do one of the tests or print a short help"
    flocktests = globals()
    option = option.lstrip('-')
    if option and (option in OPTIONS):
        function = flocktests[option]
        function(path)
    else:
        _help(__doc__.lstrip())
        _help('Usage: {0} OPTION [PATH]'.format(basename(program)),
              'Default PATH: test.flock', 'OPTIONS:',
              *('-{0}  {1}'.format(option, flocktests[option].__doc__)
                for option in OPTIONS))


if __name__ == '__main__':
    main(*sys.argv)