This script will take all the tables from a given MySQL database and upload them to Amazon SimpleDB as Domains. The 'name' of each uploaded record is the value of the first primary key found in the table.
This was written to scratch a particular itch, I've tested it against a 500mb database of my own, but didn't try overly hard to make it an all purpose general tool.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | #!/usr/bin/env python
access_key = ''
secret_key = ''
import boto
import MySQLdb
from MySQLdb import cursors
import threading
db_user = ''
db_name = ''
db = MySQLdb.connect(user=db_user,db=db_name,cursorclass=cursors.DictCursor)
c = db.cursor()
sdb = boto.connect_sdb(access_key, secret_key)
def get_or_create_domain(domain):
try:
d = sdb.get_domain(domain)
except boto.exception.SDBResponseError:
d = sdb.create_domain(domain)
return d
def get_primary_key(table_name, cursor):
"""
Returns a dictionary of fieldname -> infodict for the given table,
where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
"""
cursor.execute("SHOW INDEX FROM %s" % table_name)
indexes = {}
for row in cursor.fetchall():
if row['Key_name'] == 'PRIMARY':
return row['Column_name']
raise("Table %s does not have a primary key" % table_name)
class BotoWorker(threading.Thread):
def __init__(self, name, record, domain):
self.domain = domain
self.name = name
self.record = record
threading.Thread.__init__(self)
def run(self):
print "inserting %s" % self.name
item = self.domain.new_item(self.name)
for key, value in self.record.items():
try:
item[key] = value
except UnicodeDecodeError:
item[key] = 'unicode error'
def main():
c.execute("show tables;")
for table in c.fetchall():
table = table["Tables_in_%s" % db]
print "loading data from %s" % table
total = c.execute("select * from %s" % table)
print "fetched %s items from mysql" % total
for record in c.fetchall():
name = record.pop(get_primary_key(table, c))
thread_started = False
while not thread_started:
if threading.activeCount() < 10:
print "got a thread %s" % threading.activeCount()
BotoWorker(name=name, record=record, domain=get_or_create_domain(table)).start()
thread_started = True
if __name__ == '__main__':
main()
|
Some MySQLdb notes:
Cursors are iterators, so you can loop over them like "for row in cursor" instead of "for row in cursor.fetchall()". You still need c.fetchall() in your outer loop because you are wiping out the result set in the inner loop. OTOH, you could just create a new cursor for your inner loop.
You can't trust cursor.execute() to return the number of rows; it doesn't work for all cursor types. The standard (PEP-249) way of doing this is to look at cursor.rowcount, but for the SSCursor classes, this will always be -1 because it can't be determined.
Thanks for the feedback Andy. This script may very well become obsolete soon, the AWS guys are predicting a bulk uploader by the end of the year.
item[key] = 'unicode error'
heh, nice one Chris. This will probably work:
otherwise, cool stuff. I'm going to start with this code to to try uploading a postgres database.
You can use SDB Explorer to upload My SQL data in bulk to Amazon Simple DB using simple GUI. For more detail you can refer documentation page : http://www.sdbexplorer.com/documentation/simpledb--how-to-upload-mysql-data-on-amazon-simpledb.html
To download SDB Explorer :
http://www.sdbexplorer.com/simpledb-download.html