背景

线上以前运行得好好的系统,突然不稳定了。而且在非生产环境中没有暴露问题,仅在生产环境出现问题。不稳定体现在用户订阅的通知消息,有时候可以收到,而有时候收不到。

分析

非生产环境没有这个问题,因为数据量很少。而生产环境,数据量要大得多,导致有些数据得不到及时的处理。但是生产环境的数据量并不是特别大,几十万条数据而已,但是这点数据已经让系统出现性能问题了。

看了代码才发现,大量使用了带 await 的 for 循环。每个 await 都比较耗时,再加上大量的 for 循环,这种串行处理方式导致系统在百万条数据以下时就不能及时应对了。然后对于向不同的用户发送消息通知这种场景,并不需要等待给前一个用户发完再向第二个用户发消息,所以这些操作完全可以并行。

解决

使用 Promise.all 替换带有 await 的 for 循环,性能大大提升,几十万条数据的规模那是可以轻松应对,系统的峰值吞吐量提升了40倍(由200不到提升到8000左右)。当然,这个峰值只是当前用户订阅规模下的,如果有更大的数据,相信这个峰值还可以再提高。

改造前的表现

image.png

改造后的表现

image.png

这和重构有什么关系?

什么是重构?

昨天异步社区转发了我之前关于重构的分享: https://zhuanlan.zhihu.com/p/557756574。 这里摘抄 Martin Fowler 对于重构的定义:

所谓重构(refactoring)是这样一个过程:在不改变代码外在行为的前提下,对代码做出修改,以改进程序的内部结构。重构是一种经千锤百炼形成的有条不紊的程序整理方法,可以最大限度地减小整理过程中引入错误的概率。本质上说,重构就是在代码写好之后改进它的设计。

提升性能算重构吗?

按照Kent和Martin的定义,对代码的改动,是否算重构,需要看从外界是否能“观察得到”系统行为的改变。比如说如果有个bug,但是没有使用者注意到,你把它改掉了,这也可以叫重构。而这次碰到的问题,其实在数据量不大时,感觉不到,因此仅将串行改为并行,没有改变端到端的行为,因此在这种意义上来说,可以算是重构。

一般来说,做性能调优带来的代码改动,会让代码不易读,违背了让代码变得易读和易改的初衷,不算重构。但是本例使用 Promise.all 替换带有 await 的 for 循环,在这个具体的做法中,并没有带来难以理解的代码,甚至还让代码可读性变得更好了,从这点上讲,是一个重构反而提升了性能的特例。

实例一

这个改动非常典型,最好默认就使用 Promise.all。除非有实在的先后执行的依赖关系。
image.png

image.png

实例二

在将串行改并行的同时,将统计逻辑和业务逻辑拆开,实现关注点分离,可读性更好。
image.png

当然,在改一块儿前,不仅仅使用 Promise.all 替换了带有 await 的 for,还有对难以理解的统计逻辑进行优化,不是那么纯粹,因此实际在改动前,是先写测试的。首先,上面的左边那一坨代码,原本没有测试,《重构》一书强调,在重构前,需要先测试。所以先写好以下测试,确保自己理解了原有代码的逻辑,直到测试通过。 typescript // 使用 ts-mockery 模拟依赖对象的扫描函数,返回一个假的带有10个元素的数组 const postService = Mock.of({ terminatePost: jest.fn(), scanPosts: jest.fn().mockImplementation(async (triggerId, params, scanner) => { const post = new Post() post.status = PostStatus.INITIAL post.context = { test: again } post.adapterType = PostAdapterType.WECHAT const items = new Array(10).fill(0).map((_) => ({ ...post }))

    await scanner(items)
}),

})

... describe(handle single post item, () => { it(calculates total posted count, async () => { const res = await sut.handleSinglePostItem(triggerId, post, []) expect(res).toEqual(1) })

    it(logs error, async () => {
        const warnings: any[] = []
        console.warn = (...args) => warnings.push(args)
        postService.terminatePost = jest.fn().mockRejectedValueOnce({ message: error })

        const res = await sut.handleSinglePostItem(triggerId, post, [])
        expect(res).toEqual(0)
        expect(warnings.length).toEqual(1)
        expect(warnings[0]).toStrictEqual([triggerId: Post失败: error])
    })
})

describe(handle multiple post items, () => {
    postService.terminatePost = jest.fn().mockImplementation(async (post) => {
        console.log(terminating , post)
        return true
    })

    // 对统计逻辑的测试
    it(counts total Records, async () => {
        const stats = {
            totalRecords: 0,
            totalPosted: 0,
        }
        await sut.scanPosts(triggerId, { Segment: 2 }, 0, stats)

        expect(stats.totalRecords).toEqual(10)
        expect(stats.totalPosted).toEqual(10)
    })
})

然后,再重构代码,在重构完以后,以上测试仍然通过,这才是重构成功。

总结

当看到带有 await 的 for 循环时,很可能碰到了一个坏的模式。除非实在有先后顺序的必要,否则非常建议改成 Promise.all 来让操作并发执行,以提高系统的吞吐量。