Star's Blog

Keep learning, Keep improving


  • 首页

  • 分类

  • 关于

  • 标签

  • 归档

  • 搜索

RocketMQ 事务机制的实现流程

发表于 2020-01-05 | 分类于 中间件

1、解决消息丢失的第一个问题:订单系统推送消息领丢失

既然我们已经明确了消息在基于MQ传输的过程中可能丢失的几个地方,那么我们接着就得一步一步考虑如何去解决各个环节丢失消息的问题,首先要解决的第一个问题,就是订单系统推送消息到MQ的过程中,可能消息就丢失了。

之前我们也说过了,可能在订单系统推送消息到MQ的过程中,就因为常见的网络故障之类的问题,导致消息就丢失了,这里我们可以看一下下图中的示意。

而在RocketMQ中,有一个非常强悍有力的功能,就是事务消息的功能,凭借这个事务级的消息机制,就可以让我们确保订单系统推送给出去的消息一定会成功写入MQ里,绝对不会半路就搞丢了。

今天我们就来系统的分析一下RocketMQ的事务消息机制的原理。

2、发送half消息到MQ去,试探一下MQ是否正常

首先作为我们的订单系统而言,假设他收到了一个订单支付成功的通知之后,他必然是需要在自己的订单数据库里做一些增删改操作的,比如更新订单状态之类的。

可能有的朋友会觉得,订单系统不就是先在自己数据库里做一些增删改操作,然后就直接发个消息到MQ去,让其他关注这个订单支付成功消息的系统去从MQ获取消息做对应的处理就可以了么?

事实上还真不是这么简单。

在基于RocketMQ的事务消息机制中,我们首先要让订单系统去发送一条half消息到MQ去,这个half消息本质就是一个订单支付成功的消息,只不过你可以理解为他这个消息的状态是half状态,这个时候红包系统是看不见这个half消息的,然后我们去等待接收这个half消息写入成功的响应通知

我们看下面的图

看到这儿可能有的朋友就开始有点郁闷了,可能有的人觉得你没事儿先发个half消息给MQ干什么?

大家先别着急,你可以想一下,假设你二话不说让订单系统直接做了本地的数据库操作,比如订单状态都更新为了已完成,然后你再发送消息给MQ,结果报出一堆异常,发现MQ挂了。

这个时候,必然导致你没法通过消息通知到红包系统去派发红包,那用户一定会发现自己订单支付了,结果红包没收到。

所以,在这里我们首先第一件事,不是先让订单系统做一些增删改操作,而是先发一个half消息给MQ以及收到他的成功的响应,初步先跟MQ做个联系和沟通

大概这个意思就是说,确认一下MQ还活着,MQ也知道你后续可能想发送一条很关键的不希望丢失的消息给他了!

3、万一要是half消息写入失败了呢?

这里我们先来分析第一种情况,万一你订单系统写half消息给MQ就失败了呢?

可能你发现报错了,可能MQ就挂了,或者这个时候网络就是故障了,所以导致你的half消息都没发送成功,总之你现在肯定没法跟MQ通信了。

这个时候你的订单系统就应该执行一系列的回滚操作,比如对订单状态做一个更新,让状态变成“关闭交易”,同时通知支付系统自动进行退款,这才是正确的做法

因为你订单虽然支付了,但是包括派发红包、发送优惠券之类的后续操作是无法执行的,所以此时必然应该把钱款退还给用户,说交易失败了。

这里给大家插播一个我曾经亲身经历过的一个事情,曾经有一次在一家便利店进行购物的时候,我这里都已经显示扫码支付成功了,但是店员那边说在等待他们系统确认

结果等了一会儿,系统显示后台系统有异常,交易失败了,然后过了一会儿就让支付宝自动退款给我了。

其实这就是类似的例子。

阅读全文 »

数据库和缓存双写一致性方案

发表于 2020-01-01 | 分类于 系统架构

引言

为什么写这篇文章?

首先,缓存由于其高并发和高性能的特性,已经在项目中被广泛使用。在读取缓存方面,大家没啥疑问,都是按照下图的流程来进行业务操作

但是在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存。又或者是先删除缓存,再更新数据库,其实大家存在很大的争议。

先做一个说明,从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案。这种方案下,我们可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。也就是说如果数据库写成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存。因此,接下来讨论的思路不依赖于给缓存设置过期时间这个方案。

在这里,我们讨论四种更新策略:

  1. 先更新数据库,再更新缓存
  2. 先更新缓存,再更新数据库
  3. 先删除缓存,再更新数据库
  4. 先更新数据库,再删除缓存(推荐)

这里逐一分析。

先更新数据库,再更新缓存

