1027 lines
		
	
	
		
			31 KiB
		
	
	
	
		
			Dart
		
	
	
	
	
	
			
		
		
	
	
			1027 lines
		
	
	
		
			31 KiB
		
	
	
	
		
			Dart
		
	
	
	
	
	
import "dart:async";
 | 
						|
import "package:dio/dio.dart";
 | 
						|
import "package:drift/drift.dart" show Variable;
 | 
						|
import "package:easy_localization/easy_localization.dart";
 | 
						|
import "package:flutter/material.dart";
 | 
						|
import "package:island/database/drift_db.dart";
 | 
						|
import "package:island/database/message.dart";
 | 
						|
import "package:island/models/chat.dart";
 | 
						|
import "package:island/models/file.dart";
 | 
						|
import "package:island/pods/database.dart";
 | 
						|
import "package:island/pods/lifecycle.dart";
 | 
						|
import "package:island/pods/network.dart";
 | 
						|
import "package:island/services/file_uploader.dart";
 | 
						|
import "package:island/talker.dart";
 | 
						|
import "package:island/widgets/alert.dart";
 | 
						|
import "package:riverpod_annotation/riverpod_annotation.dart";
 | 
						|
import "package:uuid/uuid.dart";
 | 
						|
import "package:island/screens/chat/chat.dart";
 | 
						|
import "package:island/pods/chat/chat_rooms.dart";
 | 
						|
 | 
						|
part 'messages_notifier.g.dart';
 | 
						|
 | 
						|
@riverpod
 | 
						|
