Wrote some simple implementation of pool for pymongo package under gevent coroutine library.
Base bug here was with pymongo.connection.Pool because in the original package it is thread-local, so when you spawn new greenlet and trying to get already open connection, it creates new connection because in this greenlet pool is empty. So if you will implement your own pool don’t forget about this.
Example of use:
# Create Pool.
db = Mongo('test_db',10)
# Get connection from pool
conn = db.get_conn()
# Get raw connection for GridFS
raw_conn = conn.getDB
#Mongo is a singleton. So if you want to get connection in another part of application just type
db = Mongo()
conn = db.get_conn()
#Connection will get back to pool when context will be closed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | __author__ = "Andrey Nikishaev"
__email__ = "creotiv@gmail.com"
import pymongo
from gevent.queue import PriorityQueue
import os
import time
class MongoPoolException(Exception):
pass
class MongoPoolCantConnect(MongoPoolException):
pass
class MongoPoolAutoReconnect(MongoPoolException):
pass
class GPool(object):
"""
Rewrited non-thread local implementation of pymongo.connection._Pool
"""
__slots__ = ["sockets", "socket_factory", "pool_size","sock"]
def __init__(self, socket_factory):
object.__init__(self)
self.pool_size = 1
if not hasattr(self,"sock"):
self.sock = None
self.socket_factory = socket_factory
if not hasattr(self, "sockets"):
self.sockets = []
def socket(self):
# we store the pid here to avoid issues with fork /
# multiprocessing - see
# test.test_connection:TestConnection.test_fork for an example
# of what could go wrong otherwise
pid = os.getpid()
if self.sock is not None and self.sock[0] == pid:
return self.sock[1]
try:
self.sock = (pid, self.sockets.pop())
except IndexError:
self.sock = (pid, self.socket_factory())
return self.sock[1]
def return_socket(self):
if self.sock is not None and self.sock[0] == os.getpid():
# There's a race condition here, but we deliberately
# ignore it. It means that if the pool_size is 10 we
# might actually keep slightly more than that.
if len(self.sockets) < self.pool_size:
self.sockets.append(self.sock[1])
else:
self.sock[1].close()
self.sock = None
pymongo.connection._Pool = GPool
class MongoConnection(object):
"""Memcache pool auto-destruct connection"""
def __init__(self,pool,conn):
self.pool = pool
self.conn = conn
def getDB(self):
return self.conn
def __getattr__(self, name):
return getattr(self.conn, name)
def __getitem__(self, name):
return self.conn[name]
def __del__(self):
self.pool.queue.put((time.time(),self.conn))
del self.pool
del self.conn
class Mongo(object):
"""MongoDB Pool"""
def __new__(cls,size=5,dbname='',*args,**kwargs):
if not hasattr(cls,'_instance'):
cls._instance = object.__new__(cls)
cls._instance.dbname = dbname
cls._instance.queue = PriorityQueue(size)
for x in xrange(size):
try:
cls._instance.queue.put(
(time.time(),pymongo.Connection(*args,**kwargs)[dbname])
)
except Exception,e:
raise MongoPoolCantConnect('Can\'t connect to mongo servers: %s' % e)
return cls._instance
def get_conn(self,block=True,timeout=None):
"""Get Mongo connection wrapped in MongoConnection"""
obj = MongoConnection
return obj(self,self.queue.get(block,timeout)[1])
def autoreconnect(func,*args,**kwargs):
while True
try:
result = func(*args,**kwargs)
except pymongo.errors.AutoReconnect:
raise MongoPoolAutoReconnect('Can\'t connect to DB, it may gone.')
else:
return result
break
|