1. 首页 » 知识阅读

对比的句子(MySQL表数据对比小工具)

导读:笔者最近在采用 trino 代替旧有方案进行媒体接口数据拉取。需通过将 trino 拉取的数据入到测试库,并与旧方案拉取到生产库中的数据进行对比从而验证逻辑准确性。在进行数据对比时为提高效率因此开发了一个数据对比小工具,在这里分享一下希望对各位有所帮助。

思路分析

两张表进行数据对比时,首先需要计算出两张表各自参与数据对比的行数。数据匹配由主键进行关联,采用 Full join对于部分字段如 insert_time 等并不要求一致,所以需要可灵活配置过滤不参与对比字段数据对比需具体到每一个字段的对比,同时输出结果差异可一目了然由于表量多且在对比数据时存在各种差异,因此程序应可灵活配置,主要负责基础配置如:数据库信息、指定对比的表等。而最后将对比逻辑生成为一条 SQL,这样便可实现在对比数据时可根据实际情况灵活改动

Java 具体实现

引入 freemarker 依赖,这里使用 freemarker 来配置生成 SQL 的基础模板

 
            org.freemarker
            freemarker
            2.3.31
 

FreeMarkerTemplateUtils 工具类

public class FreeMarkerTemplateUtils {

    private FreeMarkerTemplateUtils(){}
    private static final Configuration CONFIGURATION = new Configuration(Configuration.VERSION_2_3_31);

    static{
        //指定加载模板所在的路径
        CONFIGURATION.setTemplateLoader(new ClassTemplateLoader(FreeMarkerTemplateUtils.class, "/templates"));
        CONFIGURATION.setDefaultEncoding("UTF-8");
        CONFIGURATION.setTemplateExceptionHandler(TemplateExceptionHandler.RETHROW_HANDLER);
        CONFIGURATION.setCacheStorage(NullCacheStorage.INSTANCE);
    }

    public static Template getTemplate(String templateName) throws IOException {
        try {
            return CONFIGURATION.getTemplate(templateName);
        } catch (IOException e) {
            throw e;
        }
    }

    public static void clearCache() {
        CONFIGURATION.clearTemplateCache();
    }
}

Table 实体

@AllArgsConstructor 
@Data  
public class Table {
   /**
     * 表名
     */
    private String tableName;

    /**
     * 过滤字段
     */
    private List filterFields;

    /**
     * 日期
     */
    private String dateStr;

    /**
     * 指定作为过滤条件的时间字段
     */
    private String filterTimeField;
}

MediaSourceDataCompareUtils 核心处理类

public class MediaSourceDataCompareUtils {

    private static final String DRIVER = "com.mysql.jdbc.Driver";
    private static final String USER = "username";
    private static final String PASSWORD = "password";
    // 数据库连接信息
    private static final String URL = "jdbc:mysql://xxxxxxxxxxxxxxxx:3306";
    private static Connection conn;
    private static PreparedStatement ps;
    private static ResultSet rs;

    /**
     * 配置
     */
    private static final String PROD_SCHEMA = "tdatamedia";
    private static final String TEST_SCHEMA = "datamedia_test";
    private static final String TEMPLATE_NAME = "CompareSqlTemp.ftl";
    private static final String SQL_FILE_DIR = "G://sqls/";

    /**
     * 启动
     */
    public static void main(String[] args) throws Exception {
        // 添加任务
        List tables = Lists.newArrayList(
                           new Table("ods_uc_campaign_log", Lists.newArrayList("is_cost"), "2021-07-07", "cost_date_hour")
                );
        for (Table table : tables) {
            check(table);
        }
    }

    /**
     * 匹配
     */
    private static void check(Table table) throws Exception {
        System.out.println("数据核对任务开始!核对表名:" + table.getTableName());
        try {
            // 创建连接
            createConnection();
            compare(table);
            System.out.println("数据核对任务结束!");
        } catch (Exception e) {
            System.out.println("执行异常。异常信息:" + e.getMessage());
        } finally {
            // 关闭连接
            rs.close();
            ps.close();
            conn.close();
        }
    }

    /**
     * 生成对比语句
     */
    private static void compare(Table table) throws Exception {
        // 获取表字段及主键
        List pks = new ArrayList();
        List fields = new ArrayList<>();
        ps = conn.prepareStatement("describe " + TEST_SCHEMA + "." + table.getTableName());
        rs = ps.executeQuery();
        while (rs.next()) {
            if ("PRI".equals(rs.getString(4))) {
                pks.add(rs.getString(1));
            }
            fields.add(rs.getString(1));
        }
        System.out.println("主键为:" + pks.toString());
        System.out.println("列名为:" + fields.toString());

        // 过滤掉不关心的字段
        fields = filter(fields, table.getFilterFields());
        StringBuilder pkStr = new StringBuilder("");
        for (String pk : pks) {
            pkStr.append(pk + ",");
        }
        // 配置模板
        Map dataMap = new HashMap<>();
        dataMap.put("PK_STR", pkStr.substring(0, pkStr.length() - 1));
        dataMap.put("FIELDS", fields);
        dataMap.put("PROD_SCHEMA", PROD_SCHEMA);
        dataMap.put("TEST_SCHEMA", TEST_SCHEMA);
        dataMap.put("DATE_STR", table.getDateStr());
        dataMap.put("TABLE_NAME", table.getTableName());
        dataMap.put("FILTER_TIME_FIELD", table.getFilterTimeField());
        String suffix = ".sql";
        String path = SQL_FILE_DIR + table.getTableName() + suffix;
        FileOutputStream fos = new FileOutputStream(new File(path));
        Writer out = new BufferedWriter(new OutputStreamWriter(fos, "utf-8"), 10240);
        Template template = FreeMarkerTemplateUtils.getTemplate(TEMPLATE_NAME);
        template.process(dataMap, out);
        System.out.println("生成 SQL 文件路径:" + path);
    }

