using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace ServerCore; public abstract class RabbitMQConnectorBase { private readonly string m_service_name; private IConnection? m_connection = null; private IModel? m_default_channel; private CancellationTokenSource m_cts = new(); private TaskSerializer m_task_serializer = new(); private Int32 m_next_req_id = 1; private readonly Dictionary, Action>)> m_tcs = new(); private readonly Dictionary m_exchange_channels = new(); public RabbitMQConnectorBase( string serviceName , string address, int port , string username, string password , bool useSSL ) { m_service_name = serviceName; try { var factory = new ConnectionFactory() { HostName = address, Port = port, UserName = username, Password = password, Ssl = { ServerName = address, Enabled = useSSL, Version = System.Security.Authentication.SslProtocols.Tls12 }, }; m_connection = factory.CreateConnection(); } catch (Exception e) { var err_msg = $"Exception !!!, Failed to perform in RabbitMQConnectorBase() !!! : exception:{e}" + $" - servieName:{serviceName}, address:{address}, port:{port}, userName:{username}, password:{password}, useSsl:{useSSL}"; Log.getLogger().error(err_msg); } } public int nextReqId() { return Interlocked.Increment(ref m_next_req_id); } public virtual bool startConsumer() { if(null == m_connection) { return false; } m_default_channel = m_connection.CreateModel(); var declare = m_default_channel.QueueDeclare( queue: m_service_name, durable: true, exclusive: false, autoDelete: true, arguments: null ); var consumer = new EventingBasicConsumer(m_default_channel); consumer.Received += onRecvJsonMessageFromConsumer; //consumer.Received += onRecvProtoMessageFromConsumer; var consume_result = m_default_channel.BasicConsume( queue: m_service_name, autoAck: true, consumer: consumer ); return true; } public virtual void stop() { m_cts.Cancel(); foreach(var channel in m_exchange_channels.Values) { channel.Dispose(); } m_default_channel?.Dispose(); m_connection?.Dispose(); } public IModel? createExchangeChannel( string exchangeName, string exchangeType) { if( true == m_exchange_channels.ContainsKey(exchangeName) ) { return null; } if( null == m_connection ) { return null; } var to_add_channel = m_connection?.CreateModel(); if(null == to_add_channel) { return null; } to_add_channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType); var queueName = to_add_channel.QueueDeclare(autoDelete: true).QueueName; to_add_channel.QueueBind( queue: queueName, exchange: exchangeName, routingKey: "" ); var consumer = new EventingBasicConsumer(to_add_channel); consumer.Received += onRecvJsonMessageFromConsumer; //consumer.Received += onRecvProtoMessageFromConsumer; to_add_channel.BasicConsume( queue: queueName, autoAck: true, consumer: consumer ); m_exchange_channels.Add(exchangeName, to_add_channel); return to_add_channel; } protected abstract void onRecvProtoMessageFromConsumer(object? sender, BasicDeliverEventArgs ea); protected abstract void onRecvJsonMessageFromConsumer(object? sender, BasicDeliverEventArgs ea); public Task? registerCompletionSource(Int32 reqId, CancellationToken cancelToken, Action> callback) { if (m_tcs.TryGetValue(reqId, out var taskCompletionSource) == true) { return null; } var task_cs = new TaskCompletionSource(); cancelToken.Register(() => { task_cs.TrySetCanceled(); callback(task_cs.Task); }); m_tcs.Add(reqId, (task_cs, callback)); return task_cs.Task; } public IModel? getExchangeChannel(string exchangeName) { m_exchange_channels.TryGetValue(exchangeName, out var found_model); return found_model; } public IModel? getDefaultChannel() => m_default_channel; public Dictionary, Action>)> getTaskCompletionSources() => m_tcs; public TaskSerializer getTaskSerializer() => m_task_serializer; public IConnection? getConnection() => m_connection; public string getServiceName() => m_service_name; }