Shadow
Thinkphp8.0使用Rabbitmq的做队列和延迟队列代码实例
前提是已经安装好Rabbitmq的服务和安装好Rabbitmq消息延迟队列插件
新建数据表
使用的orders
表结构
CREATE TABLE `orders` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`orderno` varchar(60) NOT NULL DEFAULT '' COMMENT '订单号',
`price` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '价格',
`create_time` datetime DEFAULT NULL,
`cancel_time` datetime DEFAULT NULL,
`status` tinyint(3) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=75 DEFAULT CHARSET=utf8mb4 COMMENT '测试订单表';
添加控制器
在app模块新增一个控制器/app/controller/Order.php
,内容如下:
<?php
declare (strict_types=1);
namespace app\controller;
use app\service\RabbitmqService;
use Exception;
use think\Response;
use think\response\Json;
class Order
{
/**
*
* 模拟客户下单
* @return Response
*/
public function index()
{
$db = db('orders');
$getConnection = $db->getConnection();
$timeOut = 1800;
$id = $getConnection->transaction(function () use ($db, $timeOut) {
$time = time();
$orderData = [
'orderno' => uniqid('hm'),
'price' => mt_rand(10, 999),
'create_time' => date('Y-m-d H:i:s', $time),
'status' => 0,
];
$id = $db->insertGetId($orderData);
$rabbitmqService = app()->make(RabbitmqService::class);
$rabbitmqService->sendDelayedMsg([
'id' => $id,
], $timeOut);
return $id;
});
if ($id) {
$msg = sprintf('订单插入成功:【%d】;在 %f 分钟内未支付,将自动取消订单~', $id, $timeOut/60);
return json(['msg' => $msg]);
}
return json(['msg' => 'fail']);
}
/**
* 模拟异步发送邮件
* @return Json
* @throws Exception
*/
public function email()
{
$mailData = [
'email' => mt_rand(10000000, 999999999) . '@qq.com',
];
$rabbitmqService = app()->make(RabbitmqService::class);
$res = $rabbitmqService->sendMsg($mailData);
return $res ? json(['msg' => $mailData['email']]) : json(['msg' => 'fail']);
}
}
添加路由
修改位置:/route/app.php
,新增2个访问的地址
Route::group('order',function(){
Route::get('index', Order::class.'::index');
Route::get('email', Order::class.'::email');
});
添加订单取消和发送邮件的操作脚本
主要是用在php think xx
命令行中执行
新建订单取消命令行脚本
新建文件:/app/command/OrderCancel.php
,文件内容如下:
<?php
declare (strict_types=1);
namespace app\command;
use app\service\RabbitmqService;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;
use think\facade\Log;
class OrderCancel extends Command
{
protected function configure()
{
// 指令配置
$this->setName('orderCancel')
->setDescription('订单超时未支付自动取消订单');
}
protected function execute(Input $input, Output $output)
{
// 指令输出
$output->writeln('orderCancel');
$rabbitmqService = app()->make(RabbitmqService::class);
//声明交换机
$params = new AMQPTable([
'x-delayed-type' => 'direct',
]);
$rabbitmqService->channel->exchange_declare(config('rabbitmq.exchange'), 'x-delayed-message', false, true, false, false, false, $params);
//声明队列
$rabbitmqService->channel->queue_declare(config('rabbitmq.queue'), false, true, false, false);
//绑定队列和交换机
$rabbitmqService->channel->queue_bind(config('rabbitmq.queue'), config('rabbitmq.exchange'), config('rabbitmq.routing_key'));
$callback = function (AMQPMessage $msg) use ($rabbitmqService, $output) {
$output->writeln('In callBack');
$data = json_decode($msg->getBody(), true);
$output->writeln($msg->getBody());
try {
$res = $this->cancelOrder($data['id']);
if ($res) {
$output->writeln('订单【' . $data['id'] . '】处理完毕');
$msg->ack();
return;
}
$msg->nack();
} catch (\Exception $exception) {
Log::error($exception->getMessage());
$msg->nack();
}
};
// 设置 QoS,防止消费者一次性获取过多消息
$rabbitmqService->channel->basic_qos(null, 1, null);
$rabbitmqService->channel->basic_consume(config('rabbitmq.queue'), '', false, false, false, false, $callback);
try {
$rabbitmqService->channel->consume();
} catch (\Exception $exception) {
$rabbitmqService->channel->close();
$rabbitmqService->connection->close();
Log::info("监听异常:" . $exception->getMessage());
}
}
/**
* 取消订单
* @param int $orderId
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function cancelOrder(int $orderId)
{
try {
// 尝试执行一个简单的查询来检查连接状态
Db::query('SELECT 1');
} catch (\Exception $e) {
Log::error('数据库连接断开: ' . $e->getMessage());
// 关闭当前连接
Db::close();
// 重新连接
Db::connect();
}
$db = db('orders');
$order = $db->where('id', $orderId)->find();
if ($order['status'] != 0) return false;
$res = $db->where('id', $order['id'])->update([
'status' => -1,
'cancel_time' => date('Y-m-d H:i:s'),
]);
return (bool)$res;
}
}
新建发送邮件命令行脚本
新建文件:/app/command/SendEmail.php
,文件内容如下:
<?php
declare (strict_types=1);
namespace app\command;
use app\service\RabbitmqService;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Exception;
use think\facade\Db;
use think\facade\Log;
class SendEmail extends Command
{
protected function configure()
{
// 指令配置
$this->setName('sendEmail')
->setDescription('发送邮件');
}
protected function execute(Input $input, Output $output)
{
// 指令输出
$output->writeln('sendEmail');
$rabbitmqService = app()->make(RabbitmqService::class);
$rabbitmqService->channel->exchange_declare('send_email.exchange', 'direct', false, true, false);
$rabbitmqService->channel->queue_declare('send_mail.queue', false, true, false, false);
$rabbitmqService->channel->queue_bind('send_mail.queue', 'send_email.exchange');
$callback = function (AMQPMessage $msg) use ($rabbitmqService, $output) {
try {
$data = json_decode($msg->getBody(), true);
$output->writeln($msg->getBody());
Log::info($msg->getBody());
$output->writeln('邮件【' . $data['email'] . '】处理完毕');
$msg->ack();
} catch (\Exception $exception) {
$output->writeln($exception->getMessage());
$msg->nack();
}
};
// 设置 QoS,防止消费者一次性获取过多消息
$rabbitmqService->channel->basic_qos(null, 1, null);
$rabbitmqService->channel->basic_consume('send_mail.queue', '', false, false, false, false, $callback);
try {
$rabbitmqService->channel->consume();
} catch (\Exception $exception) {
$rabbitmqService->channel->close();
$rabbitmqService->connection->close();
Log::info("监听异常:" . $exception->getMessage());
}
}
}
注册到think命令
新建好命令行执行脚本后,还需要注册到think中,否则使用php think xx
会找不到命令
修改文件:/config/console.php
,文件内容如下:
<?php
return [
// 指令定义
'commands' => [
'orderCancel' => \app\command\OrderCancel::class, //注册 可以用 php think orderCancel 执行
'sendEmail' => \app\command\SendEmail::class, //注册 可以用 php think sendEmail 执行
],
];
这时候,可以去到项目根目录,执行php think
,就能查看到自己定义的命令了
监听未支付订单取消
直接去到项目的根目录执行 php think orderCancel
,不要关闭窗口
监听邮件发送
新建一个窗口,去到项目的根目录执行php think sendEmail
不要关闭窗口
测试
直接修改文件/app/controller/Order.php
对应的方法,修改成自己想要的结果,在浏览器访问:http://xxx/order/index
模拟订单取消http://xxx/order/email
模拟邮件发送
这时候再去控制台查看监听进程,正常情况会输出对应的打印信息了。
Dcr163的博客
https://www.dcr163.cn/761.html(转载时请注明本文出处及文章链接)