# -*- coding: utf-8 -*-

# PLEASE DO NOT EDIT THIS FILE, IT IS GENERATED AND WILL BE OVERWRITTEN:
# https://github.com/ccxt/ccxt/blob/master/CONTRIBUTING.md#how-to-contribute-code

import ccxt.async_support
from ccxt.async_support.base.ws.cache import ArrayCache, ArrayCacheBySymbolById
from ccxt.base.types import Any, Balances, Int, Order, OrderBook, Str, Ticker, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import AuthenticationError
from ccxt.base.errors import BadRequest
from ccxt.base.errors import ChecksumError


class poloniexfutures(ccxt.async_support.poloniexfutures):

    def describe(self) -> Any:
        return self.deep_extend(super(poloniexfutures, self).describe(), {
            'has': {
                'ws': True,
                'cancelAllOrdersWs': False,
                'cancelOrdersWs': False,
                'cancelOrderWs': False,
                'createOrderWs': False,
                'editOrderWs': False,
                'fetchBalanceWs': False,
                'fetchOpenOrdersWs': False,
                'fetchOrderWs': False,
                'fetchTradesWs': False,
                'watchOHLCV': False,
                'watchOrderBook': True,
                'watchTicker': True,
                'watchTickers': False,
                'watchTrades': True,
                'watchTradesForSymbols': False,
                'watchBalance': True,
                'watchOrders': True,
                'watchMyTrades': False,
                'watchPosition': None,
                'watchPositions': False,
            },
            'urls': {
                'api': {
                    'ws': 'wss://futures-apiws.poloniex.com/endpoint',
                },
            },
            'options': {
                'tradesLimit': 1000,
                'ordersLimit': 1000,
                'watchTicker': {
                    'method': '/contractMarket/ticker',  # can also be /contractMarket/snapshot
                },
                'watchOrders': {
                    'method': '/contractMarket/tradeOrders',  # can also be /contractMarket/advancedOrders
                },
                'watchOrderBook': {
                    'method': '/contractMarket/level2',  # can also be '/contractMarket/level3v2'
                    'snapshotDelay': 5,
                    'snapshotMaxRetries': 3,
                    'checksum': True,
                },
                'streamLimit': 5,  # called tunnels by poloniexfutures docs
                'streamBySubscriptionsHash': {},
                'streamIndex': -1,
            },
            'streaming': {
                'keepAlive': 30000,
                'maxPingPongMisses': 2.0,
            },
        })

    async def negotiate(self, privateChannel, params={}):
        connectId = 'private' if privateChannel else 'public'
        urls = self.safe_value(self.options, 'urls', {})
        if connectId in urls:
            # return urls[connectId]
            storedFuture = urls[connectId]
            return await storedFuture
        # we store an awaitable to the url
        # so that multiple calls don't asynchronously
        # fetch different urls and overwrite each other
        urls[connectId] = self.spawn(self.negotiate_helper, privateChannel, params)
        self.options['urls'] = urls
        future = urls[connectId]
        return await future

    async def negotiate_helper(self, privateChannel, params={}):
        response = None
        connectId = 'private' if privateChannel else 'public'
        try:
            if privateChannel:
                response = await self.privatePostBulletPrivate(params)
                #
                #     {
                #         "code": "200000",
                #         "data": {
                #             "instanceServers": [
                #                 {
                #                     "pingInterval":  50000,
                #                     "endpoint": "wss://push-private.kucoin.com/endpoint",
                #                     "protocol": "websocket",
                #                     "encrypt": True,
                #                     "pingTimeout": 10000
                #                 }
                #             ],
                #             "token": "2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJ1UQy47YbpY4zVdzilNP-Bj3iXzrjjGlWtiYB9J6i9GjsxUuhPw3BlrzazF6ghq4Lzf7scStOz3KkxjwpsOBCH4=.WNQmhZQeUKIkh97KYgU0Lg=="
                #         }
                #     }
                #
            else:
                response = await self.publicPostBulletPublic(params)
            data = self.safe_value(response, 'data', {})
            instanceServers = self.safe_value(data, 'instanceServers', [])
            firstInstanceServer = self.safe_value(instanceServers, 0)
            pingInterval = self.safe_integer(firstInstanceServer, 'pingInterval')
            endpoint = self.safe_string(firstInstanceServer, 'endpoint')
            token = self.safe_string(data, 'token')
            result = endpoint + '?' + self.urlencode({
                'token': token,
                'privateChannel': privateChannel,
                'connectId': connectId,
            })
            client = self.client(result)
            client.keepAlive = pingInterval
            return result
        except Exception as e:
            future = self.safe_value(self.options['urls'], connectId)
            future.reject(e)
            del self.options['urls'][connectId]
        return None

    def request_id(self):
        requestId = self.sum(self.safe_integer(self.options, 'requestId', 0), 1)
        self.options['requestId'] = requestId
        return requestId

    async def subscribe(self, name: str, isPrivate: bool, symbol: Str = None, subscription=None, params={}):
        """
 @ignore
        Connects to a websocket channel
        :param str name: name of the channel and suscriptionHash
        :param bool isPrivate: True for the authenticated url, False for the public url
        :param str symbol: is required for all public channels, not required for private channels(except position)
        :param dict subscription: subscription parameters
        :param dict [params]: extra parameters specific to the poloniex api
        :returns dict: data from the websocket stream
        """
        url = await self.negotiate(isPrivate)
        if symbol is not None:
            market = self.market(symbol)
            marketId = market['id']
            name += ':' + marketId
        messageHash = name
        tunnelId = await self.stream(url, messageHash)
        requestId = self.request_id()
        subscribe: dict = {
            'id': requestId,
            'type': 'subscribe',
            'topic': name,                 # Subscribed topic. Some topics support subscribe to the data of multiple trading pairs through ",".
            'privateChannel': isPrivate,   # Adopt the private channel or not. Set by default.
            'response': True,              # Whether the server needs to return the receipt information of self subscription or not. Set by default.
            'tunnelId': tunnelId,
        }
        subscriptionRequest: dict = {
            'id': requestId,
        }
        if subscription is None:
            subscription = subscriptionRequest
        else:
            subscription = self.extend(subscriptionRequest, subscription)
        request = self.extend(subscribe, params)
        return await self.watch(url, messageHash, request, name, subscriptionRequest)

    def on_close(self, client, error):
        self.options['streamBySubscriptionsHash'] = {}
        super(poloniexfutures, self).on_close(client, error)

    async def stream(self, url, subscriptionHash):
        streamBySubscriptionsHash = self.safe_value(self.options, 'streamBySubscriptionsHash', {})
        stream = self.safe_string(streamBySubscriptionsHash, subscriptionHash)
        if stream is None:
            streamIndex = self.safe_integer(self.options, 'streamIndex', -1)
            streamLimit = self.safe_value(self.options, 'streamLimit')
            streamIndex = streamIndex + 1
            normalizedIndex = streamIndex % streamLimit
            self.options['streamIndex'] = streamIndex
            streamIndexString = self.number_to_string(normalizedIndex)
            stream = 'stream-' + streamIndexString
            self.options['streamBySubscriptionsHash'][subscriptionHash] = stream
            messageHash = 'tunnel:' + stream
            request: dict = {
                'id': messageHash,
                'type': 'openTunnel',
                'newTunnelId': stream,
                'response': True,
            }
            subscription: dict = {
                'id': messageHash,
                'method': self.handle_new_stream,
            }
            await self.watch(url, messageHash, request, messageHash, subscription)
        return stream

    def handle_order_book_subscription(self, client: Client, message, subscription):
        symbol = self.safe_string(subscription, 'symbol')
        limit = self.safe_integer(subscription, 'limit')
        self.orderbooks[symbol] = self.order_book({}, limit)

    def handle_subscription_status(self, client: Client, message):
        #
        #     {
        #         "id": "1578090438322",
        #         "type": "ack"
        #     }
        #
        id = self.safe_string(message, 'id')
        subscriptionsById = self.index_by(client.subscriptions, 'id')
        subscription = self.safe_value(subscriptionsById, id, {})
        method = self.safe_value(subscription, 'method')
        if method is not None:
            method(client, message, subscription)
        return message

    def handle_new_stream(self, client: Client, message, subscription):
        #
        #    {
        #        "id": "1545910840805",
        #        "type": "ack"
        #    }
        #
        messageHash = self.safe_string(message, 'id')
        client.resolve(message, messageHash)

    async def watch_ticker(self, symbol: str, params={}) -> Ticker:
        """
        watches a price ticker, a statistical calculation with the information calculated over the past 24 hours for a specific market

        https://api-docs.poloniex.com/futures/websocket/public#get-real-time-symbol-ticker

        :param str symbol: unified symbol of the market to fetch the ticker for
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `ticker structure <https://docs.ccxt.com/#/?id=ticker-structure>`
        """
        await self.load_markets()
        symbol = self.symbol(symbol)
        name = '/contractMarket/ticker'
        return await self.subscribe(name, False, symbol, None, params)

    async def watch_trades(self, symbol: str, since: Int = None, limit: Int = None, params={}) -> List[Trade]:
        """
        get the list of most recent trades for a particular symbol

        https://api-docs.poloniex.com/futures/websocket/public#full-matching-engine-datalevel-3

        :param str symbol: unified symbol of the market to fetch trades for
        :param int [since]: timestamp in ms of the earliest trade to fetch
        :param int [limit]: the maximum amount of trades to fetch
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict[]: a list of `trade structures <https://docs.ccxt.com/#/?id=public-trades>`
        """
        await self.load_markets()
        options = self.safe_value(self.options, 'watchTrades')
        name = self.safe_string(options, 'method', '/contractMarket/execution')  # can also be /contractMarket/snapshot
        name, params = self.handle_option_and_params(params, 'method', 'name', name)
        symbol = self.symbol(symbol)
        trades = await self.subscribe(name, False, symbol, None, params)
        if self.newUpdates:
            limit = trades.getLimit(symbol, limit)
        return self.filter_by_since_limit(trades, since, limit, 'timestamp', True)

    async def watch_order_book(self, symbol: str, limit: Int = None, params={}) -> OrderBook:
        """
        watches information on open orders with bid(buy) and ask(sell) prices, volumes and other data

        https://api-docs.poloniex.com/futures/websocket/public#level-2-market-data

        :param str symbol: unified symbol of the market to fetch the order book for
        :param int [limit]: not used by poloniexfutures watchOrderBook
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.method]: the method to use. Defaults to /contractMarket/level2 can also be /contractMarket/level3v2 to receive the raw stream of orders
        :returns dict: A dictionary of `order book structures <https://docs.ccxt.com/#/?id=order-book-structure>` indexed by market symbols
        """
        await self.load_markets()
        options = self.safe_value(self.options, 'watchOrderBook')
        name = self.safe_string(options, 'method', '/contractMarket/level2')  # can also be /contractMarket/level2, /contractMarket/level2Depth5:{symbol}, /contractMarket/level2Depth50:{symbol}
        name, params = self.handle_option_and_params(params, 'method', 'name', name)
        if name == '/contractMarket/level2' and limit is not None:
            if limit != 5 and limit != 50:
                raise BadRequest(self.id + ' watchOrderBook limit argument must be none, 5 or 50 if using method /contractMarket/level2')
            name += 'Depth' + self.number_to_string(limit)
        subscription: dict = {
            'symbol': symbol,
            'limit': limit,
            'method': self.handle_order_book_subscription,
        }
        orderbook = await self.subscribe(name, False, symbol, subscription, params)
        return orderbook.limit()

    async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
        """
        watches information on multiple orders made by the user

        https://api-docs.poloniex.com/futures/websocket/user-messages#private-messages

        :param str symbol: filter by unified market symbol of the market orders were made in
        :param int [since]: the earliest time in ms to fetch orders for
        :param int [limit]: the maximum number of order structures to retrieve
        :param dict [params]: extra parameters specific to the exchange API endpoint
        :param str [params.method]: the method to use will default to /contractMarket/tradeOrders. Set to /contractMarket/advancedOrders to watch stop orders
        :returns dict[]: a list of `order structures <https://docs.ccxt.com/#/?id=order-structure>`
        """
        await self.load_markets()
        options = self.safe_value(self.options, 'watchOrders')
        name = self.safe_string(options, 'method', '/contractMarket/tradeOrders')
        orders = await self.subscribe(name, True, None, None, params)
        if self.newUpdates:
            limit = orders.getLimit(symbol, limit)
        orders = self.filter_by_symbol_since_limit(orders, symbol, since, limit)
        length = len(orders)
        if length == 0:
            return await self.watch_orders(symbol, since, limit, params)
        return orders

    async def watch_balance(self, params={}) -> Balances:
        """
        watch balance and get the amount of funds available for trading or funds locked in orders

        https://api-docs.poloniex.com/futures/websocket/user-messages#account-balance-events

        :param dict [params]: extra parameters specific to the exchange API endpoint
        :returns dict: a `balance structure <https://docs.ccxt.com/#/?id=balance-structure>`
        """
        await self.load_markets()
        name = '/contractAccount/wallet'
        return await self.subscribe(name, True, None, None, params)

    def handle_trade(self, client: Client, message):
        #
        #    {
        #        "data": {
        #            "makerUserId": "1410336",
        #            "symbol": "BTCUSDTPERP",
        #            "sequence": 267913,
        #            "side": "buy",
        #            "size": 2,
        #            "price": 28409.5,
        #            "takerOrderId": "6426f9f15782c8000776995f",
        #            "makerOrderId": "6426f9f141406b0008df976e",
        #            "takerUserId": "1410880",
        #            "tradeId": "6426f9f1de029f0001e334dd",
        #            "ts": 1680275953739092500,
        #        },
        #        "subject": "match",
        #        "topic": "/contractMarket/execution:BTCUSDTPERP",
        #        "type": "message",
        #    }
        #
        data = self.safe_value(message, 'data', {})
        marketId = self.safe_string(data, 'symbol')
        if marketId is not None:
            trade = self.parse_ws_trade(data)
            symbol = trade['symbol']
            messageHash = '/contractMarket/execution:' + marketId
            stored = self.safe_value(self.trades, symbol)
            if stored is None:
                tradesLimit = self.safe_integer(self.options, 'tradesLimit', 1000)
                stored = ArrayCache(tradesLimit)
                self.trades[symbol] = stored
            stored.append(trade)
            client.resolve(stored, messageHash)
        return message

    def parse_ws_trade(self, trade, market=None):
        #
        # handleTrade
        #
        #    {
        #        "makerUserId": "1410880",
        #        "symbol": "BTCUSDTPERP",
        #        "sequence": 731390,
        #        "side": "sell",
        #        "size": 2,
        #        "price": 29372.4,
        #        "takerOrderId": "644ef0fdd64748000759218a",
        #        "makerOrderId": "644ef0fd25f4a50007f12fc5",
        #        "takerUserId": "1410880",
        #        "tradeId": "644ef0fdde029f0001eec346",
        #        "ts": 1682895101923194000
        #    }
        #
        marketId = self.safe_string(trade, 'symbol')
        market = self.safe_market(marketId, market)
        timestamp = self.safe_integer_product(trade, 'ts', 0.000001)
        return self.safe_trade({
            'info': trade,
            'id': self.safe_string(trade, 'tradeId'),
            'symbol': self.safe_string(market, 'symbol'),
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'order': self.safe_string_2(trade, 'takerOrderId', 'makerOrderId'),
            'type': None,
            'side': self.safe_string(trade, 'side'),
            'takerOrMaker': None,
            'price': self.safe_string(trade, 'price'),
            'amount': self.safe_string_2(trade, 'matchSize', 'size'),
            'cost': None,
            'fee': None,
        }, market)

    def parse_ws_order_trade(self, trade, market=None):
        #
        #    {
        #        "symbol": "BTC_USDT",
        #        "type": "LIMIT",
        #        "quantity": "1",
        #        "orderId": "32471407854219264",
        #        "tradeFee": "0",
        #        "clientOrderId": "",
        #        "accountType": "SPOT",
        #        "feeCurrency": "",
        #        "eventType": "place",
        #        "source": "API",
        #        "side": "BUY",
        #        "filledQuantity": "0",
        #        "filledAmount": "0",
        #        "matchRole": "MAKER",
        #        "state": "NEW",
        #        "tradeTime": 0,
        #        "tradeAmount": "0",
        #        "orderAmount": "0",
        #        "createTime": 1648708186922,
        #        "price": "47112.1",
        #        "tradeQty": "0",
        #        "tradePrice": "0",
        #        "tradeId": "0",
        #        "ts": 1648708187469
        #    }
        #
        timestamp = self.safe_integer(trade, 'tradeTime')
        marketId = self.safe_string(trade, 'symbol')
        return self.safe_trade({
            'info': trade,
            'id': self.safe_string(trade, 'tradeId'),
            'symbol': self.safe_symbol(marketId, market),
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'order': self.safe_string(trade, 'orderId'),
            'type': self.safe_string_lower(trade, 'type'),
            'side': self.safe_string(trade, 'side'),
            'takerOrMaker': self.safe_string_lower(trade, 'matchRole'),
            'price': self.safe_string(trade, 'price'),
            'amount': self.safe_string(trade, 'tradeAmount'),  # ? tradeQty?
            'cost': None,
            'fee': {
                'rate': None,
                'cost': self.safe_string(trade, 'tradeFee'),
                'currency': self.safe_string(trade, 'feeCurrency'),
            },
        }, market)

    def handle_order(self, client: Client, message):
        #
        #    {
        #        "data": {
        #          "symbol": "ADAUSDTPERP",
        #          "orderType": "limit",
        #          "side": "buy",
        #          "canceledSize": "1",
        #          "orderId": "642b4d4c0494cd0007c76813",
        #          "type": "canceled",
        #          "orderTime": "1680559436101909048",
        #          "size": "1",
        #          "filledSize": "0",
        #          "marginType": 1,
        #          "price": "0.25",
        #          "remainSize": "0",
        #          "clientOid": "112cbbf1-95a3-4917-957c-d3a87d81f853",
        #          "status": "done",
        #          "ts": 1680559677560686600
        #        },
        #        "subject": "orderChange",
        #        "topic": "/contractMarket/tradeOrders",
        #        "channelType": "private",
        #        "type": "message",
        #        "userId": "1139790"
        #    }
        # stop order
        #    {
        #        "data": {
        #            "orderType": "stop",
        #            "symbol": "BTCUSDTPERP",
        #            "side": "buy",
        #            "stopPriceType": "TP",
        #            "orderId": "64514fe1850d2100074378f6",
        #            "type": "open",
        #            "createdAt": 1683050465847,
        #            "stopPrice": "29000",
        #            "size": 2,
        #            "stop": "up",
        #            "marginType": 0,
        #            "orderPrice": "28552.9",
        #            "ts": 1683050465847597300
        #        },
        #        "subject": "stopOrder",
        #        "topic": "/contractMarket/advancedOrders",
        #        "channelType": "private",
        #        "id": "64514fe1850d2100074378fa",
        #        "type": "message",
        #        "userId": "1160396"
        #    }
        #
        data = self.safe_value(message, 'data', {})
        orders = self.orders
        if orders is None:
            limit = self.safe_integer(self.options, 'ordersLimit')
            orders = ArrayCacheBySymbolById(limit)
            self.orders = orders
        messageHash = '/contractMarket/tradeOrders'
        parsed = self.parse_ws_order(data)
        orders.append(parsed)
        client.resolve(orders, messageHash)
        return message

    def parse_order_status(self, status: str, type: str):
        """
 @ignore
        :param str status: "match", "open", "done"
        :param str type: "open", "match", "filled", "canceled", "update"
        :returns str:
        """
        types: dict = {
            'canceled': 'canceled',
            'cancel': 'canceled',
            'filled': 'closed',
        }
        parsedStatus = self.safe_string(types, type)
        if parsedStatus is None:
            statuses: dict = {
                'open': 'open',
                'match': 'open',
                'done': 'closed',
            }
            parsedStatus = self.safe_string(statuses, status, status)
        return parsedStatus

    def parse_ws_order(self, order, market=None):
        #
        #    {
        #        "symbol": "ADAUSDTPERP",
        #        "orderType": "limit",
        #        "side": "buy",
        #        "canceledSize": "1",
        #        "orderId": "642b4d4c0494cd0007c76813",
        #        "type": "canceled",
        #        "orderTime": "1680559436101909048",
        #        "size": "1",
        #        "filledSize": "0",
        #        "marginType": 1,
        #        "price": "0.25",
        #        "remainSize": "0",
        #        "clientOid": "112cbbf1-95a3-4917-957c-d3a87d81f853",
        #        "status": "done",
        #        "ts": 1680559677560686600
        #    }
        # stop
        #    {
        #        "orderType": "stop",
        #        "symbol": "BTCUSDTPERP",
        #        "side": "buy",
        #        "stopPriceType": "TP",
        #        "orderId": "64514fe1850d2100074378f6",
        #        "type": "open",
        #        "createdAt": 1683050465847,
        #        "stopPrice": "29000",
        #        "size": 2,
        #        "stop": "up",
        #        "marginType": 0,
        #        "orderPrice": "28552.9",
        #        "ts": 1683050465847597300
        #    }
        #
        id = self.safe_string(order, 'orderId')
        clientOrderId = self.safe_string(order, 'clientOid')
        marketId = self.safe_string(order, 'symbol')
        timestamp = self.safe_integer_product_2(order, 'orderTime', 'ts', 0.000001)
        status = self.safe_string(order, 'status')
        messageType = self.safe_string(order, 'type')
        return self.safe_order({
            'info': order,
            'symbol': self.safe_symbol(marketId, market),
            'id': id,
            'clientOrderId': clientOrderId,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
            'lastTradeTimestamp': None,
            'type': self.safe_string(order, 'orderType'),
            'timeInForce': None,
            'postOnly': None,
            'side': self.safe_string(order, 'side'),
            'price': self.safe_string_2(order, 'price', 'orderPrice'),
            'stopPrice': self.safe_string(order, 'stopPrice'),
            'triggerPrice': None,
            'amount': self.safe_string(order, 'size'),
            'cost': None,
            'average': None,
            'filled': self.safe_string(order, 'filledSize'),
            'remaining': self.safe_string(order, 'remainSize'),
            'status': self.parse_order_status(status, messageType),
            'fee': None,
            'trades': None,
        })

    def handle_ticker(self, client: Client, message):
        #
        #    {
        #        "subject": "ticker",
        #        "topic": "/contractMarket/ticker:BTCUSDTPERP",
        #        "data": {
        #            "symbol": "BTCUSDTPERP",                   # Market of the symbol
        #            "sequence": 45,                            # Sequence number which is used to judge the continuity of the pushed messages
        #            "side": "sell",                            # Transaction side of the last traded taker order
        #            "price": 3600.00,                          # Filled price
        #            "size": 16,                                # Filled quantity
        #            "tradeId": "5c9dcf4170744d6f5a3d32fb",     # Order ID
        #            "bestBidSize": 795,                        # Best bid size
        #            "bestBidPrice": 3200.00,                   # Best bid
        #            "bestAskPrice": 3600.00,                   # Best ask size
        #            "bestAskSize": 284,                        # Best ask
        #            "ts": 1553846081210004941                  # Filled time - nanosecond
        #        },
        #        "type": "message",
        #    }
        #
        #    {
        #        "topic": "/contractMarket/snapshot:BTCUSDTPERP",
        #        "subject": "snapshot.24h",
        #        "data": {
        #            "volume": 30449670,            #24h Volume
        #            "turnover": 845169919063,      #24h Turnover
        #            "lastPrice": 3551,           #Last price
        #            "priceChgPct": 0.0043,         #24h Change
        #            "ts": 1547697294838004923      #Snapshot time(nanosecond)
        #        }
        #    }
        #
        data = self.safe_value(message, 'data', {})
        messageHash = self.safe_string(message, 'topic')
        symbol = self.get_symbol_from_topic(messageHash)
        if symbol is not None:
            ticker = self.parse_ticker(data)
            self.tickers[symbol] = ticker
            client.resolve(ticker, messageHash)
        return message

    def handle_l3_order_book(self, client: Client, message):
        #
        #    {
        #        "data": {
        #            "symbol": "BTCUSDTPERP",
        #            "sequence": 1679593048010,
        #            "orderId": "6426fec8586b9500089d64d8",
        #            "clientOid": "14e6ee8e-8757-462c-84db-ed12c2b62f55",
        #            "ts": 1680277192127513900
        #        },
        #        "subject": "received",
        #        "topic": "/contractMarket/level3v2:BTCUSDTPERP",
        #        "type": "message"
        #    }
        #
        #    {
        #        "data": {
        #            "symbol": "BTCUSDTPERP",
        #            "sequence": 1679593047982,
        #            "side": "sell",
        #            "orderTime": "1680277191900131371",
        #            "size": "1",
        #            "orderId": "6426fec7d32b6e000790268b",
        #            "price": "28376.4",
        #            "ts": 1680277191939042300
        #        },
        #        "subject": "open",
        #        "topic": "/contractMarket/level3v2:BTCUSDTPERP",
        #        "type": "message"
        #    }
        #
        #    {
        #        "data": {
        #            "symbol": "BTCUSDTPERP",
        #            "reason": "canceled",   # or "filled"
        #            "sequence": 1679593047983,
        #            "orderId": "6426fec74026fa0008e7046f",
        #            "ts": 1680277191949842000
        #        },
        #        "subject": "done",
        #        "topic": "/contractMarket/level3v2:BTCUSDTPERP",
        #        "type": "message"
        #    }
        #
        messageHash = self.safe_string(message, 'topic')
        subject = self.safe_string(message, 'subject')
        if subject == 'received':
            return
        # At the time of writting self, there is no implementation to easily convert each order into the orderbook so raw messages are returned
        client.resolve(message, messageHash)

    def handle_level_2(self, client: Client, message):
        #    {
        #        "subject": "level2",
        #        "topic": "/contractMarket/level2:BTCUSDTPERP",
        #        "type": "message",
        #        "data": {
        #            "sequence": 18,                   # Sequence number which is used to judge the continuity of pushed messages
        #            "change": "5000.0,sell,83"        # Price, side, quantity
        #            "timestamp": 1551770400000
        #        }
        #    }
        topic = self.safe_string(message, 'topic')
        isSnapshot = topic.find('Depth') >= 0
        if isSnapshot:
            self.hande_l2_snapshot(client, message)
            return
        self.handle_l2_order_book(client, message)

    def handle_l2_order_book(self, client: Client, message):
        #
        #    {
        #        "id": 1545910660740,
        #        "type": "subscribe",
        #        "topic": "/contractMarket/level2:BTCUSDTPERP",
        #        "response": True
        #    }
        #
        #    {
        #        "subject": "level2",
        #        "topic": "/contractMarket/level2:BTCUSDTPERP",
        #        "type": "message",
        #        "data": {
        #            "sequence": 18,                   # Sequence number which is used to judge the continuity of pushed messages
        #            "change": "5000.0,sell,83"        # Price, side, quantity
        #            "timestamp": 1551770400000
        #        }
        #    }
        #
        data = self.safe_value(message, 'data', {})
        messageHash = self.safe_string(message, 'topic', '')
        symbol = self.get_symbol_from_topic(messageHash)
        orderBook = self.safe_value(self.orderbooks, symbol)
        if orderBook is None:
            self.orderbooks[symbol] = self.order_book({})
            orderBook = self.orderbooks[symbol]
            orderBook['symbol'] = symbol
        nonce = self.safe_integer(orderBook, 'nonce')
        if nonce is None:
            cacheLength = len(orderBook.cache)
            snapshotDelay = self.handle_option('watchOrderBook', 'snapshotDelay', 5)
            if cacheLength == snapshotDelay:
                limit = 0
                self.spawn(self.load_order_book, client, messageHash, symbol, limit, {})
            orderBook.cache.append(data)
            return
        try:
            self.handle_delta(orderBook, data)
            client.resolve(orderBook, messageHash)
        except Exception as e:
            del self.orderbooks[symbol]
            client.reject(e, messageHash)

    def hande_l2_snapshot(self, client: Client, message):
        #
        #    {
        #        "type": "message",
        #        "topic": "/contractMarket/level2Depth5:BTCUSDTPERP",
        #        "subject": "level2",
        #        "data": {
        #            "asks": [
        #                ["9993", "3"],
        #                ["9992", "3"],
        #                ["9991", "47"],
        #                ["9990", "32"],
        #                ["9989", "8"]
        #            ],
        #            "bids": [
        #                ["9988", "56"],
        #                ["9987", "15"],
        #                ["9986", "100"],
        #                ["9985", "10"],
        #                ["9984", "10"]
        #            ],
        #            "timestamp": 1682993050531,
        #        }
        #    }
        #
        data = self.safe_value(message, 'data', {})
        messageHash = self.safe_string(message, 'topic', '')
        symbol = self.get_symbol_from_topic(messageHash)
        timestamp = self.safe_integer(data, 'timestamp')
        snapshot = self.parse_order_book(data, symbol, timestamp, 'bids', 'asks')
        orderbook = self.order_book(snapshot)
        self.orderbooks[symbol] = orderbook
        client.resolve(orderbook, messageHash)

    def get_symbol_from_topic(self, topic: str):
        splitTopic = topic.split(':')
        marketId = self.safe_string(splitTopic, 1)
        return self.safe_symbol(marketId)

    def get_cache_index(self, orderbook, cache):
        firstDelta = self.safe_value(cache, 0)
        nonce = self.safe_integer(orderbook, 'nonce')
        firstDeltaSequence = self.safe_integer(firstDelta, 'sequence')
        if firstDeltaSequence > nonce + 1:
            return -1
        for i in range(0, len(cache)):
            delta = cache[i]
            sequence = self.safe_integer(delta, 'sequence')
            if nonce == sequence - 1:
                return i
        return len(cache)

    def handle_delta(self, orderbook, delta):
        #
        #    {
        #      sequence: 123677914,
        #      lastSequence: 123677913,
        #      change: '80.36,buy,4924',
        #      changes: ['80.19,buy,0',"80.15,buy,10794"],
        #      timestamp: 1715643483528
        #    },
        #
        sequence = self.safe_integer(delta, 'sequence')
        lastSequence = self.safe_integer(delta, 'lastSequence')
        nonce = self.safe_integer(orderbook, 'nonce')
        if nonce > sequence:
            return
        if nonce != lastSequence:
            checksum = self.handle_option('watchOrderBook', 'checksum', True)
            if checksum:
                raise ChecksumError(self.id + ' ' + self.orderbook_checksum_message(''))
        changes = self.safe_list(delta, 'changes')
        for i in range(0, len(changes)):
            change = changes[i]
            splitChange = change.split(',')
            price = self.safe_number(splitChange, 0)
            side = self.safe_string(splitChange, 1)
            size = self.safe_number(splitChange, 2)
            orderBookSide = orderbook['bids'] if (side == 'buy') else orderbook['asks']
            orderBookSide.store(price, size)
        timestamp = self.safe_integer(delta, 'timestamp')
        orderbook['timestamp'] = timestamp
        orderbook['datetime'] = self.iso8601(timestamp)
        orderbook['nonce'] = sequence

    def handle_balance(self, client: Client, message):
        #
        #    {
        #        "data": {
        #          "currency": "USDT",
        #          "availableBalance": "4.0000000000",
        #          "timestamp": "1680557568670"
        #        },
        #        "subject": "availableBalance.change",
        #        "topic": "/contractAccount/wallet",
        #        "channelType": "private",
        #        "id": "642b4600cae86800074b5ab7",
        #        "type": "message",
        #        "userId": "1139790"
        #    }
        #
        #    {
        #        "data": {
        #          "currency": "USDT",
        #          "orderMargin": "0.0000000000",
        #          "timestamp": "1680558743307"
        #        },
        #        "subject": "orderMargin.change",
        #        "topic": "/contractAccount/wallet",
        #        "channelType": "private",
        #        "id": "642b4a97b58e360007c3a237",
        #        "type": "message",
        #        "userId": "1139790"
        #    }
        #
        data = self.safe_value(message, 'data', [])
        messageHash = '/contractAccount/wallet'
        currencyId = self.safe_string(data, 'currency')
        currency = self.currency(currencyId)
        code = currency['code']
        self.balance[code] = self.parse_ws_balance(data)
        client.resolve(self.balance[code], messageHash)
        return message

    def parse_ws_balance(self, response):
        #
        #    {
        #        "currency": "USDT",
        #        "availableBalance": "4.0000000000",
        #        "timestamp": "1680557568670"
        #    }
        #
        #    {
        #        "currency": "USDT",
        #        "orderMargin": "0.0000000000",
        #        "timestamp": "1680558743307"
        #    }
        #
        timestamp = self.safe_integer(response, 'timestamp')
        result: dict = {
            'info': response,
            'timestamp': timestamp,
            'datetime': self.iso8601(timestamp),
        }
        currencyId = self.safe_string(response, 'currency')
        code = self.safe_currency_code(currencyId)
        newAccount = self.account()
        newAccount['free'] = self.safe_string(response, 'availableBalance')
        result[code] = newAccount
        return self.safe_balance(result)

    def handle_system_status(self, client: Client, message):
        #
        #     {
        #         "id": "1578090234088",  # connectId
        #         "type": "welcome",
        #     }
        #
        return message

    def handle_subject(self, client: Client, message):
        subject = self.safe_string(message, 'subject')
        methods: dict = {
            'auth': self.handle_authenticate,
            'received': self.handle_l3_order_book,
            'open': self.handle_l3_order_book,
            'update': self.handle_l3_order_book,
            'done': self.handle_l3_order_book,
            'level2': self.handle_level_2,
            'ticker': self.handle_ticker,
            'snapshot.24h': self.handle_ticker,
            'match': self.handle_trade,
            'orderChange': self.handle_order,
            'stopOrder': self.handle_order,
            'availableBalance.change': self.handle_balance,
            'orderMargin.change': self.handle_balance,
        }
        method = self.safe_value(methods, subject)
        if method is not None:
            method(client, message)

    def ping(self, client: Client):
        id = str(self.request_id())
        return {
            'id': id,
            'type': 'ping',
        }

    def handle_pong(self, client: Client, message):
        client.lastPong = self.milliseconds()
        return message

    def handle_error_message(self, client: Client, message):
        #
        #    {
        #        "code": 404,
        #        "data": "tunnel stream-0 is not exist",
        #        "id": "3",
        #        "type": "error"
        #    }
        #
        client.reject(message)

    def handle_message(self, client: Client, message):
        type = self.safe_string(message, 'type')
        methods: dict = {
            'welcome': self.handle_system_status,
            'ack': self.handle_subscription_status,
            'message': self.handle_subject,
            'pong': self.handle_pong,
            'error': self.handle_error_message,
        }
        method = self.safe_value(methods, type)
        if method is not None:
            method(client, message)

    def handle_authenticate(self, client, message):
        #
        #    {
        #        "success": True,
        #        "ret_msg": '',
        #        "op": "auth",
        #        "conn_id": "ce3dpomvha7dha97tvp0-2xh"
        #    }
        #
        data = self.safe_value(message, 'data')
        success = self.safe_value(data, 'success')
        messageHash = 'authenticated'
        if success:
            client.resolve(message, messageHash)
        else:
            error = AuthenticationError(self.id + ' ' + self.json(message))
            client.reject(error, messageHash)
            if messageHash in client.subscriptions:
                del client.subscriptions[messageHash]
        return message