class MessagesNotifier extends _$MessagesNotifier {
 | 
						|
  late final Dio _apiClient;
 | 
						|
  late final AppDatabase _database;
 | 
						|
  late final SnChatRoom _room;
 | 
						|
  late final SnChatMember _identity;
 | 
						|
 | 
						|
  final Map<String, LocalChatMessage> _pendingMessages = {};
 | 
						|
  final Map<String, Map<int, double>> _fileUploadProgress = {};
 | 
						|
  int? _totalCount;
 | 
						|
  String? _searchQuery;
 | 
						|
  bool? _withLinks;
 | 
						|
  bool? _withAttachments;
 | 
						|
 | 
						|
  late final String _roomId;
 | 
						|
  static const int _pageSize = 20;
 | 
						|
  bool _hasMore = true;
 | 
						|
  bool _isSyncing = false;
 | 
						|
  bool _isJumping = false;
 | 
						|
  bool _isUpdatingState = false;
 | 
						|
  DateTime? _lastPauseTime;
 | 
						|
 | 
						|
  @override
 | 
						|
  FutureOr<List<LocalChatMessage>> build(String roomId) async {
 | 
						|
    _roomId = roomId;
 | 
						|
    _apiClient = ref.watch(apiClientProvider);
 | 
						|
    _database = ref.watch(databaseProvider);
 | 
						|
    final room = await ref.watch(chatroomProvider(roomId).future);
 | 
						|
    final identity = await ref.watch(chatroomIdentityProvider(roomId).future);
 | 
						|
 | 
						|
    if (room == null) {
 | 
						|
      throw Exception('Room not found');
 | 
						|
    }
 | 
						|
    _room = room;
 | 
						|
 | 
						|
    // Allow building even if identity is null for public rooms
 | 
						|
    if (identity != null) {
 | 
						|
      _identity = identity;
 | 
						|
    }
 | 
						|
 | 
						|
    talker.log('MessagesNotifier built for room $roomId');
 | 
						|
 | 
						|
    // Only setup sync and lifecycle listeners if user is a member
 | 
						|
    if (identity != null) {
 | 
						|
      ref.listen(appLifecycleStateProvider, (_, next) {
 | 
						|
        next.whenData((state) {
 | 
						|
          if (state == AppLifecycleState.paused) {
 | 
						|
            _lastPauseTime = DateTime.now();
 | 
						|
            talker.log('App paused, recording time');
 | 
						|
          } else if (state == AppLifecycleState.resumed) {
 | 
						|
            if (_lastPauseTime != null) {
 | 
						|
              final diff = DateTime.now().difference(_lastPauseTime!);
 | 
						|
              if (diff > const Duration(minutes: 1)) {
 | 
						|
                talker.log('App resumed after >1 min, syncing messages');
 | 
						|
                syncMessages();
 | 
						|
              } else {
 | 
						|
                talker.log('App resumed within 1 min, skipping sync');
 | 
						|
              }
 | 
						|
            }
 | 
						|
          }
 | 
						|
        });
 | 
						|
      });
 | 
						|
    }
 | 
						|
 | 
						|
    loadInitial();
 | 
						|
    return [];
 | 
						|
  }
 | 
						|
 | 
						|
  List<LocalChatMessage> _sortMessages(List<LocalChatMessage> messages) {
 | 
						|
    messages.sort((a, b) => b.createdAt.compareTo(a.createdAt));
 | 
						|
    return messages;
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> _updateStateSafely(List<LocalChatMessage> messages) async {
 | 
						|
    if (_isUpdatingState) {
 | 
						|
      talker.log('State update already in progress, skipping');
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    _isUpdatingState = true;
 | 
						|
    try {
 | 
						|
      // Ensure messages are properly sorted and deduplicated
 | 
						|
      final sortedMessages = _sortMessages(messages);
 | 
						|
      final uniqueMessages = <LocalChatMessage>[];
 | 
						|
      final seenIds = <String>{};
 | 
						|
      for (final message in sortedMessages) {
 | 
						|
        if (seenIds.add(message.id)) {
 | 
						|
          uniqueMessages.add(message);
 | 
						|
        }
 | 
						|
      }
 | 
						|
      state = AsyncValue.data(uniqueMessages);
 | 
						|
    } finally {
 | 
						|
      _isUpdatingState = false;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<List<LocalChatMessage>> _getCachedMessages({
 | 
						|
    int offset = 0,
 | 
						|
    int take = 20,
 | 
						|
    String? searchQuery,
 | 
						|
    bool? withLinks,
 | 
						|
    bool? withAttachments,
 | 
						|
  }) async {
 | 
						|
    talker.log('Getting cached messages from offset $offset, take $take');
 | 
						|
    final List<LocalChatMessage> dbMessages;
 | 
						|
 | 
						|
    if (searchQuery != null && searchQuery.isNotEmpty) {
 | 
						|
      dbMessages = await _database.searchMessages(
 | 
						|
        _roomId,
 | 
						|
        searchQuery,
 | 
						|
        withAttachments: withAttachments,
 | 
						|
      );
 | 
						|
    } else {
 | 
						|
      final chatMessagesFromDb = await _database.getMessagesForRoom(
 | 
						|
        _roomId,
 | 
						|
        offset: offset,
 | 
						|
        limit: take,
 | 
						|
      );
 | 
						|
      dbMessages =
 | 
						|
          chatMessagesFromDb.map(_database.companionToMessage).toList();
 | 
						|
    }
 | 
						|
 | 
						|
    List<LocalChatMessage> filteredMessages = dbMessages;
 | 
						|
 | 
						|
    if (withLinks == true) {
 | 
						|
      filteredMessages =
 | 
						|
          filteredMessages.where((msg) => _hasLink(msg)).toList();
 | 
						|
    }
 | 
						|
 | 
						|
    if (withAttachments == true) {
 | 
						|
      filteredMessages =
 | 
						|
          filteredMessages
 | 
						|
              .where((msg) => msg.toRemoteMessage().attachments.isNotEmpty)
 | 
						|
              .toList();
 | 
						|
    }
 | 
						|
 | 
						|
    final dbLocalMessages = filteredMessages;
 | 
						|
 | 
						|
    // Always ensure unique messages to prevent duplicate keys
 | 
						|
    final uniqueMessages = <LocalChatMessage>[];
 | 
						|
    final seenIds = <String>{};
 | 
						|
    for (final message in dbLocalMessages) {
 | 
						|
      if (seenIds.add(message.id)) {
 | 
						|
        uniqueMessages.add(message);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    if (offset == 0) {
 | 
						|
      final pendingForRoom =
 | 
						|
          _pendingMessages.values
 | 
						|
              .where((msg) => msg.roomId == _roomId)
 | 
						|
              .toList();
 | 
						|
 | 
						|
      final allMessages = [...pendingForRoom, ...uniqueMessages];
 | 
						|
      _sortMessages(allMessages); // Use the helper function
 | 
						|
 | 
						|
      final finalUniqueMessages = <LocalChatMessage>[];
 | 
						|
      final finalSeenIds = <String>{};
 | 
						|
      for (final message in allMessages) {
 | 
						|
        if (finalSeenIds.add(message.id)) {
 | 
						|
          finalUniqueMessages.add(message);
 | 
						|
        }
 | 
						|
      }
 | 
						|
      return finalUniqueMessages;
 | 
						|
    }
 | 
						|
 | 
						|
    return uniqueMessages;
 | 
						|
  }
 | 
						|
 | 
						|
  /// Get all messages without search filters for jump operations
 | 
						|
  Future<List<LocalChatMessage>> _getAllMessagesForJump({
 | 
						|
    int offset = 0,
 | 
						|
    int take = 20,
 | 
						|
  }) async {
 | 
						|
    talker.log('Getting all messages for jump from offset $offset, take $take');
 | 
						|
    final chatMessagesFromDb = await _database.getMessagesForRoom(
 | 
						|
      _roomId,
 | 
						|
      offset: offset,
 | 
						|
      limit: take,
 | 
						|
    );
 | 
						|
    final dbMessages =
 | 
						|
        chatMessagesFromDb.map(_database.companionToMessage).toList();
 | 
						|
 | 
						|
    // Always ensure unique messages to prevent duplicate keys
 | 
						|
    final uniqueMessages = <LocalChatMessage>[];
 | 
						|
    final seenIds = <String>{};
 | 
						|
    for (final message in dbMessages) {
 | 
						|
      if (seenIds.add(message.id)) {
 | 
						|
        uniqueMessages.add(message);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    if (offset == 0) {
 | 
						|
      final pendingForRoom =
 | 
						|
          _pendingMessages.values
 | 
						|
              .where((msg) => msg.roomId == _roomId)
 | 
						|
              .toList();
 | 
						|
 | 
						|
      final allMessages = [...pendingForRoom, ...uniqueMessages];
 | 
						|
      _sortMessages(allMessages);
 | 
						|
 | 
						|
      final finalUniqueMessages = <LocalChatMessage>[];
 | 
						|
      final finalSeenIds = <String>{};
 | 
						|
      for (final message in allMessages) {
 | 
						|
        if (finalSeenIds.add(message.id)) {
 | 
						|
          finalUniqueMessages.add(message);
 | 
						|
        }
 | 
						|
      }
 | 
						|
      return finalUniqueMessages;
 | 
						|
    }
 | 
						|
 | 
						|
    return uniqueMessages;
 | 
						|
  }
 | 
						|
 | 
						|
  Future<List<LocalChatMessage>> _fetchAndCacheMessages({
 | 
						|
    int offset = 0,
 | 
						|
    int take = 20,
 | 
						|
  }) async {
 | 
						|
    talker.log('Fetching messages from API, offset $offset, take $take');
 | 
						|
    if (_totalCount == null) {
 | 
						|
      final response = await _apiClient.get(
 | 
						|
        '/sphere/chat/$_roomId/messages',
 | 
						|
        queryParameters: {'offset': 0, 'take': 1},
 | 
						|
      );
 | 
						|
      _totalCount = int.parse(response.headers['x-total']?.firstOrNull ?? '0');
 | 
						|
    }
 | 
						|
 | 
						|
    if (offset >= _totalCount!) {
 | 
						|
      return [];
 | 
						|
    }
 | 
						|
 | 
						|
    final response = await _apiClient.get(
 | 
						|
      '/sphere/chat/$_roomId/messages',
 | 
						|
      queryParameters: {'offset': offset, 'take': take},
 | 
						|
    );
 | 
						|
 | 
						|
    final List<dynamic> data = response.data;
 | 
						|
    _totalCount = int.parse(response.headers['x-total']?.firstOrNull ?? '0');
 | 
						|
 | 
						|
    final messages =
 | 
						|
        data.map((json) {
 | 
						|
          final remoteMessage = SnChatMessage.fromJson(json);
 | 
						|
          return LocalChatMessage.fromRemoteMessage(
 | 
						|
            remoteMessage,
 | 
						|
            MessageStatus.sent,
 | 
						|
          );
 | 
						|
        }).toList();
 | 
						|
 | 
						|
    for (final message in messages) {
 | 
						|
      await _database.saveMessage(_database.messageToCompanion(message));
 | 
						|
      if (message.nonce != null) {
 | 
						|
        _pendingMessages.removeWhere(
 | 
						|
          (_, pendingMsg) => pendingMsg.nonce == message.nonce,
 | 
						|
        );
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    return messages;
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> syncMessages() async {
 | 
						|
    if (_isSyncing) {
 | 
						|
      talker.log('Sync already in progress, skipping.');
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    _isSyncing = true;
 | 
						|
 | 
						|
    talker.log('Starting message sync');
 | 
						|
    Future.microtask(() => ref.read(isSyncingProvider.notifier).state = true);
 | 
						|
    try {
 | 
						|
      final dbMessages = await _database.getMessagesForRoom(
 | 
						|
        _room.id,
 | 
						|
        offset: 0,
 | 
						|
        limit: 1,
 | 
						|
      );
 | 
						|
      final lastMessage =
 | 
						|
          dbMessages.isEmpty
 | 
						|
              ? null
 | 
						|
              : _database.companionToMessage(dbMessages.first);
 | 
						|
 | 
						|
      if (lastMessage == null) {
 | 
						|
        talker.log('No local messages, fetching from network');
 | 
						|
        final newMessages = await _fetchAndCacheMessages(
 | 
						|
          offset: 0,
 | 
						|
          take: _pageSize,
 | 
						|
        );
 | 
						|
        state = AsyncValue.data(newMessages);
 | 
						|
        return;
 | 
						|
      }
 | 
						|
 | 
						|
      final resp = await _apiClient.post(
 | 
						|
        '/sphere/chat/${_room.id}/sync',
 | 
						|
        data: {
 | 
						|
          'last_sync_timestamp':
 | 
						|
              lastMessage.toRemoteMessage().updatedAt.millisecondsSinceEpoch,
 | 
						|
        },
 | 
						|
      );
 | 
						|
 | 
						|
      final response = MessageSyncResponse.fromJson(resp.data);
 | 
						|
      talker.log('Sync response: ${response.messages.length} changes');
 | 
						|
      for (final message in response.messages) {
 | 
						|
        await receiveMessage(message);
 | 
						|
      }
 | 
						|
    } catch (err, stackTrace) {
 | 
						|
      talker.log(
 | 
						|
        'Error syncing messages',
 | 
						|
        exception: err,
 | 
						|
        stackTrace: stackTrace,
 | 
						|
      );
 | 
						|
      showErrorAlert(err);
 | 
						|
    } finally {
 | 
						|
      talker.log('Finished message sync');
 | 
						|
      Future.microtask(
 | 
						|
        () => ref.read(isSyncingProvider.notifier).state = false,
 | 
						|
      );
 | 
						|
      _isSyncing = false;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<List<LocalChatMessage>> listMessages({
 | 
						|
    int offset = 0,
 | 
						|
    int take = 20,
 | 
						|
    bool synced = false,
 | 
						|
  }) async {
 | 
						|
    try {
 | 
						|
      if (offset == 0 &&
 | 
						|
          !synced &&
 | 
						|
          (_searchQuery == null || _searchQuery!.isEmpty)) {
 | 
						|
        _fetchAndCacheMessages(offset: 0, take: take).catchError((_) {
 | 
						|
          return <LocalChatMessage>[];
 | 
						|
        });
 | 
						|
      }
 | 
						|
 | 
						|
      final localMessages = await _getCachedMessages(
 | 
						|
        offset: offset,
 | 
						|
        take: take,
 | 
						|
        searchQuery: _searchQuery,
 | 
						|
        withLinks: _withLinks,
 | 
						|
        withAttachments: _withAttachments,
 | 
						|
      );
 | 
						|
 | 
						|
      if (localMessages.isNotEmpty) {
 | 
						|
        return localMessages;
 | 
						|
      }
 | 
						|
 | 
						|
      if (_searchQuery == null || _searchQuery!.isEmpty) {
 | 
						|
        return await _fetchAndCacheMessages(offset: offset, take: take);
 | 
						|
      } else {
 | 
						|
        return []; // If searching, and no local messages, don't fetch from network
 | 
						|
      }
 | 
						|
    } catch (e) {
 | 
						|
      final localMessages = await _getCachedMessages(
 | 
						|
        offset: offset,
 | 
						|
        take: take,
 | 
						|
        searchQuery: _searchQuery,
 | 
						|
        withLinks: _withLinks,
 | 
						|
        withAttachments: _withAttachments,
 | 
						|
      );
 | 
						|
 | 
						|
      if (localMessages.isNotEmpty) {
 | 
						|
        return localMessages;
 | 
						|
      }
 | 
						|
      rethrow;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> loadInitial() async {
 | 
						|
    talker.log('Loading initial messages');
 | 
						|
    if (_searchQuery == null || _searchQuery!.isEmpty) {
 | 
						|
      syncMessages();
 | 
						|
    }
 | 
						|
 | 
						|
    final messages = await _getCachedMessages(
 | 
						|
      offset: 0,
 | 
						|
      take: _pageSize,
 | 
						|
      searchQuery: _searchQuery,
 | 
						|
      withLinks: _withLinks,
 | 
						|
      withAttachments: _withAttachments,
 | 
						|
    );
 | 
						|
 | 
						|
    _hasMore = messages.length == _pageSize;
 | 
						|
 | 
						|
    state = AsyncValue.data(messages);
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> loadMore() async {
 | 
						|
    if (!_hasMore || state is AsyncLoading) return;
 | 
						|
    talker.log('Loading more messages');
 | 
						|
 | 
						|
    try {
 | 
						|
      final currentMessages = state.value ?? [];
 | 
						|
      final offset = currentMessages.length;
 | 
						|
 | 
						|
      final newMessages = await listMessages(offset: offset, take: _pageSize);
 | 
						|
 | 
						|
      if (newMessages.isEmpty || newMessages.length < _pageSize) {
 | 
						|
        _hasMore = false;
 | 
						|
      }
 | 
						|
 | 
						|
      state = AsyncValue.data(
 | 
						|
        _sortMessages([...currentMessages, ...newMessages]),
 | 
						|
      );
 | 
						|
    } catch (err, stackTrace) {
 | 
						|
      talker.log(
 | 
						|
        'Error loading more messages',
 | 
						|
 | 
						|
        exception: err,
 | 
						|
        stackTrace: stackTrace,
 | 
						|
      );
 | 
						|
      showErrorAlert(err);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> sendMessage(
 | 
						|
    String content,
 | 
						|
    List<UniversalFile> attachments, {
 | 
						|
    SnChatMessage? editingTo,
 | 
						|
    SnChatMessage? forwardingTo,
 | 
						|
    SnChatMessage? replyingTo,
 | 
						|
    Function(String, Map<int, double>)? onProgress,
 | 
						|
  }) async {
 | 
						|
    final nonce = const Uuid().v4();
 | 
						|
    talker.log('Sending message with nonce $nonce');
 | 
						|
 | 
						|
    final mockMessage = SnChatMessage(
 | 
						|
      id: 'pending_$nonce',
 | 
						|
      chatRoomId: _roomId,
 | 
						|
      senderId: _identity.id,
 | 
						|
      content: content,
 | 
						|
      createdAt: DateTime.now(),
 | 
						|
      updatedAt: DateTime.now(),
 | 
						|
      nonce: nonce,
 | 
						|
      sender: _identity,
 | 
						|
    );
 | 
						|
 | 
						|
    final localMessage = LocalChatMessage.fromRemoteMessage(
 | 
						|
      mockMessage,
 | 
						|
      MessageStatus.pending,
 | 
						|
    );
 | 
						|
 | 
						|
    _pendingMessages[localMessage.id] = localMessage;
 | 
						|
    _fileUploadProgress[localMessage.id] = {};
 | 
						|
    await _database.saveMessage(_database.messageToCompanion(localMessage));
 | 
						|
 | 
						|
    final currentMessages = state.value ?? [];
 | 
						|
    state = AsyncValue.data([localMessage, ...currentMessages]);
 | 
						|
 | 
						|
    try {
 | 
						|
      var cloudAttachments = List.empty(growable: true);
 | 
						|
      for (var idx = 0; idx < attachments.length; idx++) {
 | 
						|
        final cloudFile =
 | 
						|
            await FileUploader.createCloudFile(
 | 
						|
              fileData: attachments[idx],
 | 
						|
              client: ref.read(apiClientProvider),
 | 
						|
              onProgress: (progress, _) {
 | 
						|
                _fileUploadProgress[localMessage.id]?[idx] = progress;
 | 
						|
                onProgress?.call(
 | 
						|
                  localMessage.id,
 | 
						|
                  _fileUploadProgress[localMessage.id] ?? {},
 | 
						|
                );
 | 
						|
              },
 | 
						|
            ).future;
 | 
						|
        if (cloudFile == null) {
 | 
						|
          throw ArgumentError('Failed to upload the file...');
 | 
						|
        }
 | 
						|
        cloudAttachments.add(cloudFile);
 | 
						|
      }
 | 
						|
 | 
						|
      final response = await _apiClient.request(
 | 
						|
        editingTo == null
 | 
						|
            ? '/sphere/chat/$_roomId/messages'
 | 
						|
            : '/sphere/chat/$_roomId/messages/${editingTo.id}',
 | 
						|
        data: {
 | 
						|
          'content': content,
 | 
						|
          'attachments_id': cloudAttachments.map((e) => e.id).toList(),
 | 
						|
          'replied_message_id': replyingTo?.id,
 | 
						|
          'forwarded_message_id': forwardingTo?.id,
 | 
						|
          'meta': {},
 | 
						|
          'nonce': nonce,
 | 
						|
        },
 | 
						|
        options: Options(method: editingTo == null ? 'POST' : 'PATCH'),
 | 
						|
      );
 | 
						|
 | 
						|
      final remoteMessage = SnChatMessage.fromJson(response.data);
 | 
						|
      final updatedMessage = LocalChatMessage.fromRemoteMessage(
 | 
						|
        remoteMessage,
 | 
						|
        MessageStatus.sent,
 | 
						|
      );
 | 
						|
 | 
						|
      _pendingMessages.remove(localMessage.id);
 | 
						|
      await _database.deleteMessage(localMessage.id);
 | 
						|
      await _database.saveMessage(_database.messageToCompanion(updatedMessage));
 | 
						|
 | 
						|
      final currentMessages = state.value ?? [];
 | 
						|
      if (editingTo != null) {
 | 
						|
        final newMessages =
 | 
						|
            currentMessages
 | 
						|
                .where((m) => m.id != localMessage.id) // remove pending message
 | 
						|
                .map(
 | 
						|
                  (m) => m.id == editingTo.id ? updatedMessage : m,
 | 
						|
                ) // update original message
 | 
						|
                .toList();
 | 
						|
        state = AsyncValue.data(newMessages);
 | 
						|
      } else {
 | 
						|
        final newMessages =
 | 
						|
            currentMessages.map((m) {
 | 
						|
              if (m.id == localMessage.id) {
 | 
						|
                return updatedMessage;
 | 
						|
              }
 | 
						|
              return m;
 | 
						|
            }).toList();
 | 
						|
        state = AsyncValue.data(newMessages);
 | 
						|
      }
 | 
						|
      talker.log('Message with nonce $nonce sent successfully');
 | 
						|
    } catch (e, stackTrace) {
 | 
						|
      talker.log(
 | 
						|
        'Failed to send message with nonce $nonce',
 | 
						|
 | 
						|
        exception: e,
 | 
						|
        stackTrace: stackTrace,
 | 
						|
      );
 | 
						|
      localMessage.status = MessageStatus.failed;
 | 
						|
      _pendingMessages[localMessage.id] = localMessage;
 | 
						|
      await _database.updateMessageStatus(
 | 
						|
        localMessage.id,
 | 
						|
        MessageStatus.failed,
 | 
						|
      );
 | 
						|
      final newMessages =
 | 
						|
          (state.value ?? []).map((m) {
 | 
						|
            if (m.id == localMessage.id) {
 | 
						|
              return m..status = MessageStatus.failed;
 | 
						|
            }
 | 
						|
            return m;
 | 
						|
          }).toList();
 | 
						|
      state = AsyncValue.data(newMessages);
 | 
						|
      showErrorAlert(e);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> retryMessage(String pendingMessageId) async {
 | 
						|
    talker.log('Retrying message $pendingMessageId');
 | 
						|
    final message = await fetchMessageById(pendingMessageId);
 | 
						|
    if (message == null) {
 | 
						|
      throw Exception('Message not found');
 | 
						|
    }
 | 
						|
 | 
						|
    message.status = MessageStatus.pending;
 | 
						|
    _pendingMessages[pendingMessageId] = message;
 | 
						|
    await _database.updateMessageStatus(
 | 
						|
      pendingMessageId,
 | 
						|
      MessageStatus.pending,
 | 
						|
    );
 | 
						|
 | 
						|
    try {
 | 
						|
      var remoteMessage = message.toRemoteMessage();
 | 
						|
      final response = await _apiClient.post(
 | 
						|
        '/sphere/chat/${message.roomId}/messages',
 | 
						|
        data: {
 | 
						|
          'content': remoteMessage.content,
 | 
						|
          'attachments_id': remoteMessage.attachments,
 | 
						|
          'meta': remoteMessage.meta,
 | 
						|
          'nonce': message.nonce,
 | 
						|
        },
 | 
						|
      );
 | 
						|
 | 
						|
      remoteMessage = SnChatMessage.fromJson(response.data);
 | 
						|
      final updatedMessage = LocalChatMessage.fromRemoteMessage(
 | 
						|
        remoteMessage,
 | 
						|
        MessageStatus.sent,
 | 
						|
      );
 | 
						|
 | 
						|
      _pendingMessages.remove(pendingMessageId);
 | 
						|
      await _database.deleteMessage(pendingMessageId);
 | 
						|
      await _database.saveMessage(_database.messageToCompanion(updatedMessage));
 | 
						|
 | 
						|
      final newMessages =
 | 
						|
          (state.value ?? []).map((m) {
 | 
						|
            if (m.id == pendingMessageId) {
 | 
						|
              return updatedMessage;
 | 
						|
            }
 | 
						|
            return m;
 | 
						|
          }).toList();
 | 
						|
      state = AsyncValue.data(newMessages);
 | 
						|
    } catch (e, stackTrace) {
 | 
						|
      talker.log(
 | 
						|
        'Failed to retry message $pendingMessageId',
 | 
						|
 | 
						|
        exception: e,
 | 
						|
        stackTrace: stackTrace,
 | 
						|
      );
 | 
						|
      message.status = MessageStatus.failed;
 | 
						|
      _pendingMessages[pendingMessageId] = message;
 | 
						|
      await _database.updateMessageStatus(
 | 
						|
        pendingMessageId,
 | 
						|
        MessageStatus.failed,
 | 
						|
      );
 | 
						|
      final newMessages =
 | 
						|
          (state.value ?? []).map((m) {
 | 
						|
            if (m.id == pendingMessageId) {
 | 
						|
              return m..status = MessageStatus.failed;
 | 
						|
            }
 | 
						|
            return m;
 | 
						|
          }).toList();
 | 
						|
      state = AsyncValue.data(_sortMessages(newMessages));
 | 
						|
      showErrorAlert(e);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> receiveMessage(SnChatMessage remoteMessage) async {
 | 
						|
    if (remoteMessage.chatRoomId != _roomId) return;
 | 
						|
 | 
						|
    // Block message receiving during jumps to prevent list resets
 | 
						|
    if (_isJumping) {
 | 
						|
      talker.log('Blocking message receive during jump operation');
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    talker.log('Received new message ${remoteMessage.id}');
 | 
						|
 | 
						|
    final localMessage = LocalChatMessage.fromRemoteMessage(
 | 
						|
      remoteMessage,
 | 
						|
      MessageStatus.sent,
 | 
						|
    );
 | 
						|
 | 
						|
    if (remoteMessage.nonce != null) {
 | 
						|
      _pendingMessages.removeWhere(
 | 
						|
        (_, pendingMsg) => pendingMsg.nonce == remoteMessage.nonce,
 | 
						|
      );
 | 
						|
    }
 | 
						|
 | 
						|
    await _database.saveMessage(_database.messageToCompanion(localMessage));
 | 
						|
 | 
						|
    final currentMessages = state.value ?? [];
 | 
						|
    final existingIndex = currentMessages.indexWhere(
 | 
						|
      (m) =>
 | 
						|
          m.id == localMessage.id ||
 | 
						|
          (localMessage.nonce != null && m.nonce == localMessage.nonce),
 | 
						|
    );
 | 
						|
 | 
						|
    if (existingIndex >= 0) {
 | 
						|
      final newList = [...currentMessages];
 | 
						|
      newList[existingIndex] = localMessage;
 | 
						|
      state = AsyncValue.data(_sortMessages(newList));
 | 
						|
    } else {
 | 
						|
      state = AsyncValue.data(
 | 
						|
        _sortMessages([localMessage, ...currentMessages]),
 | 
						|
      );
 | 
						|
    }
 | 
						|
 | 
						|
    switch (remoteMessage.type) {
 | 
						|
      case "messages.delete":
 | 
						|
        await receiveMessageDeletion(
 | 
						|
          remoteMessage.meta['message_id'] ?? remoteMessage.id,
 | 
						|
        );
 | 
						|
      case "messages.update":
 | 
						|
      case "messages.update.links":
 | 
						|
        await receiveMessageUpdate(remoteMessage);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> receiveMessageUpdate(SnChatMessage remoteMessage) async {
 | 
						|
    if (remoteMessage.chatRoomId != _roomId) return;
 | 
						|
 | 
						|
    // Block message updates during jumps to prevent list resets
 | 
						|
    if (_isJumping) {
 | 
						|
      talker.log('Blocking message update during jump operation');
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    talker.log('Received message update ${remoteMessage.id}');
 | 
						|
 | 
						|
    final targetId = remoteMessage.meta['message_id'] ?? remoteMessage.id;
 | 
						|
    final updatedMessage = LocalChatMessage.fromRemoteMessage(
 | 
						|
      remoteMessage.copyWith(
 | 
						|
        id: targetId,
 | 
						|
        meta: Map.of(remoteMessage.meta)..remove('message_id'),
 | 
						|
        type: 'text',
 | 
						|
        editedAt: remoteMessage.createdAt,
 | 
						|
      ),
 | 
						|
      MessageStatus.sent,
 | 
						|
    );
 | 
						|
    await _database.updateMessage(_database.messageToCompanion(updatedMessage));
 | 
						|
 | 
						|
    final currentMessages = state.value ?? [];
 | 
						|
    final index = currentMessages.indexWhere((m) => m.id == updatedMessage.id);
 | 
						|
 | 
						|
    if (index >= 0) {
 | 
						|
      final newList = [...currentMessages];
 | 
						|
      newList[index] = updatedMessage;
 | 
						|
      state = AsyncValue.data(_sortMessages(newList));
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> receiveMessageDeletion(String messageId) async {
 | 
						|
    // Block message deletions during jumps to prevent list resets
 | 
						|
    if (_isJumping) {
 | 
						|
      talker.log('Blocking message deletion during jump operation');
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    talker.log('Received message deletion $messageId');
 | 
						|
    _pendingMessages.remove(messageId);
 | 
						|
 | 
						|
    final currentMessages = state.value ?? [];
 | 
						|
    final messageIndex = currentMessages.indexWhere((m) => m.id == messageId);
 | 
						|
 | 
						|
    LocalChatMessage? messageToUpdate;
 | 
						|
    if (messageIndex != -1) {
 | 
						|
      messageToUpdate = currentMessages[messageIndex];
 | 
						|
    } else {
 | 
						|
      messageToUpdate = await fetchMessageById(messageId);
 | 
						|
    }
 | 
						|
 | 
						|
    if (messageToUpdate == null) return;
 | 
						|
 | 
						|
    final remote = messageToUpdate.toRemoteMessage();
 | 
						|
    final updatedRemote = remote.copyWith(
 | 
						|
      content: 'This message was deleted',
 | 
						|
      deletedAt: DateTime.now(),
 | 
						|
      attachments: [],
 | 
						|
    );
 | 
						|
 | 
						|
    final deletedMessage = LocalChatMessage.fromRemoteMessage(
 | 
						|
      updatedRemote,
 | 
						|
      messageToUpdate.status,
 | 
						|
    );
 | 
						|
 | 
						|
    await _database.saveMessage(_database.messageToCompanion(deletedMessage));
 | 
						|
 | 
						|
    if (messageIndex != -1) {
 | 
						|
      final newList = [...currentMessages];
 | 
						|
      newList[messageIndex] = deletedMessage;
 | 
						|
      state = AsyncValue.data(newList);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> deleteMessage(String messageId) async {
 | 
						|
    talker.log('Deleting message $messageId');
 | 
						|
 | 
						|
    // Fetch message to check its status before attempting server delete
 | 
						|
    final message = await fetchMessageById(messageId);
 | 
						|
    if (message == null) {
 | 
						|
      talker.log('Message $messageId not found for deletion');
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    // Skip server delete for failed messages (never successfully sent)
 | 
						|
    if (message.status == MessageStatus.failed) {
 | 
						|
      talker.log('Skipping server delete for failed message $messageId');
 | 
						|
      // For failed messages, remove them completely from the active list
 | 
						|
      _pendingMessages.remove(messageId);
 | 
						|
      await _database.deleteMessage(messageId);
 | 
						|
 | 
						|
      final currentMessages = state.value ?? [];
 | 
						|
      final newMessages =
 | 
						|
          currentMessages.where((m) => m.id != messageId).toList();
 | 
						|
      state = AsyncValue.data(newMessages);
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    try {
 | 
						|
      await _apiClient.delete('/sphere/chat/$_roomId/messages/$messageId');
 | 
						|
      await receiveMessageDeletion(messageId);
 | 
						|
    } catch (err, stackTrace) {
 | 
						|
      talker.log(
 | 
						|
        'Error deleting message $messageId',
 | 
						|
        exception: err,
 | 
						|
        stackTrace: stackTrace,
 | 
						|
      );
 | 
						|
      showErrorAlert(err);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /// Get search results without updating shared state
 | 
						|
  Future<List<LocalChatMessage>> getSearchResults(
 | 
						|
    String query, {
 | 
						|
    bool? withLinks,
 | 
						|
    bool? withAttachments,
 | 
						|
  }) async {
 | 
						|
    final trimmedQuery = query.trim();
 | 
						|
    final hasFilters = [withLinks, withAttachments].any((e) => e == true);
 | 
						|
 | 
						|
    if (trimmedQuery.isEmpty && !hasFilters) {
 | 
						|
      return [];
 | 
						|
    }
 | 
						|
 | 
						|
    talker.log(
 | 
						|
      'Getting search results for query: $trimmedQuery, filters: links=$withLinks, attachments=$withAttachments',
 | 
						|
    );
 | 
						|
 | 
						|
    try {
 | 
						|
      // When filtering without query, get more messages to ensure we find all matches
 | 
						|
      final take = (trimmedQuery.isEmpty && hasFilters) ? 1000 : 50;
 | 
						|
      final messages = await _getCachedMessages(
 | 
						|
        offset: 0,
 | 
						|
        take: take,
 | 
						|
        searchQuery: trimmedQuery.isNotEmpty ? trimmedQuery : null,
 | 
						|
        withLinks: withLinks,
 | 
						|
        withAttachments: withAttachments,
 | 
						|
      ); // Limit initial search results
 | 
						|
      return messages;
 | 
						|
    } catch (e, stackTrace) {
 | 
						|
      talker.log(
 | 
						|
        'Error getting search results',
 | 
						|
        exception: e,
 | 
						|
        stackTrace: stackTrace,
 | 
						|
      );
 | 
						|
      rethrow;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<void> searchMessages(
 | 
						|
    String query, {
 | 
						|
    bool? withLinks,
 | 
						|
    bool? withAttachments,
 | 
						|
  }) async {
 | 
						|
    _searchQuery = query.trim();
 | 
						|
    _withLinks = withLinks;
 | 
						|
    _withAttachments = withAttachments;
 | 
						|
 | 
						|
    if (_searchQuery!.isEmpty) {
 | 
						|
      state = AsyncValue.data([]);
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    talker.log('Searching messages with query: $_searchQuery');
 | 
						|
    state = const AsyncValue.loading();
 | 
						|
 | 
						|
    try {
 | 
						|
      final messages = await _getCachedMessages(
 | 
						|
        offset: 0,
 | 
						|
        take: 50,
 | 
						|
        searchQuery: _searchQuery,
 | 
						|
        withLinks: _withLinks,
 | 
						|
        withAttachments: _withAttachments,
 | 
						|
      ); // Limit initial search results
 | 
						|
      state = AsyncValue.data(messages);
 | 
						|
    } catch (e, stackTrace) {
 | 
						|
      talker.log(
 | 
						|
        'Error searching messages',
 | 
						|
        exception: e,
 | 
						|
        stackTrace: stackTrace,
 | 
						|
      );
 | 
						|
      state = AsyncValue.error(e, stackTrace);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  void clearSearch() {
 | 
						|
    _searchQuery = null;
 | 
						|
    _withLinks = null;
 | 
						|
    _withAttachments = null;
 | 
						|
    loadInitial();
 | 
						|
  }
 | 
						|
 | 
						|
  Future<LocalChatMessage?> fetchMessageById(String messageId) async {
 | 
						|
    talker.log('Fetching message by id $messageId');
 | 
						|
    try {
 | 
						|
      final localMessage =
 | 
						|
          await (_database.select(_database.chatMessages)
 | 
						|
            ..where((tbl) => tbl.id.equals(messageId))).getSingleOrNull();
 | 
						|
      if (localMessage != null) {
 | 
						|
        return _database.companionToMessage(localMessage);
 | 
						|
      }
 | 
						|
 | 
						|
      final response = await _apiClient.get(
 | 
						|
        '/sphere/chat/$_roomId/messages/$messageId',
 | 
						|
      );
 | 
						|
      final remoteMessage = SnChatMessage.fromJson(response.data);
 | 
						|
      final message = LocalChatMessage.fromRemoteMessage(
 | 
						|
        remoteMessage,
 | 
						|
        MessageStatus.sent,
 | 
						|
      );
 | 
						|
 | 
						|
      await _database.saveMessage(_database.messageToCompanion(message));
 | 
						|
      return message;
 | 
						|
    } catch (e) {
 | 
						|
      if (e is DioException) return null;
 | 
						|
      rethrow;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Future<int> jumpToMessage(String messageId) async {
 | 
						|
    talker.log('Starting jump to message $messageId');
 | 
						|
    if (_isJumping) {
 | 
						|
      talker.log('Jump already in progress, skipping');
 | 
						|
      return -1;
 | 
						|
    }
 | 
						|
    _isJumping = true;
 | 
						|
 | 
						|
    // Clear flashing messages when starting a new jump
 | 
						|
    ref.read(flashingMessagesProvider.notifier).state = {};
 | 
						|
 | 
						|
    try {
 | 
						|
      talker.log('Fetching message $messageId');
 | 
						|
      final message = await fetchMessageById(messageId);
 | 
						|
      if (message == null) {
 | 
						|
        talker.log('Message $messageId not found');
 | 
						|
        showSnackBar('messageNotFound'.tr());
 | 
						|
        return -1;
 | 
						|
      }
 | 
						|
 | 
						|
      // Check if message is already in current state to avoid duplicate loading
 | 
						|
      final currentMessages = state.value ?? [];
 | 
						|
      final existingIndex = currentMessages.indexWhere(
 | 
						|
        (m) => m.id == messageId,
 | 
						|
      );
 | 
						|
      if (existingIndex >= 0) {
 | 
						|
        talker.log(
 | 
						|
          'Message $messageId already in current state at index $existingIndex, jumping directly',
 | 
						|
        );
 | 
						|
        return existingIndex;
 | 
						|
      }
 | 
						|
 | 
						|
      talker.log(
 | 
						|
        'Message $messageId not in current state, calculating position and loading messages around it',
 | 
						|
      );
 | 
						|
 | 
						|
      // Count messages newer than the target message to calculate optimal offset
 | 
						|
      // Use full message list (not filtered by search) for accurate position calculation
 | 
						|
      final query = _database.customSelect(
 | 
						|
        'SELECT COUNT(*) as count FROM chat_messages WHERE room_id = ? AND created_at > ?',
 | 
						|
        variables: [
 | 
						|
          Variable.withString(_roomId),
 | 
						|
          Variable.withDateTime(message.createdAt),
 | 
						|
        ],
 | 
						|
        readsFrom: {_database.chatMessages},
 | 
						|
      );
 | 
						|
      final result = await query.getSingle();
 | 
						|
      final newerCount = result.read<int>('count');
 | 
						|
 | 
						|
      // Calculate offset to position target message in the middle of the loaded chunk
 | 
						|
      const chunkSize = 100; // Load 100 messages around the target
 | 
						|
      final offset =
 | 
						|
          (newerCount - chunkSize ~/ 2).clamp(0, double.infinity).toInt();
 | 
						|
      talker.log(
 | 
						|
        'Calculated offset $offset for target message (newer: $newerCount, chunk: $chunkSize)',
 | 
						|
      );
 | 
						|
      // Use full message list (not filtered by search) for jump operations
 | 
						|
      final loadedMessages = await _getAllMessagesForJump(
 | 
						|
        offset: offset,
 | 
						|
        take: chunkSize,
 | 
						|
      );
 | 
						|
 | 
						|
      // Check if loaded messages are already in current state
 | 
						|
      final currentIds = currentMessages.map((m) => m.id).toSet();
 | 
						|
      final newMessages =
 | 
						|
          loadedMessages.where((m) => !currentIds.contains(m.id)).toList();
 | 
						|
      talker.log(
 | 
						|
        'Loaded ${loadedMessages.length} messages, ${newMessages.length} are new',
 | 
						|
      );
 | 
						|
 | 
						|
      if (newMessages.isNotEmpty) {
 | 
						|
        // Merge with current messages more safely
 | 
						|
        final allMessages = [...currentMessages, ...newMessages];
 | 
						|
        final uniqueMessages = <LocalChatMessage>[];
 | 
						|
        final seenIds = <String>{};
 | 
						|
        for (final message in allMessages) {
 | 
						|
          if (seenIds.add(message.id)) {
 | 
						|
            uniqueMessages.add(message);
 | 
						|
          }
 | 
						|
        }
 | 
						|
        await _updateStateSafely(uniqueMessages);
 | 
						|
        talker.log(
 | 
						|
          'Updated state with ${uniqueMessages.length} total messages',
 | 
						|
        );
 | 
						|
      }
 | 
						|
 | 
						|
      // Wait a bit for the UI to rebuild with new messages
 | 
						|
      await Future.delayed(const Duration(milliseconds: 100));
 | 
						|
 | 
						|
      final finalIndex = (state.value ?? []).indexWhere(
 | 
						|
        (m) => m.id == messageId,
 | 
						|
      );
 | 
						|
      talker.log('Final index for message $messageId is $finalIndex');
 | 
						|
 | 
						|
      // Verify the message is actually in the list before returning
 | 
						|
      if (finalIndex == -1) {
 | 
						|
        talker.log(
 | 
						|
          'Message $messageId still not found after loading, trying direct fetch',
 | 
						|
        );
 | 
						|
        // Try to fetch and add the specific message if it's still not found
 | 
						|
        final directMessage = await fetchMessageById(messageId);
 | 
						|
        if (directMessage != null) {
 | 
						|
          final currentList = state.value ?? [];
 | 
						|
          final updatedList = [...currentList, directMessage];
 | 
						|
          await _updateStateSafely(updatedList);
 | 
						|
          final newIndex = updatedList.indexWhere((m) => m.id == messageId);
 | 
						|
          talker.log('Added message directly, new index: $newIndex');
 | 
						|
          return newIndex;
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      return finalIndex;
 | 
						|
    } finally {
 | 
						|
      _isJumping = false;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  bool _hasLink(LocalChatMessage message) {
 | 
						|
    final content = message.toRemoteMessage().content;
 | 
						|
    if (content == null) return false;
 | 
						|
    final urlRegex = RegExp(r'https?://[^\s/$.?#].[^\s]*');
 | 
						|
    return urlRegex.hasMatch(content);
 | 
						|
  }
 | 
						|
}
 |