*
(star) can substitute for exactly one word#
(hash) can substitute for zero or more wordsRouting Key를 바탕으로 구현한 pub-sub 통신
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://myuser:secret@localhost:5672',
function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'anonymous.info';
var msg = args.slice(1).join(' ') || 'Hello World!';
channel.assertExchange(exchange, 'topic', {
durable: false,
});
channel.publish(exchange, key, Buffer.from(msg));
console.log(' [x] Sent %s:\\'%s\\'', key, msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log('Usage: receive_logs_topic.js <facility>.<severity>');
process.exit(1);
}
amqp.connect('amqp://myuser:secret@localhost:5672',
function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
channel.assertExchange(exchange, 'topic', {
durable: false,
});
channel.assertQueue('', {
exclusive: true,
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach(function(key) {
channel.bindQueue(q.queue, exchange, key);
});
channel.consume(q.queue, function(msg) {
console.log(' [x] %s:\\'%s\\'', msg.fields.routingKey,
msg.content.toString());
}, {
noAck: true,
});
});
});
});