티스토리 뷰
import os import win32com.client import pandas as pd import datetime as dt from dateutil.relativedelta import relativedelta from dateutil.parser import parse import time def checkPrivilege(): import ctypes if ctypes.windll.shell32.IsUserAnAdmin(): print("관리자 권한으로 정상적으로 실행되었습니다.") else: print("관리자 권한으로 실행되지 않았습니다. 관리자권한으로 실행하세요.") exit(1) class Creon: def __init__(self): """ :param None: :return None: - Make connection with CREON API and verify connectivity - Initialize the field info wanted to get """ checkPrivilege() self.obj_CpCodeMgr = win32com.client.Dispatch('CpUtil.CpCodeMgr') self.obj_CpCybos = win32com.client.Dispatch('CpUtil.CpCybos') self.obj_StockChart = win32com.client.Dispatch('CpSysDib.StockChart') b_connected = self.obj_CpCybos.IsConnect if b_connected == 0: print("연결 실패 - Creon Plus가 정상적으로 실행되었는지 확인하고", "계속 문제 발생 시 Creon Plus를 다시 실행하세요.") exit(1) self.list_field_key = [0, 1, 2, 3, 4, 5, 8, 9, 10, 11, 37] self.list_field_name = ['종목코드', '날짜', '시간', '시가', '고가', '저가', '종가', '거래량', '거래대금', '누적체결매도수량', '누적체결매수수량', '대비부호'] self.list_code = [] def loadCodeFile(self, file): """ :param file: code list excel file :return None: - Load stock codes from the file and assign them to self.list_code """ df = pd.read_excel(file, sheet_name='KOSPI_KOSDAQ') idx_code = df.columns[0] # CODE self.list_code = df[idx_code] def crawling(self, param): """ :param unit: can be 'm'(minutes) or 'T'(Ticks) :param mode: can be 'A'(Auto) or 'M'(Manual) :param start: (int) start date :param end: (int) end date :param test: (bool) test flag :return None: - For self.list_code, crawl info and save as csv file in each unit folder """ self.time_start = time.time() unit = param['unit'] mode = param['mode'] start = param['start'] end = param['end'] test = param['test'] first = last = 0 # Test if test is True: self.list_code = ['A000020'] for step, code in enumerate(self.list_code): # Initialize dt_start, dt_end if mode == 'A': dt_start, dt_end = self.setDate(unit, months=1, days=0) else: dt_start, dt_end = self.rangeCheck(start, end, unit) df_code, first, last = self.splitDate(code, dt_start, dt_end, unit) if not os.path.exists("%s data [%s ~ %s]" % (self.fullName(unit), first, last)): os.mkdir("%s data [%s ~ %s]" % (self.fullName(unit), first, last)) csv_name = "%s data [%s ~ %s]\\%s.csv" % (self.fullName(unit), first, last, code) df_code.to_csv(csv_name, index=False, encoding='ms949') self.time_end = time.time() print("\n{:.2f}초 걸렸습니다.".format(self.time_end - self.time_start)) print("\rCrawling Complete!") def rangeCheck(self, start, end, unit): """ :param start: (int) start date :param end: (int) end dqte :param unit: can be 'm'(minutes) or 'T'(Ticks) :return dt_start, dt_end: (datetime, datetime) dates - Verify the range of start, end date and shrink if necessary """ dt_start, dt_end = parse(str(start)), parse(str(end)) today = dt.datetime.today() # Check start date if unit == 'T': limit = today - relativedelta(months=1) else: limit = today - relativedelta(years=2) if (limit - dt_start).days > 0: dt_start = limit # Check end date if (dt_end - dt_start).days < 0: dt_end = dt_start return dt_start, dt_end def splitDate(self, code, dt_start, dt_end, unit): """ :param code: stock code :param dt_start: (datetime) start date :param dt_end: (datetime) start date :return rst: (pd.DataFrame) crawled info :return first: (str) first true date :return last: (str) last true date - CREON API doesn't work well over a wide range. So the date range should be splitted into 5 days. - Then, using CREON API attach all splitted date range """ rst = pd.DataFrame() list_date = [] first = dt_start.strftime("%Y%m%d") last = dt_end.strftime("%Y%m%d") if (dt_end - dt_start).days < 5: list_date.append((dt_start, dt_end)) else: s = dt_start e = s + dt.timedelta(days=5) delta = 1 while delta > 0: list_date.append((s, e)) delta = (dt_end - e).days s = e + dt.timedelta(days=1) e = s + dt.timedelta(days=5) list_date.reverse() # From new to old for step, (dt_start, dt_end) in enumerate(list_date): arg_start, arg_end = int(dt_start.strftime("%Y%m%d")), int(dt_end.strftime("%Y%m%d")) tmp = self.creonAPI(code, arg_start, arg_end, unit) rst = rst.append(tmp) numerator = list(self.list_code).index(code) * len(list_date) + step denominator = len(self.list_code) * len(list_date) progress = numerator / denominator * 100 self.time_cur = time.time() print("\r%.2f%% Complete.. \t %d초 경과" % (progress, self.time_cur - self.time_start), end='') return rst, first, last def setDate(self, unit, months=0, days=0): """ :param unit: can be 'm'(minutes) or 'T'(Ticks) :param months: crawl info from 'months' ago to now :param days: crawl info from 'days' ago to now :return dt_start, dt_end: (datetime, datetime) date set """ dt_end = dt.datetime.now() # 2018. 07. 03 (FIxed value) # Set date from 'months' or 'days' ago to now dt_start = dt_end - relativedelta(months=months, days=days) return dt_start, dt_end def fullName(self, unit): """ :param unit: can be 'm'(minutes) or 'T'(Ticks) :return FullName of unit: """ if unit == 'T': return "Tick" elif unit == 'm': return "Minute" else: return "Others" def creonAPI(self, code, start, end, unit): """ :param code: stock code :param start: (int) start date :param end: (int) end date :param unit: can be 'm'(minutes) or 'T'(Ticks) :return crawled info: (pd.DataFrame) """ dict_chart = {name: [] for name in self.list_field_name} self.obj_StockChart.SetInputValue(0, code) self.obj_StockChart.SetInputValue(1, ord('1')) # 0: 개수, 1: 기간 self.obj_StockChart.SetInputValue(2, end) # 종료일 self.obj_StockChart.SetInputValue(3, start) # 시작일 self.obj_StockChart.SetInputValue(5, self.list_field_key) # 필드 self.obj_StockChart.SetInputValue(6, ord(unit)) # mode: 'D', 'W', 'M', 'm', 'T' self.obj_StockChart.BlockRequest() # 설정에 따라 데이터를 요청 status = self.obj_StockChart.GetDibStatus() if status != 0: msg = self.obj_StockChart.GetDibMsg1() print("통신상태: {} {}".format(status, msg)) exit(1) cnt = self.obj_StockChart.GetHeaderValue(3) # 수신개수 for i in range(cnt): key = self.list_field_name value = lambda pos: code if pos == 0 else self.obj_StockChart.GetDataValue(pos-1, i) dict_item = {name: value(pos) for pos, name in enumerate(key)} # 20180701 -> 2018-07-01 # 153000 -> 15:30:00 date = str(dict_item['날짜']) time = str(dict_item['시간']) dict_item['종목코드'] = code dict_item['날짜'] = "{Y:2}-{M:2}-{D:2}".format(Y=date[:4], M=date[4:6], D=date[6:]) dict_item['시간'] = "{H:0>2}:{M:2}:00".format(H=time[:-2], M=time[-2:]) for k, v in dict_item.items(): dict_chart[k].append(v) return pd.DataFrame(dict_chart, columns=self.list_field_name) if __name__ == '__main__': creon = Creon() # User Input parameter param = {'unit': 'm', # 'T': Tick / 'm': minute (Limit is 1 month ago in Tick, 2 years ago in minute) 'mode': 'M', # 'M': Manual / 'A': Auto (Default is 1 month ago to now. # The range can be modified in creon.crawling()) 'start': 20180701, # Only used in 'M'anual mode 'end': 20180705, # Only used in 'M'anual mode 'test': True # True: Crawl only 1 stock / False: Crawl every stock } # 1. Load code list from 'code.xlsx' # code.xlsx : 종목코드/종목명/시장/업종 creon.loadCodeFile('code.xlsx') # 2. Crawling creon.crawling(param)
'AI > System Trading' 카테고리의 다른 글
Crawling한 data를 DB에 insertion 하기 (0) | 2018.07.10 |
---|
Comments