185 lines
4.9 KiB
C#
185 lines
4.9 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
|
|
|
|
namespace ServerCore;
|
|
|
|
|
|
// HANDOVER: RabbitMQ Wrapper 클래스 이고, 채널 생성 및 해당 채널의 이벤트를 수신 받고 처리해주는 연결 구조를 제공 한다.
|
|
|
|
|
|
public abstract class RabbitMQConnectorBase
|
|
{
|
|
private readonly string m_service_name;
|
|
private IConnection? m_connection = null;
|
|
|
|
private IModel? m_default_channel;
|
|
private CancellationTokenSource m_cts = new();
|
|
private TaskSerializer m_task_serializer = new();
|
|
|
|
private Int32 m_next_req_id = 1;
|
|
private readonly Dictionary<int, (TaskCompletionSource<object>, Action<Task<object>>)> m_tcs = new();
|
|
|
|
private readonly Dictionary<string, IModel> m_exchange_channels = new();
|
|
|
|
|
|
public RabbitMQConnectorBase( string serviceName
|
|
, string address, int port
|
|
, string username, string password
|
|
, bool useSSL )
|
|
{
|
|
m_service_name = serviceName;
|
|
|
|
try
|
|
{
|
|
var factory = new ConnectionFactory()
|
|
{
|
|
HostName = address,
|
|
Port = port,
|
|
UserName = username,
|
|
Password = password,
|
|
Ssl = { ServerName = address, Enabled = useSSL, Version = System.Security.Authentication.SslProtocols.Tls12 },
|
|
};
|
|
|
|
m_connection = factory.CreateConnection();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
var err_msg = $"Exception !!!, Failed to perform in RabbitMQConnectorBase() !!! : exception:{e}"
|
|
+ $" - servieName:{serviceName}, address:{address}, port:{port}, userName:{username}, password:{password}, useSsl:{useSSL}";
|
|
Log.getLogger().error(err_msg);
|
|
}
|
|
}
|
|
|
|
public int nextReqId()
|
|
{
|
|
return Interlocked.Increment(ref m_next_req_id);
|
|
}
|
|
|
|
public virtual bool startConsumer()
|
|
{
|
|
if(null == m_connection)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
m_default_channel = m_connection.CreateModel();
|
|
var declare = m_default_channel.QueueDeclare( queue: m_service_name,
|
|
durable: true,
|
|
exclusive: false,
|
|
autoDelete: true,
|
|
arguments: null );
|
|
|
|
var consumer = new EventingBasicConsumer(m_default_channel);
|
|
|
|
consumer.Received += onRecvJsonMessageFromConsumer;
|
|
//consumer.Received += onRecvProtoMessageFromConsumer;
|
|
|
|
var consume_result = m_default_channel.BasicConsume( queue: m_service_name,
|
|
autoAck: true,
|
|
consumer: consumer );
|
|
|
|
return true;
|
|
}
|
|
|
|
public virtual void stop()
|
|
{
|
|
m_cts.Cancel();
|
|
|
|
foreach(var channel in m_exchange_channels.Values)
|
|
{
|
|
channel.Dispose();
|
|
}
|
|
|
|
m_default_channel?.Dispose();
|
|
m_connection?.Dispose();
|
|
}
|
|
|
|
public IModel? createExchangeChannel( string exchangeName, string exchangeType)
|
|
{
|
|
if( true == m_exchange_channels.ContainsKey(exchangeName) )
|
|
{
|
|
return null;
|
|
}
|
|
|
|
if( null == m_connection )
|
|
{
|
|
return null;
|
|
}
|
|
|
|
var to_add_channel = m_connection?.CreateModel();
|
|
if(null == to_add_channel)
|
|
{
|
|
return null;
|
|
}
|
|
|
|
to_add_channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType);
|
|
|
|
var queueName = to_add_channel.QueueDeclare(autoDelete: true).QueueName;
|
|
to_add_channel.QueueBind( queue: queueName,
|
|
exchange: exchangeName,
|
|
routingKey: "" );
|
|
|
|
var consumer = new EventingBasicConsumer(to_add_channel);
|
|
|
|
consumer.Received += onRecvJsonMessageFromConsumer;
|
|
//consumer.Received += onRecvProtoMessageFromConsumer;
|
|
|
|
to_add_channel.BasicConsume( queue: queueName,
|
|
autoAck: true,
|
|
consumer: consumer );
|
|
|
|
m_exchange_channels.Add(exchangeName, to_add_channel);
|
|
|
|
return to_add_channel;
|
|
}
|
|
|
|
protected abstract void onRecvProtoMessageFromConsumer(object? sender, BasicDeliverEventArgs ea);
|
|
|
|
protected abstract void onRecvJsonMessageFromConsumer(object? sender, BasicDeliverEventArgs ea);
|
|
|
|
|
|
public Task<object>? registerCompletionSource(Int32 reqId, CancellationToken cancelToken, Action<Task<object>> callback)
|
|
{
|
|
if (m_tcs.TryGetValue(reqId, out var taskCompletionSource) == true)
|
|
{
|
|
return null;
|
|
}
|
|
|
|
var task_cs = new TaskCompletionSource<object>();
|
|
|
|
cancelToken.Register(() =>
|
|
{
|
|
task_cs.TrySetCanceled();
|
|
callback(task_cs.Task);
|
|
});
|
|
|
|
m_tcs.Add(reqId, (task_cs, callback));
|
|
|
|
return task_cs.Task;
|
|
}
|
|
|
|
public IModel? getExchangeChannel(string exchangeName)
|
|
{
|
|
m_exchange_channels.TryGetValue(exchangeName, out var found_model);
|
|
return found_model;
|
|
}
|
|
|
|
public IModel? getDefaultChannel() => m_default_channel;
|
|
|
|
public Dictionary<int, (TaskCompletionSource<object>, Action<Task<object>>)> getTaskCompletionSources() => m_tcs;
|
|
|
|
public TaskSerializer getTaskSerializer() => m_task_serializer;
|
|
|
|
public IConnection? getConnection() => m_connection;
|
|
|
|
public string getServiceName() => m_service_name;
|
|
}
|