using System.Threading.Tasks; namespace MuzikaGromche.Via; public sealed class AsyncEventProcessor where T : struct { private readonly object stateLock = new(); private T latestData; private T lastProcessedData; private bool isShutdownRequested; private TaskCompletionSource signal = new(TaskCreationOptions.RunContinuationsAsynchronously); public delegate ValueTask ProcessEventAsync(T oldData, T newData, bool shutdown); private readonly ProcessEventAsync processEventAsync; public AsyncEventProcessor(T initialData, ProcessEventAsync processEventAsync) { latestData = initialData; lastProcessedData = initialData; this.processEventAsync = processEventAsync; _ = Task.Run(ProcessLoopAsync); } /// /// Signals the processor that new data is available. /// If requestShutdown is set, the processor will perform one last pass before exiting. /// public void Notify(T data, bool requestShutdown = false) { lock (stateLock) { latestData = data; if (requestShutdown) { isShutdownRequested = true; } // Trigger the task to wake up signal.TrySetResult(true); } } private async Task ProcessLoopAsync() { bool running = true; while (running) { Task nextSignal; lock (stateLock) { nextSignal = signal.Task; } // Wait for a notification or shutdown signal // // VSTHRD003 fix: We are awaiting a task we didn't "start", // but by using RunContinuationsAsynchronously in the TCS constructor, // we guarantee the 'await' won't hijack the signaler's thread. await nextSignal.ConfigureAwait(false); T newData; T oldData; // Reset the signal for the next round lock (stateLock) { signal = new(TaskCreationOptions.RunContinuationsAsynchronously); if (isShutdownRequested) { running = false; } newData = latestData; oldData = lastProcessedData; lastProcessedData = newData; } await processEventAsync(oldData, newData, !running); } } }