# -*- coding: utf-8 -*-

# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code

import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById, ArrayCacheBySymbolBySide, ArrayCacheByTimestamp
import hashlib
from ccxt.base.types import Any, Balances, Int, Market, Order, OrderBook, Position, Str, Strings, Ticker, Tickers, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import ArgumentsRequired
from ccxt.base.errors import NotSupported


class blofin(ccxt.async_support.blofin):

    def describe(self) -> Any:
        return self.deep_extend(super(blofin, self).describe(), {
            'has': {
                'ws': True,
                'watchTrades': True,
                'watchTradesForSymbols': True,
                'watchOrderBook': True,
                'watchOrderBookForSymbols': True,
                'watchTicker': True,
                'watchTickers': True,
                'watchBidsAsks': True,
                'watchOHLCV': True,
                'watchOHLCVForSymbols': True,
                'watchOrders': True,
                'watchOrdersForSymbols': True,
                'watchPositions': True,
            },
            'urls': {
                'api': {
                    'ws': {
                        'swap': {
                            'public': 'wss://openapi.blofin.com/ws/public',
                            'private': 'wss://openapi.blofin.com/ws/private',
                        },
                    },
                },
                'test': {
                    'ws': {
                        'swap': {
                            'public': 'wss://demo-trading-openapi.blofin.com/ws/public',
                            'private': 'wss://demo-trading-openapi.blofin.com/ws/private',
                        },
                    },
                },
            },
            'options': {
                'defaultType': 'swap',
                'tradesLimit': 1000,
                # orderbook channel can be one from:
                #  - "books": 200 depth levels will be pushed in the initial full snapshot. Incremental data will be pushed every 100 ms for the changes in the order book during that period of time.
                #  - "books5": 5 depth levels snapshot will be pushed every time. Snapshot data will be pushed every 100 ms when there are changes in the 5 depth levels snapshot.
                'watchOrderBook': {
                    'channel': 'books',
                },
                'watchOrderBookForSymbols': {
                    'channel': 'books',
                },
            },
            'streaming': {
                'ping': self.ping,
                'keepAlive': 25000,  # 30 seconds max
            },
        })

    def ping(self, client):
        return 'ping'

    def handle_pong(self, client: Client, message):
        #
        #   'pong'
        #
        client.lastPong = self.milliseconds()

    async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol

        https://docs.blofin.com/index.html#ws-trades-channel

        :param str symbol: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        params['callerMethodName'] = 'watchTrades'
        return await self.watch_trades_for_symbols([symbol], since, limit, params)

    async def watch_trades_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a list of symbols

        https://docs.blofin.com/index.html#ws-trades-channel

        :param str[] symbols: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        await self.load_markets()
        trades = await self.watch_multiple_wrapper(True, 'trades', 'watchTradesForSymbols', symbols, params)
        if self.newUpdates:
            firstMarket = self.safe_dict(trades, 0)
            firstSymbol = self.safe_string(firstMarket, 'symbol')
            limit = trades.getLimit(firstSymbol, limit)
        return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)

    def handle_trades(self, client: Client, message):
        #
        #     {
        #       arg: {
        #         channel: "trades",
        #         instId: "DOGE-USDT",
        #       },
        #       data : [
        #         <same object in REST example>,
        #         ...
        #       ]
        #     }
        #
        arg = self.safe_dict(message, 'arg')
        channelName = self.safe_string(arg, 'channel')
        data = self.safe_list(message, 'data')
        if data is None:
            return
        for i in range(0, len(data)):
            rawTrade = data[i]
            trade = self.parse_ws_trade(rawTrade)
            symbol = trade['symbol']
            stored = self.safe_value(self.trades, symbol)
            if stored is None:
                limit = self.safe_integer(self.options, 'tradesLimit', 1000)
                stored = ArrayCache(limit)
                self.trades[symbol] = stored
            stored.append(trade)
            messageHash = channelName + ':' + symbol
            client.resolve(stored, messageHash)

    def parse_ws_trade(self, trade, market: Market = None) -> Trade:
        return self.parse_trade(trade, market)

    async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://docs.blofin.com/index.html#ws-order-book-channel

        :param str symbol: unified symbol of the market to fetch the order book for
        :param int [limit]: the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        params['callerMethodName'] = 'watchOrderBook'
        return await self.watch_order_book_for_symbols([symbol], limit, params)

    async def watch_order_book_for_symbols(self, symbols: List[str], limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://docs.blofin.com/index.html#ws-order-book-channel

        :param str[] symbols: unified array of symbols
        :param int [limit]: the maximum amount of order book entries to return
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.depth]: the type of order book to subscribe to, default is 'depth/increase100', also accepts 'depth5' or 'depth20' or depth50
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        await self.load_markets()
        callerMethodName = None
        callerMethodName, params = self.handle_param_string(params, 'callerMethodName', 'watchOrderBookForSymbols')
        channelName = None
        channelName, params = self.handle_option_and_params(params, callerMethodName, 'channel', 'books')
        # due to some problem, temporarily disable other channels
        if channelName != 'books':
            raise NotSupported(self.id + ' ' + callerMethodName + '() at self moment ' + channelName + ' is not supported, coming soon')
        orderbook = await self.watch_multiple_wrapper(True, channelName, callerMethodName, symbols, params)
        return orderbook.limit()

    def handle_order_book(self, client: Client, message):
        #
        #   {
        #     arg: {
        #         channel: "books",
        #         instId: "DOGE-USDT",
        #     },
        #     action: "snapshot",  # can be 'snapshot' or 'update'
        #     data: {
        #         asks: [  [0.08096, 1], [0.08097, 123], ...   ],
        #         bids: [  [0.08095, 4], [0.08094, 237], ...   ],
        #         ts: "1707491587909",
        #         prevSeqId: "0",  # in case of 'update' there will be some value, less then seqId
        #         seqId: "3374250786",
        #     },
        # }
        #
        arg = self.safe_dict(message, 'arg')
        channelName = self.safe_string(arg, 'channel')
        data = self.safe_dict(message, 'data')
        marketId = self.safe_string(arg, 'instId')
        market = self.safe_market(marketId)
        symbol = market['symbol']
        messageHash = channelName + ':' + symbol
        if not (symbol in self.orderbooks):
            self.orderbooks[symbol] = self.order_book()
        orderbook = self.orderbooks[symbol]
        timestamp = self.safe_integer(data, 'ts')
        action = self.safe_string(message, 'action')
        if action == 'snapshot':
            orderBookSnapshot = self.parse_order_book(data, symbol, timestamp)
            orderBookSnapshot['nonce'] = self.safe_integer(data, 'seqId')
            orderbook.reset(orderBookSnapshot)
        else:
            asks = self.safe_list(data, 'asks', [])
            bids = self.safe_list(data, 'bids', [])
            self.handle_deltas_with_keys(orderbook['asks'], asks)
            self.handle_deltas_with_keys(orderbook['bids'], bids)
            orderbook['timestamp'] = timestamp
            orderbook['datetime'] = self.iso8601(timestamp)
        self.orderbooks[symbol] = orderbook
        client.resolve(orderbook, messageHash)

    async def watch_ticker(self, symbol: str, params={}) -> Ticker:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market

        https://docs.blofin.com/index.html#ws-tickers-channel

        :param str symbol: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        params['callerMethodName'] = 'watchTicker'
        market = self.market(symbol)
        symbol = market['symbol']
        result = await self.watch_tickers([symbol], params)
        return result[symbol]

    async def watch_tickers(self, symbols: Strings = None, params={}) -> Tickers:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for all markets of a specific list

        https://docs.blofin.com/index.html#ws-tickers-channel

        :param str[] symbols: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        if symbols is None:
            raise NotSupported(self.id + ' watchTickers() requires a list of symbols')
        ticker = await self.watch_multiple_wrapper(True, 'tickers', 'watchTickers', symbols, params)
        if self.newUpdates:
            tickers = {}
            tickers[ticker['symbol']] = ticker
            return tickers
        return self.filter_by_array(self.tickers, 'symbol', symbols)

    def handle_ticker(self, client: Client, message):
        #
        # message
        #
        #     {
        #         arg: {
        #             channel: "tickers",
        #             instId: "DOGE-USDT",
        #         },
        #         data: [
        #             <same object in REST example>
        #         ],
        #     }
        #
        self.handle_bid_ask(client, message)
        arg = self.safe_dict(message, 'arg')
        channelName = self.safe_string(arg, 'channel')
        data = self.safe_list(message, 'data')
        for i in range(0, len(data)):
            ticker = self.parse_ws_ticker(data[i])
            symbol = ticker['symbol']
            messageHash = channelName + ':' + symbol
            self.tickers[symbol] = ticker
            client.resolve(self.tickers[symbol], messageHash)

    def parse_ws_ticker(self, ticker, market: Market = None) -> Ticker:
        return self.parse_ticker(ticker, market)

    async def watch_bids_asks(self, symbols: Strings = None, params={}) -> Tickers:
        """
        watches best bid & ask for symbols

        https://docs.blofin.com/index.html#ws-tickers-channel

        :param str[] symbols: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        symbols = self.market_symbols(symbols, None, False)
        firstMarket = self.market(symbols[0])
        channel = 'tickers'
        marketType = None
        marketType, params = self.handle_market_type_and_params('watchBidsAsks', firstMarket, params)
        url = self.implode_hostname(self.urls['api']['ws'][marketType]['public'])
        messageHashes = []
        args = []
        for i in range(0, len(symbols)):
            market = self.market(symbols[i])
            messageHashes.append('bidask:' + market['symbol'])
            args.append({
                'channel': channel,
                'instId': market['id'],
            })
        request = self.get_subscription_request(args)
        ticker = await self.watch_multiple(url, messageHashes, self.deep_extend(request, params), messageHashes)
        if self.newUpdates:
            tickers = {}
            tickers[ticker['symbol']] = ticker
            return tickers
        return self.filter_by_array(self.bidsasks, 'symbol', symbols)

    def handle_bid_ask(self, client: Client, message):
        data = self.safe_list(message, 'data')
        for i in range(0, len(data)):
            ticker = self.parse_ws_bid_ask(data[i])
            symbol = ticker['symbol']
            messageHash = 'bidask:' + symbol
            self.bidsasks[symbol] = ticker
            client.resolve(ticker, messageHash)

    def parse_ws_bid_ask(self, ticker, market=None):
        marketId = self.safe_string(ticker, 'instId')
        market = self.safe_market(marketId, market, '-')
        symbol = self.safe_string(market, 'symbol')
        timestamp = self.safe_integer(ticker, 'ts')
        return self.safe_ticker({
            'symbol': symbol,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'ask': self.safe_string(ticker, 'askPrice'),
            'askVolume': self.safe_string(ticker, 'askSize'),
            'bid': self.safe_string(ticker, 'bidPrice'),
            'bidVolume': self.safe_string(ticker, 'bidSize'),
            'info': ticker,
        }, market)

    async def watch_ohlcv(self, symbol: str, timeframe='1m', since: Int = None, limit: Int = None, params={}) -> List[list]:
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market
        :param str symbol: unified symbol of the market to fetch OHLCV data for
        :param str timeframe: the length of time each candle represents
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        params['callerMethodName'] = 'watchOHLCV'
        result = await self.watch_ohlcv_for_symbols([[symbol, timeframe]], since, limit, params)
        return result[symbol][timeframe]

    async def watch_ohlcv_for_symbols(self, symbolsAndTimeframes: List[List[str]], since: Int = None, limit: Int = None, params={}):
        """
        watches historical candlestick data containing the open, high, low, and close price, and the volume of a market

        https://docs.blofin.com/index.html#ws-candlesticks-channel

        :param str[][] symbolsAndTimeframes: array of arrays containing unified symbols and timeframes to fetch OHLCV data for, example [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]
        :param int [since]: timestamp in ms of the earliest candle to fetch
        :param int [limit]: the maximum amount of candles to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns int[][]: A list of candles ordered, open, high, low, close, volume
        """
        symbolsLength = len(symbolsAndTimeframes)
        if symbolsLength == 0 or not isinstance(symbolsAndTimeframes[0], list):
            raise ArgumentsRequired(self.id + " watchOHLCVForSymbols() requires a an array of symbols and timeframes, like  [['BTC/USDT', '1m'], ['LTC/USDT', '5m']]")
        await self.load_markets()
        symbol, timeframe, candles = await self.watch_multiple_wrapper(True, 'candle', 'watchOHLCVForSymbols', symbolsAndTimeframes, params)
        if self.newUpdates:
            limit = candles.getLimit(symbol, limit)
        filtered = self.filter_by_since_limit(candles, since, limit, 0, True)
        return self.create_ohlcv_object(symbol, timeframe, filtered)

    def handle_ohlcv(self, client: Client, message):
        #
        # message
        #
        #     {
        #         arg: {
        #             channel: "candle1m",
        #             instId: "DOGE-USDT",
        #         },
        #         data: [
        #             [same object in REST example]
        #         ],
        #     }
        #
        arg = self.safe_dict(message, 'arg')
        channelName = self.safe_string(arg, 'channel')
        data = self.safe_list(message, 'data')
        marketId = self.safe_string(arg, 'instId')
        market = self.safe_market(marketId)
        symbol = market['symbol']
        interval = channelName.replace('candle', '')
        unifiedTimeframe = self.find_timeframe(interval)
        self.ohlcvs[symbol] = self.safe_dict(self.ohlcvs, symbol, {})
        stored = self.safe_value(self.ohlcvs[symbol], unifiedTimeframe)
        if stored is None:
            limit = self.safe_integer(self.options, 'OHLCVLimit', 1000)
            stored = ArrayCacheByTimestamp(limit)
            self.ohlcvs[symbol][unifiedTimeframe] = stored
        for i in range(0, len(data)):
            candle = data[i]
            parsed = self.parse_ohlcv(candle, market)
            stored.append(parsed)
        resolveData = [symbol, unifiedTimeframe, stored]
        messageHash = 'candle' + interval + ':' + symbol
        client.resolve(resolveData, messageHash)

    async def watch_balance(self, params={}) -> Balances:
        """
        query for balance and get the amount of funds available for trading or funds locked in orders

        https://docs.blofin.com/index.html#ws-account-channel

        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `balance structure <https://docs.ccxt.com/#/?id=balance-structure>`
        """
        await self.load_markets()
        await self.authenticate()
        marketType = None
        marketType, params = self.handle_market_type_and_params('watchBalance', None, params)
        if marketType == 'spot':
            raise NotSupported(self.id + ' watchBalance() is not supported for spot markets yet')
        messageHash = marketType + ':balance'
        sub = {
            'channel': 'account',
        }
        request = self.get_subscription_request([sub])
        url = self.implode_hostname(self.urls['api']['ws'][marketType]['private'])
        return await self.watch(url, messageHash, self.deep_extend(request, params), messageHash)

    def handle_balance(self, client: Client, message):
        #
        #     {
        #         arg: {
        #           channel: "account",
        #         },
        #         data: <same object in REST example>,
        #     }
        #
        marketType = 'swap'  # for now
        if not (marketType in self.balance):
            self.balance[marketType] = {}
        self.balance[marketType] = self.parse_ws_balance(message)
        messageHash = marketType + ':balance'
        client.resolve(self.balance[marketType], messageHash)

    def parse_ws_balance(self, message):
        return self.parse_balance(message)

    async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
        """
        watches information on multiple orders made by the user
        :param str symbol: unified market symbol of the market orders were made in
        :param int [since]: the earliest time in ms to fetch orders for
        :param int [limit]: the maximum number of order structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of [order structures]{@link https://docs.ccxt.com/#/?id=order-structure
        """
        params['callerMethodName'] = 'watchOrders'
        symbolsArray = [symbol] if (symbol is not None) else []
        return await self.watch_orders_for_symbols(symbolsArray, since, limit, params)

    async def watch_orders_for_symbols(self, symbols: List[str], since: Int = None, limit: Int = None, params={}) -> List[Order]:
        """
        watches information on multiple orders made by the user across multiple symbols

        https://docs.blofin.com/index.html#ws-order-channel

        :param str[] symbols:
        :param int [since]: the earliest time in ms to fetch orders for
        :param int [limit]: the maximum number of order structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of [order structures]{@link https://docs.ccxt.com/#/?id=order-structure
        """
        await self.authenticate()
        await self.load_markets()
        orders = await self.watch_multiple_wrapper(False, 'orders', 'watchOrdersForSymbols', symbols, params)
        if self.newUpdates:
            first = self.safe_value(orders, 0)
            tradeSymbol = self.safe_string(first, 'symbol')
            limit = orders.getLimit(tradeSymbol, limit)
        return self.filter_by_since_limit(orders, since, limit, 'timestamp', True)

    def handle_orders(self, client: Client, message):
        #
        #     {
        #         action: 'update',
        #         arg: {channel: 'orders'},
        #         data: [
        #           <same object in REST example>
        #         ]
        #     }
        #
        if self.orders is None:
            limit = self.safe_integer(self.options, 'ordersLimit', 1000)
            self.orders = ArrayCacheBySymbolById(limit)
        orders = self.orders
        arg = self.safe_dict(message, 'arg')
        channelName = self.safe_string(arg, 'channel')
        data = self.safe_list(message, 'data')
        for i in range(0, len(data)):
            order = self.parse_ws_order(data[i])
            symbol = order['symbol']
            messageHash = channelName + ':' + symbol
            orders.append(order)
            client.resolve(orders, messageHash)
            client.resolve(orders, channelName)

    def parse_ws_order(self, order, market: Market = None) -> Order:
        return self.parse_order(order, market)

    async def watch_positions(self, symbols: Strings = None, since: Int = None, limit: Int = None, params={}) -> List[Position]:
        """

        https://docs.blofin.com/index.html#ws-positions-channel

        watch all open positions
        :param str[]|None symbols: list of unified market symbols
        :param int [since]: the earliest time in ms to fetch positions for
        :param int [limit]: the maximum number of positions to retrieve
        :param dict params: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `position structure <https://docs.ccxt.com/en/latest/manual.html#position-structure>`
        """
        await self.authenticate()
        await self.load_markets()
        newPositions = await self.watch_multiple_wrapper(False, 'positions', 'watchPositions', symbols, params)
        if self.newUpdates:
            return newPositions
        return self.filter_by_symbols_since_limit(self.positions, symbols, since, limit)

    def handle_positions(self, client: Client, message):
        #
        #     {
        #         arg: {channel: 'positions'},
        #         data: [
        #           <same object in REST example>
        #         ]
        #     }
        #
        if self.positions is None:
            self.positions = ArrayCacheBySymbolBySide()
        cache = self.positions
        arg = self.safe_dict(message, 'arg')
        channelName = self.safe_string(arg, 'channel')
        data = self.safe_list(message, 'data')
        newPositions = []
        for i in range(0, len(data)):
            position = self.parse_ws_position(data[i])
            newPositions.append(position)
            cache.append(position)
            messageHash = channelName + ':' + position['symbol']
            client.resolve(position, messageHash)

    def parse_ws_position(self, position, market: Market = None) -> Position:
        return self.parse_position(position, market)

    async def watch_multiple_wrapper(self, isPublic: bool, channelName: str, callerMethodName: str, symbolsArray: List[Any] = None, params={}):
        # underlier method for all watch-multiple symbols
        await self.load_markets()
        callerMethodName, params = self.handle_param_string(params, 'callerMethodName', callerMethodName)
        # if OHLCV method are being called, then symbols would be symbolsAndTimeframes(multi-dimensional) array
        isOHLCV = (channelName == 'candle')
        symbols = self.get_list_from_object_values(symbolsArray, 0) if isOHLCV else symbolsArray
        symbols = self.market_symbols(symbols, None, True, True)
        firstMarket = None
        firstSymbol = self.safe_string(symbols, 0)
        if firstSymbol is not None:
            firstMarket = self.market(firstSymbol)
        marketType = None
        marketType, params = self.handle_market_type_and_params(callerMethodName, firstMarket, params)
        if marketType != 'swap':
            raise NotSupported(self.id + ' ' + callerMethodName + '() does not support ' + marketType + ' markets yet')
        rawSubscriptions = []
        messageHashes = []
        if symbols is None:
            symbols = []
        symbolsLength = len(symbols)
        if symbolsLength > 0:
            for i in range(0, len(symbols)):
                current = symbols[i]
                market = None
                channel = channelName
                if isOHLCV:
                    market = self.market(current)
                    tfArray = symbolsArray[i]
                    tf = tfArray[1]
                    interval = self.safe_string(self.timeframes, tf, tf)
                    channel += interval
                else:
                    market = self.market(current)
                topic = {
                    'channel': channel,
                    'instId': market['id'],
                }
                rawSubscriptions.append(topic)
                messageHashes.append(channel + ':' + market['symbol'])
        else:
            rawSubscriptions.append({'channel': channelName})
            messageHashes.append(channelName)
        # private channel are difference, they only need plural channel name for multiple symbols
        if self.in_array(channelName, ['orders', 'positions']):
            rawSubscriptions = [{'channel': channelName}]
        request = self.get_subscription_request(rawSubscriptions)
        privateOrPublic = 'public' if isPublic else 'private'
        url = self.implode_hostname(self.urls['api']['ws'][marketType][privateOrPublic])
        return await self.watch_multiple(url, messageHashes, self.deep_extend(request, params), messageHashes)

    def get_subscription_request(self, args):
        return {
            'op': 'subscribe',
            'args': args,
        }

    def handle_message(self, client: Client, message):
        #
        # message examples
        #
        # {
        #   arg: {
        #     channel: "trades",
        #     instId: "DOGE-USDT",
        #   },
        #   event: "subscribe"
        # }
        #
        # incoming data updates' examples can be seen under each handler method
        #
        methods = {
            # public
            'pong': self.handle_pong,
            'trades': self.handle_trades,
            'books': self.handle_order_book,
            'tickers': self.handle_ticker,
            'candle': self.handle_ohlcv,  # candle1m, candle5m, etc
            # private
            'account': self.handle_balance,
            'orders': self.handle_orders,
            'positions': self.handle_positions,
        }
        method = None
        if message == 'pong':
            method = self.safe_value(methods, 'pong')
        else:
            event = self.safe_string(message, 'event')
            if event == 'subscribe':
                return
            elif event == 'login':
                future = self.safe_value(client.futures, 'authenticate_hash')
                future.resolve(True)
                return
            elif event == 'error':
                raise ExchangeError(self.id + ' error: ' + self.json(message))
            arg = self.safe_dict(message, 'arg')
            channelName = self.safe_string(arg, 'channel')
            method = self.safe_value(methods, channelName)
            if not method and channelName.find('candle') >= 0:
                method = methods['candle']
        if method:
            method(client, message)

    async def authenticate(self, params={}):
        self.check_required_credentials()
        milliseconds = self.milliseconds()
        messageHash = 'authenticate_hash'
        timestamp = str(milliseconds)
        nonce = 'n_' + timestamp
        auth = '/users/self/verify' + 'GET' + timestamp + '' + nonce
        signature = self.string_to_base64(self.hmac(self.encode(auth), self.encode(self.secret), hashlib.sha256))
        request = {
            'op': 'login',
            'args': [
                {
                    'apiKey': self.apiKey,
                    'passphrase': self.password,
                    'timestamp': timestamp,
                    'nonce': nonce,
                    'sign': signature,
                },
            ],
        }
        marketType = 'swap'  # for now
        url = self.implode_hostname(self.urls['api']['ws'][marketType]['private'])
        await self.watch(url, messageHash, self.deep_extend(request, params), messageHash)
