rabbitmqtest.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "log"
  7. "time"
  8. )
  9. var conn *amqp.Connection
  10. var channel *amqp.Channel
  11. var count = 0
  12. const (
  13. queueName = "test.topic.hello"
  14. exchange = ""
  15. vHost = "amqp://wph:wph@10.10.10.27:5672/"
  16. //vHost = "amqp://wph:wph@127.0.0.1:5672/"
  17. )
  18. func main() {
  19. var err error
  20. conn, err = amqp.Dial(vHost)
  21. failOnError(err, "failed to connect tp rabbitmq")
  22. defer conn.Close()
  23. channel, err = conn.Channel()
  24. failOnError(err, "failed to open a channel")
  25. defer channel.Close()
  26. _, err = channel.QueueDeclare(
  27. queueName, // name
  28. false, // durable
  29. false, // delete when unused
  30. false, // exclusive
  31. false, // no-wait
  32. nil, // arguments
  33. )
  34. failOnError(err, "Failed to declare a queue")
  35. go func() {
  36. for {
  37. for i := 0; i < 200; i++ {
  38. push()
  39. }
  40. time.Sleep(1 * time.Second)
  41. }
  42. }()
  43. receive()
  44. fmt.Println("end")
  45. }
  46. func failOnError(err error, msg string) {
  47. if err != nil {
  48. log.Fatalf("%s:%s", msg, err)
  49. panic(fmt.Sprintf("%s:%s", msg, err))
  50. }
  51. }
  52. func push() {
  53. msgContent := "hello world!"
  54. channel.Publish(exchange, queueName, false, false, amqp.Publishing{
  55. ContentType: "text/plain",
  56. Body: []byte(msgContent),
  57. })
  58. }
  59. func receive() {
  60. messages, err := channel.Consume(queueName, "", true, false, false, false, nil)
  61. failOnError(err, "")
  62. forever := make(chan bool)
  63. go func() {
  64. for d := range messages {
  65. s := BytesToString(&(d.Body))
  66. count++
  67. fmt.Printf("receve msg is: %s -- %d\n", *s, count)
  68. }
  69. }()
  70. fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
  71. <-forever
  72. }
  73. func BytesToString(b *[]byte) *string {
  74. s := bytes.NewBuffer(*b)
  75. r := s.String()
  76. return &r
  77. }