import 'dart:async'; import 'package:flutter_laravel_echo_client/src/api/socket_channel.dart'; import 'package:flutter_laravel_echo_client/src/channels/base_channel.dart'; import 'package:flutter_laravel_echo_client/src/channels/presence_channel.dart'; import 'package:flutter_laravel_echo_client/src/channels/private_channel.dart'; import 'package:flutter_laravel_echo_client/src/channels/public_channel.dart'; import 'package:flutter_laravel_echo_client/src/config/socket_config.dart'; import 'package:flutter_laravel_echo_client/src/core/auth.dart'; import 'package:flutter_laravel_echo_client/src/core/channel_registry.dart'; import 'package:socket_io_client/socket_io_client.dart' as io; enum _ChannelType { public, private, presence } class LaravelEchoSocketClient { LaravelEchoSocketClient(SocketConfig config) : _config = config.copyWith() { _auth = _config.authHeadersProvider ?? bearerProvider(_config.token); } SocketConfig _config; late AuthHeadersProvider _auth; io.Socket? _socket; final _onConnected = StreamController.broadcast(); final _onDisconnected = StreamController.broadcast(); final _onError = StreamController.broadcast(); final _onReconnectAttempt = StreamController.broadcast(); final _channels = ChannelRegistry(); bool get isConnected => _socket?.connected ?? false; Stream get onConnected => _onConnected.stream; Stream get onDisconnected => _onDisconnected.stream; Stream get onError => _onError.stream; Stream get onReconnectAttempt => _onReconnectAttempt.stream; Future connect() async { if (_socket?.connected ?? false) return; final opts = io.OptionBuilder() .setTransports(_config.transports) .setPath(_config.path) .setQuery(_config.query ?? {}) .setReconnectionDelay(_config.reconnectionDelay.inMilliseconds) .setReconnectionDelayMax(_config.reconnectionDelayMax.inMilliseconds) .setTimeout(_config.ackTimeout.inMilliseconds) .build(); _socket = io.io(_config.host.toString(), opts); _socket! ..onConnect((_) { _onConnected.add(null); for (final ch in _channels.values) { ch ..bind(_socket!) ..resubscribe(); } }) ..onDisconnect((_) => _onDisconnected.add(null)) ..onConnectError((e) => _onError.add(e)) ..onError((e) => _onError.add(e)) ..onReconnectAttempt((n) => _onReconnectAttempt.add(n is int ? n : 0)); } Future disconnect({bool force = false}) async { for (final channel in _channels.values) { await channel.unsubscribe(); } _channels.clear(); final socket = _socket; _socket = null; if (socket != null) { socket ..off('connect') ..off('disconnect') ..off('connect_error') ..off('error') ..off('reconnect_attempt') ..disconnect(); if (force) socket.dispose(); } } void updateAuthToken(String token) { _config = _config.copyWith(token: token); _auth = _config.authHeadersProvider ?? bearerProvider(token); final s = _socket; if (s != null) { s.io ..disconnect() ..connect(); } } SocketChannel channel(String name) => _getOrCreate(name, _ChannelType.public); SocketPrivateChannel private(String name) => _getOrCreate(name, _ChannelType.private) as SocketPrivateChannel; SocketPresenceChannel presence(String name) => _getOrCreate(name, _ChannelType.presence) as SocketPresenceChannel; void leave(String name) { for (final t in _ChannelType.values) { leaveChannel(_fullName(name, t)); } } void leaveChannel(String fullName) { final ch = _channels.remove(fullName); ch?.unsubscribe(); } BaseChannel _getOrCreate(String name, _ChannelType type) { final full = _fullName(name, type); return _channels.putIfAbsent(full, () { final ch = switch (type) { _ChannelType.public => PublicChannel(full, _socket, _auth), _ChannelType.private => PrivateChannel(full, _socket, _auth), _ChannelType.presence => PresenceChannel(full, _socket, _auth), }..subscribe(); return ch; }); } String _fullName(String name, _ChannelType type) => switch (type) { _ChannelType.public => name, _ChannelType.private => 'private-$name', _ChannelType.presence => 'presence-$name', }; }