MySQL + PHP 实现事件队列(2)

上一篇文章里设计的队列表, 在使用了一段时间后, 发现了以下不足:

handle 字段限制了事件执行者 (subscriber) 的数量

系统中的某个事件或者消息, 往往对应着多个接收者, 直接将处理程序写在 Handle 字段上, 限制了事件(消息)接收者的数量.

改进: 移除队列表中的 handle 字段, 将事件接收者(类)写到配置文件中:

队列表结构变更为:

  • id: 事件id, 表的主键
  • code: 事件的 code, 同类型事件都是一样的
  • event_data: 一个json字符串, 包含处理事件事所需的数据
  • status: 事件的状态, 共有4种状态: pending, running, finished, error
  • message: 事件处理中所产生的消息
  • scheduled_at: 计划处理该事件的时间
  • created_at: 该事件生成的时间
  • started_at: 处理开始时间
  • finished_at: 处理结束时间
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
<event>
    <handle>
        <order_invoice>
            <event_data_initialization>
                <handle>OrderInvoiceInitHandle</handle>
                <sort_order>0</sort_order>
            </event_data_initialization>
            <reward_points_balance_update>
                <handle>IncreaseRewardPointsBalanceHandle</handle>
                <sort_order>10</sort_order>
            </reward_points_balance_update>
            <customer_upgrade_group>
                <handle>CustomerGroupUpdateHandle</handle>
                <sort_order>20</sort_order>
            </customer_upgrade_group>
            <reward_points_update_notification>
                <handle>SendRewardPointsNotificationHandle</handle>
                <sort_order>100</sort_order>
            </reward_points_update_notification>
        </order_invoice>
    </handle>
</event>

修改 EventDispatcher class:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class EventDispatcher
{
    const XML_PATH_EVENT_HANDLES_PREFIX = 'event/handle/';

    protected $logger;
    protected $objectManager;
    protected $scopeConfig;

    public function __construct(
        LoggerInterface $logger,
        ScopeConfigInterface $scopeConfig,
        ObjectManagerInterface $objectManager
    ) {
        $this->logger = $logger;
        $this->objectManager = $objectManager;
        $this->scopeConfig = $scopeConfig;
    }


    public function execute()
    {
        try {
            /** @var EventQueue $eventModel */
            $eventModel = $this->objectManager->create(EventQueue::class);
            $conn = $eventModel->getResource()->getConnection();
            $select = $conn->select();
            $select->reset()
                ->from($conn->getTableName('oneday_event_queue'), ['id'])
                ->where('status = ?', EventQueue::STATUS_PENDING)
                ->where('scheduled_at < ?', date('Y-m-d H:i:s', time() + 60));

            $stmt = $select->query();

            /** @var EventQueue $event */
            while ($row = $stmt->fetch()) {
                try {
                    $event = clone $eventModel;
                    $event->load($row['id']);
                    if ($event->getStatus() !== EventQueue::STATUS_PENDING) {
                        continue;
                    }
                    // 设置 Event 为 running 状态
                    $event->setStatus(EventQueue::STATUS_RUNNING)
                        ->setStartedAt(date('Y-m-d H:i:s'));
                    $event->save();

                    $handles = $this->getHandlesByCode($event->getCode());
                    $messages = [];
                    /** @var DataObject $eventData */
                    $eventData = $this->objectManager->create(DataObject::class);
                    $eventData->addData(json_decode($event->getEventData(), true));
                    foreach ($handles as $handle) {
                        // 开始处理 Event
                        try {
                            /** @var EventExecutorInterface $handleInstance */
                            $handleInstance = $this->objectManager->create($handle);
                            if ($handleInstance instanceof EventExecutorInterface) {
                                $handleInstance->execute($eventData);
                            } else {
                                $messages[] = 'Event Handle('. $handle. ') have not implements '. EventExecutorInterface::class. 'Skipped this event handle';
                            }
                        } catch (\Exception $e) {
                            $messages[] = 'Event execute exception: '. $e->getMessage();
                        }
                    }
                    // Event 处理完成, 设置为 finished 状态
                    $event->setFinishedAt(time())
                        ->setStatus(EventQueue::STATUS_FINISHED)
                        ->setMessage(implode("\n", $messages));
                    $event->save();
                } catch (\Exception $e) {
                    // Event 处理失败, 设置为 error 状态
                    // todo 改进处理失败方式
                    $message = '';
                    if (isset($messages) && count($messages)) {
                        $message = implode("\n", $messages);
                    }
                    $event->setMessage($message. "\n". $e->getMessage())
                        ->setStatus(EventQueue::STATUS_ERROR);
                    $event->save();
                    $this->logger->critical(sprintf('processing event#%s failed, %s', $event->getId(), $e->getMessage()));
                }
            }
        } catch (\Exception $e) {
            $this->logger->critical('Event process error: '. $e->getMessage());
        }
    }

    /**
     * 根据 code 获取配置的 event handle
     * @param $code
     * @return array
     */
    protected function getHandlesByCode($code)
    {
        $configData = $this->scopeConfig->getValue(self::XML_PATH_EVENT_HANDLES_PREFIX. $code);
        $handles = [];
        if (is_array($configData)) {
            $_handles = [];
            foreach ($configData as $item) {
                if (isset($item['handle']) && $item['handle']) {
                    $sortOrder = isset($item['sort_order']) ? (int) $item['sort_order'] : 1;
                    $handle = $item['handle'];
                    if (isset($_handles[$sortOrder])) {
                        $_handles[$sortOrder][] = $handle;
                    } else {
                        $_handles[$sortOrder] = [$handle];
                    }
                }
            }
            ksort($_handles);
            foreach ($_handles as $items) {
                $handles = array_merge($handles, $items);
            }
        }
        return $handles;
    }
}

这样修改后, 同一个事件(消息)就可以对应多个事件处理程序(subscriber)了.

Built with Hugo
主题 StackJimmy 设计