这套方案,大家是普遍反对的。为什么呢?有如下两点原因:线程安全角度和业务场景角度。

线程安全角度

同时有请求 A 和请求 B 进行更新操作,那么会出现

  1. 线程A更新了数据库
  2. 线程B更新了数据库
  3. 线程B更新了缓存
  4. 线程A更新了缓存

这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。

业务场景角度

  1. 如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。
  2. 如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。 那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。

先更新缓存,再更新数据库

存在线程安全问题么?

存在的,假设这会同时有请求A和请求B进行更新操作,那么会出现:

  1. 线程A更新了缓存
  2. 线程B更新了缓存
  3. 线程B更新了数据库
  4. 线程A更新了数据库

请求A更新数据库应该比请求B更新数据库早才对,但是因为网络等原因,B却比A更早更新了数据库。这就导致了脏数据,因此不考虑。

可是,这时候有一个细心的读者,给博主举了一个反例。该例子出自《从P1到P7——我在淘宝这7年》这篇博客, 博主偷个懒,直接贴一下该博客的原话:

在【招财进宝】项目中有一个技术的细节值得拿出来说说,淘宝商品详情页面每天的流量在10亿以上,里面的内容都是放在缓存里的,做【招财进宝】的时候,我们要给卖家显示他的商品被浏览的次数,这个数字必须实时更新,而用缓存的话一般都是异步更新的。于是商品表里面增加了这样一个字段,每增加一个PV这个字段就要更新一次。发布上去一个小时数据库就挂掉了,撑不住这么高的update。数据库撑不住怎么办?一般的缓存策略是不支持实时更新的,这时候多隆大神想了个办法,在apache上面写了一个模块,这个数字根本不经过下层的web容器(只经过apache)就写入一个集中式的缓存区了,这个缓存区的数据再异步更新到数据库。好像什么问题,到了多隆手里,总能迎刃而解。

上面巴拉巴拉一堆,就是说,当时他们有一个读多写多的场景,然后多隆大神用了先更缓存,再异步更新数据库的策略。

难道淘宝的大神没发现线程安全问题?

不是的,上面提到的场景具有一个特殊性。我们先摘取关键一句话:

于是商品表里面增加了这样一个字段,每增加一个PV这个字段就要更新一次

PV是page view,页面浏览量的意思。

博主斗胆猜测,他们做的应该是用户每次点击,数据库里的这个字段就加一的操作。

那我们这时的SQL一般是这么写:

1
update product_tb set number = number+1 where product_id =xxx

大家注意到了么,并发执行这句SQL并不需要关心执行顺序。哪个更新线程先执行加一的SQL语句 ,与操作顺序有什么关系呢?

再说的通俗一点,假设我们同时有请求A和请求B进行更新操作,那么会出现:

  1. 线程A更新了缓存
  2. 线程B更新了缓存
  3. 线程B更新了数据库
  4. 线程A更新了数据库

因为他们这个时候执行的sql是无序的,所以上面的步骤3和步骤4哪一个步骤先执行,并没有关系。最终结果一定是一致的。

阅读全文 »

谈谈高并发系统的限流

发表于 2020-01-01 | 分类于 算法

开涛大神在博客中说过:在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。本文结合作者的一些经验介绍限流的相关概念、算法和常规的实现方式。

缓存

缓存比较好理解,在大型高并发系统中,如果没有缓存数据库将分分钟被爆,系统也会瞬间瘫痪。使用缓存不单单能够提升系统访问速度、提高并发访问量,也是保护数据库、保护系统的有效方式。大型网站一般主要是“读”,缓存的使用很容易被想到。在大型“写”系统中,缓存也常常扮演者非常重要的角色。比如累积一些数据批量写入,内存里面的缓存队列(生产消费),以及HBase写数据的机制等等也都是通过缓存提升系统的吞吐量或者实现系统的保护措施。甚至消息中间件,你也可以认为是一种分布式的数据缓存。

降级

服务降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行。降级往往会指定不同的级别,面临不同的异常等级执行不同的处理。根据服务方式:可以拒接服务,可以延迟服务,也有时候可以随机服务。根据服务范围:可以砍掉某个功能,也可以砍掉某些模块。总之服务降级需要根据不同的业务需求采用不同的降级策略。主要的目的就是服务虽然有损但是总比没有好。

限流

限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

限流的算法

常见的限流算法有:计数器、漏桶和令牌桶算法。

计数器

