Welcome, guest | Sign In | My Account | Store | Cart
__author__ = "Andrey Nikishaev"
__email__ = "creotiv@gmail.com"
 
import pymongo
from gevent.queue import PriorityQueue
import os
import time

class MongoPoolException(Exception):
    pass

class MongoPoolCantConnect(MongoPoolException):
    pass
    
class MongoPoolAutoReconnect(MongoPoolException):
    pass

class GPool(object):
    """
        Rewrited non-thread local implementation of pymongo.connection._Pool
    """

    __slots__ = ["sockets", "socket_factory", "pool_size","sock"]

    def __init__(self, socket_factory):
        object.__init__(self)
        self.pool_size = 1
        if not hasattr(self,"sock"):
            self.sock = None
        self.socket_factory = socket_factory
        if not hasattr(self, "sockets"):
            self.sockets = []

    def socket(self):
        # we store the pid here to avoid issues with fork /
        # multiprocessing - see
        # test.test_connection:TestConnection.test_fork for an example
        # of what could go wrong otherwise
        pid = os.getpid()
        if self.sock is not None and self.sock[0] == pid:
            return self.sock[1]

        try:
            self.sock = (pid, self.sockets.pop())
        except IndexError:
            self.sock = (pid, self.socket_factory())

        return self.sock[1]

    def return_socket(self):
        
        if self.sock is not None and self.sock[0] == os.getpid():
            # There's a race condition here, but we deliberately
            # ignore it.  It means that if the pool_size is 10 we
            # might actually keep slightly more than that.
            if len(self.sockets) < self.pool_size:
                self.sockets.append(self.sock[1])
            else:
                self.sock[1].close()
        self.sock = None

pymongo.connection._Pool = GPool

class MongoConnection(object):
    """Memcache pool auto-destruct connection"""
    def __init__(self,pool,conn):
        self.pool = pool
        self.conn = conn
        
    def getDB(self):
        return self.conn

    def __getattr__(self, name):
        return getattr(self.conn, name)
    
    def __getitem__(self, name):
        return self.conn[name]
                                             
    def __del__(self):
        self.pool.queue.put((time.time(),self.conn))
        del self.pool
        del self.conn


class Mongo(object):    
    """MongoDB Pool"""
    def __new__(cls,size=5,dbname='',*args,**kwargs):
        if not hasattr(cls,'_instance'):
            cls._instance = object.__new__(cls)
            cls._instance.dbname = dbname
            cls._instance.queue = PriorityQueue(size)
            for x in xrange(size):
                try:
                    cls._instance.queue.put(
                        (time.time(),pymongo.Connection(*args,**kwargs)[dbname])
                    )
                except Exception,e:
                    raise MongoPoolCantConnect('Can\'t connect to mongo servers: %s' % e)
                    
        return cls._instance     
        
    def get_conn(self,block=True,timeout=None):
        """Get Mongo connection wrapped in MongoConnection"""
        obj = MongoConnection
        return obj(self,self.queue.get(block,timeout)[1]) 
        
def autoreconnect(func,*args,**kwargs):
    while True
        try:
            result = func(*args,**kwargs)
        except pymongo.errors.AutoReconnect:
            raise MongoPoolAutoReconnect('Can\'t connect to DB, it may gone.')      
        else: 
            return result
            break

        
    
        

Diff to Previous Revision

--- revision 1 2010-12-08 07:53:35
+++ revision 2 2011-09-02 05:56:58
@@ -2,86 +2,118 @@
 __email__ = "creotiv@gmail.com"
  
 import pymongo
-from gevent.queue import Queue
- 
-class GeventMongoPool(object):
+from gevent.queue import PriorityQueue
+import os
+import time
+
+class MongoPoolException(Exception):
+    pass
+
+class MongoPoolCantConnect(MongoPoolException):
+    pass
+    
+class MongoPoolAutoReconnect(MongoPoolException):
+    pass
+
+class GPool(object):
     """
-    Rewrited connection pool for working with global connections.
+        Rewrited non-thread local implementation of pymongo.connection._Pool
     """
