Files
caliverse_server/ServerCore/RabbitMQ/RabbitMqConnectorBase.cs
2025-11-28 16:54:56 +09:00

185 lines
4.9 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace ServerCore;
// HANDOVER: RabbitMQ Wrapper 클래스 이고, 채널 생성 및 해당 채널의 이벤트를 수신 받고 처리해주는 연결 구조를 제공 한다.
public abstract class RabbitMQConnectorBase
{
private readonly string m_service_name;
private IConnection? m_connection = null;
private IModel? m_default_channel;
private CancellationTokenSource m_cts = new();
private TaskSerializer m_task_serializer = new();
private Int32 m_next_req_id = 1;
private readonly Dictionary<int, (TaskCompletionSource<object>, Action<Task<object>>)> m_tcs = new();
private readonly Dictionary<string, IModel> m_exchange_channels = new();
public RabbitMQConnectorBase( string serviceName
, string address, int port
, string username, string password
, bool useSSL )
{
m_service_name = serviceName;
try
{
var factory = new ConnectionFactory()
{
HostName = address,
Port = port,
UserName = username,
Password = password,
Ssl = { ServerName = address, Enabled = useSSL, Version = System.Security.Authentication.SslProtocols.Tls12 },
};
m_connection = factory.CreateConnection();
}
catch (Exception e)
{
var err_msg = $"Exception !!!, Failed to perform in RabbitMQConnectorBase() !!! : exception:{e}"
+ $" - servieName:{serviceName}, address:{address}, port:{port}, userName:{username}, password:{password}, useSsl:{useSSL}";
Log.getLogger().error(err_msg);
}
}
public int nextReqId()
{
return Interlocked.Increment(ref m_next_req_id);
}
public virtual bool startConsumer()
{
if(null == m_connection)
{
return false;
}
m_default_channel = m_connection.CreateModel();
var declare = m_default_channel.QueueDeclare( queue: m_service_name,
durable: true,
exclusive: false,
autoDelete: true,
arguments: null );
var consumer = new EventingBasicConsumer(m_default_channel);
consumer.Received += onRecvJsonMessageFromConsumer;
//consumer.Received += onRecvProtoMessageFromConsumer;
var consume_result = m_default_channel.BasicConsume( queue: m_service_name,
autoAck: true,
consumer: consumer );
return true;
}
public virtual void stop()
{
m_cts.Cancel();
foreach(var channel in m_exchange_channels.Values)
{
channel.Dispose();
}
m_default_channel?.Dispose();
m_connection?.Dispose();
}
public IModel? createExchangeChannel( string exchangeName, string exchangeType)
{
if( true == m_exchange_channels.ContainsKey(exchangeName) )
{
return null;
}
if( null == m_connection )
{
return null;
}
var to_add_channel = m_connection?.CreateModel();
if(null == to_add_channel)
{
return null;
}
to_add_channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType);
var queueName = to_add_channel.QueueDeclare(autoDelete: true).QueueName;
to_add_channel.QueueBind( queue: queueName,
exchange: exchangeName,
routingKey: "" );
var consumer = new EventingBasicConsumer(to_add_channel);
consumer.Received += onRecvJsonMessageFromConsumer;
//consumer.Received += onRecvProtoMessageFromConsumer;
to_add_channel.BasicConsume( queue: queueName,
autoAck: true,
consumer: consumer );
m_exchange_channels.Add(exchangeName, to_add_channel);
return to_add_channel;
}
protected abstract void onRecvProtoMessageFromConsumer(object? sender, BasicDeliverEventArgs ea);
protected abstract void onRecvJsonMessageFromConsumer(object? sender, BasicDeliverEventArgs ea);
public Task<object>? registerCompletionSource(Int32 reqId, CancellationToken cancelToken, Action<Task<object>> callback)
{
if (m_tcs.TryGetValue(reqId, out var taskCompletionSource) == true)
{
return null;
}
var task_cs = new TaskCompletionSource<object>();
cancelToken.Register(() =>
{
task_cs.TrySetCanceled();
callback(task_cs.Task);
});
m_tcs.Add(reqId, (task_cs, callback));
return task_cs.Task;
}
public IModel? getExchangeChannel(string exchangeName)
{
m_exchange_channels.TryGetValue(exchangeName, out var found_model);
return found_model;
}
public IModel? getDefaultChannel() => m_default_channel;
public Dictionary<int, (TaskCompletionSource<object>, Action<Task<object>>)> getTaskCompletionSources() => m_tcs;
public TaskSerializer getTaskSerializer() => m_task_serializer;
public IConnection? getConnection() => m_connection;
public string getServiceName() => m_service_name;
}