首页   

1.背景:

最近公司项目的用户中心模块经常出现线上问题,测试人员需要将线上真实数据导入到测试环境中去复现Bug。公司有3套测试环境,来回切换并校验数据比较麻烦,于是就有了如下的数据库同步神器。

2.界面:

3.源代码:

# -*- coding: utf-8 -*-
@date:  2022/01/16 22:00
@author: Anker
@python:v3.6
import json
import tkinter as tk
import tkinter.filedialog
import tkinter.messagebox
import pymysql as pymysql
from tkinter import END
from pymysql.cursors import DictCursor
# 线上数据库
online_connect = "erdshggadkjhg2wrtyuic.sfgh.aliyuncs.com"
online_user = "Anker"
online_password = "123456"
online_port = "3306"
online_db_name = "user_center"
online_table = "uuc_business_user"
# 测试数据库
test_connect = "bm-cp134dfhdh5e7uho.mysql.rds.aliyuncs.com"
test_user = "root"
test_port = "3306"
test_password = "Vr9ybte9hQJMghjkljh"
test_table = "uuc_business_user"
# 创建一个窗口
window = tk.Tk()
window.title("数据库同步工具")
window.geometry('600x600')
# 将new_user_data变量初始化
new_user_data = None
def data_synchronization():
    # 定义一个全局变量new_user_data
    global new_user_data
    # 每次触发操作之前,清空多行文本框的内容
    text.delete("1.0", "end")
    # 定义两个变量name和phone,用来获取用户在文本框输入的内容
    name = entry1.get()
    phone = entry2.get()
    # 连接线上数据库数据库
    conn_online_sql = pymysql.connect(host=online_connect, user=online_user, cursorclass=DictCursor,
                                      password=online_password, database=online_db_name)
    curs_online = conn_online_sql.cursor()
    # 通过字典去读取不同的测试环境
    db_config = radio.get()
    config = {"0": "0", "test1环境": "user_center_ding", "test2环境": "user_center_ding_test2", "test3环境": "test3"}
    test_db_name = config[db_config]
        if not (entry1.get() and entry2.get()):
            tk.messagebox.showinfo(title="提示", message="用户姓名、用户手机为必填项!")
        elif radio.get() == "0":
            tk.messagebox.showinfo(title="提示", message="请选择数据库")
        elif radio.get() == "test3环境":
            tk.messagebox.showinfo(title="提示", message="目前没有该测试数据库")
        else:
            # 连接测试数据库数据库"
            conn_test2_sql = pymysql.connect(host=test_connect, user=test_user, password=test_password,
                                             database=test_db_name)
            curs2 = conn_test2_sql.cursor()
            # 读取线上数据库的用户信息
            read_online_sql = f'''select * from `{online_db_name}`.`{online_table}` 
                        where name = "{name}" and phone = "{phone}" '''
            # 读取测试数据库的用户信息
            read_test2_sql = f'''select * from `{test_db_name}`.`{test_table}` where name = 
                        "{name}" and phone = "{phone}" '''
            # 执行sql查询,查询线上数据库,将查询出来的结果保存在online_user_data中
            curs_online.execute(read_online_sql)
            online_user_data = curs_online.fetchone()
            print("线上数据库该用户数据为:" + '\n' + str(online_user_data) + '\n')
            # 判断线上数据库是否有该用户
            if not online_user_data:
                text.insert("insert", "线上数据库没有该用户!" + '\n')
            else:
                # 执行下面这句后data为{"content": "x", "created_time": "2019-06-19 13:28:30"},是str类型
                data = json.dumps(online_user_data, default=str, ensure_ascii=False)
                # 把data转换为字典类型
                data_result = json.loads(data)
                print("转换为dict类型后的数据为:", '\n', data_result, '\n')
                # 把data再次转为json类型
                json_result = json.dumps(data_result)
                print("转换为json类型后的数据为:" + '\n' + json_result + '\n')
                # 获取返回字典data_result值的所有value的列表
                new_user_data = tuple(data_result.values())
                print(new_user_data)
            # 执行sql查询,查询测试数据库,将查询出来的结果保存在test_user_data中
            curs2.execute(read_test2_sql)
            test2_user_data = curs2.fetchone()
            # 提交数据至线上数据库
            conn_online_sql.commit()
            # 判断测试数据库是否有该用户
            if test2_user_data:
                text.insert("insert", "测试数据库已存在该用户数据!" + '\n')
            # 如果线上数据库有该用户,测试数据库没有该用户,则执行同步操作
            if online_user_data and (not test2_user_data):
                    # 插入的SQL语句
                    insert_sql = f'''insert into `{test_db_name}`.`{test_table}` VALUES {new_user_data}'''
                    # 执行SQL,向测试数据库中插入数据
                    curs2.execute(insert_sql)
                    print(insert_sql)
                    text.insert("insert", "数据同步成功!" + '\n')
                except IOError:
                    text.insert("insert", "数据同步失败!" + '\n')
            conn_test2_sql.commit()
            curs_online.close()
            curs2.close()
    except IOError:
        print("数据插入失败")
        conn_online_sql.rollback()
        tkinter.filedialog.Directory()
    conn_online_sql.close()
