using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace NmsProducerClasses
{
public
class
MyActiveMq
{
private
IConnectionFactory factory;
private
IConnection connection;
private
ISession session;
private
IMessageProducer prod;
private
IMessageConsumer consumer;
private
ITextMessage msg;
private
bool isTopic =
false
;
private
bool hasSelector =
false
;
private
const
string ClientID =
"clientid"
;
private
const
string Selector =
"filter='demo'"
;
private
bool sendSuccess =
true
;
private
bool receiveSuccess =
true
;
public
MyActiveMq(bool isLocalMachine, string remoteAddress)
{
try
{
if
(isLocalMachine)
{
}
else
{
factory =
new
ConnectionFactory(
"tcp://"
+ remoteAddress +
":61616/"
); //写tcp://
192.168
.
1.111
:
61616
的形式连接其他服务器上的ActiveMQ服务器
}
connection = factory.CreateConnection();
connection.ClientId = ClientID;
connection.Start();
session = connection.CreateSession();
}
catch
(System.Exception e)
{
sendSuccess =
false
;
receiveSuccess =
false
;
Console.WriteLine(
"Exception:{0}"
, e.Message);
Console.ReadLine();
throw
e;
}
Console.WriteLine(
"Begin connection..."
);
}
~MyActiveMq()
{
}
public
bool InitQueueOrTopic(bool topic, string name, bool selector =
false
)
{
try
{
if
(topic)
{
prod = session.CreateProducer(
new
Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));
if
(selector)
{
consumer = session.CreateDurableConsumer(
new
Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, Selector,
false
);
hasSelector =
true
;
}
else
{
consumer = session.CreateDurableConsumer(
new
Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID,
null
,
false
);
hasSelector =
false
;
}
isTopic =
true
;
}
else
{
prod = session.CreateProducer(
new
Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
if
(selector)
{
consumer = session.CreateConsumer(
new
Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name), Selector);
hasSelector =
true
;
}
else
{
consumer = session.CreateConsumer(
new
Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
hasSelector =
false
;
}
isTopic =
false
;
}
msg = prod.CreateTextMessage();
}
catch
(System.Exception e)
{
sendSuccess =
false
;
receiveSuccess =
false
;
Console.WriteLine(
"Exception:{0}"
, e.Message);
Console.ReadLine();
throw
e;
}
return
sendSuccess;
}
public
bool SendMessage(string message, string msgId =
"defult"
, MsgPriority priority = MsgPriority.Normal)
{
if
(prod ==
null
)
{
sendSuccess =
false
;
Console.WriteLine(
"call InitQueueOrTopic() first!!"
);
return
false
;
}
Console.WriteLine(
"Begin send messages..."
);
msg.NMSCorrelationID = msgId;
msg.Properties[
"MyID"
] = msgId;
msg.NMSMessageId = msgId;
msg.Text = message;
Console.WriteLine(message);
if
(isTopic)
{
sendSuccess = ProducerSubcriber(message, priority);
}
else
{
sendSuccess = P2P(message, priority);
}
return
sendSuccess;
}
public
string GetMessage()
{
if
(prod ==
null
)
{
Console.WriteLine(
"call InitQueueOrTopic() first!!"
);
return
null
;
}
Console.WriteLine(
"Begin receive messages..."
);
ITextMessage revMessage =
null
;
try
{
revMessage = consumer.Receive(
new
TimeSpan(TimeSpan.TicksPerMillisecond *
10
)) as ITextMessage;
}
catch
(System.Exception e)
{
receiveSuccess =
false
;
Console.WriteLine(
"Exception:{0}"
, e.Message);
Console.ReadLine();
throw
e;
}
if
(revMessage ==
null
)
{
Console.WriteLine(
"No message received!"
);
return
null
;
}
else
{
Console.WriteLine(
"Received message with Correlation ID: "
+ revMessage.NMSCorrelationID);
Console.WriteLine(
"Received message with text: "
+ revMessage.Text);
}
return
revMessage.Text;
}
private
bool P2P(string message, MsgPriority priority)
{
try
{
if
(hasSelector)
{
msg.Properties.SetString(
"filter"
,
"demo"
);
}
prod.Priority = priority;
prod.DeliveryMode = MsgDeliveryMode.Persistent;
prod.Send(msg, MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
}
catch
(System.Exception e)
{
sendSuccess =
false
;
Console.WriteLine(
"Exception:{0}"
, e.Message);
Console.ReadLine();
throw
e;
}
return
sendSuccess;
}
private
bool ProducerSubcriber(string message, MsgPriority priority)
{
try
{
prod.Priority = priority;
prod.DeliveryMode = MsgDeliveryMode.Persistent;
prod.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
}
catch
(System.Exception e)
{
sendSuccess =
false
;
Console.WriteLine(
"Exception:{0}"
, e.Message);
Console.ReadLine();
throw
e;
}
return
sendSuccess;
}
public
void
ShutDown()
{
Console.WriteLine(
"Close connection and session..."
);
session.Close();
connection.Close();
}
}
}
请发表评论