♻️ Refactor cache system with redis

🐛 Add lock to check in prevent multiple at the same time
This commit is contained in:
2025-05-24 17:29:24 +08:00
parent d4da5d7afc
commit 460ce62452
16 changed files with 569 additions and 161 deletions

View File

@ -0,0 +1,356 @@
using System.Text.Json;
using StackExchange.Redis;
namespace DysonNetwork.Sphere.Storage;
/// <summary>
/// Represents a distributed lock that can be used to synchronize access across multiple processes
/// </summary>
public interface IDistributedLock : IAsyncDisposable
{
/// <summary>
/// The resource identifier this lock is protecting
/// </summary>
string Resource { get; }
/// <summary>
/// Unique identifier for this lock instance
/// </summary>
string LockId { get; }
/// <summary>
/// Extends the lock's expiration time
/// </summary>
Task<bool> ExtendAsync(TimeSpan timeSpan);
/// <summary>
/// Releases the lock immediately
/// </summary>
Task ReleaseAsync();
}
public interface ICacheService
{
/// <summary>
/// Sets a value in the cache with an optional expiration time
/// </summary>
Task<bool> SetAsync<T>(string key, T value, TimeSpan? expiry = null);
/// <summary>
/// Gets a value from the cache
/// </summary>
Task<T?> GetAsync<T>(string key);
/// <summary>
/// Removes a specific key from the cache
/// </summary>
Task<bool> RemoveAsync(string key);
/// <summary>
/// Adds a key to a group for group-based operations
/// </summary>
Task AddToGroupAsync(string key, string group);
/// <summary>
/// Removes all keys associated with a specific group
/// </summary>
Task RemoveGroupAsync(string group);
/// <summary>
/// Gets all keys belonging to a specific group
/// </summary>
Task<IEnumerable<string>> GetGroupKeysAsync(string group);
/// <summary>
/// Helper method to set a value in cache and associate it with multiple groups in one operation
/// </summary>
/// <typeparam name="T">The type of value being cached</typeparam>
/// <param name="key">Cache key</param>
/// <param name="value">The value to cache</param>
/// <param name="groups">Optional collection of group names to associate the key with</param>
/// <param name="expiry">Optional expiration time for the cached item</param>
/// <returns>True if the set operation was successful</returns>
Task<bool> SetWithGroupsAsync<T>(string key, T value, IEnumerable<string>? groups = null, TimeSpan? expiry = null);
/// <summary>
/// Acquires a distributed lock on the specified resource
/// </summary>
/// <param name="resource">The resource identifier to lock</param>
/// <param name="expiry">How long the lock should be held before automatically expiring</param>
/// <param name="waitTime">How long to wait for the lock before giving up</param>
/// <param name="retryInterval">How often to retry acquiring the lock during the wait time</param>
/// <returns>A distributed lock instance if acquired, null otherwise</returns>
Task<IDistributedLock?> AcquireLockAsync(string resource, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null);
/// <summary>
/// Executes an action with a distributed lock, ensuring the lock is properly released afterwards
/// </summary>
/// <param name="resource">The resource identifier to lock</param>
/// <param name="action">The action to execute while holding the lock</param>
/// <param name="expiry">How long the lock should be held before automatically expiring</param>
/// <param name="waitTime">How long to wait for the lock before giving up</param>
/// <param name="retryInterval">How often to retry acquiring the lock during the wait time</param>
/// <returns>True if the lock was acquired and the action was executed, false otherwise</returns>
Task<bool> ExecuteWithLockAsync(string resource, Func<Task> action, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null);
/// <summary>
/// Executes a function with a distributed lock, ensuring the lock is properly released afterwards
/// </summary>
/// <typeparam name="T">The return type of the function</typeparam>
/// <param name="resource">The resource identifier to lock</param>
/// <param name="func">The function to execute while holding the lock</param>
/// <param name="expiry">How long the lock should be held before automatically expiring</param>
/// <param name="waitTime">How long to wait for the lock before giving up</param>
/// <param name="retryInterval">How often to retry acquiring the lock during the wait time</param>
/// <returns>The result of the function if the lock was acquired, default(T) otherwise</returns>
Task<(bool Acquired, T? Result)> ExecuteWithLockAsync<T>(string resource, Func<Task<T>> func, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null);
}
public class RedisDistributedLock : IDistributedLock
{
private readonly IDatabase _database;
private bool _disposed;
private const string LockKeyPrefix = "Lock_";
public string Resource { get; }
public string LockId { get; }
internal RedisDistributedLock(IDatabase database, string resource, string lockId)
{
_database = database;
Resource = resource;
LockId = lockId;
}
public async Task<bool> ExtendAsync(TimeSpan timeSpan)
{
if (_disposed)
throw new ObjectDisposedException(nameof(RedisDistributedLock));
var script = @"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('pexpire', KEYS[1], ARGV[2])
else
return 0
end
";
var result = await _database.ScriptEvaluateAsync(
script,
new RedisKey[] { $"{LockKeyPrefix}{Resource}" },
new RedisValue[] { LockId, (long)timeSpan.TotalMilliseconds }
);
return (long)result! == 1;
}
public async Task ReleaseAsync()
{
if (_disposed)
return;
var script = @"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
";
await _database.ScriptEvaluateAsync(
script,
new RedisKey[] { $"{LockKeyPrefix}{Resource}" },
new RedisValue[] { LockId }
);
_disposed = true;
}
public async ValueTask DisposeAsync()
{
await ReleaseAsync();
GC.SuppressFinalize(this);
}
}
public class CacheServiceRedis : ICacheService
{
private readonly IDatabase _database;
private readonly JsonSerializerOptions _serializerOptions;
// Using prefixes for different types of keys
private const string GroupKeyPrefix = "CacheGroup_";
private const string LockKeyPrefix = "Lock_";
public CacheServiceRedis(IConnectionMultiplexer redis)
{
var rds = redis ?? throw new ArgumentNullException(nameof(redis));
_database = rds.GetDatabase();
_serializerOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
};
}
public async Task<bool> SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
var serializedValue = JsonSerializer.Serialize(value, _serializerOptions);
return await _database.StringSetAsync(key, serializedValue, expiry);
}
public async Task<T?> GetAsync<T>(string key)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
var value = await _database.StringGetAsync(key);
if (value.IsNullOrEmpty)
return default;
return JsonSerializer.Deserialize<T>(value!, _serializerOptions);
}
public async Task<bool> RemoveAsync(string key)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
// Before removing the key, find all groups it belongs to and remove it from them
var script = @"
local groups = redis.call('KEYS', ARGV[1])
for _, group in ipairs(groups) do
redis.call('SREM', group, ARGV[2])
end
return redis.call('DEL', ARGV[2])
";
var result = await _database.ScriptEvaluateAsync(
script,
values: new RedisValue[] { $"{GroupKeyPrefix}*", key }
);
return (long)result! > 0;
}
public async Task AddToGroupAsync(string key, string group)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException(@"Key cannot be null or empty.", nameof(key));
if (string.IsNullOrEmpty(group))
throw new ArgumentException(@"Group cannot be null or empty.", nameof(group));
var groupKey = $"{GroupKeyPrefix}{group}";
await _database.SetAddAsync(groupKey, key);
}
public async Task RemoveGroupAsync(string group)
{
if (string.IsNullOrEmpty(group))
throw new ArgumentException(@"Group cannot be null or empty.", nameof(group));
var groupKey = $"{GroupKeyPrefix}{group}";
// Get all keys in the group
var keys = await _database.SetMembersAsync(groupKey);
if (keys.Length > 0)
{
// Delete all the keys
var keysTasks = keys.Select(key => _database.KeyDeleteAsync(key.ToString()));
await Task.WhenAll(keysTasks);
}
// Delete the group itself
await _database.KeyDeleteAsync(groupKey);
}
public async Task<IEnumerable<string>> GetGroupKeysAsync(string group)
{
if (string.IsNullOrEmpty(group))
throw new ArgumentException(@"Group cannot be null or empty.", nameof(group));
var groupKey = $"{GroupKeyPrefix}{group}";
var members = await _database.SetMembersAsync(groupKey);
return members.Select(m => m.ToString());
}
public async Task<bool> SetWithGroupsAsync<T>(string key, T value, IEnumerable<string>? groups = null, TimeSpan? expiry = null)
{
// First set the value in the cache
var setResult = await SetAsync(key, value, expiry);
// If successful and there are groups to associate, add the key to each group
if (setResult && groups != null)
{
var groupsArray = groups.Where(g => !string.IsNullOrEmpty(g)).ToArray();
if (groupsArray.Length > 0)
{
var tasks = groupsArray.Select(group => AddToGroupAsync(key, group));
await Task.WhenAll(tasks);
}
}
return setResult;
}
public async Task<IDistributedLock?> AcquireLockAsync(string resource, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null)
{
if (string.IsNullOrEmpty(resource))
throw new ArgumentException("Resource cannot be null or empty", nameof(resource));
var lockKey = $"{LockKeyPrefix}{resource}";
var lockId = Guid.NewGuid().ToString("N");
var waitTimeSpan = waitTime ?? TimeSpan.Zero;
var retryIntervalSpan = retryInterval ?? TimeSpan.FromMilliseconds(100);
var startTime = DateTime.UtcNow;
var acquired = false;
// Try to acquire the lock, retry until waitTime is exceeded
while (!acquired && (DateTime.UtcNow - startTime) < waitTimeSpan)
{
acquired = await _database.StringSetAsync(lockKey, lockId, expiry, When.NotExists);
if (!acquired)
{
await Task.Delay(retryIntervalSpan);
}
}
if (!acquired)
{
return null; // Could not acquire the lock within the wait time
}
return new RedisDistributedLock(_database, resource, lockId);
}
public async Task<bool> ExecuteWithLockAsync(string resource, Func<Task> action, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null)
{
await using var lockObj = await AcquireLockAsync(resource, expiry, waitTime, retryInterval);
if (lockObj == null)
return false; // Could not acquire the lock
await action();
return true;
}
public async Task<(bool Acquired, T? Result)> ExecuteWithLockAsync<T>(string resource, Func<Task<T>> func, TimeSpan expiry, TimeSpan? waitTime = null, TimeSpan? retryInterval = null)
{
await using var lockObj = await AcquireLockAsync(resource, expiry, waitTime, retryInterval);
if (lockObj == null)
return (false, default); // Could not acquire the lock
var result = await func();
return (true, result);
}
}

