초기커밋
This commit is contained in:
180
ServerCore/RabbitMQ/RabbitMqConnectorBase.cs
Normal file
180
ServerCore/RabbitMQ/RabbitMqConnectorBase.cs
Normal file
@@ -0,0 +1,180 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
|
||||
namespace ServerCore;
|
||||
|
||||
public abstract class RabbitMQConnectorBase
|
||||
{
|
||||
private readonly string m_service_name;
|
||||
private IConnection? m_connection = null;
|
||||
|
||||
private IModel? m_default_channel;
|
||||
private CancellationTokenSource m_cts = new();
|
||||
private TaskSerializer m_task_serializer = new();
|
||||
|
||||
private Int32 m_next_req_id = 1;
|
||||
private readonly Dictionary<int, (TaskCompletionSource<object>, Action<Task<object>>)> m_tcs = new();
|
||||
|
||||
private readonly Dictionary<string, IModel> m_exchange_channels = new();
|
||||
|
||||
|
||||
public RabbitMQConnectorBase( string serviceName
|
||||
, string address, int port
|
||||
, string username, string password
|
||||
, bool useSSL )
|
||||
{
|
||||
m_service_name = serviceName;
|
||||
|
||||
try
|
||||
{
|
||||
var factory = new ConnectionFactory()
|
||||
{
|
||||
HostName = address,
|
||||
Port = port,
|
||||
UserName = username,
|
||||
Password = password,
|
||||
Ssl = { ServerName = address, Enabled = useSSL, Version = System.Security.Authentication.SslProtocols.Tls12 },
|
||||
};
|
||||
|
||||
m_connection = factory.CreateConnection();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
var err_msg = $"Exception !!!, Failed to perform in RabbitMQConnectorBase() !!! : exception:{e}"
|
||||
+ $" - servieName:{serviceName}, address:{address}, port:{port}, userName:{username}, password:{password}, useSsl:{useSSL}";
|
||||
Log.getLogger().error(err_msg);
|
||||
}
|
||||
}
|
||||
|
||||
public int nextReqId()
|
||||
{
|
||||
return Interlocked.Increment(ref m_next_req_id);
|
||||
}
|
||||
|
||||
public virtual bool startConsumer()
|
||||
{
|
||||
if(null == m_connection)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
m_default_channel = m_connection.CreateModel();
|
||||
var declare = m_default_channel.QueueDeclare( queue: m_service_name,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: true,
|
||||
arguments: null );
|
||||
|
||||
var consumer = new EventingBasicConsumer(m_default_channel);
|
||||
|
||||
consumer.Received += onRecvJsonMessageFromConsumer;
|
||||
//consumer.Received += onRecvProtoMessageFromConsumer;
|
||||
|
||||
var consume_result = m_default_channel.BasicConsume( queue: m_service_name,
|
||||
autoAck: true,
|
||||
consumer: consumer );
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public virtual void stop()
|
||||
{
|
||||
m_cts.Cancel();
|
||||
|
||||
foreach(var channel in m_exchange_channels.Values)
|
||||
{
|
||||
channel.Dispose();
|
||||
}
|
||||
|
||||
m_default_channel?.Dispose();
|
||||
m_connection?.Dispose();
|
||||
}
|
||||
|
||||
public IModel? createExchangeChannel( string exchangeName, string exchangeType)
|
||||
{
|
||||
if( true == m_exchange_channels.ContainsKey(exchangeName) )
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
if( null == m_connection )
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var to_add_channel = m_connection?.CreateModel();
|
||||
if(null == to_add_channel)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
to_add_channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType);
|
||||
|
||||
var queueName = to_add_channel.QueueDeclare(autoDelete: true).QueueName;
|
||||
to_add_channel.QueueBind( queue: queueName,
|
||||
exchange: exchangeName,
|
||||
routingKey: "" );
|
||||
|
||||
var consumer = new EventingBasicConsumer(to_add_channel);
|
||||
|
||||
consumer.Received += onRecvJsonMessageFromConsumer;
|
||||
//consumer.Received += onRecvProtoMessageFromConsumer;
|
||||
|
||||
to_add_channel.BasicConsume( queue: queueName,
|
||||
autoAck: true,
|
||||
consumer: consumer );
|
||||
|
||||
m_exchange_channels.Add(exchangeName, to_add_channel);
|
||||
|
||||
return to_add_channel;
|
||||
}
|
||||
|
||||
protected abstract void onRecvProtoMessageFromConsumer(object? sender, BasicDeliverEventArgs ea);
|
||||
|
||||
protected abstract void onRecvJsonMessageFromConsumer(object? sender, BasicDeliverEventArgs ea);
|
||||
|
||||
|
||||
public Task<object>? registerCompletionSource(Int32 reqId, CancellationToken cancelToken, Action<Task<object>> callback)
|
||||
{
|
||||
if (m_tcs.TryGetValue(reqId, out var taskCompletionSource) == true)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var task_cs = new TaskCompletionSource<object>();
|
||||
|
||||
cancelToken.Register(() =>
|
||||
{
|
||||
task_cs.TrySetCanceled();
|
||||
callback(task_cs.Task);
|
||||
});
|
||||
|
||||
m_tcs.Add(reqId, (task_cs, callback));
|
||||
|
||||
return task_cs.Task;
|
||||
}
|
||||
|
||||
public IModel? getExchangeChannel(string exchangeName)
|
||||
{
|
||||
m_exchange_channels.TryGetValue(exchangeName, out var found_model);
|
||||
return found_model;
|
||||
}
|
||||
|
||||
public IModel? getDefaultChannel() => m_default_channel;
|
||||
|
||||
public Dictionary<int, (TaskCompletionSource<object>, Action<Task<object>>)> getTaskCompletionSources() => m_tcs;
|
||||
|
||||
public TaskSerializer getTaskSerializer() => m_task_serializer;
|
||||
|
||||
public IConnection? getConnection() => m_connection;
|
||||
|
||||
public string getServiceName() => m_service_name;
|
||||
}
|
||||
Reference in New Issue
Block a user