最近运营要给订单成功后添加一个操作, 当一个订单完成后, 该用户历史订单总额达到500$, 自动升级成为钻石会员并发送升级电子邮件. 由于订单成功事件上面已经有很多操作了, 再这样新增事件会导致订单成功页面响应缓慢.
考虑使用事件队列来完成上述需求, 分析优缺点:
优点:
- 提高订单成功页面返回速度
- 削峰. 发送邮件 / 同步订单到 ERP 等操作都比较耗时, 把这些操作分离出去, 在订单比较多的时候, 能进一步提高订单系统的吞吐量
- 解耦. 事件生产和事件消费解耦和. 比如消费事件的 handle 如果出现故障, 可以等 bug 修复后继续处理(相反, 如果是依赖于系统中的事件, 如果处理时出错, 就再也没有办法再次触发事件)
缺点:
综合上面的分析, 我决定创建一个事件队列来实现上面的需求.
我们的系统在使用 Redis, 但是由于系统在清除缓存时, 会将所有 Redis 缓存清空, 所以事件队列无法基于 Redis 完成(遗憾), 所以我只能通过 MySQL + PHP 来实现它.
事件队列中涉及到的三个对象: 队列 / 事件生产者 / 事件执行者
队列:
队列是一张 MySQL 表, 它有如下几个字段:
- id: 事件id, 表的主键
- code: 事件的 code, 同类型事件都是一样的
- handle: 事件的处理类, 该类必须实现 EventHandleInterface 接口
- event_data: 一个json字符串, 包含处理事件事所需的数据
- status: 事件的状态, 共有4种状态: pending, running, finished, error
- message: 事件处理中所产生的消息
- scheduled_at: 计划处理该事件的时间
- created_at: 该事件生成的时间
- started_at: 处理开始时间
- finished_at: 处理结束时间
事件生成者:
事件生成者存在于系统的各个环节, 它的任务是往队列里写入事件
事件执行者:
事件执行者就是队列表的 handle 字段指定的类, 它必须实现 EventExecutorInterface 接口:
1
2
3
4
|
interface EventExecutorInterface
{
public function execute($eventData);
}
|
还有一个分发事件的类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
class EventDispatcher
{
protected $logger;
protected $objectManager;
public function __construct(
LoggerInterface $logger,
ObjectManagerInterface $objectManager
) {
$this->logger = $logger;
$this->objectManager = $objectManager;
}
public function execute()
{
try {
while (true) {
/** @var EventQueueCollection $eventQueues */
$eventQueues = $this->objectManager->create(EventQueueCollection::class);
$eventQueues->addFieldToFilter('status', ['eq' => EventQueue::STATUS_PENDING])
->addFieldToFilter('scheduled_at', ['lt' => date('Y-m-d H:i:s', time() + 60)]);
echo (string)$eventQueues->getSelect();
if (count($eventQueues) === 0) {
echo "No pending event found.\n";
return;
}
/** @var EventQueue $eventQueue */
foreach ($eventQueues as $eventQueue) {
/** @var DataObject $eventData */
try {
// 设置 Event 为 running 状态
$eventQueue->setStatus(EventQueue::STATUS_RUNNING)
->setStartedAt(date('Y-m-d H:i:s'));
$eventQueue->save();
// 开始处理 Event
/** @var EventExecutorInterface $handleInstance */
$handleInstance = $this->objectManager->create($eventQueue->getHandle());
if ($handleInstance instanceof EventExecutorInterface) {
$eventData = $this->objectManager->create(DataObject::class);
$eventData->addData(json_decode($eventQueue->getEventData(), true));
$handleInstance->execute($eventData);
// Event 处理完成, 设置为 finished 状态
$eventQueue->setFinishedAt(time())
->setStatus(EventQueue::STATUS_FINISHED)
->setMessage('success');
$eventQueue->save();
} else {
throw new CronException(__('Event Handle must implements '. EventExecutorInterface::class));
}
} catch (\Exception $e) {
// Event 处理失败, 设置为 error 状态
$eventQueue->setMessage($e->getMessage())
->setStatus(EventQueue::STATUS_ERROR);
$eventQueue->save();
$this->logger->critical(sprintf('processing event#%s failed, %s', $eventQueue->getId(), $e->getMessage()));
}
}
}
} catch (\Exception $e) {
$this->logger->critical('Event process error: '. $e->getMessage());
}
}
}
|
它负责从队列里读取状态为 pending 的事件, 然后一个接一个地实例化 handle 类, 并调用 handle->execute() 方法. 在 调用 execute() 方法前, 它会更新 event 的状态和开始时间, 在 execute() 方法执行结束后, 它会更新 event 的状态和完成时间. 执行 execute() 方法时如果出现错误, 会将错误提示写入 event 的 message 并将status 改为 error.