一个 ThinkPHP 框架的 MySQL Binlog 监听扩展 Think-Binlog

本文详细介绍了专为ThinkPHP框架设计的MySQL Binlog监听扩展Think-Binlog。该工具支持实时捕获数据库的INSERT、UPDATE、DELETE事件,并集成队列转发、自定义事件订阅与守护进程模式,是构建数据同步、缓存更新、审计日志等高级功能的理想解决方案。文章包含完整的环境配置、安装步骤、使用示例及故障排除指南。

这是一个基于 krowinski/php-mysql-replication 开发的 ThinkPHP MySQL Binlog 监听扩展,支持后台运行、队列转发和事件订阅。

功能特性

🚀 实时监听 – 监听MySQL binlog事件(INSERT、UPDATE、DELETE)

🔄 队列转发 – 支持将事件转发到think-queue队列系统

📡 事件订阅 – 灵活的事件订阅机制

🛡️ 后台运行 – 支持守护进程模式运行

📊 状态监控 – 提供进程状态查看和管理

🎯 精确过滤 – 支持按数据库、表、事件类型过滤

📝 详细日志 – 完整的日志记录和错误处理

环境要求

  • PHP >= 8.0
  • ThinkPHP >= 8.0
  • MySQL >= 5.5 (支持binlog)
  • 扩展: pcntl, posix (守护进程模式需要)

安装

composer require yangweijie/think-binlog

MySQL配置

在MySQL配置文件中启用binlog:

[mysqld]
server-id        = 1
log_bin          = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size  = 100M
binlog-format    = row  # 重要:必须设置为row格式

创建复制用户:

GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlog_user'@'%';
GRANT SELECT ON `your_database`.* TO 'binlog_user'@'%';
FLUSH PRIVILEGES;

配置

发布配置文件:

php think service:discover

编辑 config/binlog.php

<?php
return [
    // MySQL连接配置
    'mysql' => [
        'host' => '127.0.0.1',
        'port' => 3306,
        'user' => 'binlog_user',
        'password' => 'password',
        'charset' => 'utf8mb4',
        'slave_id' => 666,
    ],

    // Binlog配置
    'binlog' => [
        'databases_only' => [], // 监听的数据库
        'tables_only' => [],    // 监听的表
        'events_only' => ['write', 'update', 'delete'],
    ],

    // 队列配置
    'queue' => [
        'enabled' => true,
        'connection' => 'default',
        'queue_name' => 'binlog',
    ],

    // 事件订阅器
    'subscribers' => [
        // 'App\\Listener\\UserBinlogListener',
    ],
];

使用方法

命令行工具

# 前台运行(调试模式)
php think binlog listen

# 启动守护进程
php think binlog start --daemon

# 停止守护进程
php think binlog stop

# 重启守护进程
php think binlog restart

# 查看状态
php think binlog status

编程方式

use yangweijie\ThinkBinlog\BinlogListener;

// 创建监听器
$listener = new BinlogListener();

// 启动监听
$listener->start();

事件订阅

创建事件订阅器:

<?php

namespace app\listener;

use yangweijie\ThinkBinlog\Contract\BinlogSubscriberInterface;
use yangweijie\ThinkBinlog\Event\BinlogEvent;

class UserBinlogListener implements BinlogSubscriberInterface
{
    publicfunction handle(BinlogEvent $event): void
    {
        if ($event->getTable() === 'users') {
            // 处理用户表的变更
            $this->handleUserChange($event);
        }
    }

    publicfunction getDatabases(): array
    {
        return ['your_database'];
    }

    publicfunction getTables(): array
    {
        return ['users', 'orders'];
    }

    publicfunction getEventTypes(): array
    {
        return ['insert', 'update', 'delete'];
    }

    privatefunction handleUserChange(BinlogEvent $event): void
    {
        switch ($event->getType()) {
            case'insert':
                // 处理用户注册
                break;
            case'update':
                // 处理用户信息更新
                break;
            case'delete':
                // 处理用户删除
                break;
        }
    }
}

在配置文件中注册订阅器:

'subscribers' => [
    'app\\listener\\UserBinlogListener',
],

队列处理

监听队列事件:

// 在事件监听器中
Event::listen('binlog.insert', function ($database, $table, $data) {
    // 处理插入事件
});

Event::listen('binlog.update', function ($database, $table, $data) {
    // 处理更新事件
});

Event::listen('binlog.delete', function ($database, $table, $data) {
    // 处理删除事件
});

// 监听特定表的事件
Event::listen('binlog.your_database.users', function ($eventType, $data) {
    // 处理users表的所有事件
});

事件数据结构

[
    'event_info' => [
        'type' => 'insert',           // 事件类型
        'database' => 'test_db',      // 数据库名
        'table' => 'users',           // 表名
        'datetime' => '2023-01-01 12:00:00',
        'timestamp' => 1672574400,
        'log_position' => 1234,
        'event_size' => 56,
    ],
    'data' => [
        'rows' => [                   // 变更的行数据
            [
                'id' => 1,
                'name' => 'John',
                'email' => 'john@example.com',
            ]
        ],
        'columns' => [                // 表结构信息
            // ...
        ],
    ],
]

守护进程管理

系统服务配置

创建systemd服务文件 /etc/systemd/system/think-binlog.service

[Unit]
Description=ThinkPHP Binlog Listener
After=mysql.service

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/path/to/your/project
ExecStart=/usr/bin/php think binlog start --daemon
ExecStop=/usr/bin/php think binlog stop
ExecReload=/usr/bin/php think binlog restart
Restart=always
RestartSec=3

[Install]
WantedBy=multi-user.target

启用服务:

sudo systemctl enable think-binlog
sudo systemctl start think-binlog
sudo systemctl status think-binlog

故障排除

常见问题

1. 连接失败

  • 检查MySQL用户权限
  • 确认binlog已启用
  • 验证网络连接

2. 内存使用过高

  • 调整 daemon.memory_limit 配置
  • 启用自动重启机制

3. 事件丢失

  • 检查binlog位置设置
  • 确认事件过滤配置

调试模式

# 启用调试日志
php think binlog listen

# 查看日志
tail -f runtime/log/binlog.log

性能优化

  • 使用队列异步处理事件
  • 合理设置事件过滤条件
  • 定期清理日志文件
  • 监控内存使用情况

【版权提示】信息来自于互联网,不代表Intoep官方立场,内容仅供网友参考学习。如发现本站内容存在版权问题,烦请提供版权疑问、联系方式等发邮件至 1359125269@qq.com ,我们将及时沟通与处理。如若转载请联系原出处
(0)
PHP AI插件整合文心/千问/DeepSeek等十大模型,支持Laravel与ThinkPHP框架
上一篇 2026-02-05 09:28
ThinkPHP8集成RabbitMQ的完整案例实现
下一篇 2025-10-15 11:49

相关推荐

发表回复

登录后才能评论
扫码了解
扫码了解
反馈建议
分享本页
返回顶部