MySQL + PHP 实现事件队列(2)

上一篇文章里设计的队列表, 在使用了一段时间后, 发现了以下不足:

handle 字段限制了事件执行者 (subscriber) 的数量

系统中的某个事件或者消息, 往往对应着多个接收者, 直接将处理程序写在 Handle 字段上, 限制了事件(消息)接收者的数量.

改进:
移除队列表中的 handle 字段, 将事件接收者(类)写到配置文件中:

队列表结构变更为:

1
2
3
4
5
6
7
8
9
id: 事件id, 表的主键
code:   事件的 code, 同类型事件都是一样的
event_data: 一个json字符串, 包含处理事件事所需的数据
status: 事件的状态, 共有4种状态: pending, running, finished, error
message: 事件处理中所产生的消息
scheduled_at:   计划处理该事件的时间
created_at: 该事件生成的时间
started_at: 处理开始时间
finished_at:    处理结束时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// config.xml
<event>
    <handle>
        <order_invoice>
            <event_data_initialization>
                <handle>OrderInvoiceInitHandle</handle>
                <sort_order>0</sort_order>
            </event_data_initialization>
            <reward_points_balance_update>
                <handle>IncreaseRewardPointsBalanceHandle</handle>
                <sort_order>10</sort_order>
            </reward_points_balance_update>
            <customer_upgrade_group>
                <handle>CustomerGroupUpdateHandle</handle>
                <sort_order>20</sort_order>
            </customer_upgrade_group>
            <reward_points_update_notification>
                <handle>SendRewardPointsNotificationHandle</handle>
                <sort_order>100</sort_order>
            </reward_points_update_notification>
        </order_invoice>
    </handle>
</event>

修改 EventDispatcher class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class EventDispatcher
{
    const XML_PATH_EVENT_HANDLES_PREFIX = 'event/handle/';

    protected $logger;
    protected $objectManager;
    protected $scopeConfig;

    public function __construct(
        LoggerInterface $logger,
        ScopeConfigInterface $scopeConfig,
        ObjectManagerInterface $objectManager
    ) {
        $this->logger = $logger;
        $this->objectManager = $objectManager;
        $this->scopeConfig = $scopeConfig;
    }


    public function execute()
    {
        try {
            /** @var EventQueue $eventModel */
            $eventModel = $this->objectManager->create(EventQueue::class);
            $conn = $eventModel->getResource()->getConnection();
            $select = $conn->select();
            $select->reset()
                ->from($conn->getTableName('oneday_event_queue'), ['id'])
                ->where('status = ?', EventQueue::STATUS_PENDING)
                ->where('scheduled_at < ?', date('Y-m-d H:i:s', time() + 60));

            $stmt = $select->query();

            /** @var EventQueue $event */
            while ($row = $stmt->fetch()) {
                try {
                    $event = clone $eventModel;
                    $event->load($row['id']);
                    if ($event->getStatus() !== EventQueue::STATUS_PENDING) {
                        continue;
                    }
                    // 设置 Event 为 running 状态
                    $event->setStatus(EventQueue::STATUS_RUNNING)
                        ->setStartedAt(date('Y-m-d H:i:s'));
                    $event->save();

                    $handles = $this->getHandlesByCode($event->getCode());
                    $messages = [];
                    /** @var DataObject $eventData */
                    $eventData = $this->objectManager->create(DataObject::class);
                    $eventData->addData(json_decode($event->getEventData(), true));
                    foreach ($handles as $handle) {
                        // 开始处理 Event
                        try {
                            /** @var EventExecutorInterface $handleInstance */
                            $handleInstance = $this->objectManager->create($handle);
                            if ($handleInstance instanceof EventExecutorInterface) {
                                $handleInstance->execute($eventData);
                            } else {
                                $messages[] = 'Event Handle('. $handle. ') have not implements '. EventExecutorInterface::class. 'Skipped this event handle';
                            }
                        } catch (\Exception $e) {
                            $messages[] = 'Event execute exception: '. $e->getMessage();
                        }
                    }
                    // Event 处理完成, 设置为 finished 状态
                    $event->setFinishedAt(time())
                        ->setStatus(EventQueue::STATUS_FINISHED)
                        ->setMessage(implode("\n", $messages));
                    $event->save();
                } catch (\Exception $e) {
                    // Event 处理失败, 设置为 error 状态
                    // todo 改进处理失败方式
                    $message = '';
                    if (isset($messages) && count($messages)) {
                        $message = implode("\n", $messages);
                    }
                    $event->setMessage($message. "\n". $e->getMessage())
                        ->setStatus(EventQueue::STATUS_ERROR);
                    $event->save();
                    $this->logger->critical(sprintf('processing event#%s failed, %s', $event->getId(), $e->getMessage()));
                }
            }
        } catch (\Exception $e) {
            $this->logger->critical('Event process error: '. $e->getMessage());
        }
    }

    /**
     * 根据 code 获取配置的 event handle
     * @param $code
     * @return array
     */
    protected function getHandlesByCode($code)
    {
        $configData = $this->scopeConfig->getValue(self::XML_PATH_EVENT_HANDLES_PREFIX. $code);
        $handles = [];
        if (is_array($configData)) {
            $_handles = [];
            foreach ($configData as $item) {
                if (isset($item['handle']) && $item['handle']) {
                    $sortOrder = isset($item['sort_order']) ? (int) $item['sort_order'] : 1;
                    $handle = $item['handle'];
                    if (isset($_handles[$sortOrder])) {
                        $_handles[$sortOrder][] = $handle;
                    } else {
                        $_handles[$sortOrder] = [$handle];
                    }
                }
            }
            ksort($_handles);
            foreach ($_handles as $items) {
                $handles = array_merge($handles, $items);
            }
        }
        return $handles;
    }
}

这样修改后, 同一个事件(消息)就可以对应多个事件处理程序(subscriber)了.