View File

@ -17,12 +17,12 @@ public class FileService(
TusDiskStore store,
ILogger<FileService> logger,
IServiceScopeFactory scopeFactory,
IMemoryCache cache
ICacheService cache
)
{
private const string CacheKeyPrefix = "cloudfile_";
private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(15);
/// <summary>
/// The api for getting file meta with cache,
/// the best use case is for accessing the file data.
@ -34,18 +34,19 @@ public class FileService(
public async Task<CloudFile?> GetFileAsync(string fileId)
{
var cacheKey = $"{CacheKeyPrefix}{fileId}";
if (cache.TryGetValue(cacheKey, out CloudFile? cachedFile))
var cachedFile = await cache.GetAsync<CloudFile>(cacheKey);
if (cachedFile is not null)
return cachedFile;
var file = await db.Files.FirstOrDefaultAsync(f => f.Id == fileId);
if (file != null)
cache.Set(cacheKey, file, CacheDuration);
await cache.SetAsync(cacheKey, file, CacheDuration);
return file;
}
private static readonly string TempFilePrefix = "dyn-cloudfile";
// The analysis file method no longer will remove the GPS EXIF data
@ -83,7 +84,7 @@ public class FileService(
file.FileMeta = existingFile.FileMeta;
file.HasCompression = existingFile.HasCompression;
file.SensitiveMarks = existingFile.SensitiveMarks;
db.Files.Add(file);
await db.SaveChangesAsync();
return file;
@ -399,8 +400,7 @@ public class FileService(
)
);
}
public async Task SetExpiresRangeAsync(ICollection<CloudFile> files, Duration? duration)
{
@ -408,55 +408,55 @@ public class FileService(
await db.Files.Where(o => ids.Contains(o.Id))
.ExecuteUpdateAsync(setter => setter.SetProperty(
b => b.ExpiredAt,
duration.HasValue
duration.HasValue
? b => SystemClock.Instance.GetCurrentInstant() + duration.Value
: _ => null
)
);
}
public async Task<(ICollection<CloudFile> current, ICollection<CloudFile> added, ICollection<CloudFile> removed)> DiffAndMarkFilesAsync(
ICollection<string>? newFileIds,
ICollection<CloudFile>? previousFiles = null
)
public async Task<(ICollection<CloudFile> current, ICollection<CloudFile> added, ICollection<CloudFile> removed)>
DiffAndMarkFilesAsync(
ICollection<string>? newFileIds,
ICollection<CloudFile>? previousFiles = null
)
{
if (newFileIds == null) return ([], [], previousFiles ?? []);
var records = await db.Files.Where(f => newFileIds.Contains(f.Id)).ToListAsync();
var previous = previousFiles?.ToDictionary(f => f.Id) ?? new Dictionary<string, CloudFile>();
var current = records.ToDictionary(f => f.Id);
var added = current.Keys.Except(previous.Keys).Select(id => current[id]).ToList();
var removed = previous.Keys.Except(current.Keys).Select(id => previous[id]).ToList();
if (added.Count > 0) await MarkUsageRangeAsync(added, 1);
if (removed.Count > 0) await MarkUsageRangeAsync(removed, -1);
return (newFileIds.Select(id => current[id]).ToList(), added, removed);
}
public async Task<(ICollection<CloudFile> current, ICollection<CloudFile> added, ICollection<CloudFile> removed)> DiffAndSetExpiresAsync(
ICollection<string>? newFileIds,
Duration? duration,
ICollection<CloudFile>? previousFiles = null
)
public async Task<(ICollection<CloudFile> current, ICollection<CloudFile> added, ICollection<CloudFile> removed)>
DiffAndSetExpiresAsync(
ICollection<string>? newFileIds,
Duration? duration,
ICollection<CloudFile>? previousFiles = null
)
{
if (newFileIds == null) return ([], [], previousFiles ?? []);
var records = await db.Files.Where(f => newFileIds.Contains(f.Id)).ToListAsync();
var previous = previousFiles?.ToDictionary(f => f.Id) ?? new Dictionary<string, CloudFile>();
var current = records.ToDictionary(f => f.Id);
var added = current.Keys.Except(previous.Keys).Select(id => current[id]).ToList();
var removed = previous.Keys.Except(current.Keys).Select(id => previous[id]).ToList();
if (added.Count > 0) await SetExpiresRangeAsync(added, duration);
if (removed.Count > 0) await SetExpiresRangeAsync(removed, null);
return (newFileIds.Select(id => current[id]).ToList(), added, removed);
}
}
public class CloudFileUnusedRecyclingJob(AppDatabase db, FileService fs, ILogger<CloudFileUnusedRecyclingJob> logger)