Welcome, guest | Sign In | My Account | Store | Cart

Wrote some simple implementation of pool for pymongo package under gevent coroutine library.

Base bug here was with pymongo.connection.Pool because in the original package it is thread-local, so when you spawn new greenlet and trying to get already open connection, it creates new connection because in this greenlet pool is empty. So if you will implement your own pool don’t forget about this.

Example of use:

# Create Pool. 
db = Mongo('test_db',10)
# Get connection from pool
conn = db.get_conn()
# Get raw connection for GridFS
raw_conn = conn.getDB

#Mongo is a singleton. So if you want to get connection in another part of application just type
db = Mongo()
conn = db.get_conn()

#Connection will get back to pool when context will be closed.
Python, 115 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
__author__ = "Andrey Nikishaev"
__email__ = "creotiv@gmail.com"
 
import pymongo
from gevent.queue import PriorityQueue
import os
import time

class MongoPoolException(Exception):
    pass

class MongoPoolCantConnect(MongoPoolException):
    pass
    
class MongoPoolAutoReconnect(MongoPoolException):
    pass

class GPool(object):
    """
        Rewrited non-thread local implementation of pymongo.connection._Pool
    """

    __slots__ = ["sockets", "socket_factory", "pool_size","sock"]

    def __init__(self, socket_factory):
        object.__init__(self)
        self.pool_size = 1
        if not hasattr(self,"sock"):
            self.sock = None
        self.socket_factory = socket_factory
        if not hasattr(self, "sockets"):
            self.sockets = []

    def socket(self):
        # we store the pid here to avoid issues with fork /
        # multiprocessing - see
        # test.test_connection:TestConnection.test_fork for an example
        # of what could go wrong otherwise
        pid = os.getpid()
        if self.sock is not None and self.sock[0] == pid:
            return self.sock[1]

        try:
            self.sock = (pid, self.sockets.pop())
        except IndexError:
            self.sock = (pid, self.socket_factory())

        return self.sock[1]

    def return_socket(self):
        
        if self.sock is not None and self.sock[0] == os.getpid():
            # There's a race condition here, but we deliberately
            # ignore it.  It means that if the pool_size is 10 we
            # might actually keep slightly more than that.
            if len(self.sockets) < self.pool_size:
                self.sockets.append(self.sock[1])
            else:
                self.sock[1].close()
        self.sock = None

pymongo.connection._Pool = GPool

class MongoConnection(object):
    """Memcache pool auto-destruct connection"""
    def __init__(self,pool,conn):
        self.pool = pool
        self.conn = conn
        
    def getDB(self):
        return self.conn

    def __getattr__(self, name):
        return getattr(self.conn, name)
    
    def __getitem__(self, name):
        return self.conn[name]
                                             
    def __del__(self):
        self.pool.queue.put((time.time(),self.conn))
        del self.pool
        del self.conn


class Mongo(object):    
    """MongoDB Pool"""
    def __new__(cls,size=5,dbname='',*args,**kwargs):
        if not hasattr(cls,'_instance'):
            cls._instance = object.__new__(cls)
            cls._instance.dbname = dbname
            cls._instance.queue = PriorityQueue(size)
            for x in xrange(size):
                try:
                    cls._instance.queue.put(
                        (time.time(),pymongo.Connection(*args,**kwargs)[dbname])
                    )
                except Exception,e:
                    raise MongoPoolCantConnect('Can\'t connect to mongo servers: %s' % e)
                    
        return cls._instance     
        
    def get_conn(self,block=True,timeout=None):
        """Get Mongo connection wrapped in MongoConnection"""
        obj = MongoConnection
        return obj(self,self.queue.get(block,timeout)[1]) 
        
def autoreconnect(func,*args,**kwargs):
    while True
        try:
            result = func(*args,**kwargs)
        except pymongo.errors.AutoReconnect:
            raise MongoPoolAutoReconnect('Can\'t connect to DB, it may gone.')      
        else: 
            return result
            break