123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- package main
- import (
- "bytes"
- "fmt"
- "github.com/streadway/amqp"
- "log"
- "time"
- )
- var conn *amqp.Connection
- var channel *amqp.Channel
- var count = 0
- const (
- queueName = "test.topic.hello"
- exchange = ""
- vHost = "amqp://wph:wph@10.10.10.27:5672/"
- //vHost = "amqp://wph:wph@127.0.0.1:5672/"
- )
- func main() {
- var err error
- conn, err = amqp.Dial(vHost)
- failOnError(err, "failed to connect tp rabbitmq")
- defer conn.Close()
- channel, err = conn.Channel()
- failOnError(err, "failed to open a channel")
- defer channel.Close()
- _, err = channel.QueueDeclare(
- queueName, // name
- false, // durable
- false, // delete when unused
- false, // exclusive
- false, // no-wait
- nil, // arguments
- )
- failOnError(err, "Failed to declare a queue")
- go func() {
- for {
- for i := 0; i < 200; i++ {
- push()
- }
- time.Sleep(1 * time.Second)
- }
- }()
- receive()
- fmt.Println("end")
- }
- func failOnError(err error, msg string) {
- if err != nil {
- log.Fatalf("%s:%s", msg, err)
- panic(fmt.Sprintf("%s:%s", msg, err))
- }
- }
- func push() {
- msgContent := "hello world!"
- channel.Publish(exchange, queueName, false, false, amqp.Publishing{
- ContentType: "text/plain",
- Body: []byte(msgContent),
- })
- }
- func receive() {
- messages, err := channel.Consume(queueName, "", true, false, false, false, nil)
- failOnError(err, "")
- forever := make(chan bool)
- go func() {
- for d := range messages {
- s := BytesToString(&(d.Body))
- count++
- fmt.Printf("receve msg is: %s -- %d\n", *s, count)
- }
- }()
- fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
- <-forever
- }
- func BytesToString(b *[]byte) *string {
- s := bytes.NewBuffer(*b)
- r := s.String()
- return &r
- }
|