Python 大数据插入MySQL、数据库查询优化

材料清单

  • 操作系统: MacOS Catalina 14.5
  • 数据库:MySQL@5.6
  • Py版本:Python3.7

数据库设计:

  • 字段:
    字段名称 字段类型 长度
    ID int 0
    Phone bigint 11
    QQNumber bigint 11
  • 索引:

    名称 索引 索引类型 索引方法
    QQNumber qqNumber NORMAL BTREE
    Phone phone NORMAL BTREE
  • 数据量:5亿条

  • 查询时间:5秒内
  • 图片

查询速度



Python源代码

# -- coding:UTF-8 –
import random
import time
import pymysql

conn = pymysql.connect(host='localhost', user='root', password='admin141', db='Social', charset='utf8')
cur = conn.cursor()

# number_to_chinese, 单位-数字
num_dict = {0: "零", 1: "一", 2: "二", 3: "三", 4: "四",
            5: "五", 6: "六", 7: "七", 8: "八", 9: "九"}
unit_map = [["", "十", "百", "千"], ["万", "十万", "百万", "千万"],
            ["亿", "十亿", "百亿", "千亿"], ["兆", "十兆", "百兆", "千兆"]]
unit_step = ["万", "亿", "兆"]


class number_to_chinese():
    """
       参考: https://github.com/tyong920/a2c
    """

    def __init__(self):
        self.result = ""

    def number_to_str_10000(self, data_str):
        """一万以内的数转成大写"""
        res = []
        count = 0
        # 倒转
        str_rev = reversed(data_str)  # seq -- 要转换的序列,可以是 tuple, string, list 或 range。返回一个反转的迭代器。
        for i in str_rev:
            if i is not "0":
                count_cos = count // 4  # 行
                count_col = count % 4  # 列
                res.append(unit_map[count_cos][count_col])
                res.append(num_dict[int(i)])
                count += 1
            else:
                count += 1
                if not res:
                    res.append("零")
                elif res[-1] is not "零":
                    res.append("零")
        # 再次倒序,这次变为正序了
        res.reverse()
        # 去掉"一十零"这样整数的“零”
        if res[-1] is "零" and len(res) is not 1:
            res.pop()

        return "".join(res)

    def number_to_str(self, data):
        """分段转化"""
        assert type(data) == float or int
        data_str = str(data)
        len_data = len(str(data_str))
        count_cos = len_data // 4  # 行
        count_col = len_data - count_cos * 4  # 列
        if count_col > 0: count_cos += 1

        res = ""
        for i in range(count_cos):
            if i == 0:
                data_in = data_str[-4:]
            elif i == count_cos - 1 and count_col > 0:
                data_in = data_str[:count_col]
            else:
                data_in = data_str[-(i + 1) * 4:-(i * 4)]
            res_ = self.number_to_str_10000(data_in)
            res = res_ + unit_map[i][0] + res
        return res

    def decimal_chinese(self, data):
        assert type(data) == float or int
        data_str = str(data)
        if "." not in data_str:
            res = self.number_to_str(data_str)
        else:
            data_str_split = data_str.split(".")
            if len(data_str_split) is 2:
                res_start = self.number_to_str(data_str_split[0])
                res_end = "".join([num_dict[int(number)] for number in data_str_split[1]])
                res = res_start + random.sample(["点", "."], 1)[0] + res_end
            else:
                res = str(data)
        return res


def getDB(dataList):
    sql = "insert into qq(qqNumber, phone) values (%s,%s)"
    cur.executemany(sql, dataList)
    conn.commit()
    print("[+] 插入成功: " + str(len(dataList)) + "条")
    writeLog("[+] 插入成功: " + str(len(dataList)) + "条")

def writeLog(lineTextLog):
    with open("logs.txt", 'a+') as f:
        f.write(lineTextLog)


def AddList(filePath):
    dataList = []
    lineCount = 0
    AllCount = 0
    try:
        for line in open(filePath, "r"):
            try:
                lineCount = lineCount + 1
                resultLine = line.split('-')
                if len(dataList) == 1000000:
                    AllCount = AllCount + 1
                    getDB(dataList)
                    dataList.clear()
                    print("[+] 插入成功第" + str(AllCount) + "波数据,插入数据量为100万条")
                    print("[+] 总共插入(中文): " + num.decimal_chinese(lineCount))
                    print("[+] 总共插入(数字): " + num.decimal_chinese(lineCount))
                    print("[+] =============================")
                    writeLog("[+] 插入成功第" + str(AllCount) + "波数据,插入数据量为100万条")
                    writeLog("[+] 总共插入(中文): " + num.decimal_chinese(lineCount))
                    writeLog("[+] 总共插入(数字): " + num.decimal_chinese(lineCount))
                    writeLog("[+] =============================")

                    time.sleep(5)
                dataList.append([resultLine[0], resultLine[1]])
            except:
                print("[!] 插入出错,第: " + str(AllCount) + "波数据")
                print("[!] 插入出错,第: " + num.decimal_chinese(lineCount) + "条")
                writeLog("[!] 插入出错,第: " + str(AllCount) + "波数据")
                writeLog("[!] 插入出错,第: " + num.decimal_chinese(lineCount) + "条")
                continue
    except:
        print("[!] 读取文件错误,读取第" + str(lineCount) + "条时出错")
        writeLog("[!] 读取文件错误,读取第" + str(lineCount) + "条时出错")


if __name__ == '__main__':
    num = number_to_chinese()
    AddList("/Users/Apple/Downloads/tencentnew1.txt")
    cur.close()
    conn.close()

赞(0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址