__author__ = "Andrey Nikishaev"
__email__ = "creotiv@gmail.com"
import pymongo
from gevent.queue import PriorityQueue
import os
import time
class MongoPoolException(Exception):
pass
class MongoPoolCantConnect(MongoPoolException):
pass
class MongoPoolAutoReconnect(MongoPoolException):
pass
class GPool(object):
"""
Rewrited non-thread local implementation of pymongo.connection._Pool
"""
__slots__ = ["sockets", "socket_factory", "pool_size","sock"]
def __init__(self, socket_factory):
object.__init__(self)
self.pool_size = 1
if not hasattr(self,"sock"):
self.sock = None
self.socket_factory = socket_factory
if not hasattr(self, "sockets"):
self.sockets = []
def socket(self):
# we store the pid here to avoid issues with fork /
# multiprocessing - see
# test.test_connection:TestConnection.test_fork for an example
# of what could go wrong otherwise
pid = os.getpid()
if self.sock is not None and self.sock[0] == pid:
return self.sock[1]
try:
self.sock = (pid, self.sockets.pop())
except IndexError:
self.sock = (pid, self.socket_factory())
return self.sock[1]
def return_socket(self):
if self.sock is not None and self.sock[0] == os.getpid():
# There's a race condition here, but we deliberately
# ignore it. It means that if the pool_size is 10 we
# might actually keep slightly more than that.
if len(self.sockets) < self.pool_size:
self.sockets.append(self.sock[1])
else:
self.sock[1].close()
self.sock = None
pymongo.connection._Pool = GPool
class MongoConnection(object):
"""Memcache pool auto-destruct connection"""
def __init__(self,pool,conn):
self.pool = pool
self.conn = conn
def getDB(self):
return self.conn
def __getattr__(self, name):
return getattr(self.conn, name)
def __getitem__(self, name):
return self.conn[name]
def __del__(self):
self.pool.queue.put((time.time(),self.conn))
del self.pool
del self.conn
class Mongo(object):
"""MongoDB Pool"""
def __new__(cls,size=5,dbname='',*args,**kwargs):
if not hasattr(cls,'_instance'):
cls._instance = object.__new__(cls)
cls._instance.dbname = dbname
cls._instance.queue = PriorityQueue(size)
for x in xrange(size):
try:
cls._instance.queue.put(
(time.time(),pymongo.Connection(*args,**kwargs)[dbname])
)
except Exception,e:
raise MongoPoolCantConnect('Can\'t connect to mongo servers: %s' % e)
return cls._instance
def get_conn(self,block=True,timeout=None):
"""Get Mongo connection wrapped in MongoConnection"""
obj = MongoConnection
return obj(self,self.queue.get(block,timeout)[1])
def autoreconnect(func,*args,**kwargs):
while True
try:
result = func(*args,**kwargs)
except pymongo.errors.AutoReconnect:
raise MongoPoolAutoReconnect('Can\'t connect to DB, it may gone.')
else:
return result
break
Diff to Previous Revision
--- revision 1 2010-12-08 07:53:35
+++ revision 2 2011-09-02 05:56:58
@@ -2,86 +2,118 @@
__email__ = "creotiv@gmail.com"
import pymongo
-from gevent.queue import Queue
-
-class GeventMongoPool(object):
+from gevent.queue import PriorityQueue
+import os
+import time
+
+class MongoPoolException(Exception):
+ pass
+
+class MongoPoolCantConnect(MongoPoolException):
+ pass
+
+class MongoPoolAutoReconnect(MongoPoolException):
+ pass
+
+class GPool(object):
"""
- Rewrited connection pool for working with global connections.
+ Rewrited non-thread local implementation of pymongo.connection._Pool
"""
-
- # Non thread-locals
- __slots__ = ["sockets", "socket_factory"]
- sock = None
-
+
+ __slots__ = ["sockets", "socket_factory", "pool_size","sock"]
+
def __init__(self, socket_factory):
+ object.__init__(self)
+ self.pool_size = 1
+ if not hasattr(self,"sock"):
+ self.sock = None
self.socket_factory = socket_factory
if not hasattr(self, "sockets"):
self.sockets = []
-
+
def socket(self):
# we store the pid here to avoid issues with fork /
# multiprocessing - see
# test.test_connection:TestConnection.test_fork for an example
# of what could go wrong otherwise
pid = os.getpid()
-
if self.sock is not None and self.sock[0] == pid:
return self.sock[1]
-
+
try:
self.sock = (pid, self.sockets.pop())
except IndexError:
self.sock = (pid, self.socket_factory())
-
+
return self.sock[1]
-
+
def return_socket(self):
+
if self.sock is not None and self.sock[0] == os.getpid():
- self.sockets.append(self.sock[1])
+ # There's a race condition here, but we deliberately
+ # ignore it. It means that if the pool_size is 10 we
+ # might actually keep slightly more than that.
+ if len(self.sockets) < self.pool_size:
+ self.sockets.append(self.sock[1])
+ else:
+ self.sock[1].close()
self.sock = None
-
-pymongo.connection.Pool = GeventMongoPool
-
+
+pymongo.connection._Pool = GPool
+
class MongoConnection(object):
"""Memcache pool auto-destruct connection"""
def __init__(self,pool,conn):
self.pool = pool
self.conn = conn
-
+
def getDB(self):
return self.conn
-
+
def __getattr__(self, name):
return getattr(self.conn, name)
-
+
def __getitem__(self, name):
return self.conn[name]
-
+
def __del__(self):
- self.pool.queue.put(self.conn)
+ self.pool.queue.put((time.time(),self.conn))
del self.pool
del self.conn
-
+
+
class Mongo(object):
"""MongoDB Pool"""
- def __new__(cls,db_name,size=5,*args,**kwargs):
+ def __new__(cls,size=5,dbname='',*args,**kwargs):
if not hasattr(cls,'_instance'):
- # use your own config library
cls._instance = object.__new__(cls)
- cls._instance.queue = Queue(size)
+ cls._instance.dbname = dbname
+ cls._instance.queue = PriorityQueue(size)
for x in xrange(size):
try:
- # use your own config library
cls._instance.queue.put(
- pymongo.Connection(*args,**kwargs)[db_name]
+ (time.time(),pymongo.Connection(*args,**kwargs)[dbname])
)
- except:
- sys.exc_clear()
- error('Can\'t connect to mongo servers')
-
+ except Exception,e:
+ raise MongoPoolCantConnect('Can\'t connect to mongo servers: %s' % e)
+
return cls._instance
-
+
def get_conn(self,block=True,timeout=None):
- """Get Mongo connection wrapped in MongoConnection object"""
+ """Get Mongo connection wrapped in MongoConnection"""
obj = MongoConnection
- return obj(self,self.queue.get(block,timeout))
+ return obj(self,self.queue.get(block,timeout)[1])
+
+def autoreconnect(func,*args,**kwargs):
+ while True
+ try:
+ result = func(*args,**kwargs)
+ except pymongo.errors.AutoReconnect:
+ raise MongoPoolAutoReconnect('Can\'t connect to DB, it may gone.')
+ else:
+ return result
+ break
+
+
+
+