Welcome, guest | Sign In | My Account | Store | Cart
import md5
import time

class DigestAuth:
    def __init__(self, realm, users):
        """Incomplete Python implementation of Digest Authentication.

        For the full specification. http://www.faqs.org/rfcs/rfc2617.html

        realm = AuthName in httpd.conf
        users = a dict of users containing {username:password}
        """
        self.realm = realm
        self.users = users
        self._headers= []
        self.params = {}

    def H(self, data):
        return md5.md5(data).hexdigest()

    def KD(self, secret, data):
        return self.H(secret + ":" + data)

    def A1(self):
        # If the "algorithm" directive's value is "MD5" or is
        # unspecified, then A1 is:
        # A1 = unq(username-value) ":" unq(realm-value) ":" passwd
        username = self.params["username"]
        passwd = self.users.get(username, "")
        return "%s:%s:%s" % (username, self.realm, passwd)
        # This is A1 if qop is set
        # A1 = H( unq(username-value) ":" unq(realm-value) ":" passwd )
        #         ":" unq(nonce-value) ":" unq(cnonce-value)

    def A2(self):
        # If the "qop" directive's value is "auth" or is unspecified, then A2 is:
        # A2 = Method ":" digest-uri-value
        return self.method + ":" + self.uri
        # Not implemented
        # If the "qop" value is "auth-int", then A2 is:
        # A2 = Method ":" digest-uri-value ":" H(entity-body)

    def response(self):
        if self.params.has_key("qop"):
            # Check? and self.params["qop"].lower()=="auth":
            # If the "qop" value is "auth" or "auth-int":
            # request-digest  = <"> < KD ( H(A1),     unq(nonce-value)
            #                              ":" nc-value
            #                              ":" unq(cnonce-value)
            #                              ":" unq(qop-value)
            #                              ":" H(A2)
            #                      ) <">
            return self.KD(self.H(self.A1()), \
                           self.params["nonce"] 
                           + ":" + self.params["nc"]
                           + ":" + self.params["cnonce"]
                           + ":" + self.params["qop"]                    
                           + ":" + self.H(self.A2()))
        else:
            # If the "qop" directive is not present (this construction is
            # for compatibility with RFC 2069):
            # request-digest  =
            #         <"> < KD ( H(A1), unq(nonce-value) ":" H(A2) ) > <">
            return self.KD(self.H(self.A1()), \
                           self.params["nonce"] + ":" + self.H(self.A2()))
    
    def _parseHeader(self, authheader):
        try:
            n = len("Digest ")
            authheader = authheader[n:].strip()
            items = authheader.split(", ")
            keyvalues = [i.split("=", 1) for i in items]
            keyvalues = [(k.strip(), v.strip().replace('"', '')) for k, v in keyvalues]
            self.params = dict(keyvalues)
        except:
            self.params = []

    def _returnTuple(self, code):
        return (code, self._headers, self.params.get("username", ""))

    def _createNonce(self):
        return md5.md5("%d:%s" % (time.time(), self.realm)).hexdigest()

    def createAuthheaer(self):
        self._headers.append((
            "WWW-Authenticate", 
            'Digest realm="%s", nonce="%s", algorithm="MD5", qop="auth"'
            % (self.realm, self._createNonce())
            ))

    def authenticate(self, method, uri, authheader=""):
        """ Check the response for this method and uri with authheader

        returns a tuple with:
          - HTTP_CODE
          - a tuple with header info (key, value) or None
          - and the username which was authenticated or None
        """
        self.method = method
        self.uri = uri
        if authheader.strip() == "":
            self.createAuthheaer()
            return self._returnTuple(401)
        self._parseHeader(authheader)
        if not len(self.params):
            return self._returnTuple(400)
        # Check for required parameters
        required = ["username", "realm", "nonce", "uri", "response"]
        for k in required:
            if not self.params.has_key(k):
                return self._returnTuple(400)
        # If the user is unknown we can deny access right away
        if not self.users.has_key(self.params["username"]):
            self.createAuthheaer()
            return self._returnTuple(401)
        # If qop is sent then cnonce and cn MUST be present
        if self.params.has_key("qop"):
            if not self.params.has_key("cnonce") \
               and self.params.has_key("cn"):
                return self._returnTuple(400)
        # All else is OK, now check the response.
        if self.response() == self.params["response"]:
            return self._returnTuple(200)
        else:
            self.createAuthheaer()
            return self._returnTuple(401)

History

  • revision 4 (19 years ago)
  • previous revisions are not available