- 
-    # Non thread-locals
-    __slots__ = ["sockets", "socket_factory"]
-    sock = None
- 
+
+    __slots__ = ["sockets", "socket_factory", "pool_size","sock"]
+
     def __init__(self, socket_factory):
+        object.__init__(self)
+        self.pool_size = 1
+        if not hasattr(self,"sock"):
+            self.sock = None
         self.socket_factory = socket_factory
         if not hasattr(self, "sockets"):
             self.sockets = []
- 
+
     def socket(self):
         # we store the pid here to avoid issues with fork /
         # multiprocessing - see
         # test.test_connection:TestConnection.test_fork for an example
         # of what could go wrong otherwise
         pid = os.getpid()
- 
         if self.sock is not None and self.sock[0] == pid:
             return self.sock[1]
- 
+
         try:
             self.sock = (pid, self.sockets.pop())
         except IndexError:
             self.sock = (pid, self.socket_factory())
- 
+
         return self.sock[1]
- 
+
     def return_socket(self):
+        
         if self.sock is not None and self.sock[0] == os.getpid():
-            self.sockets.append(self.sock[1])
+            # There's a race condition here, but we deliberately
+            # ignore it.  It means that if the pool_size is 10 we
+            # might actually keep slightly more than that.
+            if len(self.sockets) < self.pool_size:
+                self.sockets.append(self.sock[1])
+            else:
+                self.sock[1].close()
         self.sock = None
- 
-pymongo.connection.Pool = GeventMongoPool
- 
+
+pymongo.connection._Pool = GPool
+
 class MongoConnection(object):
     """Memcache pool auto-destruct connection"""
     def __init__(self,pool,conn):
         self.pool = pool
         self.conn = conn
- 
+        
     def getDB(self):
         return self.conn
- 
+
     def __getattr__(self, name):
         return getattr(self.conn, name)
- 
+    
     def __getitem__(self, name):
         return self.conn[name]
- 
+                                             
     def __del__(self):
-        self.pool.queue.put(self.conn)
+        self.pool.queue.put((time.time(),self.conn))
         del self.pool
         del self.conn
- 
+
+
 class Mongo(object):    
     """MongoDB Pool"""
-    def __new__(cls,db_name,size=5,*args,**kwargs):
+    def __new__(cls,size=5,dbname='',*args,**kwargs):
         if not hasattr(cls,'_instance'):
-	    # use your own config library
             cls._instance = object.__new__(cls)
-            cls._instance.queue = Queue(size)
+            cls._instance.dbname = dbname
+            cls._instance.queue = PriorityQueue(size)
             for x in xrange(size):
                 try:
-		    # use your own config library
                     cls._instance.queue.put(
-                        pymongo.Connection(*args,**kwargs)[db_name]
+                        (time.time(),pymongo.Connection(*args,**kwargs)[dbname])
                     )
-                except:
-                    sys.exc_clear()
-                    error('Can\'t connect to mongo servers')
- 
+                except Exception,e:
+                    raise MongoPoolCantConnect('Can\'t connect to mongo servers: %s' % e)
+                    
         return cls._instance     
- 
+        
     def get_conn(self,block=True,timeout=None):
-        """Get Mongo connection wrapped in MongoConnection object"""
+        """Get Mongo connection wrapped in MongoConnection"""
         obj = MongoConnection
-        return obj(self,self.queue.get(block,timeout))
+        return obj(self,self.queue.get(block,timeout)[1]) 
+        
+def autoreconnect(func,*args,**kwargs):
+    while True
+        try:
+            result = func(*args,**kwargs)
+        except pymongo.errors.AutoReconnect:
+            raise MongoPoolAutoReconnect('Can\'t connect to DB, it may gone.')      
+        else: 
+            return result
+            break
+
+        
+    
+        

History