Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Algorithm/Eocron.Algorithms.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AsyncKeyedLock" Version="6.3.4" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="7.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" />
<PackageReference Include="System.Memory" Version="4.5.5" />
Expand All @@ -36,4 +37,4 @@
<Link>README.md</Link>
</Content>
</ItemGroup>
</Project>
</Project>
87 changes: 9 additions & 78 deletions Algorithm/FileCache/Async/FileCacheAsync.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Eocron.Algorithms.Disposing;
using AsyncKeyedLock;
using Eocron.Algorithms.Disposing;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand All @@ -14,79 +15,6 @@ public sealed class FileCacheAsync<TKey> : IFileCacheAsync<TKey>, IDisposable
#region Private helper classes
private delegate Task CancellableAction(CancellationToken token);

private sealed class PerKeySemaphoreSlim : IDisposable
{
private sealed class RefCounted<T>
{
public RefCounted(T value)
{
RefCount = 1;
Value = value;
}

public int RefCount { get; set; }
public T Value { get; private set; }
}

private readonly SemaphoreSlim _lock = new SemaphoreSlim(1);
private readonly Dictionary<object, RefCounted<SemaphoreSlim>> _dict = new Dictionary<object, RefCounted<SemaphoreSlim>>();

private async Task<RefCounted<SemaphoreSlim>> GetOrCreate(object key, CancellationToken token)
{
RefCounted<SemaphoreSlim> item;
await _lock.WaitAsync(token);
try
{
if (_dict.TryGetValue(key, out item))
{
++item.RefCount;
}
else
{
item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
_dict[key] = item;
}
}
finally
{
_lock.Release();
}

return item;
}

public async Task<IDisposable> LockAsync(object key, CancellationToken token)
{
var item = await GetOrCreate(key, token);
await item.Value.WaitAsync(CancellationToken.None);
return new Disposable(() =>
{
_lock.Wait(CancellationToken.None);
try
{
--item.RefCount;
if (item.RefCount == 0)
_dict.Remove(key);
}
finally
{
_lock.Release();
}

item.Value.Release();
});
}

public void Dispose()
{
_lock.Dispose();
foreach (var v in _dict.Values)
{
v.Value.Dispose();
}
}
}

private sealed class CFileCacheEntry : AnyExpirationPolicy
{
public string FilePath { get; set; }
Expand Down Expand Up @@ -168,7 +96,7 @@ public void Dispose()

private readonly int _gcIntervalMs = 5 * 1000;
private readonly int _gcFailRetryIntervalMs = 10 * 1000;
private readonly PerKeySemaphoreSlim _perKeyLock;
private readonly AsyncKeyedLocker<TKey> _perKeyLock;
private readonly IFileSystemAsync _fs;
private readonly string _baseFolder;
private readonly AsyncReaderWriterLock _cacheLock;
Expand Down Expand Up @@ -203,7 +131,11 @@ public FileCacheAsync(string baseFolder, IFileSystemAsync fileSystem = null, boo
{
if (baseFolder == null)
throw new ArgumentNullException(nameof(baseFolder));
_perKeyLock = new PerKeySemaphoreSlim();
_perKeyLock = new AsyncKeyedLocker<TKey>(o =>
{
o.PoolSize = 20;
o.PoolInitialFill = 1;
});
_cacheLock = new AsyncReaderWriterLock();
_fs = fileSystem ?? FileSystemAsync.Instance;
_baseFolder = baseFolder;
Expand Down Expand Up @@ -464,7 +396,7 @@ private async Task<CFileCacheEntry> InternalGetOrAdd(TKey key, Func<TKey, Task<C

//per key mutex required to avoid multiple file upload to cache by same key.
//files are very large objects, so unnecessary actions with them should be avoided if possible.
using (await _perKeyLock.LockAsync(key, token))
using (await _perKeyLock.LockAsync(key, token).ConfigureAwait(false))
{
if (_entries.TryGetValue(key, out cacheEntry))
return cacheEntry.Pulse(policy);
Expand Down Expand Up @@ -749,7 +681,6 @@ public void Dispose()
{
_cts?.Dispose();
_gc?.Wait();
_perKeyLock.Dispose();
//_cacheLock.Dispose();
}
}
Expand Down
85 changes: 8 additions & 77 deletions Algorithm/FileCache/FileCache.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Eocron.Algorithms.Disposing;
using AsyncKeyedLock;
using Eocron.Algorithms.Disposing;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand All @@ -14,79 +15,6 @@ public sealed class FileCache<TKey> : IFileCache<TKey>, IDisposable
#region Private helper classes
private delegate void CancellableAction(CancellationToken token);

private sealed class PerKeySemaphoreSlim : IDisposable
{
private sealed class RefCounted<T>
{
public RefCounted(T value)
{
RefCount = 1;
Value = value;
}

public int RefCount { get; set; }
public T Value { get; private set; }
}

private readonly SemaphoreSlim _lock = new SemaphoreSlim(1);
private readonly Dictionary<object, RefCounted<SemaphoreSlim>> _dict = new Dictionary<object, RefCounted<SemaphoreSlim>>();

private RefCounted<SemaphoreSlim> GetOrCreate(object key, CancellationToken token)
{
RefCounted<SemaphoreSlim> item;
_lock.Wait(token);
try
{
if (_dict.TryGetValue(key, out item))
{
++item.RefCount;
}
else
{
item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
_dict[key] = item;
}
}
finally
{
_lock.Release();
}

return item;
}

public IDisposable Lock(object key, CancellationToken token)
{
var item = GetOrCreate(key, token);
item.Value.Wait(CancellationToken.None);
return new Disposable(() =>
{
_lock.Wait(CancellationToken.None);
try
{
--item.RefCount;
if (item.RefCount == 0)
_dict.Remove(key);
}
finally
{
_lock.Release();
}

item.Value.Release();
});
}

public void Dispose()
{
_lock.Dispose();
foreach (var v in _dict.Values)
{
v.Value.Dispose();
}
}
}

private sealed class CFileCacheEntry : AnyExpirationPolicy
{
public string FilePath { get; set; }
Expand Down Expand Up @@ -168,7 +96,7 @@ public void Dispose()

private readonly int _gcIntervalMs = 5 * 1000;
private readonly int _gcFailRetryIntervalMs = 10 * 1000;
private readonly PerKeySemaphoreSlim _perKeyLock;
private readonly AsyncKeyedLocker<TKey> _perKeyLock;
private readonly IFileSystem _fs;
private readonly string _baseFolder;
private readonly ReaderWriterLockSlim _cacheLock;
Expand Down Expand Up @@ -203,7 +131,11 @@ public FileCache(string baseFolder, IFileSystem fileSystem = null, bool disableG
{
if (baseFolder == null)
throw new ArgumentNullException(nameof(baseFolder));
_perKeyLock = new PerKeySemaphoreSlim();
_perKeyLock = new AsyncKeyedLocker<TKey>(o =>
{
o.PoolSize = 20;
o.PoolInitialFill = 1;
});
_cacheLock = new ReaderWriterLockSlim();
_fs = fileSystem ?? FileSystem.Instance;
_baseFolder = baseFolder;
Expand Down Expand Up @@ -750,7 +682,6 @@ public void Dispose()
{
_cts?.Dispose();
_gc?.Join();
_perKeyLock.Dispose();
//_cacheLock.Dispose();
}
}
Expand Down