计数器是最简单粗暴的算法。比如某个服务最多只能每秒钟处理100个请求。我们可以设置一个1秒钟的滑动窗口,窗口中有10个格子,每个格子100毫秒,每100毫秒移动一次,每次移动都需要记录当前服务请求的次数。内存中需要保存10次的次数。可以用数据结构LinkedList来实现。格子每次移动的时候判断一次,当前访问次数和LinkedList中最后一个相差是否超过100,如果超过就需要限流了。

很明显,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//服务访问次数,可以放在Redis中,实现分布式系统的访问计数
Long counter = 0L;
//使用LinkedList来记录滑动窗口的10个格子。
LinkedList<Long> ll = new LinkedList<Long>();

public static void main(String[] args)
{
Counter counter = new Counter();

counter.doCheck();
}

private void doCheck()
{
while (true)
{
ll.addLast(counter);

if (ll.size() > 10)
{
ll.removeFirst();
}

//比较最后一个和第一个,两者相差一秒
if ((ll.peekLast() - ll.peekFirst()) > 100)
{
//To limit rate
}

Thread.sleep(100);
}
}

漏桶算法

漏桶算法即leaky bucket是一种非常常用的限流算法,可以用来实现流量整形(Traffic Shaping)和流量控制(Traffic Policing)。

贴了一张维基百科上示意图帮助大家理解:

阅读全文 »

Redis面试连环问

发表于 2020-01-01 | 分类于 数据库

Redis是什么

面试官:你先来说下redis是什么吧

我:(这不就是总结下redis的定义和特点嘛)Redis是C语言开发的一个开源的(遵从BSD协议)高性能键值对(key-value)的内存数据库,可以用作数据库、缓存、消息中间件等。它是一种NoSQL(not-only sql,泛指非关系型数据库)的数据库。

我顿了一下,接着说:Redis作为一个内存数据库。1、性能优秀,数据在内存中,读写速度非常快,支持并发10W QPS;2、单进程单线程,是线程安全的,采用IO多路复用机制;3、丰富的数据类型,支持字符串(strings)、散列(hashes)、列表(lists)、集合(sets)、有序集合(sorted sets)等;4、支持数据持久化。可以将内存中数据保存在磁盘中,重启时加载;5、主从复制,哨兵,高可用;6、可以用作分布式锁;7、可以作为消息中间件使用,支持发布订阅

五种数据类型

面试官:总结的不错,看来是早有准备啊。刚来听你提到redis支持五种数据类型,那你能简单说下这五种数据类型吗?

我:当然可以,但是在说之前,我觉得有必要先来了解下Redis内部内存管理是如何描述这5种数据类型的。说着,我拿着笔给面试官画了一张图:

我:首先redis内部使用一个redisObject对象来表示所有的key和value,redisObject最主要的信息如上图所示:type表示一个value对象具体是何种数据类型,encoding是不同数据类型在redis内部的存储方式。比如:type=string表示value存储的是一个普通字符串,那么encoding可以是raw或者int。

我顿了一下,接着说:下面我简单说下5种数据类型:

1、string是redis最基本的类型,可以理解成与memcached一模一样的类型,一个key对应一个value。value不仅是string,也可以是数字。string类型是二进制安全的,意思是redis的string类型可以包含任何数据,比如jpg图片或者序列化的对象。string类型的值最大能存储512M。

2、Hash是一个键值(key-value)的集合。redis的hash是一个string的key和value的映射表,Hash特别适合存储对象。常用命令:hget,hset,hgetall等。

3、list列表是简单的字符串列表,按照插入顺序排序。可以添加一个元素到列表的头部(左边)或者尾部(右边) 常用命令:lpush、rpush、lpop、rpop、lrange(获取列表片段)等。

应用场景:list应用场景非常多,也是Redis最重要的数据结构之一,比如twitter的关注列表,粉丝列表都可以用list结构来实现。

数据结构:list就是链表,可以用来当消息队列用。redis提供了List的push和pop操作,还提供了操作某一段的api,可以直接查询或者删除某一段的元素。

实现方式:redis list的是实现是一个双向链表,既可以支持反向查找和遍历,更方便操作,不过带来了额外的内存开销。

4、set是string类型的无序集合。集合是通过hashtable实现的。set中的元素是没有顺序的,而且是没有重复的。

常用命令:sdd、spop、smembers、sunion等。

应用场景:redis set对外提供的功能和list一样是一个列表,特殊之处在于set是自动去重的,而且set提供了判断某个成员是否在一个set集合中。

5、zset和set一样是string类型元素的集合,且不允许重复的元素。常用命令:zadd、zrange、zrem、zcard等。

使用场景:sorted set可以通过用户额外提供一个优先级(score)的参数来为成员排序,并且是插入有序的,即自动排序。当你需要一个有序的并且不重复的集合列表,那么可以选择sorted set结构。和set相比,sorted set关联了一个double类型权重的参数score,使得集合中的元素能够按照score进行有序排列,redis正是通过分数来为集合中的成员进行从小到大的排序。

