티스토리 뷰

code.xlsx



import os
import win32com.client
import pandas as pd
import datetime as dt
from dateutil.relativedelta import relativedelta
from dateutil.parser import parse
import time


def checkPrivilege():
    import ctypes
    if ctypes.windll.shell32.IsUserAnAdmin():
        print("관리자 권한으로 정상적으로 실행되었습니다.")
    else:
        print("관리자 권한으로 실행되지 않았습니다. 관리자권한으로 실행하세요.")
        exit(1)

class Creon:
    def __init__(self):
        """
        :param None:
        :return None:
        - Make connection with CREON API and verify connectivity
        - Initialize the field info wanted to get
        """
        checkPrivilege()
        self.obj_CpCodeMgr = win32com.client.Dispatch('CpUtil.CpCodeMgr')
        self.obj_CpCybos = win32com.client.Dispatch('CpUtil.CpCybos')
        self.obj_StockChart = win32com.client.Dispatch('CpSysDib.StockChart')
        b_connected = self.obj_CpCybos.IsConnect
        if b_connected == 0:
            print("연결 실패 - Creon Plus가 정상적으로 실행되었는지 확인하고",
                  "계속 문제 발생 시 Creon Plus를 다시 실행하세요.")
            exit(1)

        self.list_field_key = [0, 1, 2, 3, 4, 5, 8, 9, 10, 11, 37]
        self.list_field_name = ['종목코드', '날짜', '시간', '시가', '고가', '저가', '종가',
                                '거래량', '거래대금', '누적체결매도수량', '누적체결매수수량', '대비부호']
        self.list_code = []

    def loadCodeFile(self, file):
        """
        :param file: code list excel file
        :return None:
        - Load stock codes from the file and assign them to self.list_code
        """
        df = pd.read_excel(file, sheet_name='KOSPI_KOSDAQ')
        idx_code = df.columns[0]  # CODE
        self.list_code = df[idx_code]

    def crawling(self, param):
        """
        :param unit: can be 'm'(minutes) or 'T'(Ticks)
        :param mode: can be 'A'(Auto) or 'M'(Manual)
        :param start: (int) start date
        :param end: (int) end date
        :param test: (bool) test flag
        :return None:
        - For self.list_code, crawl info and save as csv file in each unit folder
        """
        self.time_start = time.time()

        unit = param['unit']
        mode = param['mode']
        start = param['start']
        end = param['end']
        test = param['test']

        first = last = 0

        # Test
        if test is True:
            self.list_code = ['A000020']

        for step, code in enumerate(self.list_code):
            # Initialize dt_start, dt_end
            if mode == 'A':
                dt_start, dt_end = self.setDate(unit, months=1, days=0)
            else:
                dt_start, dt_end = self.rangeCheck(start, end, unit)

            df_code, first, last = self.splitDate(code, dt_start, dt_end, unit)

            if not os.path.exists("%s data [%s ~ %s]" % (self.fullName(unit), first, last)):
                os.mkdir("%s data [%s ~ %s]" % (self.fullName(unit), first, last))

            csv_name = "%s data [%s ~ %s]\\%s.csv" % (self.fullName(unit), first, last, code)
            df_code.to_csv(csv_name, index=False, encoding='ms949')

        self.time_end = time.time()
        print("\n{:.2f}초 걸렸습니다.".format(self.time_end - self.time_start))
        print("\rCrawling Complete!")

    def rangeCheck(self, start, end, unit):
        """
        :param start: (int) start date
        :param end: (int) end dqte
        :param unit: can be 'm'(minutes) or 'T'(Ticks)
        :return dt_start, dt_end: (datetime, datetime) dates
        - Verify the range of start, end date and shrink if necessary
        """
        dt_start, dt_end = parse(str(start)), parse(str(end))
        today = dt.datetime.today()

        # Check start date
        if unit == 'T':
            limit = today - relativedelta(months=1)
        else:
            limit = today - relativedelta(years=2)

        if (limit - dt_start).days > 0:
            dt_start = limit

        # Check end date
        if (dt_end - dt_start).days < 0:
            dt_end = dt_start

        return dt_start, dt_end

    def splitDate(self, code, dt_start, dt_end, unit):
        """
        :param code: stock code
        :param dt_start: (datetime) start date
        :param dt_end: (datetime) start date
        :return rst: (pd.DataFrame) crawled info
        :return first: (str) first true date
        :return last: (str) last true date
        - CREON API doesn't work well over a wide range. So the date range should be splitted into 5 days.
        - Then, using CREON API attach all splitted date range
        """
        rst = pd.DataFrame()
        list_date = []

        first = dt_start.strftime("%Y%m%d")
        last = dt_end.strftime("%Y%m%d")

        if (dt_end - dt_start).days < 5:
            list_date.append((dt_start, dt_end))
        else:
            s = dt_start
            e = s + dt.timedelta(days=5)

            delta = 1
            while delta > 0:
                list_date.append((s, e))
                delta = (dt_end - e).days
                s = e + dt.timedelta(days=1)
                e = s + dt.timedelta(days=5)
            list_date.reverse()  # From new to old

        for step, (dt_start, dt_end) in enumerate(list_date):
            arg_start, arg_end = int(dt_start.strftime("%Y%m%d")), int(dt_end.strftime("%Y%m%d"))
            tmp = self.creonAPI(code, arg_start, arg_end, unit)
            rst = rst.append(tmp)

            numerator = list(self.list_code).index(code) * len(list_date) + step
            denominator = len(self.list_code) * len(list_date)

            progress = numerator / denominator * 100
            self.time_cur = time.time()
            print("\r%.2f%% Complete.. \t %d초 경과" % (progress, self.time_cur - self.time_start), end='')

        return rst, first, last

    def setDate(self, unit, months=0, days=0):
        """
        :param unit: can be 'm'(minutes) or 'T'(Ticks)
        :param months: crawl info from 'months' ago to now
        :param days: crawl info from 'days' ago to now
        :return dt_start, dt_end: (datetime, datetime) date set
        """
        dt_end = dt.datetime.now()  # 2018. 07. 03 (FIxed value)

        # Set date from 'months' or 'days' ago to now
        dt_start = dt_end - relativedelta(months=months, days=days)

        return dt_start, dt_end

    def fullName(self, unit):
        """
        :param unit: can be 'm'(minutes) or 'T'(Ticks)
        :return FullName of unit:
        """
        if unit == 'T':
            return "Tick"
        elif unit == 'm':
            return "Minute"
        else:
            return "Others"

    def creonAPI(self, code, start, end, unit):
        """
        :param code: stock code
        :param start: (int) start date
        :param end: (int) end date
        :param unit: can be 'm'(minutes) or 'T'(Ticks)
        :return crawled info: (pd.DataFrame)
        """
        dict_chart = {name: [] for name in self.list_field_name}

        self.obj_StockChart.SetInputValue(0, code)
        self.obj_StockChart.SetInputValue(1, ord('1'))             # 0: 개수, 1: 기간
        self.obj_StockChart.SetInputValue(2, end)                  # 종료일
        self.obj_StockChart.SetInputValue(3, start)                # 시작일
        self.obj_StockChart.SetInputValue(5, self.list_field_key)  # 필드
        self.obj_StockChart.SetInputValue(6, ord(unit))            # mode: 'D', 'W', 'M', 'm', 'T'

        self.obj_StockChart.BlockRequest()  # 설정에 따라 데이터를 요청

        status = self.obj_StockChart.GetDibStatus()
        if status != 0:
            msg = self.obj_StockChart.GetDibMsg1()
            print("통신상태: {} {}".format(status, msg))
            exit(1)

        cnt = self.obj_StockChart.GetHeaderValue(3)  # 수신개수
        for i in range(cnt):
            key = self.list_field_name
            value = lambda pos: code if pos == 0 else self.obj_StockChart.GetDataValue(pos-1, i)
            dict_item = {name: value(pos) for pos, name in enumerate(key)}

            # 20180701 -> 2018-07-01
            # 153000 -> 15:30:00
            date = str(dict_item['날짜'])
            time = str(dict_item['시간'])

            dict_item['종목코드'] = code
            dict_item['날짜'] = "{Y:2}-{M:2}-{D:2}".format(Y=date[:4], M=date[4:6], D=date[6:])
            dict_item['시간'] = "{H:0>2}:{M:2}:00".format(H=time[:-2], M=time[-2:])

            for k, v in dict_item.items():
                dict_chart[k].append(v)

        return pd.DataFrame(dict_chart, columns=self.list_field_name)


if __name__ == '__main__':
    creon = Creon()

    # User Input parameter
    param = {'unit': 'm',  # 'T': Tick / 'm': minute (Limit is 1 month ago in Tick, 2 years ago in minute)
             'mode': 'M',  # 'M': Manual / 'A': Auto (Default is 1 month ago to now.
                           #                          The range can be modified in creon.crawling())
             'start': 20180701,  # Only used in 'M'anual mode
             'end':   20180705,  # Only used in 'M'anual mode
             'test': True       # True: Crawl only 1 stock / False: Crawl every stock
             }

    # 1. Load code list from 'code.xlsx'
    # code.xlsx : 종목코드/종목명/시장/업종
    creon.loadCodeFile('code.xlsx')

    # 2. Crawling
    creon.crawling(param)

'AI > System Trading' 카테고리의 다른 글

Crawling한 data를 DB에 insertion 하기  (0) 2018.07.10
Comments
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday