2012-06-03
Publishing Messages
#
- 连接到Broker
- 获取Channel
- 声明一个Exchange
- 创建一个Message
- 发布这个Message
- 关闭Channel
- 关闭Connection
1ConnectionFactory connectionFactory = new ConnectionFactory();
2// AMQP URI amqp://userName:password@hostName:portNumber/virtualHost"
3connectionFactory.setUri("amqp://test:[email protected]:5672/demo");
4Connection connection = connectionFactory.newConnection();
5Channel channel = connection.createChannel();
6channel.exchangeDeclare("first-exchange", "direct", true);
7byte[] msgBody = "Hello, World!".getBytes("UTF-8");
8channel.basicPublish("first-exchange", "helloRoutingKey", null, msgBody);
9channel.close();
10connection.close();
Receiving Messages by Subscription
#
1public static void main(String[] args) throws Exception {
2 ConnectionFactory connectionFactory = new ConnectionFactory();
3 connectionFactory.setUri("amqp://test:[email protected]:5672/demo");
4 Connection connection = connectionFactory.newConnection();
5 final Channel channel = connection.createChannel();
6 channel.exchangeDeclare("first-exchange", "direct", true);
7 channel.queueDeclare("hello-queue", true, false, false, null);
8 channel.queueBind("hello-queue", "first-exchange", "helloRoutingKey");
9 channel.basicConsume("hello-queue", new DefaultConsumer(channel) {
10 @Override
11 public void handleDelivery(String consumerTag, Envelope envelope,
12 BasicProperties properties, byte[] body)
13 throws IOException {
14 String msg = new String(body, "UTF-8");
15 System.out.println(msg);
16 channel.basicAck(envelope.getDeliveryTag(), false);
17 }
18 });
19 Scanner scanner = new Scanner(System.in);
20 scanner.nextLine();
21 scanner.close();
22 channel.close();
23 connection.close();
24}