import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:dio/dio.dart'; import 'package:nomo/app/constants/errors.dart'; import 'package:shelf/shelf_io.dart' as shelf_io; import 'package:shelf_web_socket/shelf_web_socket.dart'; import '../../../utils/crypto.dart'; import '../../../utils/log/logger.dart'; import '../../../utils/misc.dart'; import '../../constants/enums.dart'; import 'vpn_exception.dart'; import 'vpn_message.dart'; class _RpcResult { final bool success; final String message; final dynamic data; _RpcResult(this.success, this.message, this.data); } class VpnService { static const _tag = 'VpnService'; static const _rpcPort = 25364; static const _rpcFileName = 'ixrpc.exe'; static const _rpcHeartbeatTimeout = 5; static const _rpcBaseUrl = 'http://127.0.0.1:$_rpcPort'; static const String _accessKeySecret = '{C5E098D8-5487-4F8A-8B23-7F1F839A3A0B}'; final _rpcDio = Dio(); ConnectionState _status = ConnectionState.disconnected; final _ecStatus = StreamController<(ConnectionState, dynamic)>.broadcast(); bool _isOnline = false; final _ecOnline = StreamController.broadcast(); Process? _process; Timer? _refreshTimer; Timer? _startupTimer; String? _logPath; bool _vpnDebug = false; bool _isStarted = false; int _rpcHostPort = 0; DateTime _lastAliveTime = DateTime.now(); bool _needRecoverConnection = false; Map? _connectionParams; void _setStatus(ConnectionState value, dynamic data) { if (_status != value) { _status = value; _ecStatus.add((_status, data)); } } void _setIsOnline(bool value) { if (_isOnline != value) { _isOnline = value; _ecOnline.add(_isOnline); } } Future<_RpcResult> _rpcGet(String action) async { final url = Uri.parse('$_rpcBaseUrl/api/$action').toString(); final response = await _rpcDio.get(url); if (response.statusCode != 200) { log(_tag, 'rpcGet HTTP error: ${response.statusCode}'); return _RpcResult(false, 'HTTP ${response.statusCode}', null); } final json = response.data as Map; final int code = json['code'] as int; final String msg = json['msg'] as String; return _RpcResult(code == 200, msg, json); } Future<_RpcResult> _rpcPost( String action, Map params, ) async { if (!isOnline) { throw RpcException('Vpn rpc service not available'); } final url = Uri.parse('$_rpcBaseUrl/api/action').toString(); final data = jsonEncode({'action': action, 'data': params}); try { final timestamp = DateTime.now().millisecondsSinceEpoch; final signature = await Crypto.signature( '$data:$timestamp', _accessKeySecret, ); log(_tag, '[VpnService] url:$url'); log(_tag, '[VpnService] >>:$data'); final response = await _rpcDio.post( url, data: data, options: Options( headers: { 'Accept': 'application/json', 'Timestamp': '$timestamp', 'Signature': signature, }, ), ); log(_tag, '[VpnService] <<:${response.data}'); if (response.statusCode != 200) { log(_tag, 'rpcPost HTTP error: ${response.statusCode}'); return _RpcResult(false, 'HTTP ${response.statusCode}', null); } final json = response.data as Map; // Dio 自动反序列化JSON的数据 final int code = json['code'] as int; final String msg = json['msg'] as String; return _RpcResult(code == 200, msg, json); } on DioException catch (e, stack) { log(_tag, 'DioException: ${e.message}\nStack: $stack'); throw RpcException('DioException: ${e.message}'); } catch (e, stack) { log(_tag, 'request error: $e\nStack: $stack'); throw RpcException('request error: $e'); } } Future _check() { return _rpcGet('ping'); } Future _resetProcess(int hostPort, bool debug, {int? rpcPort}) async { try { _killProcess(); final List args = []; args.add('-l'); args.add('127.0.0.1:$hostPort'); if (rpcPort != null) { args.add('-p'); args.add(rpcPort.toString()); } if (_logPath != null) { args.add('-d'); args.add(_logPath!); } log(_tag, '_resetProcess: starting $_rpcFileName with args: $args'); _process = await Process.start( _rpcFileName, args, runInShell: false, mode: ProcessStartMode.detached, ); log(_tag, '_resetProcess: process started successfully.'); } catch (e, stack) { log(_tag, '_resetProcess error: $e\nStack: $stack'); } } void _killProcess() { log(_tag, 'VpnService kill process'); _process?.kill(); _process = null; _setIsOnline(false); } void dispose() async { log(_tag, 'VpnService dispose start.'); await _shutdown(); _ecStatus.close(); _ecOnline.close(); } Future start(Map params) async { _connectionParams = null; // 通知正在连接 _setStatus(ConnectionState.connecting, params); try { final result = await _rpcPost('connect', params); if (!result.success) { log(_tag, 'start call error: ${result.message}'); _setStatus(ConnectionState.error, Errors.ERROR_RPC_RETURN_FALSE); } else { _connectionParams = params; log(_tag, 'start call success.'); } } catch (e, stack) { log(_tag, 'start call exception: $e\nStack: $stack'); // 通知连接失败 _setStatus(ConnectionState.error, Errors.ERROR_RPC_CALL_FAILED); } } Future stop() async { _needRecoverConnection = false; try { final result = await _rpcPost('disconnect', {}); if (result.success) { log(_tag, 'disconnect call success.'); } else { log(_tag, 'disconnect call error: ${result.message}'); } } catch (e, stack) { log(_tag, 'disconnect call exception: $e\nStack: $stack'); } } Future getLocationName() async { return ''; } Future getRemoteAddress() async { try { final result = await _rpcPost('get_remote_ip', {}); if (!result.success) { log(_tag, 'getRemoteAddress error: ${result.message}'); return ''; } return result.message; } catch (e) { log(_tag, 'getRemoteAddress error: $e'); return ''; } } Future setBypassAddresses(List list) async { try { if (list.isEmpty) { log(_tag, 'setBypassAddresses: empty list'); return false; } final result = await _rpcPost('set_pass_ips', {'ips': list.join(',')}); if (!result.success) { log(_tag, 'setBypassAddresses error: ${result.message}'); return false; } return true; } catch (e) { log(_tag, 'setBypassAddresses error: $e'); return false; } } Future _startup({bool debug = false}) async { log(_tag, 'startup called. debug: $debug'); _vpnDebug = debug; if (_isStarted) { log(_tag, 'startup abort! _isStarted: $_isStarted'); return; } try { // 处理VPN 消息 final handler = webSocketHandler((webSocket, _) { webSocket.stream.listen((message) { _lastAliveTime = DateTime.now(); try { final vpnMessage = VpnMessage.fromMap(jsonDecode(message)); switch (vpnMessage.type) { case VpnMessageType.login: log(_tag, 'login message got'); // rpc已经上线 _setIsOnline(true); // 保存重连状态 try { if (_status == ConnectionState.connected || _status == ConnectionState.connecting) { _needRecoverConnection = true; } } catch (e, stack) { log(_tag, 'login Exception: $e\nStack: $stack'); } break; case VpnMessageType.heartbeat: log(_tag, 'heartbeat message got'); // 应答心跳 webSocket.sink.add( VpnMessage.create(VpnMessageType.heartbeat, null).toJson(), ); break; case VpnMessageType.stateSync: log(_tag, 'stateSync message got'); try { final data = vpnMessage.data; if (data == null) { return; } final int state = data['state'] as int; final dynamic param = data['param']; _setStatus(ConnectionState.values[state], param); // 恢复连接 if (_needRecoverConnection) { if (_connectionParams != null) { start(_connectionParams!); } _needRecoverConnection = false; } } catch (e, stack) { log(_tag, 'stateSync Exception: $e\nStack: $stack'); } break; default: } } catch (e, stack) { log(_tag, 'webSocketHandler Exception: $e\nStack: $stack'); } }); }); shelf_io.serve(handler, '127.0.0.1', 35461).then((server) async { _isStarted = true; log(_tag, 'startup finished. _isStarted: $_isStarted'); log(_tag, 'Serving at ws://${server.address.host}:${server.port}'); // websocket 服务器端 _rpcHostPort = server.port; // 启动vpn客户端进程 await _resetProcess(_rpcHostPort, rpcPort: _rpcPort, _vpnDebug); // 启动守护定时器 _refreshTimer = Timer.periodic(const Duration(seconds: 1), (_) async { final dt = DateTime.now().difference(_lastAliveTime).inSeconds; if (dt > _rpcHeartbeatTimeout) { // 尝试check一下 try { await _check(); _lastAliveTime = DateTime.now(); log(_tag, 'RPC process has been woken up, continuing...'); } catch (e) { log(_tag, 'RPC process has timedout, need reset.'); _lastAliveTime = DateTime.now(); await _resetProcess(_rpcHostPort, rpcPort: _rpcPort, _vpnDebug); } } }); }); } catch (e, stack) { log(_tag, 'startup error: $e\nStack: $stack'); } } Future _shutdown() async { log(_tag, 'shutdown called.'); if (!_isStarted) { log(_tag, 'shutdown abort! _isStarted: $_isStarted'); return; } try { // 断开连接 await stop(); } catch (e, stack) { log(_tag, 'shutdown disconnect error: $e\nStack: $stack'); } try { // 关闭守护定时器 _startupTimer?.cancel(); } catch (e, stack) { log(_tag, 'shutdown _startupTimer cancel error: $e\nStack: $stack'); } try { _refreshTimer?.cancel(); } catch (e, stack) { log(_tag, 'shutdown _refreshTimer cancel error: $e\nStack: $stack'); } try { // 关闭vpn客户端进程 _killProcess(); } catch (e, stack) { log(_tag, 'shutdown _killProcess error: $e\nStack: $stack'); } _isStarted = false; _needRecoverConnection = false; _setStatus(ConnectionState.disconnected, null); log(_tag, 'shutdown finished. _isStarted: $_isStarted'); } Future initialize(int timedout, bool debug) async { log(_tag, 'vpn service initialize...'); _logPath = (await logFileDirectory()).path; final startTime = DateTime.now(); bool isTimedout = false; // 启动服务 await _startup(debug: debug); // 等待vpn客户端就绪信号 if (!isOnline) { final completer = Completer(); Timer.periodic(const Duration(milliseconds: 500), (timer) { // 客户端就绪 if (isOnline) { log(_tag, 'vpn service start successed.'); completer.complete(); timer.cancel(); } // 等待超时 final dt = DateTime.now().difference(startTime).inSeconds; if (dt > timedout) { log(_tag, 'vpn service start timedout.'); completer.complete(); timer.cancel(); isTimedout = true; } }); await completer.future; log(_tag, 'vpn service initialize finished.'); } if (isTimedout) { _shutdown(); throw WaitOnlineTimedOutException(); } } bool get isOnline => _isOnline; ConnectionState get status => _status; Stream get onOnlineChanged => _ecOnline.stream; Stream<(ConnectionState, dynamic)> get onStatusChanged => _ecStatus.stream; }