实现方式:Redis sorted set的内部使用HashMap和跳跃表(skipList)来保证数据的存储和有序,HashMap里放的是成员到score的映射,而跳跃表里存放的是所有的成员,排序依据是HashMap里存的score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单。

我:我之前总结了一张图,关于数据类型的应用场景,如果您感兴趣,可以去我的掘金看。。

数据类型应用场景总结

类型 简介 特性 场景
string(字符串) 二进制安全 可以包含任何数据,比如jpg图片或者序列化对象 —
Hash(字典) 键值对集合,即编程语言中的map类型 适合存储对象,并且可以像数据库中的update一个属性一样只修改某一项属性值 存储、读取、修改用户属性
List(列表) 链表(双向链表) 增删快,提供了操作某一元素的api 最新消息排行;消息队列
set(集合) hash表实现,元素不重复 添加、删除、查找的复杂度都是O(1),提供了求交集、并集、差集的操作 共同好友;利用唯一性,统计访问网站的所有Ip
sorted set(有序集合) 将set中的元素增加一个权重参数score,元素按score有序排列 数据插入集合时,已经进行了天然排序 排行榜;带权重的消息队列

面试官:想不到你平时也下了不少工夫,那redis缓存你一定用过的吧

我:用过的。。

面试官:那你跟我说下你是怎么用的?

我是结合spring boot使用的。一般有两种方式,一种是直接通过RedisTemplate来使用,另一种是使用spring cache集成Redis(也就是注解的方式)。具体的代码我就不说了,在我的掘金中有一个demo(见下)。

Redis缓存

直接通过RedisTemplate来使用

使用spring cache集成Redis pom.xml中加入以下依赖:

阅读全文 »

Java中怎么快速把InputStream转化为String

发表于 2020-01-01 | 分类于 Java

其实我只是偶尔上Stack Overflow,直到看了这个200万次阅读量的提问:HowdoI read/convert anInputStreamintoaStringinJava?

惊呆了!!!

怎么会有这么多人围观。

我第一反应的解决办法是使用 Apachecommons包的工具类 IOUtils,果不其然,第一条回答就是这个。

我的天!居然有2000+的赞!

继续往下看,发现大家的不少的骚操作

使用CharStreams (Guava)

1
2
String result = CharStreams.toString(new InputStreamReader(      
inputStream, Charsets.UTF_8));

使用Scanner

1
2
Scanner s = new Scanner(inputStream).useDelimiter("\\A");
String result = s.hasNext() ? s.next() : "";

使用Stream API

Warning: This solution converts different line breaks (like \r\n) to \n.

1
String result = new BufferedReader(new InputStreamReader(inputStream))  .lines().collect(Collectors.joining("\n"));

使用parallel Stream API

Warning: This solution converts different line breaks (like \r\n) to \n.

1
String result = new BufferedReader(new InputStreamReader(inputStream)).lines()   .parallel().collect(Collectors.joining("\n"));

使用InputStreamReader and StringBuilder

1
2
3
4
5
6
7
8
9
final int bufferSize = 1024;
final char[] buffer = new char[bufferSize];
final StringBuilder out = new StringBuilder();
Reader in = new InputStreamReader(stream, StandardCharsets.UTF_8);
int charsRead;
while((charsRead = in.read(buffer, 0, buffer.length)) > 0) {
out.append(buffer, 0, charsRead);
}
return out.toString();

使用StringWriter and IOUtils.copy

1
2
3
 writer = new StringWriter();
IOUtils.copy(inputStream, writer, "UTF-8");
return writer.toString();

使用ByteArrayOutputStream and inputStream.read

1
2
3
4
5
6
7
8
ByteArrayOutputStream result = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int length;
while ((length = inputStream.read(buffer)) != -1) {
result.write(buffer, 0, length);
}
// StandardCharsets.UTF_8.name() > JDK 7
return result.toString("UTF-8");

使用BufferedReader

Warning: This solution converts different line breaks (like \n\r) to line.separator system property (for example, in Windows to “\r\n”).

1
2
3
4
5
6
7
8
9
String newLine = System.getProperty("line.separator");
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
StringBuilder result = new StringBuilder();
boolean flag = false;
for (String line; (line = reader.readLine()) != null; ) {
result.append(flag? newLine: "").append(line);
flag = true;
}
return result.toString();
阅读全文 »
123…20
Morning Star

Morning Star

100 日志
14 分类
37 标签
GitHub
© 2021 Morning Star
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4