上一篇文章里设计的队列表, 在使用了一段时间后, 发现了以下不足:
handle 字段限制了事件执行者 (subscriber) 的数量
系统中的某个事件或者消息, 往往对应着多个接收者, 直接将处理程序写在 Handle 字段上, 限制了事件(消息)接收者的数量.
改进:
移除队列表中的 handle 字段, 将事件接收者(类)写到配置文件中:
队列表结构变更为:
- id: 事件id, 表的主键
- code: 事件的 code, 同类型事件都是一样的
- event_data: 一个json字符串, 包含处理事件事所需的数据
- status: 事件的状态, 共有4种状态: pending, running, finished, error
- message: 事件处理中所产生的消息
- scheduled_at: 计划处理该事件的时间
- created_at: 该事件生成的时间
- started_at: 处理开始时间
- finished_at: 处理结束时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
<event>
<handle>
<order_invoice>
<event_data_initialization>
<handle>OrderInvoiceInitHandle</handle>
<sort_order>0</sort_order>
</event_data_initialization>
<reward_points_balance_update>
<handle>IncreaseRewardPointsBalanceHandle</handle>
<sort_order>10</sort_order>
</reward_points_balance_update>
<customer_upgrade_group>
<handle>CustomerGroupUpdateHandle</handle>
<sort_order>20</sort_order>
</customer_upgrade_group>
<reward_points_update_notification>
<handle>SendRewardPointsNotificationHandle</handle>
<sort_order>100</sort_order>
</reward_points_update_notification>
</order_invoice>
</handle>
</event>
|
修改 EventDispatcher class:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
class EventDispatcher
{
const XML_PATH_EVENT_HANDLES_PREFIX = 'event/handle/';
protected $logger;
protected $objectManager;
protected $scopeConfig;
public function __construct(
LoggerInterface $logger,
ScopeConfigInterface $scopeConfig,
ObjectManagerInterface $objectManager
) {
$this->logger = $logger;
$this->objectManager = $objectManager;
$this->scopeConfig = $scopeConfig;
}
public function execute()
{
try {
/** @var EventQueue $eventModel */
$eventModel = $this->objectManager->create(EventQueue::class);
$conn = $eventModel->getResource()->getConnection();
$select = $conn->select();
$select->reset()
->from($conn->getTableName('oneday_event_queue'), ['id'])
->where('status = ?', EventQueue::STATUS_PENDING)
->where('scheduled_at < ?', date('Y-m-d H:i:s', time() + 60));
$stmt = $select->query();
/** @var EventQueue $event */
while ($row = $stmt->fetch()) {
try {
$event = clone $eventModel;
$event->load($row['id']);
if ($event->getStatus() !== EventQueue::STATUS_PENDING) {
continue;
}
// 设置 Event 为 running 状态
$event->setStatus(EventQueue::STATUS_RUNNING)
->setStartedAt(date('Y-m-d H:i:s'));
$event->save();
$handles = $this->getHandlesByCode($event->getCode());
$messages = [];
/** @var DataObject $eventData */
$eventData = $this->objectManager->create(DataObject::class);
$eventData->addData(json_decode($event->getEventData(), true));
foreach ($handles as $handle) {
// 开始处理 Event
try {
/** @var EventExecutorInterface $handleInstance */
$handleInstance = $this->objectManager->create($handle);
if ($handleInstance instanceof EventExecutorInterface) {
$handleInstance->execute($eventData);
} else {
$messages[] = 'Event Handle('. $handle. ') have not implements '. EventExecutorInterface::class. 'Skipped this event handle';
}
} catch (\Exception $e) {
$messages[] = 'Event execute exception: '. $e->getMessage();
}
}
// Event 处理完成, 设置为 finished 状态
$event->setFinishedAt(time())
->setStatus(EventQueue::STATUS_FINISHED)
->setMessage(implode("\n", $messages));
$event->save();
} catch (\Exception $e) {
// Event 处理失败, 设置为 error 状态
// todo 改进处理失败方式
$message = '';
if (isset($messages) && count($messages)) {
$message = implode("\n", $messages);
}
$event->setMessage($message. "\n". $e->getMessage())
->setStatus(EventQueue::STATUS_ERROR);
$event->save();
$this->logger->critical(sprintf('processing event#%s failed, %s', $event->getId(), $e->getMessage()));
}
}
} catch (\Exception $e) {
$this->logger->critical('Event process error: '. $e->getMessage());
}
}
/**
* 根据 code 获取配置的 event handle
* @param $code
* @return array
*/
protected function getHandlesByCode($code)
{
$configData = $this->scopeConfig->getValue(self::XML_PATH_EVENT_HANDLES_PREFIX. $code);
$handles = [];
if (is_array($configData)) {
$_handles = [];
foreach ($configData as $item) {
if (isset($item['handle']) && $item['handle']) {
$sortOrder = isset($item['sort_order']) ? (int) $item['sort_order'] : 1;
$handle = $item['handle'];
if (isset($_handles[$sortOrder])) {
$_handles[$sortOrder][] = $handle;
} else {
$_handles[$sortOrder] = [$handle];
}
}
}
ksort($_handles);
foreach ($_handles as $items) {
$handles = array_merge($handles, $items);
}
}
return $handles;
}
}
|
这样修改后, 同一个事件(消息)就可以对应多个事件处理程序(subscriber)了.