利用币安api进行历史数据获取

2023-8-4 361

在获取币安永续合约的历史数据中,目前存在两种方式。

  • 利用币安的网站进行获取https://data.binance.vision/
  • 利用币安的api进行获取

但是在获取过程中发现两种方式均存在缺陷。方法一利用网站获取,其中包含非常多的历史脏数据,例如:在获取历史5分钟数据的时候,在0时区下的凌晨24点的数据存在缺失;BNX、MINA等币种数据的开始时间和币安交易所api提供的开始时间不符合等。方法二利用api来获取数据,虽然数据的完整性可以得到保证。但是获取所有交易对symbol的时候,api只是返回目前正在交易的symbol,对于那些已经退市的symbol的基本信息没有返回。所以想要获取全部的symbol的名称存在困难。

对于上面的这些问题,本文考虑了一种方式来获取全部的永续合约的历史数据:将两种方式进行组合。其中利用第一种方式来爬取币安到现在所有上市过的历史永续合约名称,再利用第二种方法来获取所有的上市过的永续合约历史数据。 下面的代码不仅可以获取历史的完整数据,同时在此基础上,增加数据追加功能,支持最近数据补全。 数据依赖:

binance_futures_connector==3.3.1
pandas==1.1.5
Requests==2.31.0
tqdm==4.65.0

完整python代码如下:

import datetime
import time
import pandas as pd
import glob
from functools import partial
import re
from binance.um_futures import UMFutures
import warnings
from tqdm import tqdm
from datetime import timedelta
import requests
from multiprocessing import Pool

warnings.filterwarnings('ignore')
pd.options.display.max_rows = None
pd.options.display.expand_frame_repr = False  # 当列太多时不换行
pd.set_option('display.unicode.ambiguous_as_wide', True)  # 设置命令行输出时的列对齐功能
pd.set_option('display.unicode.east_asian_width', True)


# 时间戳转化为时间函数
def convert_timestamp(timeNum):
    timeStamp = float(timeNum / 1000)
    timeArray = time.localtime(timeStamp)
    otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
    return otherStyleTime


# 将时间转化为时间戳函数
def get_timestamp(time_str):
    # 使用 strptime 解析日期时间字符串为日期时间对象
    parsed_time = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
    time_str = parsed_time - timedelta(hours=8)
    # 使用 timestamp 方法将日期时间对象转换为时间戳
    timestamp = int(time_str.timestamp() * 1000)
    return timestamp


def get_timestamp_interval(interval='5m', limit=1500):
    """
    得到每次获取数据的时间戳长度间隔
    :param interval:
    :param limit:
    :return:
    """
    if interval.endswith('s'):
        seconds = int(interval[:-1]) * limit
        return timedelta(seconds=seconds).total_seconds() * 1000

    if interval.endswith('m'):
        minutes = int(interval[:-1]) * limit
        return timedelta(minutes=minutes).total_seconds() * 1000

    if interval.endswith('h'):
        hours = int(interval[:-1]) * limit
        return timedelta(hours=hours).total_seconds() * 1000

    if interval.endswith('d'):
        days = int(interval[:-1]) * limit
        return timedelta(days=days).total_seconds() * 1000

        # 如果未匹配到合适的时间间隔单位,默认为 '5m'
    print(f"无效的时间间隔: {interval}")


def run_function_till_success(function, tryTimes=10, sleepTimes=20):
    '''
    将函数function尝试运行tryTimes次,直到成功返回函数结

本主题为课程学员专享,成为Python数字货币投资课程学员后可免费阅读

成为学员
最新回复 ( 0条评论 )


官方微信
码力十足学量化
Powered by Xiuno BBS 4.0.7

官方微信