Python Binance Asyncio Client

February 25, 2021

BinanceClientAsync

binance_client.py

import time
import datetime
# import pytz
# import dateparser
# import requests
import aiohttp
import asyncio
import hmac
import hashlib
# import decimal
# import pprint
# import numbers
# import sys
# import random
from urllib.parse import urljoin, urlencode

class BinanceException(Exception):
    def __init__(self, status_code, data=None):

        self.status_code = status_code
        if data:
            self.code = data['code']
            self.msg = data['msg']
            message = f"{status_code} [{self.code}] {self.msg}"
        else:
            self.code = None
            self.msg = None
            message = f"status_code={status_code}"

        # Python 2.x
        # super(BinanceException, self).__init__(message)
        super().__init__(message)


class BinanceClientAsync:
    BASE_URL = 'https://api.binance.com'

    def __init__(self, api_key, secret_key, recvWindow=6000, TEST=False, retry=3):
        self.api_key = api_key
        self.secret_key = secret_key
        self.recvWindow = recvWindow
        self.headers = {
            'X-MBX-APIKEY': api_key
        }
        self.TEST = TEST
        self.session = aiohttp.ClientSession()
        self.retry = retry

    async def open(self):
        self.session = aiohttp.ClientSession()

    async def close(self):
        await self.session.close()

    async def _handle_reponse(self, r):
        if r.status == 200:
            data = await r.json()
            return data
        else:
            if r.headers['Content-Type'].startswith('application/json'):
                raise BinanceException(status_code=r.status, data=await r.json())
            else:
                raise BinanceException(status_code=r.status, data=None)

    def _sign(self, params):
        params['timestamp'] = int(time.time() * 1000)
        query_string = urlencode(params)
        params['signature'] = hmac.new(self.secret_key.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()


    async def _retry_request(self, fn):
        retry_count = 0
        while True:
            try:
                return await fn()
            except aiohttp.client_exceptions.ClientOSError as e:
                print(e)
                retry_count += 1
                if retry_count >= self.retry:
                    raise e
                await asyncio.sleep(pow(2, retry_count-1)) # 1, 2, 4
            except BinanceException as e:
                if e.code == -1001: # DISCONNECTED, Internal error; unable to process your request. Please try again.
                    print(e)
                    retry_count += 1
                    if retry_count >= self.retry:
                        raise e
                    await asyncio.sleep(pow(2, retry_count-1)) # 1, 2, 4
                elif e.status_code == 502: # maybe want to handle all 5XX error?
                    print(e)
                    retry_count += 1
                    if retry_count >= self.retry:
                        raise e
                    await asyncio.sleep(pow(2, retry_count-1)) # 1, 2, 4
                else:
                    raise e

Sample Function

class BinanceClientAsync:
    async def get_time(self):
        PATH =  '/api/v1/time'
        params = None

        url = urljoin(self.BASE_URL, PATH)

        async with self.session.get(url) as r:
            if r.status == 200:
                data = await r.json()
                return data
            else:
                raise BinanceException(status_code=r.status, data=data)

User _handle_reponse to reduce code.

class BinanceClientAsync:
    async def get_exchange_info(self):
        PATH =  '/api/v1/exchangeInfo'
        params = None

        url = urljoin(self.BASE_URL, PATH)
        async with self.session.get(url, params=params) as r:
            return await self._handle_reponse(r)

    async def get_price(self, symbol=None):
        PATH = '/api/v3/ticker/price'
        params = {
        
        }
        if symbol:
            params['symbol'] = symbol

        url = urljoin(self.BASE_URL, PATH)
        async with self.session.get(url, headers=self.headers, params=params) as r:
            return await self._handle_reponse(r)

Support retry due to network or server error

class BinanceClientAsync:
    async def get_order_book(self, symbol, limit=100):
        # PATH = '/api/v1/depth'
        PATH = '/api/v3/depth'
        params = {
            'symbol': symbol,
            'limit': limit
        }

        url = urljoin(self.BASE_URL, PATH)

        async def request():
            async with self.session.get(url, headers=self.headers, params=params) as r:
                return await self._handle_reponse(r)

        return await self._retry_request(request)

Create Order

class BinanceClientAsync:
    async def order_limit_maker(self, symbol, side, quantity, price):
        return await self.create_order(
            symbol=symbol,
            side=side,
            quantity=quantity,
            order_type='LIMIT_MAKER',
            price=price,
            time_in_force=None)

    async def order_stop_loss_limit(self, symbol, side, quantity, stop_price, price):
        return await self.create_order(
            symbol=symbol,
            side=side,
            quantity=quantity,
            order_type='STOP_LOSS_LIMIT',
            price=price,
            stop_price=stop_price,
            time_in_force='GTC')

    async def order_market(self, symbol, side, quantity):
        return await self.create_order(
            symbol=symbol,
            side=side,
            quantity=quantity,
            order_type='MARKET',
            price=None,
            time_in_force=None)

    async def create_order(self, symbol, side, quantity, order_type, price, time_in_force, stop_price=None):
        PATH = '/api/v3/order'
        #if self.TEST:
        #    PATH = '/api/v3/order/test'

        params = {
            'symbol': symbol,
            'side': side,
            'quantity': quantity,
            'type': order_type
        }

        if price is not None:
            params['price'] = price
        if time_in_force is not None:
            params['timeInForce'] = time_in_force
        if stop_price is not None:
            params['stopPrice'] = stop_price

        self._sign(params)

        url = urljoin(self.BASE_URL, PATH)

        async with self.session.post(url, headers=self.headers, params=params) as r:
            return await self._handle_reponse(r)

Query/Cancel Order

class BinanceClientAsync:
    async def query_order(self, symbol, order_id):
        PATH = '/api/v3/order'

        params = {
            'symbol': symbol,
            'orderId': order_id
        }

        self._sign(params)

        url = urljoin(self.BASE_URL, PATH)

        # status: NEW, PARTIALLY_FILLED, FILLED, CANCELED, PENDING_CANCEL, REJECTED, EXPIRED
        async def request():
            async with self.session.get(url, headers=self.headers, params=params) as r:
                return await self._handle_reponse(r)

        return await self._retry_request(request)

    async def cancel_order(self, symbol, order_id):
        PATH = '/api/v3/order'

        params = {
            'symbol': symbol,
            'orderId': order_id
        }

        self._sign(params)

        url = urljoin(self.BASE_URL, PATH)

        async with self.session.delete(url, headers=self.headers, params=params) as r:
            return await self._handle_reponse(r)

Others

class BinanceClientAsync:
    async def get_price(self, symbol=None):
        PATH = '/api/v3/ticker/price'
        params = {
        
        }
        if symbol:
            params['symbol'] = symbol

        url = urljoin(self.BASE_URL, PATH)
        async with self.session.get(url, headers=self.headers, params=params) as r:
            return await self._handle_reponse(r)

    async def get_order_book(self, symbol, limit=100):
        # PATH = '/api/v1/depth'
        PATH = '/api/v3/depth'
        params = {
            'symbol': symbol,
            'limit': limit
        }

        url = urljoin(self.BASE_URL, PATH)

        async def request():
            async with self.session.get(url, headers=self.headers, params=params) as r:
                return await self._handle_reponse(r)

        return await self._retry_request(request)

    async def get_kline_data(self, symbol, interval, limit=500, start_time=None, end_time=None):
        PATH = '/api/v1/klines'

        # interval: 1m, 3m, 5m, 15m, 30m
        params = {
            'symbol': symbol,
            'interval': interval,
            'limit': limit
        }

        if start_time is not None:
            params['startTime'] = start_time
        if end_time is not None:
            params['endTime'] = end_time

        url = urljoin(self.BASE_URL, PATH)

        async def request():
            async with self.session.get(url, headers=self.headers, params=params) as r:
                return await self._handle_reponse(r)

        return await self._retry_request(request)

    async def get_trades(self, symbol, from_id=None, limit=500, start_time=None):
        PATH = '/api/v3/myTrades'

        params = {
            'symbol': symbol
        }

        if from_id:
            params['fromId'] = from_id
        if start_time:
            params['startTime'] = start_time
        if limit:
            params['limit'] = limit

        self._sign(params)

        url = urljoin(self.BASE_URL, PATH)

        async with self.session.get(url, headers=self.headers, params=params) as r:
            return await self._handle_reponse(r)

    async def create_oco_order(self, symbol, side, quantity, price, stop_price, stop_limit_price, stop_limit_time_in_force):
        PATH = '/api/v3/order/oco'
        #if self.TEST:
        #    PATH = '/api/v3/order/test'

        params = {
            'symbol': symbol,
            'side': side,
            'quantity': quantity,
            'price': price,
            'stopPrice': stop_price,
            'stopLimitPrice': stop_limit_price,
            'stopLimitTimeInForce': stop_limit_time_in_force
        }

        self._sign(params)

        url = urljoin(self.BASE_URL, PATH)

        async with self.session.post(url, headers=self.headers, params=params) as r:
            return await self._handle_reponse(r)

    async def query_oco_order(self, order_list_id):
        PATH = '/api/v3/orderList'

        params = {
            'orderListId': order_list_id
        }

        self._sign(params)

        url = urljoin(self.BASE_URL, PATH)

        async def request():
            async with self.session.get(url, headers=self.headers, params=params) as r:
                return await self._handle_reponse(r)

        return await self._retry_request(request)

Sample Usage

import asyncio
import binance_client

async def main():
    binance = binance_client.BinanceClientAsync(api_key=BINANCE_API_KEY, secret_key= BINANCE_SECRET_KEY)

    symbol = 'BTCBUSD'
    side = 'buy'

    prices = await binance.get_price(symbol=symbol)
    price = float(prices['price'])

    # adjust price lower to prevent immediate match
    price *= 0.995

    # buy BUSD 15 worth of BTC
    quantity = 15 / price

    price = f"{price:.2f}"
    quantity = f"{ quantity:.6f}"
    print('price', price, 'quantity', quantity)

    try:
        data = await binance.order_limit_maker(symbol=symbol, side=side, quantity=quantity, price=price)
        order_id = data['orderId']
        print('order_id', order_id)

        loop_count = 0
        while True:
            await asyncio.sleep(5)
            data = await binance.query_order(symbol=symbol, order_id=order_id)
            status = data['status']
            print('status', status)
            if status in ['FILLED', 'CANCELED', 'PENDING_CANCEL', 'REJECTED', 'EXPIRED']:
                print('success')
                break

            # wait 10*5 = 50s before cancel
            loop_count += 1
            if loop_count > 10:
                data = await binance.cancel_order(symbol=symbol, order_id=order_id)
                print('cancel')
                break

    except binance_client.BinanceException as e:
        # if e.code == -2010 and e.msg == 'Order would immediately match and take.':
        # if e.code == -2013 and e.msg == 'Order does not exist.'
        print(e)
asyncio.run(main())

References:

This work is licensed under a
Creative Commons Attribution-NonCommercial 4.0 International License.