Limit parallelism & add mutex

This commit is contained in:
ConfusedPolarBear 2022-05-17 02:06:34 -05:00
parent 05c5526bca
commit ba8ce041ca

View File

@ -46,8 +46,16 @@ public class FingerprinterTask : IScheduledTask
private readonly ILogger<FingerprinterTask> _logger; private readonly ILogger<FingerprinterTask> _logger;
/// <summary>
/// Lock which guards the fingerprint cache dictionary.
/// </summary>
private readonly object _fingerprintCacheLock = new object(); private readonly object _fingerprintCacheLock = new object();
/// <summary>
/// Lock which guards the shared dictionary of intros.
/// </summary>
private readonly object _introsLock = new object();
/// <summary> /// <summary>
/// Temporary fingerprint cache to speed up reanalysis. /// Temporary fingerprint cache to speed up reanalysis.
/// Fingerprints are removed from this after a season is analyzed. /// Fingerprints are removed from this after a season is analyzed.
@ -95,7 +103,11 @@ public class FingerprinterTask : IScheduledTask
var queue = Plugin.Instance!.AnalysisQueue; var queue = Plugin.Instance!.AnalysisQueue;
var totalProcessed = 0; var totalProcessed = 0;
Parallel.ForEach(queue, (season) => // TODO: make configurable
var options = new ParallelOptions();
options.MaxDegreeOfParallelism = 2;
Parallel.ForEach(queue, options, (season) =>
{ {
var first = season.Value[0]; var first = season.Value[0];
@ -140,6 +152,8 @@ public class FingerprinterTask : IScheduledTask
KeyValuePair<Guid, List<QueuedEpisode>> season, KeyValuePair<Guid, List<QueuedEpisode>> season,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
var seasonIntros = new Dictionary<Guid, Intro>();
var first = season.Value[0]; var first = season.Value[0];
/* Don't analyze specials or seasons with an insufficient number of episodes. /* Don't analyze specials or seasons with an insufficient number of episodes.
@ -206,8 +220,8 @@ public class FingerprinterTask : IScheduledTask
var (lhsIntro, rhsIntro) = FingerprintEpisodes(lhs, rhs); var (lhsIntro, rhsIntro) = FingerprintEpisodes(lhs, rhs);
Plugin.Instance.Intros![lhsIntro.EpisodeId] = lhsIntro; seasonIntros[lhsIntro.EpisodeId] = lhsIntro;
Plugin.Instance.Intros![rhsIntro.EpisodeId] = rhsIntro; seasonIntros[rhsIntro.EpisodeId] = rhsIntro;
if (!lhsIntro.Valid) if (!lhsIntro.Valid)
{ {
@ -223,7 +237,16 @@ public class FingerprinterTask : IScheduledTask
} }
} }
Plugin.Instance!.SaveTimestamps(); // Ensure only one thread at a time can update the shared intro dictionary.
lock (_introsLock)
{
foreach (var intro in seasonIntros)
{
Plugin.Instance!.Intros[intro.Key] = intro.Value;
}
Plugin.Instance!.SaveTimestamps();
}
if (cancellationToken.IsCancellationRequested || !everFoundIntro) if (cancellationToken.IsCancellationRequested || !everFoundIntro)
{ {
@ -603,7 +626,10 @@ public class FingerprinterTask : IScheduledTask
oldDuration, oldDuration,
newDuration); newDuration);
Plugin.Instance!.Intros[episode.EpisodeId] = newRhs; lock (_introsLock)
{
Plugin.Instance!.Intros[episode.EpisodeId] = newRhs;
}
} }
} }