Files
caliverse_server/ServerCore/Task/TaskSerializer.cs
2025-11-28 16:54:56 +09:00

251 lines
9.6 KiB
C#

//#define LOCK_LOG_ON // Lock 관련 로그 활성화 설정
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Axion.Collections.Concurrent;
using NeoSmart.AsyncLock;
namespace ServerCore;
// HANDOVER: 넘겨받은 Task를 순서가 보장된 절차로 처리해 주고,
// 중첩된 Task 실행 방지를 위해 Transaction ID로 필터링 해준다.
//=============================================================================================
// TaskSerializer에 Func<Task> 함수를 담기위한 클래스 이다.
//
// author : kangms
//
//=============================================================================================
public class LogicFunction
{
private readonly string m_trans_id;
private readonly Func<Task> m_logic_func;
private readonly string m_logic_func_name;
public LogicFunction(Func<Task> logicFunc, string logicFuncName)
{
m_trans_id = string.Empty;
m_logic_func = logicFunc;
m_logic_func_name = logicFuncName;
}
public LogicFunction(string transId, Func<Task> logicFunc, string logicFuncName)
{
m_trans_id = transId;
m_logic_func = logicFunc;
m_logic_func_name = logicFuncName;
}
public string getTransId() => m_trans_id;
public Func<Task> getLogicFunc() => m_logic_func;
public string getLogicFuncName() => m_logic_func_name;
public string toBasicString()
{
return $"LogicFunc(TransId:{m_trans_id}, LogicFuncName:{m_logic_func_name})";
}
}
//=============================================================================================
// Func<Task> 함수를 등록하면 단일 Task 기반으로 실행시켜주는 클래스 이다 !!!
//
// author : kangms
//
//=============================================================================================
public class TaskSerializer
{
private readonly ConcurrentHashSet<string> m_waiting_trans_ids = new();
private readonly ConcurrentQueue<LogicFunction> m_queue_logic_funcs = new();
private readonly SemaphoreSlim m_semaphore = new SemaphoreSlim(0);
private Task? m_processing_task;
private AsyncLock m_lock = new();
public TaskSerializer()
{}
public async Task<bool> postLogicFuncWithTransId(string transId, Func<Task> logicFunc, string logicFuncName = "")
{
var logic_func = new LogicFunction(transId, logicFunc, logicFuncName);
if (false == m_waiting_trans_ids.TryAdd(transId, out _))
{
Log.getLogger().warn( $"Falied to Enqueue LogicFunc in TaskSerializer.postLogicFuncWithTransId() !!! : {logic_func.toBasicString()}"
+ $" - transId:{transId}, QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
return false;
}
try
{
await enqueueLogicFunc(logic_func);
}
catch (Exception e)
{
Log.getLogger().error( $"Failed to enqueueLogicFunc() in TaskSerializer.postLogicFuncWithTransId() !!!, Exception:{e}, {logic_func.toBasicString()}"
+ $" - transId:{transId}, QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}" );
return false;
}
return true;
}
public async Task<bool> postLogicFunc(Func<Task> logicFunc, string logicFuncName = "")
{
var logic_func = new LogicFunction(logicFunc, logicFuncName);
try
{
await enqueueLogicFunc(logic_func);
}
catch (Exception e)
{
Log.getLogger().error( $"Failed to enqueueLogicFunc() in TaskSerializer.postLogicFunc() !!!, Exception:{e}, {logic_func.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}" );
return false;
}
return true;
}
private async Task enqueueLogicFunc(LogicFunction logicFunc)
{
m_queue_logic_funcs.Enqueue(logicFunc);
m_semaphore.Release();
#if TASK_SERIALIZER_LOG_ON
Log.getLogger().debug( $"Enqueue LogicFunc in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#endif//TASK_SERIALIZER_LOG_ON
#if LOCK_LOG_ON
Log.getLogger().debug( $"LOCK TRY in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#endif//LOCK_LOG_ON
using (await m_lock.LockAsync())
{
#if LOCK_LOG_ON
Log.getLogger().debug( $"LOCKED in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}" );
#endif//LOCK_LOG_ON
if (null == m_processing_task || true == m_processing_task.IsCompleted)
{
m_processing_task = Task.Run(() => runTask());
#if TASK_SERIALIZER_LOG_ON
Log.getLogger().debug( $"Create Task in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#endif//TASK_SERIALIZER_LOG_ON
}
#if LOCK_LOG_ON
Log.getLogger().debug( $"UNLOCK in TaskSerializer.enqueueLogicFunc() !!! : {logicFunc.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#endif//LOCK_LOG_ON
}
}
private async Task runTask()
{
while (true)
{
await m_semaphore.WaitAsync().ConfigureAwait(false);
LogicFunction? next_logic_func = null;
if (false == m_queue_logic_funcs.TryDequeue(out next_logic_func))
{
Log.getLogger().error("Semaphore released but queue was empty in TaskSerializer.runTask() !!!");
continue;
}
var trans_id = string.Empty;
Func<Task>? logic_func = null;
var logic_func_name = string.Empty;
try
{
NullReferenceCheckHelper.throwIfNull(next_logic_func, () => $"next_logic_func is null !!!");
trans_id = next_logic_func.getTransId();
logic_func = next_logic_func.getLogicFunc();
NullReferenceCheckHelper.throwIfNull(logic_func, () => $"logic_func is null !!! - transId:{trans_id}");
logic_func_name = next_logic_func.getLogicFuncName();
#if TASK_SERIALIZER_LOG_ON
Log.getLogger().debug( $"Dequeue LogicFunc in TaskSerializer.runTask() : {next_logic_func.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#endif//TASK_SERIALIZER_LOG_ON
if (false == trans_id.isNullOrWhiteSpace())
{
m_waiting_trans_ids.Remove(trans_id);
}
await logic_func().ConfigureAwait(false);
#if TASK_SERIALIZER_LOG_ON
Log.getLogger().debug( $"logic_func().ConfigureAwait(false) in TaskSerializer.runTask() !!!"
+ $" - {next_logic_func.toBasicString()}, QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#endif//TASK_SERIALIZER_LOG_ON
}
catch (Exception e)
{
Log.getLogger().debug( $"Failed to logic_func().ConfigureAwait(false) in TaskSerializer.runTask() !!!, Exception:{e}, {next_logic_func.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
}
#if LOCK_LOG_ON
Log.getLogger().debug( $"LOCK TRY by TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#endif//LOCK_LOG_ON
using (await m_lock.LockAsync())
{
#if LOCK_LOG_ON
Log.getLogger().debug( $"LOCKED by TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}" );
#endif//LOCK_LOG_ON
if (true == m_queue_logic_funcs.IsEmpty)
{
m_processing_task = null;
NullReferenceCheckHelper.throwIfNull(next_logic_func, () => $"next_logic_func is null !!!");
Log.getLogger().debug($"Release Task in TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#if LOCK_LOG_ON
Log.getLogger().debug( $"UNLOCK by TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#endif//LOCK_LOG_ON
return;
}
#if LOCK_LOG_ON
Log.getLogger().debug( $"UNLOCK by TaskSerializer.runTask() !!! : {next_logic_func.toBasicString()}"
+ $" - QLFC:{getQueueingLogicFuncCount()}, WTIDC:{m_waiting_trans_ids.Count}");
#endif//LOCK_LOG_ON
}
}
}
public Int32 getQueueingLogicFuncCount() => m_queue_logic_funcs.Count;
public Task? getProcessingTask() => m_processing_task;
}