package main import ( "bytes" "fmt" "github.com/streadway/amqp" "log" "time" ) var conn *amqp.Connection var channel *amqp.Channel var count = 0 const ( queueName = "test.topic.hello" exchange = "" vHost = "amqp://wph:wph@10.10.10.27:5672/" //vHost = "amqp://wph:wph@127.0.0.1:5672/" ) func main() { var err error conn, err = amqp.Dial(vHost) failOnError(err, "failed to connect tp rabbitmq") defer conn.Close() channel, err = conn.Channel() failOnError(err, "failed to open a channel") defer channel.Close() _, err = channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") go func() { for { for i := 0; i < 200; i++ { push() } time.Sleep(1 * time.Second) } }() receive() fmt.Println("end") } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s:%s", msg, err) panic(fmt.Sprintf("%s:%s", msg, err)) } } func push() { msgContent := "hello world!" channel.Publish(exchange, queueName, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msgContent), }) } func receive() { messages, err := channel.Consume(queueName, "", true, false, false, false, nil) failOnError(err, "") forever := make(chan bool) go func() { for d := range messages { s := BytesToString(&(d.Body)) count++ fmt.Printf("receve msg is: %s -- %d\n", *s, count) } }() fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n") <-forever } func BytesToString(b *[]byte) *string { s := bytes.NewBuffer(*b) r := s.String() return &r }