Spring Boot + 事务钩子函数,打造高效支付系统!

今天,我继续安利一个独门绝技:Spring 事务的钩子函数。

单纯的讲技术可能比较枯燥乏味。

接下来,我将以一个实际的案例来描述Spring事务钩子函数的正确使用姿势。

一、案例背景

拿支付系统相关的业务来举例。在支付系统中,我们需要记录每个账户的资金流水(记录用户A因为哪个操作扣了钱,因为哪个操作加了钱),这样我们才能对每个账户的账做到心中有数,对于支付系统而言,资金流水的数据可谓是最重要的。因此,为了防止支付系统的老大徇私舞弊,CTO提了一个流水存档的需求:

要求支付系统对每个账户的资金流水做一份存档,要求支付系统在写流水的时候,把流水相关的信息以消息的形式推送到kafka,由存档系统消费这个消息并落地到库里(这个库只有存档系统拥有写权限)。

整个需求的流程如下所示:

图片

整个需求的流程还是比较简单的,考虑到后续会有其他事业部也要进行数据存档操作,CTO建议支付系统团队内部开发一个二方库,这个二方库的主要功能就是发送消息到kafka中去。

二、确定方案

既然要求开发一个二方库,因此,我们需要考虑如下几件事情:

1、技术栈使用的springboot,因此,这里最好以starter的方式提供

2、二方库需要发送消息给kafka,最好是二方库内部基于kafka生产者的api创建生产者,不要使用Spring自带的kafkaTemplate,因为集成方有可能已经使用了kafkaTemplate。不能与集成方造成冲突。

3、减少对接方的集成难度、学习成本,最好是提供一个简单实用的api,业务侧能简单上手。

4、发送消息这个操作需要支持事务,尽量不影响主业务

在上述的几件事情中,最需要注意的应该就是第4点:发送消息这个操作需要支持事务,尽量不影响主业务。

这是什么意思呢?

首先,尽量不影响主业务,这个最简单的方式就是使用异步机制。

其次,需要支持事务是指:假设我们的api是在事务方法内部调用的,那么我们需要保证事务提交后再执行这个api。

那么,我们的流水落地api应该要有这样的功能:

图片

内部可以判断当前是否存在事务,如果存在事务,则需要等事务提交后再异步发送消息给kafka。

如果不存在事务则直接异步发送消息给kafka。

而且这样的判断逻辑得放在二方库内部才行。

那现在摆在我们面前的问题就是:我要如何判断当前是否存在事务,以及如何在事务提交后再触发我们自定义的逻辑呢?

三、TransactionSynchronizationManager显神威

这个类内部所有的变量、方法都是static修饰的,也就是说它其实是一个工具类。是一个事务同步器。

下述是流水落地API的伪代码,这段代码就解决了我们上述提到的疑问:

java复制代码private final ExecutorService executor = Executors.newSingleThreadExecutor();

public void sendLog() {
    // 判断当前是否存在事务
    if (!TransactionSynchronizationManager.isSynchronizationActive()) {
        // 无事务,异步发送消息给kafka
        
        executor.submit(() -> {
            // 发送消息给kafka
            try {
                // 发送消息给kafka
            } catch (Exception e) {
                // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
            }
        });
        return;
    }

    // 有事务,则添加一个事务同步器,并重写afterCompletion方法(此方法在事务提交后会做回调)
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

        @Override
        public void afterCompletion(int status) {
            if (status == TransactionSynchronization.STATUS_COMMITTED) {
                // 事务提交后,再异步发送消息给kafka
                executor.submit(() -> {
                    try {
                     // 发送消息给kafka
                    } catch (Exception e) {
                     // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
                    }
                });
            }
        }

    });

}

代码比较简单,其主要是TransactionSynchronizationManager的使用。

3.1、判断是否存在事务?TransactionSynchronizationManager.isSynchronizationActive() 方法显神威

我们先看下这个方法的源码:

java复制代码// TransactionSynchronizationManager.java类内部的部分代码

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
   new NamedThreadLocal<>("Transaction synchronizations");

public static boolean isSynchronizationActive() {
    return (synchronizations.get() != null);
}

