Publishing Messages

  1. 连接到Broker
  2. 获取Channel
  3. 声明一个Exchange
  4. 创建一个Message
  5. 发布这个Message
  6. 关闭Channel
  7. 关闭Connection
 1ConnectionFactory connectionFactory = new ConnectionFactory();
 2// AMQP URI amqp://userName:password@hostName:portNumber/virtualHost"
 3connectionFactory.setUri("amqp://test:[email protected]:5672/demo");
 4Connection connection = connectionFactory.newConnection();
 5Channel channel = connection.createChannel();
 6channel.exchangeDeclare("first-exchange", "direct", true);
 7byte[] msgBody = "Hello, World!".getBytes("UTF-8");
 8channel.basicPublish("first-exchange", "helloRoutingKey", null, msgBody);
 9channel.close();
10connection.close();

Receiving Messages by Subscription

 1public static void main(String[] args) throws Exception {
 2    ConnectionFactory connectionFactory = new ConnectionFactory();
 3    connectionFactory.setUri("amqp://test:[email protected]:5672/demo");
 4    Connection connection = connectionFactory.newConnection();
 5    final Channel channel = connection.createChannel();
 6    channel.exchangeDeclare("first-exchange", "direct", true);
 7    channel.queueDeclare("hello-queue", true, false, false, null);
 8    channel.queueBind("hello-queue", "first-exchange", "helloRoutingKey");
 9    channel.basicConsume("hello-queue", new DefaultConsumer(channel) {
10        @Override
11        public void handleDelivery(String consumerTag, Envelope envelope, 
12        	BasicProperties properties, byte[] body)
13            throws IOException {
14            String msg = new String(body, "UTF-8");
15            System.out.println(msg);
16            channel.basicAck(envelope.getDeliveryTag(), false);
17        }
18    });
19    Scanner scanner = new Scanner(System.in);
20    scanner.nextLine();
21    scanner.close();
22    channel.close();
23    connection.close();
24}