この記事では、Python で単純なマルチタスク mysql を XML に実装する方法を主に紹介し、Python クエリ mysql の結果セットを XML 形式のデータ出力に関連する操作スキルをサンプルの形式で分析します。
この記事の例は、Python が単純なマルチタスクの mysql to XML メソッドを実装するものであると説明されています。参考のために皆さんと共有してください。詳細は次のとおりです: ニーズを満たすために、エクスポートされる形式は、navicat によってエクスポートされる XML と一致している必要があります。 gevent を使用すると、ファイル I/O 操作がブロックされるため、完全な非同期にはなりません。 1. mysql2xml.py:# -*- coding: utf-8 -*- ''' Created on 2014/12/27 @author: Yoki ''' import gevent import pymysql from pymysql.cursors import DictCursor import re import codecs db_conn = None def init_mysql_connect(*args, **kwargs): global db_conn db_conn = pymysql.connect(*args, **kwargs) def list_to_xml(result_cur, key_list): ''' mysql 结果集转xml,非xml标准导出方式; xml dom 不支持相同名字的node :param result_cur: :param key_list: :return: ''' content = '' content += '<?xml version="1.0" encoding="UTF-8" ?>\r\n' content += '<RECORDS>\r\n' # root节点 for item in result_cur: content += '\t<RECORD>\r\n' for k in key_list: v = item.get(k, '') real_value = v content += '\t\t<%s>%s</%s>\r\n' % (k, real_value, k) content += '\t</RECORD>\r\n' content += '</RECORDS>\r\n' return content def get_table_rows(tb_name): ''' 获取mysql表rows :param tb_name: :return: ''' global db_conn rows = [] cursor = db_conn.cursor(cursor=DictCursor) cursor.execute('select * from %s' % tb_name) for row in cursor: rows.append(row) return rows def get_table_keys(tb_name): ''' 获取表中字段,顺序 为创建表时的顺序 :param tb_name: :return: ''' global db_conn cursor = db_conn.cursor(cursor=DictCursor) cur = cursor.execute('show create table %s' % tb_name) if cur != 1: raise Exception for r in cursor: create_sql = r['Create Table'] fields = re.findall('`(.*?)`', create_sql) result = [] # 处理字段 for i in xrange(1, len(fields)): field = fields[i] if field in result: continue result.append(field) return result return [] def mysql_to_xml(tb_name, output_dir='xml', postfix='xml'): ''' mysql数据导出xml, :param tb_name: 数据库表名 :param output_dir: :param postfix: :return: ''' rows = get_table_rows(tb_name) keys = get_table_keys(tb_name) content = list_to_xml(rows, keys) fp = codecs.open('%s/%s.%s' % (output_dir, tb_name, postfix), 'w', 'utf-8') fp.write(content) fp.close() tb_list = [ 'tb_item', 'tb_state' ] if __name__ == '__main__': init_mysql_connect(host="localhost", user='user', password="password", database='test', port=3306, charset='utf8') jobs = [] for tb_name in tb_list: jobs.append(gevent.spawn(mysql_to_xml, tb_name)) gevent.joinall(jobs)
def list_to_xml(result_cur, key_list): fp = codecs.open('test.xml'), 'w', 'utf-8') fp.write('<?xml version="1.0" encoding="UTF-8" ?>\r\n') fp.write('<RECORDS>\r\n') for item in result_cur: fp.write('\t<RECORD>\r\n') for k in key_list: v = item.get(k, '') if v is None: real_value = '' else: if type(v) == unicode: real_value = cgi.escape(v) else: real_value = v fp.write('\t\t<%s>%s</%s>\r\n' % (k, real_value, k)) fp.write('\t</RECORD>\r\n') fp.write('</RECORDS>\r\n') fp.close()