Golang并发操作RabbitMQ

send.go

通过amqp连接RabbitMQ,在通过协程发送信息

package main

import (
	"github.com/streadway/amqp"
	"log"
	"rabbitmqTest/utils"
	"sync"
)



func main() {
	//TODO 连接地址改为自己主机地址
	conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
	utils.FailOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	utils.FailOnError(err, "Failed to open a channel")
	defer ch.Close()

	bodyMap := make(map[string]string)

	bodyMap["test1"] = "a"
	bodyMap["test2"] = "b"
	bodyMap["test3"] = "c"

	var wg sync.WaitGroup
	errList := make(chan error, 2 * len(bodyMap))
	for name, body := range bodyMap {
		wg.Add(1)
		go func(name, body string) {
			defer wg.Done()
			q, err := ch.QueueDeclare(
				name, // name
				false,   // durable
				false,   // delete when unused
				false,   // exclusive
				false,   // no-wait
				nil,     // arguments
			)

			if err != nil {
				errList <- err
				return
			}

			err = ch.Publish(
				"",     // exchange
				q.Name, // routing key
				false,  // mandatory
				false,  // immediate
				amqp.Publishing{
					ContentType: "text/plain",
					Body:        []byte(body),
				})
			if err != nil {
				errList <- err
				return
			}
			log.Printf(" [x] Sent %s", body)
		}(name, body)
	}
	wg.Wait()
	close(errList)


	if len(errList) > 0{
		for err := range errList {
			utils.FailOnError(err, "Failed send message")
		}
	}


}

recv.go

package main

import (
	"log"
	"github.com/streadway/amqp"
	"rabbitmqTest/utils"
	"sync"
)



func main() {
	//TODO 连接地址改为自己主机地址
	conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
	utils.FailOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	utils.FailOnError(err, "Failed to open a channel")
	defer ch.Close()

	nameList := []string{
		"test1",
		"test2",
		"test3",
	}
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

	var wg sync.WaitGroup
	errList := make(chan error, 2 * len(nameList))
	for _, name := range nameList {
		go func(name string) {
			q, err := ch.QueueDeclare(
				name, // name
				false,   // durable
				false,   // delete when unused
				false,   // exclusive
				false,   // no-wait
				nil,     // arguments
			)
			utils.FailOnError(err, "Failed to declare a queue")
			msgs, err := ch.Consume(
				q.Name, // queue
				"",     // consumer
				true,   // auto-ack
				false,  // exclusive
				false,  // no-local
				false,  // no-wait
				nil,    // args
			)
			utils.FailOnError(err, "Failed to register a consumer")
			for d := range msgs {
				log.Printf("Received a message: %s", d.Body)
			}
		}(name)
	}
	wg.Wait()
	close(errList)

	if len(errList) > 0{
		for err := range errList {
			utils.FailOnError(err, "Failed send message")
		}
	}

	forever := make(chan bool)


	<-forever
}

log.go

package utils
import (
	"log"
)
func FailOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

上一篇:【爬虫】 爬虫请求json数据,返回乱码问题的解决


下一篇:从零开始的VIO——Allan方差工具