最近运营要给订单成功后添加一个操作, 当一个订单完成后, 该用户历史订单总额达到500$, 自动升级成为钻石会员并发送升级电子邮件. 由于订单成功事件上面已经有很多操作了, 再这样新增事件会导致订单成功页面响应缓慢.
考虑使用事件队列来完成上述需求, 分析优缺点:
优点:
- 提高订单成功页面返回速度
- 削峰. 发送邮件 / 同步订单到 ERP 等操作都比较耗时, 把这些操作分离出去, 在订单比较多的时候, 能进一步提高订单系统的吞吐量
- 解耦. 事件生产和事件消费解耦和. 比如消费事件的 handle 如果出现故障, 可以等 bug 修复后继续处理(相反, 如果是依赖于系统中的事件, 如果处理时出错, 就再也没有办法再次触发事件)
缺点:
- 增加了系统的复杂度
- 数据存在一定时间的不一致
综合上面的分析, 我决定创建一个事件队列来实现上面的需求.
我们的系统在使用 Redis, 但是由于系统在清除缓存时, 会将所有 Redis 缓存清空, 所以事件队列无法基于 Redis 完成(遗憾), 所以我只能通过 MySQL + PHP 来实现它.
事件队列中涉及到的三个对象: 队列 / 事件生产者 / 事件执行者
队列:
队列是一张 MySQL 表, 它有如下几个字段:
id: 事件id, 表的主键
code: 事件的 code, 同类型事件都是一样的
handle: 事件的处理类, 该类必须实现 EventHandleInterface 接口
event_data: 一个json字符串, 包含处理事件事所需的数据
status: 事件的状态, 共有4种状态: pending, running, finished, error
message: 事件处理中所产生的消息
scheduled_at: 计划处理该事件的时间
created_at: 该事件生成的时间
started_at: 处理开始时间
finished_at: 处理结束时间
事件生成者:
事件生成者存在于系统的各个环节, 它的任务是往队列里写入事件
事件执行者:
事件执行者就是队列表的 handle 字段指定的类, 它必须实现 EventExecutorInterface 接口:
1 2 3 4 | interface EventExecutorInterface { public function execute($eventData); } |
还有一个分发事件的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | class EventDispatcher { protected $logger; protected $objectManager; public function __construct( LoggerInterface $logger, ObjectManagerInterface $objectManager ) { $this->logger = $logger; $this->objectManager = $objectManager; } public function execute() { try { while (true) { /** @var EventQueueCollection $eventQueues */ $eventQueues = $this->objectManager->create(EventQueueCollection::class); $eventQueues->addFieldToFilter('status', ['eq' => EventQueue::STATUS_PENDING]) ->addFieldToFilter('scheduled_at', ['lt' => date('Y-m-d H:i:s', time() + 60)]); echo (string)$eventQueues->getSelect(); if (count($eventQueues) === 0) { echo "No pending event found.\n"; return; } /** @var EventQueue $eventQueue */ foreach ($eventQueues as $eventQueue) { /** @var DataObject $eventData */ try { // 设置 Event 为 running 状态 $eventQueue->setStatus(EventQueue::STATUS_RUNNING) ->setStartedAt(date('Y-m-d H:i:s')); $eventQueue->save(); // 开始处理 Event /** @var EventExecutorInterface $handleInstance */ $handleInstance = $this->objectManager->create($eventQueue->getHandle()); if ($handleInstance instanceof EventExecutorInterface) { $eventData = $this->objectManager->create(DataObject::class); $eventData->addData(json_decode($eventQueue->getEventData(), true)); $handleInstance->execute($eventData); // Event 处理完成, 设置为 finished 状态 $eventQueue->setFinishedAt(time()) ->setStatus(EventQueue::STATUS_FINISHED) ->setMessage('success'); $eventQueue->save(); } else { throw new CronException(__('Event Handle must implements '. EventExecutorInterface::class)); } } catch (\Exception $e) { // Event 处理失败, 设置为 error 状态 $eventQueue->setMessage($e->getMessage()) ->setStatus(EventQueue::STATUS_ERROR); $eventQueue->save(); $this->logger->critical(sprintf('processing event#%s failed, %s', $eventQueue->getId(), $e->getMessage())); } } } } catch (\Exception $e) { $this->logger->critical('Event process error: '. $e->getMessage()); } } } |
它负责从队列里读取状态为 pending 的事件, 然后一个接一个地实例化 handle 类, 并调用 handle->execute() 方法. 在 调用 execute() 方法前, 它会更新 event 的状态和开始时间, 在 execute() 方法执行结束后, 它会更新 event 的状态和完成时间. 执行 execute() 方法时如果出现错误, 会将错误提示写入 event 的 message 并将status 改为 error.