TypechoJoeTheme

Dcr163的博客

统计

Thinkphp8.0使用Rabbitmq的做队列和延迟队列代码实例

2025-09-28
/
0 评论
/
13 阅读
/
正在检测是否收录...
09/28

前提是已经安装好Rabbitmq的服务和安装好Rabbitmq消息延迟队列插件

新建数据表

使用的orders表结构

CREATE TABLE `orders` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `orderno` varchar(60) NOT NULL DEFAULT '' COMMENT '订单号',
  `price` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '价格',
  `create_time` datetime DEFAULT NULL,
  `cancel_time` datetime DEFAULT NULL,
  `status` tinyint(3) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=75 DEFAULT CHARSET=utf8mb4 COMMENT '测试订单表';

添加控制器

在app模块新增一个控制器/app/controller/Order.php,内容如下:

<?php
declare (strict_types=1);

namespace app\controller;

use app\service\RabbitmqService;
use Exception;
use think\Response;
use think\response\Json;

class Order
{
    /**
     *
     * 模拟客户下单
     * @return Response
     */
    public function index()
    {
        $db = db('orders');
        $getConnection = $db->getConnection();
        $timeOut = 1800;
        $id = $getConnection->transaction(function () use ($db, $timeOut) {
            $time = time();
            $orderData = [
                'orderno' => uniqid('hm'),
                'price' => mt_rand(10, 999),
                'create_time' => date('Y-m-d H:i:s', $time),
                'status' => 0,
            ];
            $id = $db->insertGetId($orderData);
            $rabbitmqService = app()->make(RabbitmqService::class);
            $rabbitmqService->sendDelayedMsg([
                'id' => $id,
            ], $timeOut);
            return $id;
        });
        if ($id) {
            $msg = sprintf('订单插入成功:【%d】;在 %f 分钟内未支付,将自动取消订单~', $id, $timeOut/60);
            return json(['msg' => $msg]);
        }
        return json(['msg' => 'fail']);
    }

    /**
     * 模拟异步发送邮件
     * @return Json
     * @throws Exception
     */
    public function email()
    {
        $mailData = [
            'email' => mt_rand(10000000, 999999999) . '@qq.com',
        ];
        $rabbitmqService = app()->make(RabbitmqService::class);
        $res = $rabbitmqService->sendMsg($mailData);
        return $res ? json(['msg' => $mailData['email']]) : json(['msg' => 'fail']);
    }
}

添加路由

修改位置:/route/app.php,新增2个访问的地址

Route::group('order',function(){
    Route::get('index', Order::class.'::index');
    Route::get('email', Order::class.'::email');
});

添加订单取消和发送邮件的操作脚本

主要是用在php think xx 命令行中执行

新建订单取消命令行脚本

新建文件:/app/command/OrderCancel.php,文件内容如下:

<?php
declare (strict_types=1);

namespace app\command;

use app\service\RabbitmqService;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;
use think\facade\Log;

class OrderCancel extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('orderCancel')
            ->setDescription('订单超时未支付自动取消订单');
    }

    protected function execute(Input $input, Output $output)
    {
        // 指令输出
        $output->writeln('orderCancel');
        $rabbitmqService = app()->make(RabbitmqService::class);

        //声明交换机
        $params = new AMQPTable([
            'x-delayed-type' => 'direct',
        ]);
        $rabbitmqService->channel->exchange_declare(config('rabbitmq.exchange'), 'x-delayed-message', false, true, false, false, false, $params);
        //声明队列
        $rabbitmqService->channel->queue_declare(config('rabbitmq.queue'), false, true, false, false);
        //绑定队列和交换机
        $rabbitmqService->channel->queue_bind(config('rabbitmq.queue'), config('rabbitmq.exchange'), config('rabbitmq.routing_key'));


        $callback = function (AMQPMessage $msg) use ($rabbitmqService, $output) {
            $output->writeln('In callBack');
            $data = json_decode($msg->getBody(), true);
            $output->writeln($msg->getBody());
            try {
                $res = $this->cancelOrder($data['id']);
                if ($res) {
                    $output->writeln('订单【' . $data['id'] . '】处理完毕');
                    $msg->ack();
                    return;
                }
                $msg->nack();
            } catch (\Exception $exception) {
                Log::error($exception->getMessage());
                $msg->nack();
            }
        };
        // 设置 QoS,防止消费者一次性获取过多消息
        $rabbitmqService->channel->basic_qos(null, 1, null);
        $rabbitmqService->channel->basic_consume(config('rabbitmq.queue'), '', false, false, false, false, $callback);
        try {
            $rabbitmqService->channel->consume();
        } catch (\Exception $exception) {
            $rabbitmqService->channel->close();
            $rabbitmqService->connection->close();
            Log::info("监听异常:" . $exception->getMessage());
        }
    }


    /**
     * 取消订单
     * @param int $orderId
     * @return bool
     * @throws \think\db\exception\DataNotFoundException
     * @throws \think\db\exception\DbException
     * @throws \think\db\exception\ModelNotFoundException
     */
    public function cancelOrder(int $orderId)
    {
        try {
            // 尝试执行一个简单的查询来检查连接状态
            Db::query('SELECT 1');
        } catch (\Exception $e) {
            Log::error('数据库连接断开: ' . $e->getMessage());
            // 关闭当前连接
            Db::close();
            // 重新连接
            Db::connect();
        }

        $db = db('orders');
        $order = $db->where('id', $orderId)->find();
        if ($order['status'] != 0) return false;
        $res = $db->where('id', $order['id'])->update([
            'status' => -1,
            'cancel_time' => date('Y-m-d H:i:s'),
        ]);
        return (bool)$res;
    }
}