很明显,synchronizations是一个线程变量(ThreadLocal)。

那它是在什么时候set进去的呢?这里的话,可以参考下这个方法:org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization,其源码如下所示:

java复制代码/**
  * Activate transaction synchronization for the current thread.
  * Called by a transaction manager on transaction begin.
  * @throws IllegalStateException if synchronization is already active
  */
public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
        throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
}

由源码中的注释也可以知道,它是在事务管理器开启事务时调用的。换句话说,只要我们的程序执行到带有事务特性的方法时,就会在线程变量中放入一个LinkedHashSet,用来标识当前存在事务。只要isSynchronizationActive返回true,则代表当前有事务。因此,结合这两个方法我们是指能解决我们最开始提出的疑问:**要如何判断当前是否存在事务**

3.2、如何在事务提交后触发自定义逻辑?TransactionSynchronizationManager.registerSynchronization()方法显神威

我们来看下这个方法的源代码:

java复制代码/**
  * Register a new transaction synchronization for the current thread.
  * Typically called by resource management code.
  * <p>Note that synchronizations can implement the
  * {@link org.springframework.core.Ordered} interface.
  * They will be executed in an order according to their order value (if any).
  * @param synchronization the synchronization object to register
  * @throws IllegalStateException if transaction synchronization is not active
  * @see org.springframework.core.Ordered
  */
public static void registerSynchronization(TransactionSynchronization synchronization)
    throws IllegalStateException {

    Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    if (!isSynchronizationActive()) {
        throw new IllegalStateException("Transaction synchronization is not active");
    }
    synchronizations.get().add(synchronization);
}

这里又使用到了synchronizations线程变量,我们在判断是否存在事务时,就是判断这个线程变量内部是否有值。

那我们现在想在事务提交后触发自定义逻辑和这个有什么关系呢?

我们在上面构建流水落地api的伪代码中有向synchronizations内部添加了一个TransactionSynchronizationAdapter,内部并重写了afterCompletion方法,其代码如下所示:

java复制代码TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

    @Override
    public void afterCompletion(int status) {
        if (status == TransactionSynchronization.STATUS_COMMITTED) {
            // 事务提交后,再异步发送消息给kafka
            executor.submit(() -> {
                    try {
                     // 发送消息给kafka
                    } catch (Exception e) {
                     // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
                    }
            });
        }
    }

});

我们结合registerSynchronization的源码来看,其实这段代码主要就是向线程变量内部的LinkedHashSet添加了一个对象而已,但就是这么一个操作,让Spring在事务执行的过程中变得“有事情可做”

这是什么意思呢?是因为Spring在执行事务方法时,对于操作事务的每一个阶段都有一个回调操作,比如:trigger系列的回调

图片

invoke系列的回调

图片

而我们现在的需求就是在事务提交后触发自定义的函数,那就是在invokeAfterCommit和invokeAfterCompletion这两个方法来选了。

首先,这两个方法都会拿到所有TransactionSynchronization的集合(其中会包括我们上述添加的TransactionSynchronizationAdapter)。

但是要注意一点:invokeAfterCommit只能拿到集合,invokeAfterCompletion除了集合还有一个int类型的参数,而这个int类型的参数其实是当前事务的一种状态。也就是说,如果我们重写了invokeAfterCompletion方法,我们除了能拿到集合外,还能拿到当前事务的状态。

因此,此时我们可以根据这个状态来做不同的事情,比如:可以在事务提交时做自定义处理,也可以在事务回滚时做自定义处理等等。

四、总结

上面有说到,我们判断当前是否存在事务、添加钩子函数都是依赖线程变量的。因此,我们在使用过程中,一定要避免切换线程。否则会出现不生效的情况。

