251 lines
9.6 KiB
C#
251 lines
9.6 KiB
C#
//#define LOCK_LOG_ON // Lock 관련 로그 활성화 설정
|
|
|
|
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
using Axion.Collections.Concurrent;
|
|
using NeoSmart.AsyncLock;
|
|
|
|
|
|
|
|
namespace ServerCore;
|
|
|
|
|
|
// HANDOVER: 넘겨받은 Task를 순서가 보장된 절차로 처리해 주고,
|
|
// 중첩된 Task 실행 방지를 위해 Transaction ID로 필터링 해준다.
|
|
|
|
|
|
//=============================================================================================
|
|
// TaskSerializer에 Func<Task> 함수를 담기위한 클래스 이다.
|
|
//
|
|
// author : kangms
|
|
//
|
|
//=============================================================================================
|
|
public class LogicFunction
|
|
{
|
|
private readonly string m_trans_id;
|
|
private readonly Func<Task> m_logic_func;
|
|
private readonly string m_logic_func_name;
|
|
|
|
public LogicFunction(Func<Task> logicFunc, string logicFuncName)
|
|
{
|
|
m_trans_id = string.Empty;
|
|
m_logic_func = logicFunc;
|
|
m_logic_func_name = logicFuncName;
|
|
}
|
|
|
|
public LogicFunction(string transId, Func<Task> logicFunc, string logicFuncName)
|
|
{
|
|
m_trans_id = transId;
|
|
m_logic_func = logicFunc;
|
|
m_logic_func_name = logicFuncName;
|
|
}
|
|
|
|
public string getTransId() => m_trans_id;
|
|
|
|
public Func<Task> getLogicFunc() => m_logic_func;
|
|
|
|
public string getLogicFuncName() => m_logic_func_name;
|
|
|
|
public string toBasicString()
|
|
{
|
|
return $"LogicFunc(TransId:{m_trans_id}, LogicFuncName:{m_logic_func_name})";
|
|
}
|
|
}
|
|
|
|
//=============================================================================================
|
|
// Func<Task> 함수를 등록하면 단일 Task 기반으로 실행시켜주는 클래스 이다 !!!
|
|
//
|
|
// author : kangms
|
|
//
|
|
//=============================================================================================
|
|
public class TaskSerializer
|
|
{
|
|
private readonly ConcurrentHashSet<string> m_waiting_trans_ids = new();
|
|
|
|
private readonly ConcurrentQueue<LogicFunction> m_queue_logic_funcs = new();
|
|
private readonly SemaphoreSlim m_semaphore = new SemaphoreSlim(0);
|
|
|
|
private Task? m_processing_task;
|
|
|
|
private AsyncLock m_lock = new();
|
|
|
|
|
|
public TaskSerializer()
|
|
{}
|
|
|
|
public async Task<bool> postLogicFuncWithTransId(string transId, Func<Task> logicFunc, string logicFuncName = "")
|
|
{
|
|
var logic_func = new LogicFunction(transId, logicFunc, logicFuncName);
|
|
|
|
if (false == m_waiting_trans_ids.TryAdd(transId, out _))
|
|
{
|
|
Log.getLogger().warn( $"Falied to Enqueue LogicFunc in TaskSerializer.postLogicFuncWithTransId() !!! : {logic_func.toBasicString()}"
|
|
+ $" - transId:{transId}, QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
return false;
|
|
}
|
|
|
|
try
|
|
{
|
|
await enqueueLogicFunc(logic_func);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Log.getLogger().error( $"Failed to enqueueLogicFunc() in TaskSerializer.postLogicFuncWithTransId() !!!, Exception:{e}, {logic_func.toBasicString()}"
|
|
+ $" - transId:{transId}, QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}" );
|
|
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
public async Task<bool> postLogicFunc(Func<Task> logicFunc, string logicFuncName = "")
|
|
{
|
|
var logic_func = new LogicFunction(logicFunc, logicFuncName);
|
|
|
|
try
|
|
{
|
|
await enqueueLogicFunc(logic_func);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Log.getLogger().error( $"Failed to enqueueLogicFunc() in TaskSerializer.postLogicFunc() !!!, Exception:{e}, {logic_func.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}" );
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
private async Task enqueueLogicFunc(LogicFunction logicFunc)
|
|
{
|
|
m_queue_logic_funcs.Enqueue(logicFunc);
|
|
m_semaphore.Release();
|
|
|
|
#if TASK_SERIALIZER_LOG_ON
|
|
Log.getLogger().debug( $"Enqueue LogicFunc in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
#endif//TASK_SERIALIZER_LOG_ON
|
|
|
|
#if LOCK_LOG_ON
|
|
Log.getLogger().debug( $"LOCK TRY in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
#endif//LOCK_LOG_ON
|
|
using (await m_lock.LockAsync())
|
|
{
|
|
#if LOCK_LOG_ON
|
|
Log.getLogger().debug( $"LOCKED in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}" );
|
|
#endif//LOCK_LOG_ON
|
|
if (null == m_processing_task || true == m_processing_task.IsCompleted)
|
|
{
|
|
m_processing_task = Task.Run(() => runTask());
|
|
|
|
#if TASK_SERIALIZER_LOG_ON
|
|
Log.getLogger().debug( $"Create Task in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
#endif//TASK_SERIALIZER_LOG_ON
|
|
}
|
|
#if LOCK_LOG_ON
|
|
Log.getLogger().debug( $"UNLOCK in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
#endif//LOCK_LOG_ON
|
|
}
|
|
}
|
|
|
|
private async Task runTask()
|
|
{
|
|
while (true)
|
|
{
|
|
await m_semaphore.WaitAsync().ConfigureAwait(false);
|
|
|
|
LogicFunction? next_logic_func = null;
|
|
|
|
if (false == m_queue_logic_funcs.TryDequeue(out next_logic_func))
|
|
{
|
|
Log.getLogger().error("Semaphore released but queue was empty in TaskSerializer.runTask() !!!");
|
|
continue;
|
|
}
|
|
|
|
var trans_id = string.Empty;
|
|
Func<Task>? logic_func = null;
|
|
var logic_func_name = string.Empty;
|
|
|
|
try
|
|
{
|
|
NullReferenceCheckHelper.throwIfNull(next_logic_func, () => $"next_logic_func is null !!!");
|
|
|
|
trans_id = next_logic_func.getTransId();
|
|
logic_func = next_logic_func.getLogicFunc();
|
|
NullReferenceCheckHelper.throwIfNull(logic_func, () => $"logic_func is null !!! - transId:{trans_id}");
|
|
|
|
logic_func_name = next_logic_func.getLogicFuncName();
|
|
|
|
#if TASK_SERIALIZER_LOG_ON
|
|
Log.getLogger().debug( $"Dequeue LogicFunc in TaskSerializer.runTask() : {next_logic_func.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
#endif//TASK_SERIALIZER_LOG_ON
|
|
if (false == trans_id.isNullOrWhiteSpace())
|
|
{
|
|
m_waiting_trans_ids.Remove(trans_id);
|
|
}
|
|
|
|
await logic_func().ConfigureAwait(false);
|
|
|
|
#if TASK_SERIALIZER_LOG_ON
|
|
Log.getLogger().debug( $"logic_func().ConfigureAwait(false) in TaskSerializer.runTask() !!!"
|
|
+ $" - {next_logic_func.toBasicString()}, QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
#endif//TASK_SERIALIZER_LOG_ON
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Log.getLogger().debug( $"Failed to logic_func().ConfigureAwait(false) in TaskSerializer.runTask() !!!, Exception:{e}, {next_logic_func.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
}
|
|
|
|
#if LOCK_LOG_ON
|
|
Log.getLogger().debug( $"LOCK TRY by TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
#endif//LOCK_LOG_ON
|
|
using (await m_lock.LockAsync())
|
|
{
|
|
#if LOCK_LOG_ON
|
|
Log.getLogger().debug( $"LOCKED by TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}" );
|
|
#endif//LOCK_LOG_ON
|
|
if (true == m_queue_logic_funcs.IsEmpty)
|
|
{
|
|
m_processing_task = null;
|
|
|
|
NullReferenceCheckHelper.throwIfNull(next_logic_func, () => $"next_logic_func is null !!!");
|
|
|
|
Log.getLogger().debug($"Release Task in TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
|
|
#if LOCK_LOG_ON
|
|
Log.getLogger().debug( $"UNLOCK by TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
#endif//LOCK_LOG_ON
|
|
return;
|
|
}
|
|
#if LOCK_LOG_ON
|
|
Log.getLogger().debug( $"UNLOCK by TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
|
|
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
|
|
#endif//LOCK_LOG_ON
|
|
}
|
|
}
|
|
}
|
|
|
|
public Int32 getQueueingLogicFuncCount() => m_queue_logic_funcs.Count;
|
|
|
|
public Task? getProcessingTask() => m_processing_task;
|
|
}
|