MySQL + PHP 实现事件队列(1)

最近运营要给订单成功后添加一个操作, 当一个订单完成后, 该用户历史订单总额达到500$, 自动升级成为钻石会员并发送升级电子邮件. 由于订单成功事件上面已经有很多操作了, 再这样新增事件会导致订单成功页面响应缓慢.

考虑使用事件队列来完成上述需求, 分析优缺点:
优点:

  1. 提高订单成功页面返回速度
  2. 削峰. 发送邮件 / 同步订单到 ERP 等操作都比较耗时, 把这些操作分离出去, 在订单比较多的时候, 能进一步提高订单系统的吞吐量
  3. 解耦. 事件生产和事件消费解耦和. 比如消费事件的 handle 如果出现故障, 可以等 bug 修复后继续处理(相反, 如果是依赖于系统中的事件, 如果处理时出错, 就再也没有办法再次触发事件)

缺点:

  1. 增加了系统的复杂度
  2. 数据存在一定时间的不一致

综合上面的分析, 我决定创建一个事件队列来实现上面的需求.
我们的系统在使用 Redis, 但是由于系统在清除缓存时, 会将所有 Redis 缓存清空, 所以事件队列无法基于 Redis 完成(遗憾), 所以我只能通过 MySQL + PHP 来实现它.
事件队列中涉及到的三个对象: 队列 / 事件生产者 / 事件执行者

队列:
队列是一张 MySQL 表, 它有如下几个字段:
id: 事件id, 表的主键
code: 事件的 code, 同类型事件都是一样的
handle: 事件的处理类, 该类必须实现 EventHandleInterface 接口
event_data: 一个json字符串, 包含处理事件事所需的数据
status: 事件的状态, 共有4种状态: pending, running, finished, error
message: 事件处理中所产生的消息
scheduled_at: 计划处理该事件的时间
created_at: 该事件生成的时间
started_at: 处理开始时间
finished_at: 处理结束时间

事件生成者:
事件生成者存在于系统的各个环节, 它的任务是往队列里写入事件

事件执行者:
事件执行者就是队列表的 handle 字段指定的类, 它必须实现 EventExecutorInterface 接口:

1
2
3
4
interface EventExecutorInterface
{
    public function execute($eventData);
}

还有一个分发事件的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class EventDispatcher
{
    protected $logger;
    protected $objectManager;

    public function __construct(
        LoggerInterface $logger,
        ObjectManagerInterface $objectManager
    ) {
        $this->logger = $logger;
        $this->objectManager = $objectManager;
    }


    public function execute()
    {
        try {
            while (true) {
                /** @var EventQueueCollection $eventQueues */
                $eventQueues = $this->objectManager->create(EventQueueCollection::class);
                $eventQueues->addFieldToFilter('status', ['eq' => EventQueue::STATUS_PENDING])
                    ->addFieldToFilter('scheduled_at', ['lt' => date('Y-m-d H:i:s', time() + 60)]);
                echo (string)$eventQueues->getSelect();
                if (count($eventQueues) === 0) {
                    echo "No pending event found.\n";
                    return;
                }
                /** @var EventQueue $eventQueue */
                foreach ($eventQueues as $eventQueue) {
                    /** @var DataObject $eventData */
                    try {
                        // 设置 Event 为 running 状态
                        $eventQueue->setStatus(EventQueue::STATUS_RUNNING)
                            ->setStartedAt(date('Y-m-d H:i:s'));
                        $eventQueue->save();

                        // 开始处理 Event
                        /** @var EventExecutorInterface $handleInstance */
                        $handleInstance = $this->objectManager->create($eventQueue->getHandle());
                        if ($handleInstance instanceof EventExecutorInterface) {
                            $eventData = $this->objectManager->create(DataObject::class);
                            $eventData->addData(json_decode($eventQueue->getEventData(), true));
                            $handleInstance->execute($eventData);

                            // Event 处理完成, 设置为 finished 状态
                            $eventQueue->setFinishedAt(time())
                                ->setStatus(EventQueue::STATUS_FINISHED)
                                ->setMessage('success');
                            $eventQueue->save();
                        } else {
                            throw new CronException(__('Event Handle must implements '. EventExecutorInterface::class));
                        }
                    } catch (\Exception $e) {
                        // Event 处理失败, 设置为 error 状态
                        $eventQueue->setMessage($e->getMessage())
                            ->setStatus(EventQueue::STATUS_ERROR);
                        $eventQueue->save();
                        $this->logger->critical(sprintf('processing event#%s failed, %s', $eventQueue->getId(), $e->getMessage()));
                    }
                }
            }
        } catch (\Exception $e) {
            $this->logger->critical('Event process error: '. $e->getMessage());
        }
    }
}

它负责从队列里读取状态为 pending 的事件, 然后一个接一个地实例化 handle 类, 并调用 handle->execute() 方法. 在 调用 execute() 方法前, 它会更新 event 的状态和开始时间, 在 execute() 方法执行结束后, 它会更新 event 的状态和完成时间. 执行 execute() 方法时如果出现错误, 会将错误提示写入 event 的 message 并将status 改为 error.