# sim_con_mq **Repository Path**: sumerily/sim_con_mq ## Basic Information - **Project Name**: sim_con_mq - **Description**: 简单方便mq依赖包。。。。。 - **Primary Language**: Go - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-12-03 - **Last Updated**: 2025-12-03 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # sim_con_mq 一个简单易用的 RabbitMQ Go 客户端封装库,提供了简洁的 API 来实现消息的发布和订阅功能。 ## ✨ 特性 - 🚀 简洁的 API 设计,快速上手 - 🔧 灵活的配置选项(选项模式 + 构建器模式) - 🔄 支持手动/自动消息确认 - 📦 内置消息重试机制 - 🎯 支持多种交换机类型(direct, fanout, topic, headers) - 🔌 连接管理和自动重连 - 📝 完善的日志输出 ## 📦 安装 ```bash go get github.com/your-username/sim_con_mq ``` 或者在 `go.mod` 中添加: ``` require sim_con_mq v1.0.0 ``` ## 🚀 快速开始 ### 环境要求 - Go 1.23+ - RabbitMQ 服务器(配置非默认端口,如 15673) ### 基本使用 #### 1. 发布消息(生产者) ```go package main import ( "context" "log" mq "sim_con_mq" "sim_con_mq/config" "sim_con_mq/options" ) func main() { // 创建客户端 client := mq.NewClient( options.WithHost("localhost"), options.WithPort(15673), // 使用非默认端口 options.WithCredentials("guest", "guest"), options.WithExchangeName("my_exchange"), options.WithExchangeType(config.ExchangeDirect), options.WithRoutingKey("my.routing.key"), ) // 连接 if err := client.Connect(); err != nil { log.Fatalf("连接失败: %v", err) } defer client.Close() // 发布消息 ctx := context.Background() err := client.Publish(ctx, []byte(`{"message": "Hello, MQ!"}`)) if err != nil { log.Fatalf("发布失败: %v", err) } log.Println("消息发布成功") } ``` #### 2. 订阅消息(消费者) ```go package main import ( "context" "log" "os" "os/signal" "syscall" mq "sim_con_mq" "sim_con_mq/config" "sim_con_mq/consumer" "sim_con_mq/options" ) func main() { // 创建客户端 client := mq.NewClient( options.WithHost("localhost"), options.WithPort(15673), options.WithCredentials("guest", "guest"), options.WithExchangeName("my_exchange"), options.WithExchangeType(config.ExchangeDirect), options.WithQueueName("my_queue"), options.WithConsumerTag("my.routing.key"), options.WithAutoAck(false), // 手动确认 options.WithPrefetchCount(1), // 预取数量 options.WithRetryCount(3), // 重试次数 ) // 连接 if err := client.Connect(); err != nil { log.Fatalf("连接失败: %v", err) } defer client.Close() // 订阅消息 err := client.Subscribe(func(ctx context.Context, msg *consumer.Message) error { log.Printf("收到消息: %s", string(msg.Body)) // 返回 nil 表示处理成功,消息将被确认 // 返回 error 将触发重试 return nil }) if err != nil { log.Fatalf("订阅失败: %v", err) } // 等待退出信号 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan } ``` ### 使用构建器模式 ```go client := mq.NewClientBuilder(). WithHost("localhost"). WithPort(15673). WithCredentials("guest", "guest"). WithExchange("my_exchange", config.ExchangeDirect). WithQueue("my_queue"). WithRoutingKey("my.routing.key"). WithAutoAck(false). Build() ``` ## 📖 配置选项 ### 连接配置 | 选项 | 方法 | 默认值 | 说明 | |------|------|--------|------| | Host | `WithHost(host)` | `localhost` | MQ服务器地址 | | Port | `WithPort(port)` | `15673` | 端口号(非默认) | | Username | `WithCredentials(user, pwd)` | `guest` | 用户名 | | Password | `WithCredentials(user, pwd)` | `guest` | 密码 | | Vhost | `WithVhost(vhost)` | `/` | 虚拟主机 | ### 交换机配置 | 选项 | 方法 | 默认值 | 说明 | |------|------|--------|------| | Name | `WithExchangeName(name)` | - | 交换机名称 | | Type | `WithExchangeType(type)` | `direct` | 交换机类型 | | Durable | `WithExchangeDurable(bool)` | `true` | 是否持久化 | | AutoDelete | `WithExchangeAutoDelete(bool)` | `false` | 是否自动删除 | ### 交换机类型 ```go config.ExchangeDirect // 直连交换机 config.ExchangeFanout // 扇出交换机(广播) config.ExchangeTopic // 主题交换机 config.ExchangeHeaders // 头部交换机 ``` ### 队列配置 | 选项 | 方法 | 默认值 | 说明 | |------|------|--------|------| | Name | `WithQueueName(name)` | - | 队列名称 | | Durable | `WithQueueDurable(bool)` | `true` | 是否持久化 | | AutoDelete | `WithQueueAutoDelete(bool)` | `false` | 是否自动删除 | | Exclusive | `WithQueueExclusive(bool)` | `false` | 是否排他 | ### 消费者配置 | 选项 | 方法 | 默认值 | 说明 | |------|------|--------|------| | AutoAck | `WithAutoAck(bool)` | `false` | 是否自动确认 | | ConsumerTag | `WithConsumerTag(tag)` | - | 消费者标签 | | PrefetchCount | `WithPrefetchCount(count)` | `1` | 预取消息数量 | | RetryCount | `WithRetryCount(count)` | `3` | 失败重试次数 | | RetryDelay | `WithRetryDelay(duration)` | `5s` | 重试间隔 | ### 生产者配置 | 选项 | 方法 | 默认值 | 说明 | |------|------|--------|------| | RoutingKey | `WithRoutingKey(key)` | - | 路由键 | | Mandatory | `WithMandatory(bool)` | `false` | 是否强制 | ## 🔧 消息确认 ### 手动确认模式(推荐) ```go // 设置手动确认 options.WithAutoAck(false) // 在处理函数中 func handler(ctx context.Context, msg *consumer.Message) error { // 处理消息... // 返回 nil 自动确认 return nil // 或手动确认 // msg.Ack() // 或拒绝并重新入队 // msg.Nack(true) // 或拒绝不重新入队 // msg.Reject(false) } ``` ### 自动确认模式 ```go options.WithAutoAck(true) ``` ## 📁 项目结构 ``` sim_con_mq/ ├── client.go # 主客户端入口 ├── config/ │ └── config.go # 配置结构定义 ├── options/ │ └── options.go # 选项模式配置 ├── connection/ │ └── connection.go # 连接管理 ├── producer/ │ └── producer.go # 生产者实现 ├── consumer/ │ └── consumer.go # 消费者实现 ├── examples/ │ ├── producer/ │ │ └── main.go # 生产者示例 │ └── consumer/ │ └── main.go # 消费者示例 ├── go.mod └── README.md ``` ## 🎨 设计模式 本库采用多种设计模式保证可扩展性和解耦: 1. **选项模式(Options Pattern)** - 灵活的参数配置 2. **建造者模式(Builder Pattern)** - 链式调用构建客户端 3. **工厂模式(Factory Pattern)** - 创建生产者和消费者 4. **策略模式(Strategy Pattern)** - 不同的消息确认策略 ## ⚠️ 注意事项 1. **端口配置**:本库默认使用 `15673` 端口,请确保 RabbitMQ 配置了该端口 2. **连接管理**:使用完毕后请调用 `client.Close()` 关闭连接 3. **错误处理**:消息处理函数返回 error 会触发重试机制 4. **优雅退出**:建议使用信号处理实现优雅退出 ## 🔗 RabbitMQ 端口配置 在 RabbitMQ 配置文件中添加非默认端口监听: ```ini # rabbitmq.conf listeners.tcp.1 = 0.0.0.0:15673 ``` 或者使用 Docker: ```bash docker run -d --name rabbitmq \ -p 15673:5672 \ -p 15672:15672 \ rabbitmq:management ``` ## 📄 License MIT License