# 清空按钮函数:用来清空输入文本框中的内容
def clear():
    entry1.delete(0, END)  # 清空Entry文本框中的内容
    entry2.delete(0, END)  # 清空Entry文本框中的内容
    text.delete("1.0", "end")  # 清空text多行文本框中的内容
    radio.set("0")  # 清空Radiobutton标签选项框
def selection():
    text.delete("1.0", "end")
    choose = str(radio.get())
    text.insert("insert", "您选择的是:" + choose + '\n')
# 创建一个标签
tk.Label(window, text="线上用户姓名:").place(x=10, y=50)
tk.Label(window, text="线上用户手机:").place(x=10, y=120)
# 创建一个Entry文本框:用来输入一行文本字符串
entry1 = tk.Entry(window, bd=3, width=62, highlightcolor='red')
entry1.place(x=120, y=50)
entry2 = tk.Entry(window, bd=3, width=62, highlightcolor='green')
entry2.place(x=120, y=120)
# 创建一个可以输入多行文字的文本框
text = tk.Text(window, height=20, width=75, bg='white', highlightcolor='blue', highlightthickness=1)
text.place(x=35, y=240)
# 创建两个按钮:选择文件夹、退出窗口
tk.Button(window, text='开始同步', bg='grey', fg='white', command=data_synchronization).place(x=130, y=520)
tk.Button(window, text='清空', bg='grey', fg='white', command=clear).place(x=290, y=520)
tk.Button(window, text='退出', bg='grey', fg='white', command=window.quit).place(x=430, y=520)
radio = tk.StringVar()
radio.set("0")
tk.Label(window, text='请选择数据库:', bg='green', fg='white', width=12).place(x=10, y=180)
# 创建3个radiobutton选项
radio1 = tk.Radiobutton(window, text='test1环境', variable=radio, command=selection, value="test1环境")
radio1.place(x=140, y=180)
radio2 = tk.Radiobutton(window, text='test2环境', variable=radio, command=selection, value="test2环境")
radio2.place(x=290, y=180)
radio3 = tk.Radiobutton(window, text='test3环境', variable=radio, command=selection, value="test3环境")
radio3.place(x=440, y=180)
# 主循环
window.mainloop()
                            
                            
# 读取配置文件
conf = configparser.RawConfigParser()
conf.read("D:\PycharmProjects\economic_relation\\venv\Include\control\conf.ini")
# 获取源数据库参数
class DBUtils():
  def __init__(self):
    self.conn = pymysql.connect(dbDict['test1'], dbUser, dbPassword)
    self.cursor
				
通常,进程之间彼此是完全孤立的,唯一的通信方式是队列或管道。但可以使用两个对象来表示共享数据。其实,这些对象使用了共享内存(通过mmap模块)使访问多个进程成为可能。Value( typecode, arg1, … argN, lock ) 在共享内容中常见ctypes对象。typecode要么是包含array模块使用的相同类型代码(如’i’,’d’等)的字符串,要么是来自ctypes模块的类型对
from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent import sys import threading import traceback import time import pymysql mysql_sett. 今天看到有网友在我的贴子下留言,说让我帮他分析一个商品网站,判断商品有货或者无货。写下这篇博客,记录下我的分析思路。 一.分析网页结构 这是一个卖奢侈品的网站,和其他电商网站一样,有商品的一些信息, import pypyodbc p_path = r'专题3.3 结构化信息采集\data\贷款利率.mdb' connStr = 'Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ='+p_path+';' conn = pyodbc.connect(connStr) # 链接数据库 二、转化为DataFr...
© 2022 微搜