| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- import 'dart:async';
- import 'dart:convert';
- import 'dart:io';
- import 'package:dio/dio.dart';
- import 'package:nomo/app/constants/errors.dart';
- import 'package:shelf/shelf_io.dart' as shelf_io;
- import 'package:shelf_web_socket/shelf_web_socket.dart';
- import '../../../utils/crypto.dart';
- import '../../../utils/log/logger.dart';
- import '../../../utils/misc.dart';
- import '../../constants/enums.dart';
- import 'vpn_exception.dart';
- import 'vpn_message.dart';
- class _RpcResult {
- final bool success;
- final String message;
- final dynamic data;
- _RpcResult(this.success, this.message, this.data);
- }
- class VpnWindowsService {
- static const _tag = 'VpnWindowsService';
- static const _rpcPort = 29764;
- static const _rpcFileName = 'ixrpc.exe';
- static const _rpcHeartbeatTimeout = 5;
- static const _rpcBaseUrl = 'http://127.0.0.1:$_rpcPort';
- static const String _accessKeySecret =
- '{C5E098D8-5487-4F8A-8B23-7F1F839A3A0B}';
- final _rpcDio = Dio();
- ConnectionState _status = ConnectionState.disconnected;
- final _ecStatus = StreamController<(ConnectionState, dynamic)>.broadcast();
- bool _isOnline = false;
- final _ecOnline = StreamController<bool>.broadcast();
- Process? _process;
- Timer? _refreshTimer;
- Timer? _startupTimer;
- String? _logPath;
- bool _vpnDebug = false;
- bool _isStarted = false;
- int _rpcHostPort = 0;
- DateTime _lastAliveTime = DateTime.now();
- bool _needRecoverConnection = false;
- Map<String, dynamic>? _connectionParams;
- void _setStatus(ConnectionState value, dynamic data) {
- if (_status != value) {
- _status = value;
- _ecStatus.add((_status, data));
- }
- }
- void _setIsOnline(bool value) {
- if (_isOnline != value) {
- _isOnline = value;
- _ecOnline.add(_isOnline);
- }
- }
- Future<_RpcResult> _rpcGet(String action) async {
- final url = Uri.parse('$_rpcBaseUrl/api/$action').toString();
- final response = await _rpcDio.get(url);
- if (response.statusCode != 200) {
- log(_tag, 'rpcGet HTTP error: ${response.statusCode}');
- return _RpcResult(false, 'HTTP ${response.statusCode}', null);
- }
- final json = response.data as Map<String, dynamic>;
- final int code = json['code'] as int;
- final String msg = json['msg'] as String;
- return _RpcResult(code == 200, msg, json);
- }
- Future<_RpcResult> _rpcPost(
- String action,
- Map<String, dynamic> params,
- ) async {
- if (!isOnline) {
- throw RpcException('Vpn rpc service not available');
- }
- final url = Uri.parse('$_rpcBaseUrl/api/action').toString();
- final data = jsonEncode({'action': action, 'data': params});
- try {
- final timestamp = DateTime.now().millisecondsSinceEpoch;
- final signature = await Crypto.signature(
- '$data:$timestamp',
- _accessKeySecret,
- );
- log(_tag, '[VpnWindowsService] url:$url');
- log(_tag, '[VpnWindowsService] >>:$data');
- final response = await _rpcDio.post(
- url,
- data: data,
- options: Options(
- headers: {
- 'Accept': 'application/json',
- 'Timestamp': '$timestamp',
- 'Signature': signature,
- },
- ),
- );
- log(_tag, '[VpnWindowsService] <<:${response.data}');
- if (response.statusCode != 200) {
- log(_tag, 'rpcPost HTTP error: ${response.statusCode}');
- return _RpcResult(false, 'HTTP ${response.statusCode}', null);
- }
- final json = response.data as Map<String, dynamic>; // Dio 自动反序列化JSON的数据
- final int code = json['code'] as int;
- final String msg = json['msg'] as String;
- return _RpcResult(code == 200, msg, json);
- } on DioException catch (e, stack) {
- log(_tag, 'DioException: ${e.message}\nStack: $stack');
- throw RpcException('DioException: ${e.message}');
- } catch (e, stack) {
- log(_tag, 'request error: $e\nStack: $stack');
- throw RpcException('request error: $e');
- }
- }
- Future<void> _check() {
- return _rpcGet('ping');
- }
- Future<void> _resetProcess(int hostPort, bool debug, {int? rpcPort}) async {
- try {
- _killProcess();
- final List<String> args = [];
- args.add('-l');
- args.add('127.0.0.1:$hostPort');
- if (rpcPort != null) {
- args.add('-p');
- args.add(rpcPort.toString());
- }
- if (_logPath != null) {
- args.add('-d');
- args.add(_logPath!);
- }
- log(_tag, '_resetProcess: starting $_rpcFileName with args: $args');
- _process = await Process.start(
- _rpcFileName,
- args,
- runInShell: false,
- mode: ProcessStartMode.detached,
- );
- log(_tag, '_resetProcess: process started successfully.');
- } catch (e, stack) {
- log(_tag, '_resetProcess error: $e\nStack: $stack');
- }
- }
- void _killProcess() {
- log(_tag, 'VpnWindowsService kill process');
- _process?.kill();
- _process = null;
- _setIsOnline(false);
- }
- void dispose() async {
- log(_tag, 'VpnWindowsService dispose start.');
- await _shutdown();
- _ecStatus.close();
- _ecOnline.close();
- }
- Future<void> start(Map<String, dynamic> params) async {
- _connectionParams = null;
- // 通知正在连接
- _setStatus(ConnectionState.connecting, params);
- try {
- final result = await _rpcPost('connect', params);
- if (!result.success) {
- log(_tag, 'start call error: ${result.message}');
- _setStatus(ConnectionState.error, Errors.ERROR_RPC_RETURN_FALSE);
- } else {
- _connectionParams = params;
- log(_tag, 'start call success.');
- }
- } catch (e, stack) {
- log(_tag, 'start call exception: $e\nStack: $stack');
- // 通知连接失败
- _setStatus(ConnectionState.error, Errors.ERROR_RPC_CALL_FAILED);
- }
- }
- Future<void> stop() async {
- _needRecoverConnection = false;
- try {
- final result = await _rpcPost('disconnect', {});
- if (result.success) {
- log(_tag, 'disconnect call success.');
- } else {
- log(_tag, 'disconnect call error: ${result.message}');
- }
- } catch (e, stack) {
- log(_tag, 'disconnect call exception: $e\nStack: $stack');
- }
- }
- Future<String> getLocationName() async {
- return '';
- }
- Future<String> getRemoteAddress() async {
- try {
- final result = await _rpcPost('get_remote_ip', {});
- if (!result.success) {
- log(_tag, 'getRemoteAddress error: ${result.message}');
- return '';
- }
- return result.message;
- } catch (e) {
- log(_tag, 'getRemoteAddress error: $e');
- return '';
- }
- }
- Future<bool> setBypassAddresses(List<String> list) async {
- try {
- if (list.isEmpty) {
- log(_tag, 'setBypassAddresses: empty list');
- return false;
- }
- final result = await _rpcPost('set_pass_ips', {'ips': list.join(',')});
- if (!result.success) {
- log(_tag, 'setBypassAddresses error: ${result.message}');
- return false;
- }
- return true;
- } catch (e) {
- log(_tag, 'setBypassAddresses error: $e');
- return false;
- }
- }
- Future<void> _startup({bool debug = false}) async {
- log(_tag, 'startup called. debug: $debug');
- _vpnDebug = debug;
- if (_isStarted) {
- log(_tag, 'startup abort! _isStarted: $_isStarted');
- return;
- }
- try {
- // 处理VPN 消息
- final handler = webSocketHandler((webSocket, _) {
- webSocket.stream.listen((message) {
- _lastAliveTime = DateTime.now();
- try {
- final vpnMessage = VpnMessage.fromMap(jsonDecode(message));
- switch (vpnMessage.type) {
- case VpnMessageType.login:
- log(_tag, 'login message got');
- // rpc已经上线
- _setIsOnline(true);
- // 保存重连状态
- try {
- if (_status == ConnectionState.connected ||
- _status == ConnectionState.connecting) {
- _needRecoverConnection = true;
- }
- } catch (e, stack) {
- log(_tag, 'login Exception: $e\nStack: $stack');
- }
- break;
- case VpnMessageType.heartbeat:
- log(_tag, 'heartbeat message got');
- // 应答心跳
- webSocket.sink.add(
- VpnMessage.create(VpnMessageType.heartbeat, null).toJson(),
- );
- break;
- case VpnMessageType.stateSync:
- log(_tag, 'stateSync message got');
- try {
- final data = vpnMessage.data;
- if (data == null) {
- return;
- }
- final int state = data['state'] as int;
- final dynamic param = data['param'];
- _setStatus(ConnectionState.values[state], param);
- // 恢复连接
- if (_needRecoverConnection) {
- if (_connectionParams != null) {
- start(_connectionParams!);
- }
- _needRecoverConnection = false;
- }
- } catch (e, stack) {
- log(_tag, 'stateSync Exception: $e\nStack: $stack');
- }
- break;
- default:
- }
- } catch (e, stack) {
- log(_tag, 'webSocketHandler Exception: $e\nStack: $stack');
- }
- });
- });
- shelf_io.serve(handler, '127.0.0.1', 35461).then((server) async {
- _isStarted = true;
- log(_tag, 'startup finished. _isStarted: $_isStarted');
- log(_tag, 'Serving at ws://${server.address.host}:${server.port}');
- // websocket 服务器端
- _rpcHostPort = server.port;
- // 启动vpn客户端进程
- await _resetProcess(_rpcHostPort, rpcPort: _rpcPort, _vpnDebug);
- // 启动守护定时器
- _refreshTimer = Timer.periodic(const Duration(seconds: 1), (_) async {
- final dt = DateTime.now().difference(_lastAliveTime).inSeconds;
- if (dt > _rpcHeartbeatTimeout) {
- // 尝试check一下
- try {
- await _check();
- _lastAliveTime = DateTime.now();
- log(_tag, 'RPC process has been woken up, continuing...');
- } catch (e) {
- log(_tag, 'RPC process has timedout, need reset.');
- _lastAliveTime = DateTime.now();
- await _resetProcess(_rpcHostPort, rpcPort: _rpcPort, _vpnDebug);
- }
- }
- });
- });
- } catch (e, stack) {
- log(_tag, 'startup error: $e\nStack: $stack');
- }
- }
- Future<void> _shutdown() async {
- log(_tag, 'shutdown called.');
- if (!_isStarted) {
- log(_tag, 'shutdown abort! _isStarted: $_isStarted');
- return;
- }
- try {
- // 断开连接
- await stop();
- } catch (e, stack) {
- log(_tag, 'shutdown disconnect error: $e\nStack: $stack');
- }
- try {
- // 关闭守护定时器
- _startupTimer?.cancel();
- } catch (e, stack) {
- log(_tag, 'shutdown _startupTimer cancel error: $e\nStack: $stack');
- }
- try {
- _refreshTimer?.cancel();
- } catch (e, stack) {
- log(_tag, 'shutdown _refreshTimer cancel error: $e\nStack: $stack');
- }
- try {
- // 关闭vpn客户端进程
- _killProcess();
- } catch (e, stack) {
- log(_tag, 'shutdown _killProcess error: $e\nStack: $stack');
- }
- _isStarted = false;
- _needRecoverConnection = false;
- _setStatus(ConnectionState.disconnected, null);
- log(_tag, 'shutdown finished. _isStarted: $_isStarted');
- }
- Future<void> initialize(int timedout, bool debug) async {
- log(_tag, 'vpn service initialize...');
- _logPath = (await logFileDirectory()).path;
- final startTime = DateTime.now();
- bool isTimedout = false;
- // 启动服务
- await _startup(debug: debug);
- // 等待vpn客户端就绪信号
- if (!isOnline) {
- final completer = Completer();
- Timer.periodic(const Duration(milliseconds: 500), (timer) {
- // 客户端就绪
- if (isOnline) {
- log(_tag, 'vpn service start successed.');
- completer.complete();
- timer.cancel();
- }
- // 等待超时
- final dt = DateTime.now().difference(startTime).inSeconds;
- if (dt > timedout) {
- log(_tag, 'vpn service start timedout.');
- completer.complete();
- timer.cancel();
- isTimedout = true;
- }
- });
- await completer.future;
- log(_tag, 'vpn service initialize finished.');
- }
- if (isTimedout) {
- _shutdown();
- throw WaitOnlineTimedOutException();
- }
- }
- bool get isOnline => _isOnline;
- ConnectionState get status => _status;
- Stream<bool> get onOnlineChanged => _ecOnline.stream;
- Stream<(ConnectionState, dynamic)> get onStatusChanged => _ecStatus.stream;
- }
|