1
0
Fork 0
muzika-gromche/MuzikaGromche/Via/AsyncEventProcessor.cs

84 lines
2.4 KiB
C#

using System.Threading.Tasks;
namespace MuzikaGromche.Via;
public sealed class AsyncEventProcessor<T> where T : struct
{
private readonly object stateLock = new();
private T latestData;
private T lastProcessedData;
private bool isShutdownRequested;
private TaskCompletionSource<bool> signal = new(TaskCreationOptions.RunContinuationsAsynchronously);
public delegate ValueTask ProcessEventAsync(T oldData, T newData, bool shutdown);
private readonly ProcessEventAsync processEventAsync;
public AsyncEventProcessor(T initialData, ProcessEventAsync processEventAsync)
{
latestData = initialData;
lastProcessedData = initialData;
this.processEventAsync = processEventAsync;
_ = Task.Run(ProcessLoopAsync);
}
/// <summary>
/// Signals the processor that new data is available.
/// If <c>requestShutdown</c> is set, the processor will perform one last pass before exiting.
/// </summary>
public void Notify(T data, bool requestShutdown = false)
{
lock (stateLock)
{
latestData = data;
if (requestShutdown)
{
isShutdownRequested = true;
}
// Trigger the task to wake up
signal.TrySetResult(true);
}
}
private async Task ProcessLoopAsync()
{
bool running = true;
while (running)
{
Task<bool> nextSignal;
lock (stateLock)
{
nextSignal = signal.Task;
}
// Wait for a notification or shutdown signal
//
// VSTHRD003 fix: We are awaiting a task we didn't "start",
// but by using RunContinuationsAsynchronously in the TCS constructor,
// we guarantee the 'await' won't hijack the signaler's thread.
await nextSignal.ConfigureAwait(false);
T newData;
T oldData;
// Reset the signal for the next round
lock (stateLock)
{
signal = new(TaskCreationOptions.RunContinuationsAsynchronously);
if (isShutdownRequested)
{
running = false;
}
newData = latestData;
oldData = lastProcessedData;
lastProcessedData = newData;
}
await processEventAsync(oldData, newData, !running);
}
}
}