首页 > 用户投稿

查看kafka消费组的消息数 如何使用消息队列解决分布式事务?

如何使用消息队列解决分布式事务?

消息事务是指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。在此用kafka为例做进一步解说吧!

一、基本概念

为了支持事务,kafka0.11.0版本引入以下概念:

查看kafka消费组的消息数 如何使用消息队列解决分布式事务?

1.事务协调者:类似于消费组负载均衡的协调者,每一个实现事务的生产端都被分配到一个事务协调者(transactioncoordinator)。

2.引入一个内部kafkatopic作为事务log:类似于消费管理offset的topic,事务topic本身也是持久化的,日志信息记录事务状态信息,由事务协调者写入。

3.引入控制消息(controlmessages):这些消息是客户端产生的并写入到主题的特殊消息,但对于使用者来说不可见。它们是用来让broker告知消费者之前拉取的消息是否被原子性提交。

4.引入transactionid:不同生产实例使用同一个transactionid表示是同一个事务,可以跨session的数据幂等发送。当具有相同transactionid的新的producer实例被创建且工作时,旧的且拥有相同transactionid的producer将不再工作,避免事务僵死。

id:每个新的producer在初始化的时候会被分配一个唯一的pid,这个pid对用户是不可见的。主要是为提供幂等性时引入的。

numbler。(对于每个pid,该producer发送数据的每个lttopic,partitiongt都对应一个从0开始单调递增的sequencenumber。

7.每个生产者增加一个epoch:用于标识同一个事务id在一次事务中的epoch,每次初始化事务时会递增,从而让服务端可以知道生产者请求是否旧的请求。

8.幂等性:保证发送单个分区的消息只会发送一次,不会出现重复消息。增加一个幂等性的开关,可以独立与事务使用,即可以只开启幂等但不开启事务

二、事务流程

1、查找事务协调者

生产者会首先发起一个查找事务协调者的请求(findcoordinatorrequest)。协调者会负责分配一个pid给生产者。类似于消费组的协调者。

2、获取produceid

在知道事务协调者后,生产者需要往协调者发送初始化pid请求(initpidrequest)。这个请求分两种情况:

●不带transactionid

这种情况下直接生成一个新的produceid即可,返回给客户端

●带transactionid

这种情况下,kafka根据transactionalid获取对应的pid,这个对应关系是保存在事务日志中(上图2a)。这样可以确保相同的transactionid返回相同的pid,用于恢复或者终止之前未完成的事务。

3、启动事务

生产者通过调用begintransaction接口启动事务,此时只是内部的状态记录为事务开始,但是事务协调者认为事务开始只有当生产者开始发送第一条消息才开始。

4、消费和生产配合过程

这一步是消费和生成互相配合完成事务的过程,其中涉及多个请求:

●增加分区到事务请求

当生产者有新分区要写入数据,则会发送addpartitiontotxnrequest到事务协调者。协调者会处理请求,主要做的事情是更新事务元数据信息,并把信息写入到事务日志中(事务topic)。

●生产请求

生产者通过调用send接口发送数据到分区,这些请求新增pid,epoch和sequencenumber字段。

●增加消费offset到事务

生产者通过新增的snedoffsetstotransaction接口,会发送某个分区的offset信息到事务协调者。协调者会把分区信息增加到事务中。

●事务提交offset请求

当生产者调用事务提交offset接口后,会发送一个txnoffsetcommitrequest请求到消费组协调者,消费组协调者会把offset存储在__consumer-offsetstopic中。协调者会根据请求的pid和epoch验证生产者是否允许发起这个请求。消费offset只有当事务提交后才对外可见。

5、提交或回滚事务

用户通过调用committransaction或aborttranssaction方法提交或回滚事务。

●endtxnrequest

当生产者完成事务后,客户端需要显式调用结束事务或者回滚事务。前者会使得消息对消费者可见,后者会对生产数据标记为abort状态,使得消息对消费者不可见。无论是提交或者回滚,都是发送一个endtnxrequest请求到事务协调者,写入prepare_commit或者prepare_abort信息到事务记录日志中(5.1a)。

●writetxnmarkerrequest

这个请求是事务协调者向事务中每个topicpartition的leader发送的。每个broker收到请求后会写入commit(pid)或者abort(pid)控制信息到数据日志中(5.2a)。

这个信息用于告知消费者当前消息是哪个事务,消息是否应该接受或者丢弃。而对于未提交消息,消费者会缓存该事务的消息直到提交或者回滚。

这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的offset存于__consumer_offsets中,transactioncoordinator也需要向该内部topic的各partition的leader发送writetxnmarkerrequest从而写入commit(pid)或commit(pid)控制信息(5.2a左边)。

●写入最终提交或回滚信息

当提交和回滚信息写入数据日子后,事务协调者会往事务日志中写入最终的提交或者终止信息以表示事务已经完成(图5.3),此时大部分于事务有关系的消息都可以被删除(通过标记后面在日志压缩时会被移除),我们只需要保留事务id以及其时间戳即可。

apachekafka是什么开源的系统?

apachekafka是一个开源消息系统,由scala写成。是由apache软件基金会开发的一个开源消息系统项目。

kafka最初是由linkedin开发,并于2011年初开源。2012年10月从apacheincubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于jms的特性,但是在设计实现上完全不同,此外它并不是jms规范的实现。

kafka对消息保存时根据topic进行归类,发送消息者称为producer,消息接受者称为consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。

无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性

事务消息协调者生产者pid

原文标题:查看kafka消费组的消息数 如何使用消息队列解决分布式事务?,如若转载,请注明出处:https://www.shcrbfchs.com/shc2/493.html
免责声明:此资讯系转载自合作媒体或互联网其它网站,「泰福润金」登载此文出于传递更多信息之目的,并不意味着赞同其观点或证实其描述,文章内容仅供参考。