    /**
     * 创建连接
     */
    private static void createConnection() throws Exception {
        Class.forName(DRIVER);
        conn = DriverManager.getConnection(URL, USER, PASSWORD);
    }

    /**
     * 过滤掉不需要校验的字段
     */
    public static List filter(List listA, List listB) {
        HashSet hs1 = new HashSet(listA);
        HashSet hs2 = new HashSet(listB);
        hs1.removeAll(hs2);
        List listC = new ArrayList<>();
        listC.addAll(hs1);
        return listC;
    }
}

CompareSqlTemp.ftl SQL 模板

这里需注意,笔者的数据库采用阿里云的 ADB,因此在语法上与普通 MySQL 数据库有些小差异。

SELECT
<#if FIELDS?exists>
    <#list FIELDS as field>
        if(test.${field} = prod.${field}, null, ifnull(test.${field}, 'null') || " <=> " || ifnull(prod.${field}, 'null')) "${field}(test <=> prod)"<#if field_has_next>,
    

-- SELECT sum(if(test.${FILTER_TIME_FIELD} is null, 0, 1)) test_total, sum(if(prod.${FILTER_TIME_FIELD} is null, 0, 1)) prod_total
from (SELECT * FROM ${TEST_SCHEMA}.${TABLE_NAME} where ${FILTER_TIME_FIELD} = '${DATE_STR}') test
FULL OUTER JOIN  (SELECT * FROM ${PROD_SCHEMA}.${TABLE_NAME} where ${FILTER_TIME_FIELD} = '${DATE_STR}') prod USING (${PK_STR})
WHERE
test.${FILTER_TIME_FIELD} is null or prod.${FILTER_TIME_FIELD} is null
<#list FIELDS as condition>
    or test.${condition} != prod.${condition}

limit 1000

对比结果

运行程序后会生成一条 SQL 语句,把 SQL 语句放到数据库运行可得出分析结果(关于生成 SQL 逻辑这里不再赘述)。

MySQL表数据对比小工具

差异字段

MySQL表数据对比小工具

无差异字段

MySQL表数据对比小工具

由于是生成了 SQL 语句,在使用时如果有一些特别的需求如新增过滤条件等可直接修改 SQL 达到灵活变更,从而实现数据对比。

简洁的 Python 版本

import pymysql
from mako.template import Template

text = """
    SELECT
    %for index,field in enumerate(fields):
        if(test.${field} = prod.${field}, null, ifnull(test.${field}, 'null') || " <=> " || ifnull(prod.${field}, 'null')) "${field}(test <=> prod)" ${',' if index < len(fields)-1 else ''}
    %endfor
    -- SELECT sum(if(test.${FILTER_TIME_FIELD} is null, 0, 1)) test_total, sum(if(prod.${FILTER_TIME_FIELD} is null, 0, 1)) prod_total
    from (SELECT * FROM ${TEST_SCHEMA}.${TABLE_NAME} where ${FILTER_TIME_FIELD} = '${DATE_STR}') test
    FULL OUTER JOIN  (SELECT * FROM ${PROD_SCHEMA}.${TABLE_NAME} where ${FILTER_TIME_FIELD} = '${DATE_STR}') prod USING (${PK_STR})
    WHERE
    test.${FILTER_TIME_FIELD} is null or prod.${FILTER_TIME_FIELD} is null
    %for index,field in enumerate(fields):
        or test.${field} != prod.${field}
    %endfor
    limit 1000
"""
mytemplate = Template(text=text)

#对比的表名
TABLE_NAME = 'ods_tencent_ad_his_log'
#不需要对比的字段
no_compare_field = ['platform']
#时间字段
FILTER_TIME_FIELD = 'ad_created_time'
#时间
DATE_STR = '2021-07-07'
TEST_SCHEMA = 'datamedia_test'
PROD_SCHEMA = 'datamedia'
FIELDS = []
PK_STR = ''

db = pymysql.connect(host='xxxxxxxxxxxxxxxxx',
                     port=3306,
                     user='username',
                     password='password',
                     charset='utf8')

cursor = db.cursor()
cursor.execute("describe " + TEST_SCHEMA + "."+TABLE_NAME)
data = cursor.fetchall()
# 获取主键和列
pks = []
fields = []
for row in data:
    if "PRI" == row[3]:
        pks.append(row[0])
    fields.append(row[0])

PK_STR = ",".join(pks)

# 过滤掉不需要对比的字段
for field in fields:
    if field not in no_compare_field:
        FIELDS.append(field)

# print("住键: " + ",".join(pks))
# print("列名: " + ",".join(fields))

db.close()

print(
    mytemplate.render(
        fields=FIELDS,
        TEST_SCHEMA=TEST_SCHEMA,
        TABLE_NAME=TABLE_NAME,
        FILTER_TIME_FIELD=FILTER_TIME_FIELD,
        DATE_STR=DATE_STR,
        PROD_SCHEMA=PROD_SCHEMA,
        PK_STR=PK_STR
    )
)
print(1)

最后

以上就是笔者进行数据对比的小工具,通过该工具进行简单配置后生成 SQL 并于数据库运行得出对比结果。同时提供了 Java 实现与 Python 实现,希望对各位有所帮助。

感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

声明:本文由"麦兜"发布,不代表"知识分享"立场,转载联系作者并注明出处:https://www.029ipr.com/zhishi/22686.html