新建发送邮件命令行脚本

新建文件:/app/command/SendEmail.php,文件内容如下:

<?php
declare (strict_types=1);

namespace app\command;

use app\service\RabbitmqService;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Exception;
use think\facade\Db;
use think\facade\Log;

class SendEmail extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('sendEmail')
            ->setDescription('发送邮件');
    }

    protected function execute(Input $input, Output $output)
    {
        // 指令输出
        $output->writeln('sendEmail');
        $rabbitmqService = app()->make(RabbitmqService::class);

        $rabbitmqService->channel->exchange_declare('send_email.exchange', 'direct', false, true, false);
        $rabbitmqService->channel->queue_declare('send_mail.queue', false, true, false, false);
        $rabbitmqService->channel->queue_bind('send_mail.queue', 'send_email.exchange');

        $callback = function (AMQPMessage $msg) use ($rabbitmqService, $output) {
            try {
                $data = json_decode($msg->getBody(), true);
                $output->writeln($msg->getBody());
                Log::info($msg->getBody());
                $output->writeln('邮件【' . $data['email'] . '】处理完毕');
                $msg->ack();
            } catch (\Exception $exception) {
                $output->writeln($exception->getMessage());
                $msg->nack();
            }
        };
        // 设置 QoS,防止消费者一次性获取过多消息
        $rabbitmqService->channel->basic_qos(null, 1, null);
        $rabbitmqService->channel->basic_consume('send_mail.queue', '', false, false, false, false, $callback);
        try {
            $rabbitmqService->channel->consume();
        } catch (\Exception $exception) {
            $rabbitmqService->channel->close();
            $rabbitmqService->connection->close();
            Log::info("监听异常:" . $exception->getMessage());
        }
    }
}

注册到think命令

新建好命令行执行脚本后,还需要注册到think中,否则使用php think xx会找不到命令

修改文件:/config/console.php,文件内容如下:

<?php
return [
    // 指令定义
    'commands' => [
        'orderCancel' => \app\command\OrderCancel::class,  //注册 可以用 php think orderCancel 执行
        'sendEmail' => \app\command\SendEmail::class, //注册 可以用 php think sendEmail 执行
    ],
];

这时候,可以去到项目根目录,执行php think,就能查看到自己定义的命令了

监听未支付订单取消

直接去到项目的根目录执行 php think orderCancel,不要关闭窗口

监听邮件发送

新建一个窗口,去到项目的根目录执行php think sendEmail不要关闭窗口

测试

直接修改文件/app/controller/Order.php对应的方法,修改成自己想要的结果,在浏览器访问:
http://xxx/order/index模拟订单取消
http://xxx/order/email模拟邮件发送

这时候再去控制台查看监听进程,正常情况会输出对应的打印信息了。

rabbitmq延迟队列thinkphp8
朗读
赞(0)
版权属于:

Dcr163的博客

本文链接:

https://www.dcr163.cn/761.html(转载时请注明本文出处及文章链接)

评论 (0)

人生倒计时

今日已经过去小时
这周已经过去
本月已经过去
今年已经过去个月

最新回复

  1. Emmajop
    2025-09-28
  2. 噢噢噢
    2025-09-24
  3. Labeling Machine
    2025-09-22
  4. http://goldenfiber.ru/forum/?PAGE_NAME=profile_view&UID=55151&backurl=%2Fforum%2F%3FPAGE_NAME%3Dprofile_view%26UID%3D32514
    2025-07-10
  5. https://www.triptipedia.com/user/phmapaladnet
    2025-05-08

标签云