最后说一句(求关注!别白嫖!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

关注公众号:woniuxgg,在公众号中回复:笔记  就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/555565.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Nodejs 第六十四章(SSO单点登录)

单点登录 单点登录&#xff08;Single Sign-On&#xff0c;简称SSO&#xff09;是一种身份认证和访问控制的机制&#xff0c;允许用户使用一组凭据&#xff08;如用户名和密码&#xff09;登录到多个应用程序或系统&#xff0c;而无需为每个应用程序单独提供凭据 SSO的主要优…

openGauss学习笔记-266 openGauss性能调优-TPCC性能调优测试指导-文件系统配置

文章目录 openGauss学习笔记-266 openGauss性能调优-TPCC性能调优测试指导-文件系统配置266.1 查看当前数据盘的文件系统类型266.2 对于需要修改的磁盘&#xff0c;备份所需的数据至其他磁盘或其他服务器266.3 格式化磁盘为xfs文件系统266.4 执行**步骤一** openGauss学习笔记-…

【Keil MDK5新建工程】STM32F103C8T6

一、参数及片上外设 二、系统结构及引脚定义 三、工程架构及新建工程步骤 四、GPIO模式 一、参数及片上外设 二、系统结构及引脚定义 三、工程架构及新建工程步骤 建立工程文件夹&#xff0c;Keil中新建工程&#xff0c;选择型号 工程文件夹里建立Core、Library、User等文件夹…

2024年华中杯数学建模竞赛ABC题思路分析

简单分析一下各个题目可能需要用到的方法和模型&#xff0c;完整代码和成品论文见文末 A题 太阳能路灯光伏板的朝向设计问题: 1. 球面几何、天文学相关知识,如赤纬角、太阳高度角、时角等概念和公式 2. 太阳辐射模型,根据太阳能辐射强度、大气衰减系数等计算地表太阳辐射强度…

绝对隔离+底层限制,成就猎鹰蜜罐“牢不可破”的立体化安全

前言 自网络诞生以来&#xff0c;攻击威胁事件层出不穷&#xff0c;网络攻防对抗已成为信息时代背景下的无硝烟战争。然而&#xff0c;传统的网络防御技术如防火墙、入侵检测技术等都是一种敌暗我明的被动防御&#xff0c;难以有效应对攻击者随时随地发起的无处不在的攻击和威胁…

【学习笔记】Vue3源码解析:第五部分 - 实现渲染(1)

课程地址&#xff1a;【已完结】全网最详细Vue3源码解析&#xff01;&#xff08;一行行带你手写Vue3源码&#xff09; 第五部分-&#xff1a;&#xff08;对应课程的第29-32节&#xff09; 第29节&#xff1a;《实现渲染的createRender方法》 1、通过createApp()方法得到一个…

AI热潮下,公链基础设施赛道都有哪些变化?

最近在一级市场&#xff0c;最火热的赛道无疑是AI&#xff0c;其次是BTC&#xff0c;每天聊的项目80%都集中在这两个赛道&#xff0c;我个人最多的时候一天可以聊5&#xff0c;6个AI项目。 可以预见的是AI泡沫会在明后年达到顶峰&#xff0c;随着数以百计的AI新项目上线&#…

QT实现客户端断开连接

Widget.cpp #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget), socket(new QTcpSocket(this)) {ui->setupUi(this);//初始化界面ui->msgEdit->setEnabled(false); //不可用ui-…

SQL Server Management Studio 显示行号

前言 在使用 SQL Server Management Studio (SSMS) 进行数据库管理和查询时&#xff0c;能够看到代码的行号是非常有用的。这可以帮助您更容易地定位代码错误、讨论特定的代码行&#xff0c;或者在执行长查询时快速找到特定行。在本文中&#xff0c;我将向您展示如何在 SSMS 中…

AIDE:自动驾驶目标检测的自动数据引擎

AIDE&#xff1a;自动驾驶目标检测的自动数据引擎 摘要IntroductionRelated WorksMethodData FeederModel Updater4 Experiments 摘要 自动驾驶车辆&#xff08;AV&#xff09;系统依赖于健壮的感知模型作为安全保证的基石。然而&#xff0c;道路上遇到的物体表现出长尾分布&a…

图像生成模型浅析(Stable Diffusion、DALL-E、Imagen)

目录 前言1. 速览图像生成模型1.1 VAE1.2 Flow-based Model1.3 Diffusion Model1.4 GAN1.5 对比速览 2. Diffusion Model3. Stable Diffusion3.1 Text Encoder3.2 Decoder3.3 Generation Model 总结参考 前言 简单学习下图像生成模型的相关知识&#x1f917; 以下内容来自于李宏…

vue3+elment复杂详情页面打开后,再打开其他页面都显示空白,控制台也没什么特殊报错

页面使用了el-tabs 、 el-tab-pane、el-table 等标签 但是经测试不是这些问题导致的 js也使用了onMounted &#xff0c;但是除掉也时空白页面 反正之前人写的页面可乱&#xff0c;尤其是js这块&#xff0c;穿插引用import一大堆 主题页面样式布局如下 最后看到页面代码太乱…

古籍数字化平台:精校功能介绍

一、平台介绍 古籍数字化平台&#xff0c;本着公益性、低成本、合作共赢的三大原则&#xff0c;功能涵盖古籍OCR识别、族谱县志OCR识别、民国报纸OCR识别、图文逐字校对、数据著录、智能标点分段、精编排版、智能白话译文等&#xff0c;是一站式线上整理全流程平台。 平台集成…

备战面试K8S

备战面试&&K8S Kubernetes关于DockerDocker的优缺点分析 WebAssemblyWebAssembly与Container比较 CtrCrictlCtr和CriCtl的区别 Pod生命周期PodConditions容器状态Pod容器组成生命周期的流程 Kubelet EFK日志采集工具的优缺点 Kubernetes 容器运行接口 Container Runti…

2024年免费云服务器推荐,小编亲测好用!

随着云计算技术的飞速发展&#xff0c;云服务器以其弹性、高效、安全的特性&#xff0c;成为众多企业和个人用户的首选。尽管市面上有众多收费的云服务器产品&#xff0c;但免费的云服务器仍然吸引着大量用户&#xff0c;尤其是初学者和预算有限的用户。下面&#xff0c;我们就…

从API到Agent:洞悉LangChain工程化设计

作者&#xff1a;范志东 原文&#xff1a;https://mp.weixin.qq.com/s/zGS9N92R6dsc9Jk57pmYSg 本文作者试着从工程角度去理解LangChain的设计和使用。大家可以将此文档作为LangChain的“10分钟快速上手”手册&#xff0c;希望帮助需要的同学实现AI工程的Bootstrap。 我想做一…

[Vision Board创客营]学习片上Flash移植FAL

文章目录 [Vision Board创客营]学习片上Flash移植FAL介绍环境搭建使用组件测试porbeerasewriteread 结语 [Vision Board创客营]学习片上Flash移植FAL 水平较菜&#xff0c;大佬轻喷。&#x1f630;&#x1f630;&#x1f630; 介绍 &#x1f680;&#x1f680;Vision-Board 开…

安全开发实战(3)--存活探测与端口扫描

目录 安全开发专栏 前言 存活探测 端口扫描 方式一: 1.3.1 One 1.3.2 Two 1.3.3 批量监测 方式二: 1.3.1 One 1.3.2 Two 1.3.3 Three 1.3.4 扫描ip地址,提取出开放端口和协议 ​编辑 1.3.5 批量扫描(最终完成版) 总结 安全开发专栏 安全开发实战​http://t.csd…

MySql数据库从0-1学习-第五天事务和索引

事务 事务 是一组操作的集合&#xff0c;它是一个不可分割的工作单位。事务会把所有的操作作为一个整体一起向系统提交或撤销操作请求&#xff0c;即这些操作 要么同时成功&#xff0c;要么同时失败。 注意事项,默认事务是自动提交的,也就是说,当执行一条DML语句,MySql会立即隐…

【Java开发指南 | 第八篇】Java变量、构造方法、创建对象

读者可订阅专栏&#xff1a;Java开发指南 |【CSDN秋说】 文章目录 Java变量构造方法创建对象 Java变量 在Java中&#xff0c;变量用于存储数据值。它们是程序中用于保存信息的一种基本方式。变量在程序执行过程中可以被赋予不同的值&#xff0c;并且这些值可以在程序的不同部分…
最新文章