Skip to content

API Reference

kiwoom

Bot

Kiwoom REST API를 이용해 전략을 실행하는 최상위 클래스입니다.

사용자가 API 세부 동작을 알지 못해도 전략 수행에 집중할 수 있도록 합니다.

Source code in kiwoom/bot.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class Bot:
    """
    Kiwoom REST API를 이용해 전략을 실행하는 최상위 클래스입니다.

    사용자가 API 세부 동작을 알지 못해도 전략 수행에 집중할 수 있도록 합니다.
    """

    def __init__(self, host: str, appkey: str, secretkey: str, api: API | None = None):
        """
        Bot 클래스 인스턴스를 초기화합니다.

        Args:
            host (str): 실서버 / 모의서버 도메인
            appkey (str): 파일경로 / 앱키
            secretkey (str): 파일경로 / 시크릿키
            api (API, optional): API를 별도로 구현했다면 인스턴스 전달가능
        """
        self.api = api if api else API(host, appkey, secretkey)

    async def __aenter__(self) -> Self:
        """
        async with 구문에서 Bot 인스턴스를 반환합니다.

        Returns:
            Bot: self
        """
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        """
        async with 구문을 종료할 때 연결을 해제하고 리소스를 정리합니다.

        Args:
            exc_type (_type_): exception type
            exc_value (_type_): exception value
            traceback (_type_): traceback
        """
        with contextlib.suppress(EXCEPTIONS_TO_SUPPRESS):
            await asyncio.shield(self.close())

    def debug(self, debugging: bool = True) -> None:
        """
        디버깅 모드 활성화 / 비활성화.
        디버깅 모드에서는 Http 요청과 응답이 출력됩니다.

        Args:
            debugging (bool): 디버깅 모드 활성화 여부
        """
        self.api.debugging = debugging

    def token(self) -> str:
        """
        연결이 되었다면, 키움 REST API 토큰을 반환합니다.

        Returns:
            str: token
        """
        return self.api.token()

    async def connect(self):
        """
        키움 REST API HTTP 서버 및 Websocket 서버에 접속합니다.
        """
        await self.api.connect()
        await asyncio.sleep(1)

    async def close(self):
        """
        키움 REST API HTTP 서버 및 Websocket 서버 연결을 해제합니다.
        """
        await asyncio.shield(self.api.close())

    async def stock_list(self, market: str, ats: bool = True) -> list[str]:
        """
        주어진 market 코드에 해당하는 주식 종목코드 목록을 반환합니다.

        Args:
            market (str): {
                'KOSPI': '0', 'KOSDAQ': '10', 'ELW': '3',
                '뮤추얼펀드': '4', '신주인수권': '5', '리츠': '6',
                'ETF': '8', '하이일드펀드': '9', 'K-OTC': '30',
                'KONEX': '50', 'ETN': '60', 'NXT': 'NXT'}
            ats (bool, optional): 대체거래소 반영한 통합코드 여부 (ex. '005930_AL')

        Returns:
            list[str]: 종목코드 리스트
        """

        # Add NXT market
        if market == "NXT":
            kospi = await self.stock_list("0")
            kosdaq = await self.stock_list("10")
            codes = [c for c in kospi + kosdaq if "AL" in c]
            return sorted(codes)

        data = await self.api.stock_list(market)
        codes = proc.stock_list(data, ats)
        return codes

    async def sector_list(self, market: str) -> list[str]:
        """
        주어진 market 코드에 해당하는 업종코드 목록을 반환합니다.

        Args:
            market (str): {
                '0': 'KOSPI', '1': 'KOSDAQ',
                '2': 'KOSPI200', '4': 'KOSPI100(150)',
                '7': 'KRX100'}

        Returns:
            list[str]: 업종코드 리스트
        """
        data = await self.api.sector_list(market)
        codes = proc.sector_list(data)
        return codes

    async def candle(
        self,
        code: str,
        period: str,
        ctype: str,
        start: str = None,
        end: str = None,
    ) -> DataFrame:
        """
        주어진 코드, 기간, 종목/업종 유형에 해당하는 캔들차트 데이터를 반환합니다.

        Args:
            code (str): 종목코드 / 업종코드
            period (str): 캔들 기간유형 {"tick", "min", "day"}
            ctype (str): 종목 / 업종유형 {"stock", "sector"}
            start (str, optional): 시작일자 in YYYYMMDD format
            end (str, optional): 종료일자 in YYYYMMDD format

        Returns:
            DataFrame: Pandas 캔들차트 데이터프레임
        """
        data = await self.api.candle(code, period, ctype, start, end)
        df = proc.candle.process(data, code, period, ctype, start, end)
        return df

    async def trade(self, start: str, end: str = "") -> DataFrame:
        """
        주어진 시작일자와 종료일자에 해당하는 체결내역을
        키움증권 '0343' 계좌 체결내역 화면과 동일한 구성으로 반환합니다.
        데이터 조회 제한으로 최근 2개월 데이터만 조회할 수 있습니다.

        체결내역 데이터는 [알파노트](http://alphanote.io)를 통해
        간편하게 진입/청산 시각화 및 성과 지표들을 확인할 수 있습니다.

        Args:
            start (str): 시작일자 in YYYYMMDD format
            end (str, optional): 종료일자 in YYYYMMDD format

        Returns:
            DataFrame: 키움증권 '0343' 화면 'Excel 내보내기' 형식
        """
        data = await self.api.trade(start, end)
        df = proc.trade.process(data)
        return df

    async def run(self):
        """
        전략 로직을 구현하고 실행합니다.
        """
        pass

api instance-attribute

api = api if api else API(host, appkey, secretkey)

debug

debug(debugging=True)

디버깅 모드 활성화 / 비활성화. 디버깅 모드에서는 Http 요청과 응답이 출력됩니다.

Parameters:

Name Type Description Default
debugging bool

디버깅 모드 활성화 여부

True
Source code in kiwoom/bot.py
52
53
54
55
56
57
58
59
60
def debug(self, debugging: bool = True) -> None:
    """
    디버깅 모드 활성화 / 비활성화.
    디버깅 모드에서는 Http 요청과 응답이 출력됩니다.

    Args:
        debugging (bool): 디버깅 모드 활성화 여부
    """
    self.api.debugging = debugging

token

token()

연결이 되었다면, 키움 REST API 토큰을 반환합니다.

Returns:

Name Type Description
str str

token

Source code in kiwoom/bot.py
62
63
64
65
66
67
68
69
def token(self) -> str:
    """
    연결이 되었다면, 키움 REST API 토큰을 반환합니다.

    Returns:
        str: token
    """
    return self.api.token()

connect async

connect()

키움 REST API HTTP 서버 및 Websocket 서버에 접속합니다.

Source code in kiwoom/bot.py
71
72
73
74
75
76
async def connect(self):
    """
    키움 REST API HTTP 서버 및 Websocket 서버에 접속합니다.
    """
    await self.api.connect()
    await asyncio.sleep(1)

close async

close()

키움 REST API HTTP 서버 및 Websocket 서버 연결을 해제합니다.

Source code in kiwoom/bot.py
78
79
80
81
82
async def close(self):
    """
    키움 REST API HTTP 서버 및 Websocket 서버 연결을 해제합니다.
    """
    await asyncio.shield(self.api.close())

stock_list async

stock_list(market, ats=True)

주어진 market 코드에 해당하는 주식 종목코드 목록을 반환합니다.

Parameters:

Name Type Description Default
market str

{ 'KOSPI': '0', 'KOSDAQ': '10', 'ELW': '3', '뮤추얼펀드': '4', '신주인수권': '5', '리츠': '6', 'ETF': '8', '하이일드펀드': '9', 'K-OTC': '30', 'KONEX': '50', 'ETN': '60', 'NXT': 'NXT'}

required
ats bool

대체거래소 반영한 통합코드 여부 (ex. '005930_AL')

True

Returns:

Type Description
list[str]

list[str]: 종목코드 리스트

Source code in kiwoom/bot.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
async def stock_list(self, market: str, ats: bool = True) -> list[str]:
    """
    주어진 market 코드에 해당하는 주식 종목코드 목록을 반환합니다.

    Args:
        market (str): {
            'KOSPI': '0', 'KOSDAQ': '10', 'ELW': '3',
            '뮤추얼펀드': '4', '신주인수권': '5', '리츠': '6',
            'ETF': '8', '하이일드펀드': '9', 'K-OTC': '30',
            'KONEX': '50', 'ETN': '60', 'NXT': 'NXT'}
        ats (bool, optional): 대체거래소 반영한 통합코드 여부 (ex. '005930_AL')

    Returns:
        list[str]: 종목코드 리스트
    """

    # Add NXT market
    if market == "NXT":
        kospi = await self.stock_list("0")
        kosdaq = await self.stock_list("10")
        codes = [c for c in kospi + kosdaq if "AL" in c]
        return sorted(codes)

    data = await self.api.stock_list(market)
    codes = proc.stock_list(data, ats)
    return codes

sector_list async

sector_list(market)

주어진 market 코드에 해당하는 업종코드 목록을 반환합니다.

Parameters:

Name Type Description Default
market str

{ '0': 'KOSPI', '1': 'KOSDAQ', '2': 'KOSPI200', '4': 'KOSPI100(150)', '7': 'KRX100'}

required

Returns:

Type Description
list[str]

list[str]: 업종코드 리스트

Source code in kiwoom/bot.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
async def sector_list(self, market: str) -> list[str]:
    """
    주어진 market 코드에 해당하는 업종코드 목록을 반환합니다.

    Args:
        market (str): {
            '0': 'KOSPI', '1': 'KOSDAQ',
            '2': 'KOSPI200', '4': 'KOSPI100(150)',
            '7': 'KRX100'}

    Returns:
        list[str]: 업종코드 리스트
    """
    data = await self.api.sector_list(market)
    codes = proc.sector_list(data)
    return codes

candle async

candle(code, period, ctype, start=None, end=None)

주어진 코드, 기간, 종목/업종 유형에 해당하는 캔들차트 데이터를 반환합니다.

Parameters:

Name Type Description Default
code str

종목코드 / 업종코드

required
period str

캔들 기간유형 {"tick", "min", "day"}

required
ctype str

종목 / 업종유형 {"stock", "sector"}

required
start str

시작일자 in YYYYMMDD format

None
end str

종료일자 in YYYYMMDD format

None

Returns:

Name Type Description
DataFrame DataFrame

Pandas 캔들차트 데이터프레임

Source code in kiwoom/bot.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def candle(
    self,
    code: str,
    period: str,
    ctype: str,
    start: str = None,
    end: str = None,
) -> DataFrame:
    """
    주어진 코드, 기간, 종목/업종 유형에 해당하는 캔들차트 데이터를 반환합니다.

    Args:
        code (str): 종목코드 / 업종코드
        period (str): 캔들 기간유형 {"tick", "min", "day"}
        ctype (str): 종목 / 업종유형 {"stock", "sector"}
        start (str, optional): 시작일자 in YYYYMMDD format
        end (str, optional): 종료일자 in YYYYMMDD format

    Returns:
        DataFrame: Pandas 캔들차트 데이터프레임
    """
    data = await self.api.candle(code, period, ctype, start, end)
    df = proc.candle.process(data, code, period, ctype, start, end)
    return df

trade async

trade(start, end='')

주어진 시작일자와 종료일자에 해당하는 체결내역을 키움증권 '0343' 계좌 체결내역 화면과 동일한 구성으로 반환합니다. 데이터 조회 제한으로 최근 2개월 데이터만 조회할 수 있습니다.

체결내역 데이터는 알파노트를 통해 간편하게 진입/청산 시각화 및 성과 지표들을 확인할 수 있습니다.

Parameters:

Name Type Description Default
start str

시작일자 in YYYYMMDD format

required
end str

종료일자 in YYYYMMDD format

''

Returns:

Name Type Description
DataFrame DataFrame

키움증권 '0343' 화면 'Excel 내보내기' 형식

Source code in kiwoom/bot.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
async def trade(self, start: str, end: str = "") -> DataFrame:
    """
    주어진 시작일자와 종료일자에 해당하는 체결내역을
    키움증권 '0343' 계좌 체결내역 화면과 동일한 구성으로 반환합니다.
    데이터 조회 제한으로 최근 2개월 데이터만 조회할 수 있습니다.

    체결내역 데이터는 [알파노트](http://alphanote.io)를 통해
    간편하게 진입/청산 시각화 및 성과 지표들을 확인할 수 있습니다.

    Args:
        start (str): 시작일자 in YYYYMMDD format
        end (str, optional): 종료일자 in YYYYMMDD format

    Returns:
        DataFrame: 키움증권 '0343' 화면 'Excel 내보내기' 형식
    """
    data = await self.api.trade(start, end)
    df = proc.trade.process(data)
    return df

run async

run()

전략 로직을 구현하고 실행합니다.

Source code in kiwoom/bot.py
173
174
175
176
177
async def run(self):
    """
    전략 로직을 구현하고 실행합니다.
    """
    pass

API

Bases: Client

Kiwoom REST API 서버와 직접 요청과 응답을 주고받는 클래스입니다.

데이터 조회, 주문 요청 등 저수준 통신을 담당하며, 직접 API 스펙을 구현하여 활용합니다.

Source code in kiwoom/api.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
class API(Client):
    """
    Kiwoom REST API 서버와 직접 요청과 응답을 주고받는 클래스입니다.

    데이터 조회, 주문 요청 등 저수준 통신을 담당하며,
    직접 API 스펙을 구현하여 활용합니다.
    """

    def __init__(self, host: str, appkey: str, secretkey: str):
        """
        API 클래스 인스턴스를 초기화합니다.

        Args:
            host (str): 실서버 / 모의서버 도메인
            appkey (str): 파일경로 / 앱키
            secretkey (str): 파일경로 / 시크릿키

        Raises:
            ValueError: 유효하지 않은 도메인
        """
        match host:
            case config.REAL:
                wss_url = Socket.REAL + Socket.ENDPOINT
            case config.MOCK:
                wss_url = Socket.MOCK + Socket.ENDPOINT
            case _:
                raise ValueError(f"Invalid host: {self.host}")

        super().__init__(host, appkey, secretkey)
        self.queue = asyncio.Queue(maxsize=WEBSOCKET_QUEUE_MAX_SIZE)
        self.socket = Socket(url=wss_url, queue=self.queue)

        self._state = State.CLOSED
        self._state_lock = asyncio.Lock()
        self._recv_task: asyncio.Task | None = None
        self._stop_event = asyncio.Event()
        self._stop_event.set()

        self._sem = asyncio.Semaphore(config.http.WEBSOCKET_MAX_CONCURRENCY)
        async_print = wrap_sync_callback(self._sem, lambda msg: print(msg))
        self._callbacks = defaultdict(lambda: async_print)
        self._add_default_callback_on_real_data()

    async def connect(self):
        """
        키움 REST API HTTP 서버와 Websocket 서버에 접속하고 토큰을 발급받습니다.

        Raises:
            RuntimeError: 토큰을 발급받지 못한 경우
            Exception: 예상하지 못한 에러
        """
        async with self._state_lock:
            if self._state in (State.CONNECTED, State.CONNECTING):
                return

            self._state = State.CONNECTING
            try:
                # Cancel existing task
                self._stop_event.set()
                await cancel(self._recv_task)

                # Connect http server
                await super().connect(self._appkey, self._secretkey)
                if not (token := self.token()):
                    raise RuntimeError("Not connected: token is not available.")

                # Connect websocket server
                await self.socket.connect(self._session, token)

                # Run websocket receiving task
                self._stop_event.clear()
                self._recv_task = asyncio.create_task(self._on_receive_websocket(), name="dequeue")
                self._state = State.CONNECTED

            except Exception as err:
                self._state = State.CLOSED
                with contextlib.suppress(Exception):
                    await self.socket.close()
                with contextlib.suppress(Exception):
                    await super().close()
                raise Exception from err

    async def close(self):
        """
        키움 REST API 서버와 연결을 해제하고 리소스를 정리합니다.
        """
        async with self._state_lock:
            if self._state in (State.CLOSED, State.CLOSING):
                return

            self._state = State.CLOSING
            try:
                # Cancel existing task
                self._stop_event.set()
                with contextlib.suppress(EXCEPTIONS_TO_SUPPRESS):
                    await asyncio.shield(cancel(self._recv_task))
                self._recv_task = None

                # Close websocket server
                with contextlib.suppress(EXCEPTIONS_TO_SUPPRESS):
                    await asyncio.shield(self.socket.close())
                # Close http server
                with contextlib.suppress(EXCEPTIONS_TO_SUPPRESS):
                    await asyncio.shield(super().close())

            finally:
                self._state = State.CLOSED

    async def stock_list(self, market: str) -> dict:
        """
        주어진 market 코드에 대해 'ka10099' API 요청을 하고 응답을 반환합니다.

        Args:
            market (str): 조회할 주식 시장코드

        Raises:
            ValueError: 종목코드 목록이 없을 경우

        Returns:
            dict: 종목코드 목록을 포함하는 응답
        """
        endpoint = "/api/dostk/stkinfo"
        api_id = "ka10099"

        res = await self.request(endpoint, api_id, data={"mrkt_tp": market})
        body = res.json()
        if not body["list"] or len(body["list"]) <= 1:
            raise ValueError(f"Stock list is not available for market code, {market}.")
        return body

    async def sector_list(self, market: str) -> dict:
        """
        주어진 market 코드에 대해 'ka10101' API 요청을 하고 응답을 반환합니다.

        Args:
            market (str): 조회할 주식 시장코드

        Raises:
            ValueError: 업종코드 목록이 없을 경우

        Returns:
            dict: 업종코드 목록을 포함하는 응답
        """
        endpoint = "/api/dostk/stkinfo"
        api_id = "ka10101"

        res = await self.request(endpoint, api_id, data={"mrkt_tp": market})
        body = res.json()
        if not body["list"] or len(body["list"]) <= 1:
            raise ValueError(f"Sector list is not available for sector code, {market}.")
        return body

    async def candle(
        self,
        code: str,
        period: str,
        ctype: str,
        start: str = None,
        end: str = None,
    ) -> dict:
        """
        주어진 코드, 기간, 종목/업종 유형에 해당하는 API 요청을 하고 응답을 반환합니다.

        "stock": {"tick": "ka10079", "min": "ka10080", "day": "ka10081"}

        "sector": {"tick": "ka20004", "min": "ka20005", "day": "ka20006"}

        Args:
            code (str): 종목코드 / 업종코드
            period (str): 캔들 기간유형, {"tick", "min", "day"}.
            ctype (str): 종목 / 업종 유형, {"stock", "sector"}.
            start (str, optional): 시작일자 in YYYYMMDD format.
            end (str, optional): 종료일자 in YYYYMMDD format.

        Raises:
            ValueError: 유효하지 않은 'ctype' 또는 'period'

        Returns:
            dict: 캔들 데이터를 포함하는 json 응답
        """

        ctype = ctype.lower()
        endpoint = "/api/dostk/chart"
        api_id = PERIOD_TO_API_ID[ctype][period]
        data = dict(PERIOD_TO_DATA[ctype][period])
        match ctype:
            case "stock":
                data["stk_cd"] = code
            case "sector":
                data["inds_cd"] = code
            case _:
                raise ValueError(f"'ctype' must be one of [stock, sector], not {ctype=}.")
        if period == "day":
            end = end if end else datetime.now().strftime("%Y%m%d")
            data["base_dt"] = end

        ymd: int = len("YYYYMMDD")  # 8 digit compare
        key: str = PERIOD_TO_BODY_KEY[ctype][period]
        time: str = PERIOD_TO_TIME_KEY[period]

        def should_continue(body: dict) -> bool:
            # Validate
            if not valid(body, period, ctype):
                return False
            # Request full data
            if not start:
                return True
            # Condition to continue
            chart = body[key]
            earliest = chart[-1][time][:ymd]
            return start <= earliest

        body = await self.request_until(should_continue, endpoint, api_id, data=data)
        return body

    async def trade(self, start: str, end: str = "") -> list[dict]:
        """
        주어진 시작일자와 종료일자에 해당하는 체결내역을
        키움증권 '0343' 계좌 체결내역 화면과 동일한 구성으로 반환합니다.
        데이터 조회 제한으로 최근 2개월 데이터만 조회할 수 있습니다.

        체결내역 데이터는 [알파노트](http://alphanote.io)를 통해
        간편하게 진입/청산 시각화 및 성과 지표들을 확인할 수 있습니다.

        Args:
            start (str): 시작일자 in YYYYMMDD format
            end (str, optional): 종료일자 in YYYYMMDD format

        Returns:
            list[dict]: 체결내역 데이터를 포함하는 json 응답 리스트
        """
        endpoint = "/api/dostk/acnt"
        api_id = "kt00009"
        data = {
            "ord_dt": "",  # YYYYMMDD (Optional)
            "qry_tp": "1",  # 전체/체결
            "stk_bond_tp": "1",  # 전체/주식/채권
            "mrkt_tp": "0",  # 전체/코스피/코스닥/OTCBB/ECN
            "sell_tp": "0",  # 전체/매도/매수
            "dmst_stex_tp": "%",  # 전체/KRX/NXT/SOR
            # 'stk_cd': '',  # 종목코드 (Optional)
            # 'fr_ord_no': '',  # 시작주문번호 (Optional)
        }

        today = datetime.today()
        start = datetime.strptime(start, "%Y%m%d")
        start = max(start, today - timedelta(days=REQUEST_LIMIT_DAYS))
        end = datetime.strptime(end, "%Y%m%d") if end else datetime.today()
        end = min(end, datetime.today())

        trs = []
        key = "acnt_ord_cntr_prst_array"
        for bday in bdate_range(start, end):
            dic = dict(data)
            dic["ord_dt"] = bday.strftime("%Y%m%d")  # manually set ord_dt
            body = await self.request_until(lambda x: True, endpoint, api_id, data=dic)
            if key in body:
                # Append order date to each record
                for rec in body[key]:
                    rec["ord_dt"] = bday.strftime("%Y-%m-%d")
                trs.extend(body[key])
        return trs

    def add_callback_on_real_data(self, real_type: str, callback: Callable) -> None:
        """
        실시간 데이터 수신 시 호출될 콜백 함수를 추가합니다. (trnm이 'REAL'인 경우)

        * callback 함수는 서버 응답 string 그대로를 인자로 받습니다.
        * real_type을 'PING' 또는 'LOGIN'으로 설정하면 기본 콜백 함수를 덮어씁니다.

        콜백 함수는 비동기 콜백 함수를 추가하는 것을 권장합니다.
        비동기 및 동기 콜백 함수 모두 루프를 블로킹하지 않도록
        백그라운드에서 실행됩니다. 따라서 데이터 처리 완료 순서가 반드시
        데이터 수신 순서에 따라 실행되지 않을 수 있습니다.

        ex) tick 체결 데이터 (type 'OB')가 수신될 때마다 데이터 출력하기

            > fn = lambda raw: print(raw)

            > add_callback_on_real_data(real_type='OB', callback=fn)

        Args:
            real_type (str): 키움 REST API에 정의된 실시간 데이터 타입
            callback (Callable): raw 스트링을 인자로 받는 콜백 함수
        """

        real_type = real_type.upper()
        # Asnyc Callback
        if iscoroutinefunction(callback):
            self._callbacks[real_type] = wrap_async_callback(self._sem, callback)
        # Sync Callback
        else:
            self._callbacks[real_type] = wrap_sync_callback(self._sem, callback)

    def _add_default_callback_on_real_data(self) -> None:
        """
        Add default callback functions on real data receive.
        """

        # Ping
        async def callback_on_ping(msg: dict):
            await self.socket.send(msg)

        self.add_callback_on_real_data(real_type="PING", callback=callback_on_ping)

        # Login
        def callback_on_login(msg: dict):
            if msg.get("return_code") != 0:
                raise RuntimeError(f"Login failed with return_code not zero, {msg}.")
            print(msg)

        self.add_callback_on_real_data(real_type="LOGIN", callback=callback_on_login)

    async def _on_receive_websocket(self) -> None:
        """
        Receive websocket data and dispatch to the callback function.
        Decoder patially checks 'trnm' and 'type' in order to speed up.

        If trnm is "REAL", the argument to callback function is RealData instance.
        Otherwise, the argument to callback function is json dict.

        Raises:
            Exception: Exception raised by the callback function or decoder
        """
        decoder = msgspec.json.Decoder(type=RealType)
        while not self._stop_event.is_set():
            try:
                raw: str = await self.queue.get()
            except asyncio.CancelledError:
                break

            try:
                msg = decoder.decode(raw)  # partially decoded for speed up
                if msg.trnm == "REAL":
                    for data in msg.data:
                        asyncio.create_task(
                            self._callbacks[data.type](
                                RealData(bytes(data.values), data.type, data.name, data.item)
                            )
                        )
                    continue

                dic = orjson.loads(raw)
                asyncio.create_task(self._callbacks[msg.trnm](dic))

            except Exception as err:
                raise Exception("Failed to handling websocket data.") from err

            finally:
                self.queue.task_done()

    async def register_tick(
        self,
        grp_no: str,
        codes: list[str],
        refresh: str = "1",
    ) -> None:
        """
        주어진 그룹번호와 종목 코드에 대해 주식체결 데이터를 등록합니다. (타입 '0B')

        Args:
            grp_no (str): 그룹번호
            codes (list[str]): 종목코드 리스트
            refresh (str, optional): 기존등록유지여부 (기존유지:'1', 신규등록:'0').
        """

        assert len(codes) <= 100, f"Max 100 codes per group, got {len(codes)} codes."
        await self.socket.send(
            {
                "trnm": "REG",
                "grp_no": grp_no,
                "refresh": refresh,
                "data": [
                    {
                        "item": codes,
                        "type": ["0B"],
                    }
                ],
            }
        )

    async def register_hoga(
        self,
        grp_no: str,
        codes: list[str],
        refresh: str = "1",
    ) -> None:
        """
        주어진 그룹번호와 종목 코드에 대해 주식호가잔량 데이터를 등록합니다. (타입 '0D')

        Args:
            grp_no (str): 그룹번호
            codes (list[str]): 종목코드 리스트
            refresh (str, optional): 기존등록유지여부 (기존유지:'1', 신규등록:'0').
        """

        assert len(codes) <= 100, f"Max 100 codes per group, got {len(codes)} codes."
        await self.socket.send(
            {
                "trnm": "REG",
                "grp_no": grp_no,
                "refresh": refresh,
                "data": [
                    {
                        "item": codes,
                        "type": ["0D"],
                    }
                ],
            }
        )

    async def remove_register(self, grp_no: str, codes: list[str], type: str | list[str]) -> None:
        """
        주어진 그룹번호와 실시간 데이터 타입에 대해 등록된 데이터를 제거합니다.

        Args:
            grp_no (str): 그룹번호
            type (str | list[str]): 실시간 데이터 타입 ex) '0B', '0D', 'DD'
        """
        if not grp_no or not type:
            return
        if isinstance(type, str):
            type = [type]
        await self.socket.send(
            {
                "trnm": "REMOVE",
                "grp_no": grp_no,
                "refresh": "",
                "data": [{"item": codes, "type": type}],
            }
        )

queue instance-attribute

queue = Queue(maxsize=WEBSOCKET_QUEUE_MAX_SIZE)

socket instance-attribute

socket = Socket(url=wss_url, queue=queue)

connect async

connect()

키움 REST API HTTP 서버와 Websocket 서버에 접속하고 토큰을 발급받습니다.

Raises:

Type Description
RuntimeError

토큰을 발급받지 못한 경우

Exception

예상하지 못한 에러

Source code in kiwoom/api.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
async def connect(self):
    """
    키움 REST API HTTP 서버와 Websocket 서버에 접속하고 토큰을 발급받습니다.

    Raises:
        RuntimeError: 토큰을 발급받지 못한 경우
        Exception: 예상하지 못한 에러
    """
    async with self._state_lock:
        if self._state in (State.CONNECTED, State.CONNECTING):
            return

        self._state = State.CONNECTING
        try:
            # Cancel existing task
            self._stop_event.set()
            await cancel(self._recv_task)

            # Connect http server
            await super().connect(self._appkey, self._secretkey)
            if not (token := self.token()):
                raise RuntimeError("Not connected: token is not available.")

            # Connect websocket server
            await self.socket.connect(self._session, token)

            # Run websocket receiving task
            self._stop_event.clear()
            self._recv_task = asyncio.create_task(self._on_receive_websocket(), name="dequeue")
            self._state = State.CONNECTED

        except Exception as err:
            self._state = State.CLOSED
            with contextlib.suppress(Exception):
                await self.socket.close()
            with contextlib.suppress(Exception):
                await super().close()
            raise Exception from err

close async

close()

키움 REST API 서버와 연결을 해제하고 리소스를 정리합니다.

Source code in kiwoom/api.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def close(self):
    """
    키움 REST API 서버와 연결을 해제하고 리소스를 정리합니다.
    """
    async with self._state_lock:
        if self._state in (State.CLOSED, State.CLOSING):
            return

        self._state = State.CLOSING
        try:
            # Cancel existing task
            self._stop_event.set()
            with contextlib.suppress(EXCEPTIONS_TO_SUPPRESS):
                await asyncio.shield(cancel(self._recv_task))
            self._recv_task = None

            # Close websocket server
            with contextlib.suppress(EXCEPTIONS_TO_SUPPRESS):
                await asyncio.shield(self.socket.close())
            # Close http server
            with contextlib.suppress(EXCEPTIONS_TO_SUPPRESS):
                await asyncio.shield(super().close())

        finally:
            self._state = State.CLOSED

stock_list async

stock_list(market)

주어진 market 코드에 대해 'ka10099' API 요청을 하고 응답을 반환합니다.

Parameters:

Name Type Description Default
market str

조회할 주식 시장코드

required

Raises:

Type Description
ValueError

종목코드 목록이 없을 경우

Returns:

Name Type Description
dict dict

종목코드 목록을 포함하는 응답

Source code in kiwoom/api.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
async def stock_list(self, market: str) -> dict:
    """
    주어진 market 코드에 대해 'ka10099' API 요청을 하고 응답을 반환합니다.

    Args:
        market (str): 조회할 주식 시장코드

    Raises:
        ValueError: 종목코드 목록이 없을 경우

    Returns:
        dict: 종목코드 목록을 포함하는 응답
    """
    endpoint = "/api/dostk/stkinfo"
    api_id = "ka10099"

    res = await self.request(endpoint, api_id, data={"mrkt_tp": market})
    body = res.json()
    if not body["list"] or len(body["list"]) <= 1:
        raise ValueError(f"Stock list is not available for market code, {market}.")
    return body

sector_list async

sector_list(market)

주어진 market 코드에 대해 'ka10101' API 요청을 하고 응답을 반환합니다.

Parameters:

Name Type Description Default
market str

조회할 주식 시장코드

required

Raises:

Type Description
ValueError

업종코드 목록이 없을 경우

Returns:

Name Type Description
dict dict

업종코드 목록을 포함하는 응답

Source code in kiwoom/api.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
async def sector_list(self, market: str) -> dict:
    """
    주어진 market 코드에 대해 'ka10101' API 요청을 하고 응답을 반환합니다.

    Args:
        market (str): 조회할 주식 시장코드

    Raises:
        ValueError: 업종코드 목록이 없을 경우

    Returns:
        dict: 업종코드 목록을 포함하는 응답
    """
    endpoint = "/api/dostk/stkinfo"
    api_id = "ka10101"

    res = await self.request(endpoint, api_id, data={"mrkt_tp": market})
    body = res.json()
    if not body["list"] or len(body["list"]) <= 1:
        raise ValueError(f"Sector list is not available for sector code, {market}.")
    return body

candle async

candle(code, period, ctype, start=None, end=None)

주어진 코드, 기간, 종목/업종 유형에 해당하는 API 요청을 하고 응답을 반환합니다.

"stock": {"tick": "ka10079", "min": "ka10080", "day": "ka10081"}

"sector": {"tick": "ka20004", "min": "ka20005", "day": "ka20006"}

Parameters:

Name Type Description Default
code str

종목코드 / 업종코드

required
period str

캔들 기간유형, {"tick", "min", "day"}.

required
ctype str

종목 / 업종 유형, {"stock", "sector"}.

required
start str

시작일자 in YYYYMMDD format.

None
end str

종료일자 in YYYYMMDD format.

None

Raises:

Type Description
ValueError

유효하지 않은 'ctype' 또는 'period'

Returns:

Name Type Description
dict dict

캔들 데이터를 포함하는 json 응답

Source code in kiwoom/api.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
async def candle(
    self,
    code: str,
    period: str,
    ctype: str,
    start: str = None,
    end: str = None,
) -> dict:
    """
    주어진 코드, 기간, 종목/업종 유형에 해당하는 API 요청을 하고 응답을 반환합니다.

    "stock": {"tick": "ka10079", "min": "ka10080", "day": "ka10081"}

    "sector": {"tick": "ka20004", "min": "ka20005", "day": "ka20006"}

    Args:
        code (str): 종목코드 / 업종코드
        period (str): 캔들 기간유형, {"tick", "min", "day"}.
        ctype (str): 종목 / 업종 유형, {"stock", "sector"}.
        start (str, optional): 시작일자 in YYYYMMDD format.
        end (str, optional): 종료일자 in YYYYMMDD format.

    Raises:
        ValueError: 유효하지 않은 'ctype' 또는 'period'

    Returns:
        dict: 캔들 데이터를 포함하는 json 응답
    """

    ctype = ctype.lower()
    endpoint = "/api/dostk/chart"
    api_id = PERIOD_TO_API_ID[ctype][period]
    data = dict(PERIOD_TO_DATA[ctype][period])
    match ctype:
        case "stock":
            data["stk_cd"] = code
        case "sector":
            data["inds_cd"] = code
        case _:
            raise ValueError(f"'ctype' must be one of [stock, sector], not {ctype=}.")
    if period == "day":
        end = end if end else datetime.now().strftime("%Y%m%d")
        data["base_dt"] = end

    ymd: int = len("YYYYMMDD")  # 8 digit compare
    key: str = PERIOD_TO_BODY_KEY[ctype][period]
    time: str = PERIOD_TO_TIME_KEY[period]

    def should_continue(body: dict) -> bool:
        # Validate
        if not valid(body, period, ctype):
            return False
        # Request full data
        if not start:
            return True
        # Condition to continue
        chart = body[key]
        earliest = chart[-1][time][:ymd]
        return start <= earliest

    body = await self.request_until(should_continue, endpoint, api_id, data=data)
    return body

trade async

trade(start, end='')

주어진 시작일자와 종료일자에 해당하는 체결내역을 키움증권 '0343' 계좌 체결내역 화면과 동일한 구성으로 반환합니다. 데이터 조회 제한으로 최근 2개월 데이터만 조회할 수 있습니다.

체결내역 데이터는 알파노트를 통해 간편하게 진입/청산 시각화 및 성과 지표들을 확인할 수 있습니다.

Parameters:

Name Type Description Default
start str

시작일자 in YYYYMMDD format

required
end str

종료일자 in YYYYMMDD format

''

Returns:

Type Description
list[dict]

list[dict]: 체결내역 데이터를 포함하는 json 응답 리스트

Source code in kiwoom/api.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
async def trade(self, start: str, end: str = "") -> list[dict]:
    """
    주어진 시작일자와 종료일자에 해당하는 체결내역을
    키움증권 '0343' 계좌 체결내역 화면과 동일한 구성으로 반환합니다.
    데이터 조회 제한으로 최근 2개월 데이터만 조회할 수 있습니다.

    체결내역 데이터는 [알파노트](http://alphanote.io)를 통해
    간편하게 진입/청산 시각화 및 성과 지표들을 확인할 수 있습니다.

    Args:
        start (str): 시작일자 in YYYYMMDD format
        end (str, optional): 종료일자 in YYYYMMDD format

    Returns:
        list[dict]: 체결내역 데이터를 포함하는 json 응답 리스트
    """
    endpoint = "/api/dostk/acnt"
    api_id = "kt00009"
    data = {
        "ord_dt": "",  # YYYYMMDD (Optional)
        "qry_tp": "1",  # 전체/체결
        "stk_bond_tp": "1",  # 전체/주식/채권
        "mrkt_tp": "0",  # 전체/코스피/코스닥/OTCBB/ECN
        "sell_tp": "0",  # 전체/매도/매수
        "dmst_stex_tp": "%",  # 전체/KRX/NXT/SOR
        # 'stk_cd': '',  # 종목코드 (Optional)
        # 'fr_ord_no': '',  # 시작주문번호 (Optional)
    }

    today = datetime.today()
    start = datetime.strptime(start, "%Y%m%d")
    start = max(start, today - timedelta(days=REQUEST_LIMIT_DAYS))
    end = datetime.strptime(end, "%Y%m%d") if end else datetime.today()
    end = min(end, datetime.today())

    trs = []
    key = "acnt_ord_cntr_prst_array"
    for bday in bdate_range(start, end):
        dic = dict(data)
        dic["ord_dt"] = bday.strftime("%Y%m%d")  # manually set ord_dt
        body = await self.request_until(lambda x: True, endpoint, api_id, data=dic)
        if key in body:
            # Append order date to each record
            for rec in body[key]:
                rec["ord_dt"] = bday.strftime("%Y-%m-%d")
            trs.extend(body[key])
    return trs

add_callback_on_real_data

add_callback_on_real_data(real_type, callback)

실시간 데이터 수신 시 호출될 콜백 함수를 추가합니다. (trnm이 'REAL'인 경우)

  • callback 함수는 서버 응답 string 그대로를 인자로 받습니다.
  • real_type을 'PING' 또는 'LOGIN'으로 설정하면 기본 콜백 함수를 덮어씁니다.

콜백 함수는 비동기 콜백 함수를 추가하는 것을 권장합니다. 비동기 및 동기 콜백 함수 모두 루프를 블로킹하지 않도록 백그라운드에서 실행됩니다. 따라서 데이터 처리 완료 순서가 반드시 데이터 수신 순서에 따라 실행되지 않을 수 있습니다.

ex) tick 체결 데이터 (type 'OB')가 수신될 때마다 데이터 출력하기

> fn = lambda raw: print(raw)

> add_callback_on_real_data(real_type='OB', callback=fn)

Parameters:

Name Type Description Default
real_type str

키움 REST API에 정의된 실시간 데이터 타입

required
callback Callable

raw 스트링을 인자로 받는 콜백 함수

required
Source code in kiwoom/api.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def add_callback_on_real_data(self, real_type: str, callback: Callable) -> None:
    """
    실시간 데이터 수신 시 호출될 콜백 함수를 추가합니다. (trnm이 'REAL'인 경우)

    * callback 함수는 서버 응답 string 그대로를 인자로 받습니다.
    * real_type을 'PING' 또는 'LOGIN'으로 설정하면 기본 콜백 함수를 덮어씁니다.

    콜백 함수는 비동기 콜백 함수를 추가하는 것을 권장합니다.
    비동기 및 동기 콜백 함수 모두 루프를 블로킹하지 않도록
    백그라운드에서 실행됩니다. 따라서 데이터 처리 완료 순서가 반드시
    데이터 수신 순서에 따라 실행되지 않을 수 있습니다.

    ex) tick 체결 데이터 (type 'OB')가 수신될 때마다 데이터 출력하기

        > fn = lambda raw: print(raw)

        > add_callback_on_real_data(real_type='OB', callback=fn)

    Args:
        real_type (str): 키움 REST API에 정의된 실시간 데이터 타입
        callback (Callable): raw 스트링을 인자로 받는 콜백 함수
    """

    real_type = real_type.upper()
    # Asnyc Callback
    if iscoroutinefunction(callback):
        self._callbacks[real_type] = wrap_async_callback(self._sem, callback)
    # Sync Callback
    else:
        self._callbacks[real_type] = wrap_sync_callback(self._sem, callback)

register_tick async

register_tick(grp_no, codes, refresh='1')

주어진 그룹번호와 종목 코드에 대해 주식체결 데이터를 등록합니다. (타입 '0B')

Parameters:

Name Type Description Default
grp_no str

그룹번호

required
codes list[str]

종목코드 리스트

required
refresh str

기존등록유지여부 (기존유지:'1', 신규등록:'0').

'1'
Source code in kiwoom/api.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
async def register_tick(
    self,
    grp_no: str,
    codes: list[str],
    refresh: str = "1",
) -> None:
    """
    주어진 그룹번호와 종목 코드에 대해 주식체결 데이터를 등록합니다. (타입 '0B')

    Args:
        grp_no (str): 그룹번호
        codes (list[str]): 종목코드 리스트
        refresh (str, optional): 기존등록유지여부 (기존유지:'1', 신규등록:'0').
    """

    assert len(codes) <= 100, f"Max 100 codes per group, got {len(codes)} codes."
    await self.socket.send(
        {
            "trnm": "REG",
            "grp_no": grp_no,
            "refresh": refresh,
            "data": [
                {
                    "item": codes,
                    "type": ["0B"],
                }
            ],
        }
    )

register_hoga async

register_hoga(grp_no, codes, refresh='1')

주어진 그룹번호와 종목 코드에 대해 주식호가잔량 데이터를 등록합니다. (타입 '0D')

Parameters:

Name Type Description Default
grp_no str

그룹번호

required
codes list[str]

종목코드 리스트

required
refresh str

기존등록유지여부 (기존유지:'1', 신규등록:'0').

'1'
Source code in kiwoom/api.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
async def register_hoga(
    self,
    grp_no: str,
    codes: list[str],
    refresh: str = "1",
) -> None:
    """
    주어진 그룹번호와 종목 코드에 대해 주식호가잔량 데이터를 등록합니다. (타입 '0D')

    Args:
        grp_no (str): 그룹번호
        codes (list[str]): 종목코드 리스트
        refresh (str, optional): 기존등록유지여부 (기존유지:'1', 신규등록:'0').
    """

    assert len(codes) <= 100, f"Max 100 codes per group, got {len(codes)} codes."
    await self.socket.send(
        {
            "trnm": "REG",
            "grp_no": grp_no,
            "refresh": refresh,
            "data": [
                {
                    "item": codes,
                    "type": ["0D"],
                }
            ],
        }
    )

remove_register async

remove_register(grp_no, codes, type)

주어진 그룹번호와 실시간 데이터 타입에 대해 등록된 데이터를 제거합니다.

Parameters:

Name Type Description Default
grp_no str

그룹번호

required
type str | list[str]

실시간 데이터 타입 ex) '0B', '0D', 'DD'

required
Source code in kiwoom/api.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
async def remove_register(self, grp_no: str, codes: list[str], type: str | list[str]) -> None:
    """
    주어진 그룹번호와 실시간 데이터 타입에 대해 등록된 데이터를 제거합니다.

    Args:
        grp_no (str): 그룹번호
        type (str | list[str]): 실시간 데이터 타입 ex) '0B', '0D', 'DD'
    """
    if not grp_no or not type:
        return
    if isinstance(type, str):
        type = [type]
    await self.socket.send(
        {
            "trnm": "REMOVE",
            "grp_no": grp_no,
            "refresh": "",
            "data": [{"item": codes, "type": type}],
        }
    )

http

Client

Source code in kiwoom/http/client.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class Client:
    def __init__(self, host: str, appkey: str, secretkey: str):
        """
        Initialize Client instance.

        Args:
            host (str): domain
            appkey (str): file path or raw appkey
            secretkey (str): file path or raw secretkey
        """
        self.host: str = host
        self.debugging: bool = False

        self._auth: str = ""
        self._appkey: str = appkey
        self._secretkey: str = secretkey

        self._state_http = State.CLOSED
        self._ready_event = asyncio.Event()
        self._limiter: RateLimiter = RateLimiter()
        self._session: ClientSession = None

    async def connect(self, appkey: str, secretkey: str) -> None:
        """
        Connect to Kiwoom REST API server and receive token.

        Args:
            appkey (str): file path or raw appkey
            secretkey (str): file path or raw secretkey
        """
        if isfile(appkey):
            with open(appkey, "r") as f:
                self._appkey = f.read().strip()
        if isfile(secretkey):
            with open(secretkey, "r") as f:
                self._secretkey = f.read().strip()

        # Already connected
        if self._session and not self._session.closed:
            return

        # Establish HTTP session
        self._ready_event.clear()
        self._session = ClientSession(
            timeout=aiohttp.ClientTimeout(
                total=HTTP_TOTAL_TIMEOUT,
                sock_connect=HTTP_CONNECT_TIMEOUT,
                sock_read=HTTP_READ_TIMEOUT,
            ),
            connector=aiohttp.TCPConnector(limit=HTTP_TCP_CONNECTORS, enable_cleanup_closed=True),
        )

        # Request token
        endpoint = "/oauth2/token"
        api_id = ""
        headers = self.headers(api_id)
        data = {
            "grant_type": "client_credentials",
            "appkey": self._appkey,
            "secretkey": self._secretkey,
        }
        async with self._session.post(self.host + endpoint, headers=headers, json=data) as res:
            res.raise_for_status()
            body = await res.json()
            resp = Response(res.url, res.status, res.headers, body)

        # Set token
        if "token" not in body:
            msg = dumps(self, endpoint, api_id, headers, data, resp)
            raise RuntimeError(f"Failed to get token: {msg}")
        token = body["token"]
        self._auth = f"Bearer {token}"
        self._session.headers.update(
            {
                "Content-Type": "application/json;charset=UTF-8",
                "authorization": self._auth,
            }
        )
        self._state_http = State.CONNECTED
        self._ready_event.set()

    async def close(self) -> None:
        """
        Close HTTP session.
        """
        self._ready_event.clear()
        if self._session:
            await asyncio.shield(self._session.close())

        self._auth = ""
        self._session = None

    def token(self) -> str:
        """
        Returns token if available, otherwise empty string.

        Raises:
            ValueError: Invalid token.

        Returns:
            str: token
        """
        if not self._auth:
            return ""
        if "Bearer " in self._auth:
            return self._auth[len("Bearer ") :]
        raise ValueError(f"Invalid token: {self._auth}")

    def headers(
        self, api_id: str, cont_yn: str = "N", next_key: str = "", headers: dict | None = None
    ) -> dict[str, str]:
        """
        Generate headers for the request.

        Args:
            api_id (str): api_id in Kiwoom API
            cont_yn (str, optional): cont_yn in Kiwoom API
            next_key (str, optional): next_key in Kiwoom API
            headers (dict | None, optional): headers to be updated with

        Returns:
            dict[str, str]: headers
        """
        base = {
            # 'Content-Type': 'application/json;charset=UTF-8',
            # 'authorization': self._auth,
            "cont-yn": cont_yn,
            "next-key": next_key,
            "api-id": api_id,
        }
        if headers is not None:
            headers.update(base)
            return headers
        return base

    async def ready(self):
        """
        Wait until request limit is lifted and connection is established.

        Raises:
            RuntimeError: Connection timeout.
        """
        try:
            await asyncio.wait_for(self._ready_event.wait(), HTTP_TOTAL_TIMEOUT)
        except asyncio.TimeoutError as err:
            msg = f"Connection timeout: waited for {HTTP_TOTAL_TIMEOUT} seconds."
            raise RuntimeError(msg) from err
        await self._limiter.acquire()

    @debugger
    async def post(
        self, endpoint: str, api_id: str, headers: dict | None = None, data: dict | None = None
    ) -> aiohttp.ClientResponse:
        """
        Post request to the server, but using client.request function is recommended.
        Request limit and connection status are checked globally and automatically.

        Args:
            endpoint (str): endpoint to Kiwoom REST API server
            api_id (str): api id
            headers (dict | None, optional): headers of the request.
            data (dict | None, optional): data to be sent in json format

        Returns:
            aiohttp.ClientResponse: async response from the server,
                but this will be converted to kiwoom.http.response.Response by debugger.
        """

        # Warn not connected
        if not self._state_http == State.CONNECTED:
            warnings.warn("Not connected, wait for timeout...", RuntimeWarning, stacklevel=1)

        # Wait connection and request limits
        await self.ready()

        # Post Request
        if headers is None:
            headers = self.headers(api_id)
        return await self._session.post(self.host + endpoint, headers=headers, json=data)

    async def request(
        self, endpoint: str, api_id: str, headers: dict | None = None, data: dict | None = None
    ) -> Response:
        """
        Requests to the server and returns response with error handling.

        Args:
            endpoint (str): endpoint of the server
            api_id (str): api id
            headers (dict | None, optional): headers of the request. Defaults to None.
            data (dict | None, optional): data of the request. Defaults to None.

        Raises:
            RuntimeError: RuntimeError when return_code is not in [0, 3, 20]

        Returns:
            Response: response wrapped by kiwoom.http.response.Response
        """

        res: Response = await self.post(endpoint, api_id, headers=headers, data=data)
        body = res.json()
        if "return_code" in body:
            match body["return_code"]:
                case 0 | 20:
                    # 0: Success
                    # 20 : No Data
                    return res
                case 3:
                    # 3 : Token Expired
                    print("Token expired, trying to refresh token...")
                    await self.connect(self._appkey, self._secretkey)
                    return await self.request(endpoint, api_id, headers=headers, data=data)

        # Request Failure
        return_code = body["return_code"]
        err = f"\nRequest failed with {return_code=}, not in {{'0', '3', '20'}}."
        if not self.debugging:
            msg = dumps(self, endpoint, api_id, headers, data, res)
            raise RuntimeError(msg + err)
        raise RuntimeError(err)

    async def request_until(
        self,
        should_continue: Callable,
        endpoint: str,
        api_id: str,
        headers: dict | None = None,
        data: dict | None = None,
    ) -> dict:
        """
        Request until 'cont-yn' in response header is 'Y',
        and should_continue(body) evaluates to True.

        Args:
            should_continue (Callable):
                callable that takes body(dict) and
                returns boolean value to request again or not
            endpoint (str):
                endpoint of the server
            api_id (str):
                api id
            headers (dict | None, optional):
                headers of the request. Defaults to None.
            data (dict | None, optional):
                data of the request. Defaults to None.

        Returns:
            dict: response body
        """

        # Initial request
        res = await self.request(endpoint, api_id, headers=headers, data=data)
        body = res.json()

        # If condition to chain is not met
        if callable(should_continue) and not should_continue(body):
            return body

        # Extract list data only
        bodies = dict()
        for key in body.keys():
            if isinstance(body[key], list):
                bodies[key] = [body[key]]
                continue
            bodies[key] = body[key]

        # Rercursive call
        while res.headers.get("cont-yn") == "Y" and should_continue(body):
            next_key = res.headers.get("next-key")
            headers = self.headers(api_id, cont_yn="Y", next_key=next_key, headers=headers)

            # Continue request
            res = await self.request(endpoint, api_id, headers=headers, data=data)
            body = res.json()

            # Append list data
            for key in body.keys():
                if isinstance(body[key], list):
                    bodies[key].append(body[key])

        # Flatten list data as if it was one list
        for key in bodies:
            if isinstance(bodies[key], list):
                bodies[key] = list(chain.from_iterable(bodies[key]))
        return bodies
host instance-attribute
host = host
debugging instance-attribute
debugging = False
connect async
connect(appkey, secretkey)

Connect to Kiwoom REST API server and receive token.

Parameters:

Name Type Description Default
appkey str

file path or raw appkey

required
secretkey str

file path or raw secretkey

required
Source code in kiwoom/http/client.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
async def connect(self, appkey: str, secretkey: str) -> None:
    """
    Connect to Kiwoom REST API server and receive token.

    Args:
        appkey (str): file path or raw appkey
        secretkey (str): file path or raw secretkey
    """
    if isfile(appkey):
        with open(appkey, "r") as f:
            self._appkey = f.read().strip()
    if isfile(secretkey):
        with open(secretkey, "r") as f:
            self._secretkey = f.read().strip()

    # Already connected
    if self._session and not self._session.closed:
        return

    # Establish HTTP session
    self._ready_event.clear()
    self._session = ClientSession(
        timeout=aiohttp.ClientTimeout(
            total=HTTP_TOTAL_TIMEOUT,
            sock_connect=HTTP_CONNECT_TIMEOUT,
            sock_read=HTTP_READ_TIMEOUT,
        ),
        connector=aiohttp.TCPConnector(limit=HTTP_TCP_CONNECTORS, enable_cleanup_closed=True),
    )

    # Request token
    endpoint = "/oauth2/token"
    api_id = ""
    headers = self.headers(api_id)
    data = {
        "grant_type": "client_credentials",
        "appkey": self._appkey,
        "secretkey": self._secretkey,
    }
    async with self._session.post(self.host + endpoint, headers=headers, json=data) as res:
        res.raise_for_status()
        body = await res.json()
        resp = Response(res.url, res.status, res.headers, body)

    # Set token
    if "token" not in body:
        msg = dumps(self, endpoint, api_id, headers, data, resp)
        raise RuntimeError(f"Failed to get token: {msg}")
    token = body["token"]
    self._auth = f"Bearer {token}"
    self._session.headers.update(
        {
            "Content-Type": "application/json;charset=UTF-8",
            "authorization": self._auth,
        }
    )
    self._state_http = State.CONNECTED
    self._ready_event.set()
close async
close()

Close HTTP session.

Source code in kiwoom/http/client.py
105
106
107
108
109
110
111
112
113
114
async def close(self) -> None:
    """
    Close HTTP session.
    """
    self._ready_event.clear()
    if self._session:
        await asyncio.shield(self._session.close())

    self._auth = ""
    self._session = None
token
token()

Returns token if available, otherwise empty string.

Raises:

Type Description
ValueError

Invalid token.

Returns:

Name Type Description
str str

token

Source code in kiwoom/http/client.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def token(self) -> str:
    """
    Returns token if available, otherwise empty string.

    Raises:
        ValueError: Invalid token.

    Returns:
        str: token
    """
    if not self._auth:
        return ""
    if "Bearer " in self._auth:
        return self._auth[len("Bearer ") :]
    raise ValueError(f"Invalid token: {self._auth}")
headers
headers(api_id, cont_yn='N', next_key='', headers=None)

Generate headers for the request.

Parameters:

Name Type Description Default
api_id str

api_id in Kiwoom API

required
cont_yn str

cont_yn in Kiwoom API

'N'
next_key str

next_key in Kiwoom API

''
headers dict | None

headers to be updated with

None

Returns:

Type Description
dict[str, str]

dict[str, str]: headers

Source code in kiwoom/http/client.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def headers(
    self, api_id: str, cont_yn: str = "N", next_key: str = "", headers: dict | None = None
) -> dict[str, str]:
    """
    Generate headers for the request.

    Args:
        api_id (str): api_id in Kiwoom API
        cont_yn (str, optional): cont_yn in Kiwoom API
        next_key (str, optional): next_key in Kiwoom API
        headers (dict | None, optional): headers to be updated with

    Returns:
        dict[str, str]: headers
    """
    base = {
        # 'Content-Type': 'application/json;charset=UTF-8',
        # 'authorization': self._auth,
        "cont-yn": cont_yn,
        "next-key": next_key,
        "api-id": api_id,
    }
    if headers is not None:
        headers.update(base)
        return headers
    return base
ready async
ready()

Wait until request limit is lifted and connection is established.

Raises:

Type Description
RuntimeError

Connection timeout.

Source code in kiwoom/http/client.py
159
160
161
162
163
164
165
166
167
168
169
170
171
async def ready(self):
    """
    Wait until request limit is lifted and connection is established.

    Raises:
        RuntimeError: Connection timeout.
    """
    try:
        await asyncio.wait_for(self._ready_event.wait(), HTTP_TOTAL_TIMEOUT)
    except asyncio.TimeoutError as err:
        msg = f"Connection timeout: waited for {HTTP_TOTAL_TIMEOUT} seconds."
        raise RuntimeError(msg) from err
    await self._limiter.acquire()
post async
post(endpoint, api_id, headers=None, data=None)

Post request to the server, but using client.request function is recommended. Request limit and connection status are checked globally and automatically.

Parameters:

Name Type Description Default
endpoint str

endpoint to Kiwoom REST API server

required
api_id str

api id

required
headers dict | None

headers of the request.

None
data dict | None

data to be sent in json format

None

Returns:

Type Description
ClientResponse

aiohttp.ClientResponse: async response from the server, but this will be converted to kiwoom.http.response.Response by debugger.

Source code in kiwoom/http/client.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@debugger
async def post(
    self, endpoint: str, api_id: str, headers: dict | None = None, data: dict | None = None
) -> aiohttp.ClientResponse:
    """
    Post request to the server, but using client.request function is recommended.
    Request limit and connection status are checked globally and automatically.

    Args:
        endpoint (str): endpoint to Kiwoom REST API server
        api_id (str): api id
        headers (dict | None, optional): headers of the request.
        data (dict | None, optional): data to be sent in json format

    Returns:
        aiohttp.ClientResponse: async response from the server,
            but this will be converted to kiwoom.http.response.Response by debugger.
    """

    # Warn not connected
    if not self._state_http == State.CONNECTED:
        warnings.warn("Not connected, wait for timeout...", RuntimeWarning, stacklevel=1)

    # Wait connection and request limits
    await self.ready()

    # Post Request
    if headers is None:
        headers = self.headers(api_id)
    return await self._session.post(self.host + endpoint, headers=headers, json=data)
request async
request(endpoint, api_id, headers=None, data=None)

Requests to the server and returns response with error handling.

Parameters:

Name Type Description Default
endpoint str

endpoint of the server

required
api_id str

api id

required
headers dict | None

headers of the request. Defaults to None.

None
data dict | None

data of the request. Defaults to None.

None

Raises:

Type Description
RuntimeError

RuntimeError when return_code is not in [0, 3, 20]

Returns:

Name Type Description
Response Response

response wrapped by kiwoom.http.response.Response

Source code in kiwoom/http/client.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
async def request(
    self, endpoint: str, api_id: str, headers: dict | None = None, data: dict | None = None
) -> Response:
    """
    Requests to the server and returns response with error handling.

    Args:
        endpoint (str): endpoint of the server
        api_id (str): api id
        headers (dict | None, optional): headers of the request. Defaults to None.
        data (dict | None, optional): data of the request. Defaults to None.

    Raises:
        RuntimeError: RuntimeError when return_code is not in [0, 3, 20]

    Returns:
        Response: response wrapped by kiwoom.http.response.Response
    """

    res: Response = await self.post(endpoint, api_id, headers=headers, data=data)
    body = res.json()
    if "return_code" in body:
        match body["return_code"]:
            case 0 | 20:
                # 0: Success
                # 20 : No Data
                return res
            case 3:
                # 3 : Token Expired
                print("Token expired, trying to refresh token...")
                await self.connect(self._appkey, self._secretkey)
                return await self.request(endpoint, api_id, headers=headers, data=data)

    # Request Failure
    return_code = body["return_code"]
    err = f"\nRequest failed with {return_code=}, not in {{'0', '3', '20'}}."
    if not self.debugging:
        msg = dumps(self, endpoint, api_id, headers, data, res)
        raise RuntimeError(msg + err)
    raise RuntimeError(err)
request_until async
request_until(should_continue, endpoint, api_id, headers=None, data=None)

Request until 'cont-yn' in response header is 'Y', and should_continue(body) evaluates to True.

Parameters:

Name Type Description Default
should_continue Callable

callable that takes body(dict) and returns boolean value to request again or not

required
endpoint str

endpoint of the server

required
api_id str

api id

required
headers dict | None

headers of the request. Defaults to None.

None
data dict | None

data of the request. Defaults to None.

None

Returns:

Name Type Description
dict dict

response body

Source code in kiwoom/http/client.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
async def request_until(
    self,
    should_continue: Callable,
    endpoint: str,
    api_id: str,
    headers: dict | None = None,
    data: dict | None = None,
) -> dict:
    """
    Request until 'cont-yn' in response header is 'Y',
    and should_continue(body) evaluates to True.

    Args:
        should_continue (Callable):
            callable that takes body(dict) and
            returns boolean value to request again or not
        endpoint (str):
            endpoint of the server
        api_id (str):
            api id
        headers (dict | None, optional):
            headers of the request. Defaults to None.
        data (dict | None, optional):
            data of the request. Defaults to None.

    Returns:
        dict: response body
    """

    # Initial request
    res = await self.request(endpoint, api_id, headers=headers, data=data)
    body = res.json()

    # If condition to chain is not met
    if callable(should_continue) and not should_continue(body):
        return body

    # Extract list data only
    bodies = dict()
    for key in body.keys():
        if isinstance(body[key], list):
            bodies[key] = [body[key]]
            continue
        bodies[key] = body[key]

    # Rercursive call
    while res.headers.get("cont-yn") == "Y" and should_continue(body):
        next_key = res.headers.get("next-key")
        headers = self.headers(api_id, cont_yn="Y", next_key=next_key, headers=headers)

        # Continue request
        res = await self.request(endpoint, api_id, headers=headers, data=data)
        body = res.json()

        # Append list data
        for key in body.keys():
            if isinstance(body[key], list):
                bodies[key].append(body[key])

    # Flatten list data as if it was one list
    for key in bodies:
        if isinstance(bodies[key], list):
            bodies[key] = list(chain.from_iterable(bodies[key]))
    return bodies

Response

Response wrapper for aiohttp.ClientResponse

Source code in kiwoom/http/response.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Response:
    """
    Response wrapper for aiohttp.ClientResponse
    """

    def __init__(self, url: str, status: int, headers: dict, body: dict):
        """
        Simply wrap aiohttp.ClientResponse to escape from async context.

        Args:
            url (str): url of the response
            status (int): status code of the response
            headers (dict): headers of the response
            body (dict): body of the response
        """

        self.url = url
        self.status = status
        self.headers = headers
        self.body = body

    def json(self) -> dict:
        """
        Returns already parsed body.

        Returns:
            dict: body in json format
        """
        return self.body
url instance-attribute
url = url
status instance-attribute
status = status
headers instance-attribute
headers = headers
body instance-attribute
body = body
json
json()

Returns already parsed body.

Returns:

Name Type Description
dict dict

body in json format

Source code in kiwoom/http/response.py
22
23
24
25
26
27
28
29
def json(self) -> dict:
    """
    Returns already parsed body.

    Returns:
        dict: body in json format
    """
    return self.body

Socket

Source code in kiwoom/http/socket.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class Socket:
    REAL = "wss://api.kiwoom.com:10000"
    MOCK = "wss://mockapi.kiwoom.com:10000"  # KRX Only
    ENDPOINT = "/api/dostk/websocket"

    def __init__(self, url: str, queue: asyncio.Queue):
        """
        Initialize Socket class.

        Args:
            url (str): url of Kiwoom websocket server
            queue (asyncio.Queue): queue to put received data
        """
        self.url = url
        self._queue = queue
        self._session: ClientSession | None = None
        self._websocket: ClientWebSocketResponse | None = None

        self._state = State.CLOSED
        self._state_lock = asyncio.Lock()
        self._queue_task: asyncio.Task | None = None
        self._stop_event = asyncio.Event()
        self._stop_event.set()

    async def connect(self, session: ClientSession, token: str):
        """
        Connect to Kiwoom websocket server.

        Args:
            session (ClientSession): aiohttp ClientSession from API.connect()
            token (str): token for authentication
        """

        # print("Trying to connect websocket...")
        async with self._state_lock:
            if self._state in (State.CONNECTED, State.CONNECTING):
                return

            self._state = State.CONNECTING
            try:
                # Close existing websocket & task
                self._stop_event.set()
                if self._websocket and not self._websocket.closed:
                    await self._websocket.close()
                await cancel(self._queue_task)
                self._queue_task = None

                self._session = session
                self._websocket = await session.ws_connect(
                    self.url, autoping=True, heartbeat=WEBSOCKET_HEARTBEAT
                )

                self._stop_event.clear()
                self._queue_task = asyncio.create_task(self.run(), name="enqueue")
                await self.send({"trnm": "LOGIN", "token": token})
                self._state = State.CONNECTED

            except Exception as err:
                print(f"Websocket failed to connect to {self.url}: {err}")
                self._state = State.CLOSED

    async def close(self):
        """
        Close the websocket and the task.
        """
        async with self._state_lock:
            self._stop_event.set()
            if self._queue_task:
                self._queue_task.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await self._queue_task

            if self._websocket and not self._websocket.closed:
                with contextlib.suppress(Exception):
                    await self._websocket.close()

            self._session = None
            self._websocket = None
            self._queue_task = None

    async def send(self, msg: str | dict) -> None:
        """
        Send data to Kiwoom websocket server.

        Args:
            msg (str | dict): msg should be in json format
        """
        if isinstance(msg, dict):
            # msg = json.dumps(msg)  # slow
            msg = orjson.dumps(msg).decode("utf-8")
        await self._websocket.send_str(msg)

    async def recv(self) -> str:
        """
        Receive data from Kiwoom websocket server and return data.
        If message type is not str, close the websocket and raise RuntimeError.

        Raises:
            RuntimeError: Websocket Connection Error

        Returns:
            str: received json formatted data from websocket
        """
        try:
            return await self._websocket.receive_str()
        except WSMessageTypeError as err:
            msg = await self._websocket.receive()
            if msg.type == WSMsgType.BINARY:
                msg.data = msg.data.decode("utf-8")
            await self.close()
            raise RuntimeError(f"Websocket received other type than str: {msg}") from err

    async def run(self):
        """
        Receive data from websocket and put data to the queue.
        If WEBSOCKET_QUEUE_MAX_SIZE is set and queue gets full,
        then backpressure will be applied to the websocket.
        Run this task in background with asyncio.create_task().
        """
        assert self._websocket is not None
        try:
            while not self._stop_event.is_set():
                await self._queue.put(await self.recv())

        except Exception as e:
            print(f"Failed to receive message: {e}")
            await self.close()
REAL class-attribute instance-attribute
REAL = 'wss://api.kiwoom.com:10000'
MOCK class-attribute instance-attribute
MOCK = 'wss://mockapi.kiwoom.com:10000'
ENDPOINT class-attribute instance-attribute
ENDPOINT = '/api/dostk/websocket'
url instance-attribute
url = url
connect async
connect(session, token)

Connect to Kiwoom websocket server.

Parameters:

Name Type Description Default
session ClientSession

aiohttp ClientSession from API.connect()

required
token str

token for authentication

required
Source code in kiwoom/http/socket.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async def connect(self, session: ClientSession, token: str):
    """
    Connect to Kiwoom websocket server.

    Args:
        session (ClientSession): aiohttp ClientSession from API.connect()
        token (str): token for authentication
    """

    # print("Trying to connect websocket...")
    async with self._state_lock:
        if self._state in (State.CONNECTED, State.CONNECTING):
            return

        self._state = State.CONNECTING
        try:
            # Close existing websocket & task
            self._stop_event.set()
            if self._websocket and not self._websocket.closed:
                await self._websocket.close()
            await cancel(self._queue_task)
            self._queue_task = None

            self._session = session
            self._websocket = await session.ws_connect(
                self.url, autoping=True, heartbeat=WEBSOCKET_HEARTBEAT
            )

            self._stop_event.clear()
            self._queue_task = asyncio.create_task(self.run(), name="enqueue")
            await self.send({"trnm": "LOGIN", "token": token})
            self._state = State.CONNECTED

        except Exception as err:
            print(f"Websocket failed to connect to {self.url}: {err}")
            self._state = State.CLOSED
close async
close()

Close the websocket and the task.

Source code in kiwoom/http/socket.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
async def close(self):
    """
    Close the websocket and the task.
    """
    async with self._state_lock:
        self._stop_event.set()
        if self._queue_task:
            self._queue_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._queue_task

        if self._websocket and not self._websocket.closed:
            with contextlib.suppress(Exception):
                await self._websocket.close()

        self._session = None
        self._websocket = None
        self._queue_task = None
send async
send(msg)

Send data to Kiwoom websocket server.

Parameters:

Name Type Description Default
msg str | dict

msg should be in json format

required
Source code in kiwoom/http/socket.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def send(self, msg: str | dict) -> None:
    """
    Send data to Kiwoom websocket server.

    Args:
        msg (str | dict): msg should be in json format
    """
    if isinstance(msg, dict):
        # msg = json.dumps(msg)  # slow
        msg = orjson.dumps(msg).decode("utf-8")
    await self._websocket.send_str(msg)
recv async
recv()

Receive data from Kiwoom websocket server and return data. If message type is not str, close the websocket and raise RuntimeError.

Raises:

Type Description
RuntimeError

Websocket Connection Error

Returns:

Name Type Description
str str

received json formatted data from websocket

Source code in kiwoom/http/socket.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
async def recv(self) -> str:
    """
    Receive data from Kiwoom websocket server and return data.
    If message type is not str, close the websocket and raise RuntimeError.

    Raises:
        RuntimeError: Websocket Connection Error

    Returns:
        str: received json formatted data from websocket
    """
    try:
        return await self._websocket.receive_str()
    except WSMessageTypeError as err:
        msg = await self._websocket.receive()
        if msg.type == WSMsgType.BINARY:
            msg.data = msg.data.decode("utf-8")
        await self.close()
        raise RuntimeError(f"Websocket received other type than str: {msg}") from err
run async
run()

Receive data from websocket and put data to the queue. If WEBSOCKET_QUEUE_MAX_SIZE is set and queue gets full, then backpressure will be applied to the websocket. Run this task in background with asyncio.create_task().

Source code in kiwoom/http/socket.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
async def run(self):
    """
    Receive data from websocket and put data to the queue.
    If WEBSOCKET_QUEUE_MAX_SIZE is set and queue gets full,
    then backpressure will be applied to the websocket.
    Run this task in background with asyncio.create_task().
    """
    assert self._websocket is not None
    try:
        while not self._stop_event.is_set():
            await self._queue.put(await self.recv())

    except Exception as e:
        print(f"Failed to receive message: {e}")
        await self.close()

client

Client
Source code in kiwoom/http/client.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class Client:
    def __init__(self, host: str, appkey: str, secretkey: str):
        """
        Initialize Client instance.

        Args:
            host (str): domain
            appkey (str): file path or raw appkey
            secretkey (str): file path or raw secretkey
        """
        self.host: str = host
        self.debugging: bool = False

        self._auth: str = ""
        self._appkey: str = appkey
        self._secretkey: str = secretkey

        self._state_http = State.CLOSED
        self._ready_event = asyncio.Event()
        self._limiter: RateLimiter = RateLimiter()
        self._session: ClientSession = None

    async def connect(self, appkey: str, secretkey: str) -> None:
        """
        Connect to Kiwoom REST API server and receive token.

        Args:
            appkey (str): file path or raw appkey
            secretkey (str): file path or raw secretkey
        """
        if isfile(appkey):
            with open(appkey, "r") as f:
                self._appkey = f.read().strip()
        if isfile(secretkey):
            with open(secretkey, "r") as f:
                self._secretkey = f.read().strip()

        # Already connected
        if self._session and not self._session.closed:
            return

        # Establish HTTP session
        self._ready_event.clear()
        self._session = ClientSession(
            timeout=aiohttp.ClientTimeout(
                total=HTTP_TOTAL_TIMEOUT,
                sock_connect=HTTP_CONNECT_TIMEOUT,
                sock_read=HTTP_READ_TIMEOUT,
            ),
            connector=aiohttp.TCPConnector(limit=HTTP_TCP_CONNECTORS, enable_cleanup_closed=True),
        )

        # Request token
        endpoint = "/oauth2/token"
        api_id = ""
        headers = self.headers(api_id)
        data = {
            "grant_type": "client_credentials",
            "appkey": self._appkey,
            "secretkey": self._secretkey,
        }
        async with self._session.post(self.host + endpoint, headers=headers, json=data) as res:
            res.raise_for_status()
            body = await res.json()
            resp = Response(res.url, res.status, res.headers, body)

        # Set token
        if "token" not in body:
            msg = dumps(self, endpoint, api_id, headers, data, resp)
            raise RuntimeError(f"Failed to get token: {msg}")
        token = body["token"]
        self._auth = f"Bearer {token}"
        self._session.headers.update(
            {
                "Content-Type": "application/json;charset=UTF-8",
                "authorization": self._auth,
            }
        )
        self._state_http = State.CONNECTED
        self._ready_event.set()

    async def close(self) -> None:
        """
        Close HTTP session.
        """
        self._ready_event.clear()
        if self._session:
            await asyncio.shield(self._session.close())

        self._auth = ""
        self._session = None

    def token(self) -> str:
        """
        Returns token if available, otherwise empty string.

        Raises:
            ValueError: Invalid token.

        Returns:
            str: token
        """
        if not self._auth:
            return ""
        if "Bearer " in self._auth:
            return self._auth[len("Bearer ") :]
        raise ValueError(f"Invalid token: {self._auth}")

    def headers(
        self, api_id: str, cont_yn: str = "N", next_key: str = "", headers: dict | None = None
    ) -> dict[str, str]:
        """
        Generate headers for the request.

        Args:
            api_id (str): api_id in Kiwoom API
            cont_yn (str, optional): cont_yn in Kiwoom API
            next_key (str, optional): next_key in Kiwoom API
            headers (dict | None, optional): headers to be updated with

        Returns:
            dict[str, str]: headers
        """
        base = {
            # 'Content-Type': 'application/json;charset=UTF-8',
            # 'authorization': self._auth,
            "cont-yn": cont_yn,
            "next-key": next_key,
            "api-id": api_id,
        }
        if headers is not None:
            headers.update(base)
            return headers
        return base

    async def ready(self):
        """
        Wait until request limit is lifted and connection is established.

        Raises:
            RuntimeError: Connection timeout.
        """
        try:
            await asyncio.wait_for(self._ready_event.wait(), HTTP_TOTAL_TIMEOUT)
        except asyncio.TimeoutError as err:
            msg = f"Connection timeout: waited for {HTTP_TOTAL_TIMEOUT} seconds."
            raise RuntimeError(msg) from err
        await self._limiter.acquire()

    @debugger
    async def post(
        self, endpoint: str, api_id: str, headers: dict | None = None, data: dict | None = None
    ) -> aiohttp.ClientResponse:
        """
        Post request to the server, but using client.request function is recommended.
        Request limit and connection status are checked globally and automatically.

        Args:
            endpoint (str): endpoint to Kiwoom REST API server
            api_id (str): api id
            headers (dict | None, optional): headers of the request.
            data (dict | None, optional): data to be sent in json format

        Returns:
            aiohttp.ClientResponse: async response from the server,
                but this will be converted to kiwoom.http.response.Response by debugger.
        """

        # Warn not connected
        if not self._state_http == State.CONNECTED:
            warnings.warn("Not connected, wait for timeout...", RuntimeWarning, stacklevel=1)

        # Wait connection and request limits
        await self.ready()

        # Post Request
        if headers is None:
            headers = self.headers(api_id)
        return await self._session.post(self.host + endpoint, headers=headers, json=data)

    async def request(
        self, endpoint: str, api_id: str, headers: dict | None = None, data: dict | None = None
    ) -> Response:
        """
        Requests to the server and returns response with error handling.

        Args:
            endpoint (str): endpoint of the server
            api_id (str): api id
            headers (dict | None, optional): headers of the request. Defaults to None.
            data (dict | None, optional): data of the request. Defaults to None.

        Raises:
            RuntimeError: RuntimeError when return_code is not in [0, 3, 20]

        Returns:
            Response: response wrapped by kiwoom.http.response.Response
        """

        res: Response = await self.post(endpoint, api_id, headers=headers, data=data)
        body = res.json()
        if "return_code" in body:
            match body["return_code"]:
                case 0 | 20:
                    # 0: Success
                    # 20 : No Data
                    return res
                case 3:
                    # 3 : Token Expired
                    print("Token expired, trying to refresh token...")
                    await self.connect(self._appkey, self._secretkey)
                    return await self.request(endpoint, api_id, headers=headers, data=data)

        # Request Failure
        return_code = body["return_code"]
        err = f"\nRequest failed with {return_code=}, not in {{'0', '3', '20'}}."
        if not self.debugging:
            msg = dumps(self, endpoint, api_id, headers, data, res)
            raise RuntimeError(msg + err)
        raise RuntimeError(err)

    async def request_until(
        self,
        should_continue: Callable,
        endpoint: str,
        api_id: str,
        headers: dict | None = None,
        data: dict | None = None,
    ) -> dict:
        """
        Request until 'cont-yn' in response header is 'Y',
        and should_continue(body) evaluates to True.

        Args:
            should_continue (Callable):
                callable that takes body(dict) and
                returns boolean value to request again or not
            endpoint (str):
                endpoint of the server
            api_id (str):
                api id
            headers (dict | None, optional):
                headers of the request. Defaults to None.
            data (dict | None, optional):
                data of the request. Defaults to None.

        Returns:
            dict: response body
        """

        # Initial request
        res = await self.request(endpoint, api_id, headers=headers, data=data)
        body = res.json()

        # If condition to chain is not met
        if callable(should_continue) and not should_continue(body):
            return body

        # Extract list data only
        bodies = dict()
        for key in body.keys():
            if isinstance(body[key], list):
                bodies[key] = [body[key]]
                continue
            bodies[key] = body[key]

        # Rercursive call
        while res.headers.get("cont-yn") == "Y" and should_continue(body):
            next_key = res.headers.get("next-key")
            headers = self.headers(api_id, cont_yn="Y", next_key=next_key, headers=headers)

            # Continue request
            res = await self.request(endpoint, api_id, headers=headers, data=data)
            body = res.json()

            # Append list data
            for key in body.keys():
                if isinstance(body[key], list):
                    bodies[key].append(body[key])

        # Flatten list data as if it was one list
        for key in bodies:
            if isinstance(bodies[key], list):
                bodies[key] = list(chain.from_iterable(bodies[key]))
        return bodies
host instance-attribute
host = host
debugging instance-attribute
debugging = False
connect async
connect(appkey, secretkey)

Connect to Kiwoom REST API server and receive token.

Parameters:

Name Type Description Default
appkey str

file path or raw appkey

required
secretkey str

file path or raw secretkey

required
Source code in kiwoom/http/client.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
async def connect(self, appkey: str, secretkey: str) -> None:
    """
    Connect to Kiwoom REST API server and receive token.

    Args:
        appkey (str): file path or raw appkey
        secretkey (str): file path or raw secretkey
    """
    if isfile(appkey):
        with open(appkey, "r") as f:
            self._appkey = f.read().strip()
    if isfile(secretkey):
        with open(secretkey, "r") as f:
            self._secretkey = f.read().strip()

    # Already connected
    if self._session and not self._session.closed:
        return

    # Establish HTTP session
    self._ready_event.clear()
    self._session = ClientSession(
        timeout=aiohttp.ClientTimeout(
            total=HTTP_TOTAL_TIMEOUT,
            sock_connect=HTTP_CONNECT_TIMEOUT,
            sock_read=HTTP_READ_TIMEOUT,
        ),
        connector=aiohttp.TCPConnector(limit=HTTP_TCP_CONNECTORS, enable_cleanup_closed=True),
    )

    # Request token
    endpoint = "/oauth2/token"
    api_id = ""
    headers = self.headers(api_id)
    data = {
        "grant_type": "client_credentials",
        "appkey": self._appkey,
        "secretkey": self._secretkey,
    }
    async with self._session.post(self.host + endpoint, headers=headers, json=data) as res:
        res.raise_for_status()
        body = await res.json()
        resp = Response(res.url, res.status, res.headers, body)

    # Set token
    if "token" not in body:
        msg = dumps(self, endpoint, api_id, headers, data, resp)
        raise RuntimeError(f"Failed to get token: {msg}")
    token = body["token"]
    self._auth = f"Bearer {token}"
    self._session.headers.update(
        {
            "Content-Type": "application/json;charset=UTF-8",
            "authorization": self._auth,
        }
    )
    self._state_http = State.CONNECTED
    self._ready_event.set()
close async
close()

Close HTTP session.

Source code in kiwoom/http/client.py
105
106
107
108
109
110
111
112
113
114
async def close(self) -> None:
    """
    Close HTTP session.
    """
    self._ready_event.clear()
    if self._session:
        await asyncio.shield(self._session.close())

    self._auth = ""
    self._session = None
token
token()

Returns token if available, otherwise empty string.

Raises:

Type Description
ValueError

Invalid token.

Returns:

Name Type Description
str str

token

Source code in kiwoom/http/client.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def token(self) -> str:
    """
    Returns token if available, otherwise empty string.

    Raises:
        ValueError: Invalid token.

    Returns:
        str: token
    """
    if not self._auth:
        return ""
    if "Bearer " in self._auth:
        return self._auth[len("Bearer ") :]
    raise ValueError(f"Invalid token: {self._auth}")
headers
headers(api_id, cont_yn='N', next_key='', headers=None)

Generate headers for the request.

Parameters:

Name Type Description Default
api_id str

api_id in Kiwoom API

required
cont_yn str

cont_yn in Kiwoom API

'N'
next_key str

next_key in Kiwoom API

''
headers dict | None

headers to be updated with

None

Returns:

Type Description
dict[str, str]

dict[str, str]: headers

Source code in kiwoom/http/client.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def headers(
    self, api_id: str, cont_yn: str = "N", next_key: str = "", headers: dict | None = None
) -> dict[str, str]:
    """
    Generate headers for the request.

    Args:
        api_id (str): api_id in Kiwoom API
        cont_yn (str, optional): cont_yn in Kiwoom API
        next_key (str, optional): next_key in Kiwoom API
        headers (dict | None, optional): headers to be updated with

    Returns:
        dict[str, str]: headers
    """
    base = {
        # 'Content-Type': 'application/json;charset=UTF-8',
        # 'authorization': self._auth,
        "cont-yn": cont_yn,
        "next-key": next_key,
        "api-id": api_id,
    }
    if headers is not None:
        headers.update(base)
        return headers
    return base
ready async
ready()

Wait until request limit is lifted and connection is established.

Raises:

Type Description
RuntimeError

Connection timeout.

Source code in kiwoom/http/client.py
159
160
161
162
163
164
165
166
167
168
169
170
171
async def ready(self):
    """
    Wait until request limit is lifted and connection is established.

    Raises:
        RuntimeError: Connection timeout.
    """
    try:
        await asyncio.wait_for(self._ready_event.wait(), HTTP_TOTAL_TIMEOUT)
    except asyncio.TimeoutError as err:
        msg = f"Connection timeout: waited for {HTTP_TOTAL_TIMEOUT} seconds."
        raise RuntimeError(msg) from err
    await self._limiter.acquire()
post async
post(endpoint, api_id, headers=None, data=None)

Post request to the server, but using client.request function is recommended. Request limit and connection status are checked globally and automatically.

Parameters:

Name Type Description Default
endpoint str

endpoint to Kiwoom REST API server

required
api_id str

api id

required
headers dict | None

headers of the request.

None
data dict | None

data to be sent in json format

None

Returns:

Type Description
ClientResponse

aiohttp.ClientResponse: async response from the server, but this will be converted to kiwoom.http.response.Response by debugger.

Source code in kiwoom/http/client.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@debugger
async def post(
    self, endpoint: str, api_id: str, headers: dict | None = None, data: dict | None = None
) -> aiohttp.ClientResponse:
    """
    Post request to the server, but using client.request function is recommended.
    Request limit and connection status are checked globally and automatically.

    Args:
        endpoint (str): endpoint to Kiwoom REST API server
        api_id (str): api id
        headers (dict | None, optional): headers of the request.
        data (dict | None, optional): data to be sent in json format

    Returns:
        aiohttp.ClientResponse: async response from the server,
            but this will be converted to kiwoom.http.response.Response by debugger.
    """

    # Warn not connected
    if not self._state_http == State.CONNECTED:
        warnings.warn("Not connected, wait for timeout...", RuntimeWarning, stacklevel=1)

    # Wait connection and request limits
    await self.ready()

    # Post Request
    if headers is None:
        headers = self.headers(api_id)
    return await self._session.post(self.host + endpoint, headers=headers, json=data)
request async
request(endpoint, api_id, headers=None, data=None)

Requests to the server and returns response with error handling.

Parameters:

Name Type Description Default
endpoint str

endpoint of the server

required
api_id str

api id

required
headers dict | None

headers of the request. Defaults to None.

None
data dict | None

data of the request. Defaults to None.

None

Raises:

Type Description
RuntimeError

RuntimeError when return_code is not in [0, 3, 20]

Returns:

Name Type Description
Response Response

response wrapped by kiwoom.http.response.Response

Source code in kiwoom/http/client.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
async def request(
    self, endpoint: str, api_id: str, headers: dict | None = None, data: dict | None = None
) -> Response:
    """
    Requests to the server and returns response with error handling.

    Args:
        endpoint (str): endpoint of the server
        api_id (str): api id
        headers (dict | None, optional): headers of the request. Defaults to None.
        data (dict | None, optional): data of the request. Defaults to None.

    Raises:
        RuntimeError: RuntimeError when return_code is not in [0, 3, 20]

    Returns:
        Response: response wrapped by kiwoom.http.response.Response
    """

    res: Response = await self.post(endpoint, api_id, headers=headers, data=data)
    body = res.json()
    if "return_code" in body:
        match body["return_code"]:
            case 0 | 20:
                # 0: Success
                # 20 : No Data
                return res
            case 3:
                # 3 : Token Expired
                print("Token expired, trying to refresh token...")
                await self.connect(self._appkey, self._secretkey)
                return await self.request(endpoint, api_id, headers=headers, data=data)

    # Request Failure
    return_code = body["return_code"]
    err = f"\nRequest failed with {return_code=}, not in {{'0', '3', '20'}}."
    if not self.debugging:
        msg = dumps(self, endpoint, api_id, headers, data, res)
        raise RuntimeError(msg + err)
    raise RuntimeError(err)
request_until async
request_until(should_continue, endpoint, api_id, headers=None, data=None)

Request until 'cont-yn' in response header is 'Y', and should_continue(body) evaluates to True.

Parameters:

Name Type Description Default
should_continue Callable

callable that takes body(dict) and returns boolean value to request again or not

required
endpoint str

endpoint of the server

required
api_id str

api id

required
headers dict | None

headers of the request. Defaults to None.

None
data dict | None

data of the request. Defaults to None.

None

Returns:

Name Type Description
dict dict

response body

Source code in kiwoom/http/client.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
async def request_until(
    self,
    should_continue: Callable,
    endpoint: str,
    api_id: str,
    headers: dict | None = None,
    data: dict | None = None,
) -> dict:
    """
    Request until 'cont-yn' in response header is 'Y',
    and should_continue(body) evaluates to True.

    Args:
        should_continue (Callable):
            callable that takes body(dict) and
            returns boolean value to request again or not
        endpoint (str):
            endpoint of the server
        api_id (str):
            api id
        headers (dict | None, optional):
            headers of the request. Defaults to None.
        data (dict | None, optional):
            data of the request. Defaults to None.

    Returns:
        dict: response body
    """

    # Initial request
    res = await self.request(endpoint, api_id, headers=headers, data=data)
    body = res.json()

    # If condition to chain is not met
    if callable(should_continue) and not should_continue(body):
        return body

    # Extract list data only
    bodies = dict()
    for key in body.keys():
        if isinstance(body[key], list):
            bodies[key] = [body[key]]
            continue
        bodies[key] = body[key]

    # Rercursive call
    while res.headers.get("cont-yn") == "Y" and should_continue(body):
        next_key = res.headers.get("next-key")
        headers = self.headers(api_id, cont_yn="Y", next_key=next_key, headers=headers)

        # Continue request
        res = await self.request(endpoint, api_id, headers=headers, data=data)
        body = res.json()

        # Append list data
        for key in body.keys():
            if isinstance(body[key], list):
                bodies[key].append(body[key])

    # Flatten list data as if it was one list
    for key in bodies:
        if isinstance(bodies[key], list):
            bodies[key] = list(chain.from_iterable(bodies[key]))
    return bodies

debug

dumps
dumps(api, endpoint, api_id, headers, data, res)

Dump request and response to string for debugging.

Parameters:

Name Type Description Default
api Client

Client instance

required
endpoint str

endpoint

required
api_id str

api id

required
headers dict

headers

required
data dict

data

required
res Response

wrapped response

required

Returns:

Name Type Description
str str

description

Source code in kiwoom/http/debug.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def dumps(api, endpoint: str, api_id, headers: dict, data: dict, res: Response) -> str:
    """
    Dump request and response to string for debugging.

    Args:
        api (Client): Client instance
        endpoint (str): endpoint
        api_id (str): api id
        headers (dict): headers
        data (dict): data
        res (Response): wrapped response

    Returns:
        str: _description_
    """
    # Request
    headers = json.dumps(
        headers if headers is not None else api.headers(api_id), indent=4, ensure_ascii=False
    )
    req = "\n== Request ==\n"
    req += f"URL : {api.host + endpoint}\n"
    req += f"Headers : {headers}\n"
    req += f"Data : {json.dumps(data, indent=4, ensure_ascii=False)}\n"

    # Response
    headers = json.dumps(
        {key: res.headers.get(key) for key in ["next-key", "cont-yn", "api-id"]},
        indent=4,
        ensure_ascii=False,
    )
    resp = "== Response ==\n"
    resp += f"Code : {res.status}\n"
    resp += f"Headers : {headers}\n"
    resp += f"Response : {json.dumps(res.json(), indent=4, ensure_ascii=False)}\n"
    return req + resp
debugger
debugger(fn)

Debugger decorator for Client.post method. Even though debugging is disabled, it will print if error occurs.

Parameters:

Name Type Description Default
fn function

function to be decorated

required

Raises:

Type Description
err

propagate HTTPException from original function

Returns:

Name Type Description
Response Callable

wrapped response

Source code in kiwoom/http/debug.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def debugger(fn) -> Callable:
    """
    Debugger decorator for Client.post method.
    Even though debugging is disabled, it will print if error occurs.

    Args:
        fn (function): function to be decorated

    Raises:
        err: propagate HTTPException from original function

    Returns:
        Response: wrapped response
    """

    @functools.wraps(fn)
    async def wrapper(api, endpoint: str, api_id: str, headers: dict, data: dict) -> Response:
        res: ClientResponse = await fn(api, endpoint, api_id, headers, data)
        async with res:
            # Async to sync Response
            resp = Response(
                url=res.url, status=res.status, headers=res.headers, body=await res.json()
            )

            # Debugging
            if api.debugging:
                print(dumps(api, endpoint, api_id, headers, data, resp))

            try:
                res.raise_for_status()
            except HTTPException as err:
                # Always debug when error occurs
                if not api.debugging:
                    print(dumps(api, endpoint, api_id, headers, data, resp))
                raise err
        return resp

    return wrapper

response

Response

Response wrapper for aiohttp.ClientResponse

Source code in kiwoom/http/response.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Response:
    """
    Response wrapper for aiohttp.ClientResponse
    """

    def __init__(self, url: str, status: int, headers: dict, body: dict):
        """
        Simply wrap aiohttp.ClientResponse to escape from async context.

        Args:
            url (str): url of the response
            status (int): status code of the response
            headers (dict): headers of the response
            body (dict): body of the response
        """

        self.url = url
        self.status = status
        self.headers = headers
        self.body = body

    def json(self) -> dict:
        """
        Returns already parsed body.

        Returns:
            dict: body in json format
        """
        return self.body
url instance-attribute
url = url
status instance-attribute
status = status
headers instance-attribute
headers = headers
body instance-attribute
body = body
json
json()

Returns already parsed body.

Returns:

Name Type Description
dict dict

body in json format

Source code in kiwoom/http/response.py
22
23
24
25
26
27
28
29
def json(self) -> dict:
    """
    Returns already parsed body.

    Returns:
        dict: body in json format
    """
    return self.body

socket

Socket
Source code in kiwoom/http/socket.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class Socket:
    REAL = "wss://api.kiwoom.com:10000"
    MOCK = "wss://mockapi.kiwoom.com:10000"  # KRX Only
    ENDPOINT = "/api/dostk/websocket"

    def __init__(self, url: str, queue: asyncio.Queue):
        """
        Initialize Socket class.

        Args:
            url (str): url of Kiwoom websocket server
            queue (asyncio.Queue): queue to put received data
        """
        self.url = url
        self._queue = queue
        self._session: ClientSession | None = None
        self._websocket: ClientWebSocketResponse | None = None

        self._state = State.CLOSED
        self._state_lock = asyncio.Lock()
        self._queue_task: asyncio.Task | None = None
        self._stop_event = asyncio.Event()
        self._stop_event.set()

    async def connect(self, session: ClientSession, token: str):
        """
        Connect to Kiwoom websocket server.

        Args:
            session (ClientSession): aiohttp ClientSession from API.connect()
            token (str): token for authentication
        """

        # print("Trying to connect websocket...")
        async with self._state_lock:
            if self._state in (State.CONNECTED, State.CONNECTING):
                return

            self._state = State.CONNECTING
            try:
                # Close existing websocket & task
                self._stop_event.set()
                if self._websocket and not self._websocket.closed:
                    await self._websocket.close()
                await cancel(self._queue_task)
                self._queue_task = None

                self._session = session
                self._websocket = await session.ws_connect(
                    self.url, autoping=True, heartbeat=WEBSOCKET_HEARTBEAT
                )

                self._stop_event.clear()
                self._queue_task = asyncio.create_task(self.run(), name="enqueue")
                await self.send({"trnm": "LOGIN", "token": token})
                self._state = State.CONNECTED

            except Exception as err:
                print(f"Websocket failed to connect to {self.url}: {err}")
                self._state = State.CLOSED

    async def close(self):
        """
        Close the websocket and the task.
        """
        async with self._state_lock:
            self._stop_event.set()
            if self._queue_task:
                self._queue_task.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await self._queue_task

            if self._websocket and not self._websocket.closed:
                with contextlib.suppress(Exception):
                    await self._websocket.close()

            self._session = None
            self._websocket = None
            self._queue_task = None

    async def send(self, msg: str | dict) -> None:
        """
        Send data to Kiwoom websocket server.

        Args:
            msg (str | dict): msg should be in json format
        """
        if isinstance(msg, dict):
            # msg = json.dumps(msg)  # slow
            msg = orjson.dumps(msg).decode("utf-8")
        await self._websocket.send_str(msg)

    async def recv(self) -> str:
        """
        Receive data from Kiwoom websocket server and return data.
        If message type is not str, close the websocket and raise RuntimeError.

        Raises:
            RuntimeError: Websocket Connection Error

        Returns:
            str: received json formatted data from websocket
        """
        try:
            return await self._websocket.receive_str()
        except WSMessageTypeError as err:
            msg = await self._websocket.receive()
            if msg.type == WSMsgType.BINARY:
                msg.data = msg.data.decode("utf-8")
            await self.close()
            raise RuntimeError(f"Websocket received other type than str: {msg}") from err

    async def run(self):
        """
        Receive data from websocket and put data to the queue.
        If WEBSOCKET_QUEUE_MAX_SIZE is set and queue gets full,
        then backpressure will be applied to the websocket.
        Run this task in background with asyncio.create_task().
        """
        assert self._websocket is not None
        try:
            while not self._stop_event.is_set():
                await self._queue.put(await self.recv())

        except Exception as e:
            print(f"Failed to receive message: {e}")
            await self.close()
REAL class-attribute instance-attribute
REAL = 'wss://api.kiwoom.com:10000'
MOCK class-attribute instance-attribute
MOCK = 'wss://mockapi.kiwoom.com:10000'
ENDPOINT class-attribute instance-attribute
ENDPOINT = '/api/dostk/websocket'
url instance-attribute
url = url
connect async
connect(session, token)

Connect to Kiwoom websocket server.

Parameters:

Name Type Description Default
session ClientSession

aiohttp ClientSession from API.connect()

required
token str

token for authentication

required
Source code in kiwoom/http/socket.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async def connect(self, session: ClientSession, token: str):
    """
    Connect to Kiwoom websocket server.

    Args:
        session (ClientSession): aiohttp ClientSession from API.connect()
        token (str): token for authentication
    """

    # print("Trying to connect websocket...")
    async with self._state_lock:
        if self._state in (State.CONNECTED, State.CONNECTING):
            return

        self._state = State.CONNECTING
        try:
            # Close existing websocket & task
            self._stop_event.set()
            if self._websocket and not self._websocket.closed:
                await self._websocket.close()
            await cancel(self._queue_task)
            self._queue_task = None

            self._session = session
            self._websocket = await session.ws_connect(
                self.url, autoping=True, heartbeat=WEBSOCKET_HEARTBEAT
            )

            self._stop_event.clear()
            self._queue_task = asyncio.create_task(self.run(), name="enqueue")
            await self.send({"trnm": "LOGIN", "token": token})
            self._state = State.CONNECTED

        except Exception as err:
            print(f"Websocket failed to connect to {self.url}: {err}")
            self._state = State.CLOSED
close async
close()

Close the websocket and the task.

Source code in kiwoom/http/socket.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
async def close(self):
    """
    Close the websocket and the task.
    """
    async with self._state_lock:
        self._stop_event.set()
        if self._queue_task:
            self._queue_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._queue_task

        if self._websocket and not self._websocket.closed:
            with contextlib.suppress(Exception):
                await self._websocket.close()

        self._session = None
        self._websocket = None
        self._queue_task = None
send async
send(msg)

Send data to Kiwoom websocket server.

Parameters:

Name Type Description Default
msg str | dict

msg should be in json format

required
Source code in kiwoom/http/socket.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def send(self, msg: str | dict) -> None:
    """
    Send data to Kiwoom websocket server.

    Args:
        msg (str | dict): msg should be in json format
    """
    if isinstance(msg, dict):
        # msg = json.dumps(msg)  # slow
        msg = orjson.dumps(msg).decode("utf-8")
    await self._websocket.send_str(msg)
recv async
recv()

Receive data from Kiwoom websocket server and return data. If message type is not str, close the websocket and raise RuntimeError.

Raises:

Type Description
RuntimeError

Websocket Connection Error

Returns:

Name Type Description
str str

received json formatted data from websocket

Source code in kiwoom/http/socket.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
async def recv(self) -> str:
    """
    Receive data from Kiwoom websocket server and return data.
    If message type is not str, close the websocket and raise RuntimeError.

    Raises:
        RuntimeError: Websocket Connection Error

    Returns:
        str: received json formatted data from websocket
    """
    try:
        return await self._websocket.receive_str()
    except WSMessageTypeError as err:
        msg = await self._websocket.receive()
        if msg.type == WSMsgType.BINARY:
            msg.data = msg.data.decode("utf-8")
        await self.close()
        raise RuntimeError(f"Websocket received other type than str: {msg}") from err
run async
run()

Receive data from websocket and put data to the queue. If WEBSOCKET_QUEUE_MAX_SIZE is set and queue gets full, then backpressure will be applied to the websocket. Run this task in background with asyncio.create_task().

Source code in kiwoom/http/socket.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
async def run(self):
    """
    Receive data from websocket and put data to the queue.
    If WEBSOCKET_QUEUE_MAX_SIZE is set and queue gets full,
    then backpressure will be applied to the websocket.
    Run this task in background with asyncio.create_task().
    """
    assert self._websocket is not None
    try:
        while not self._stop_event.is_set():
            await self._queue.put(await self.recv())

    except Exception as e:
        print(f"Failed to receive message: {e}")
        await self.close()

utils

RateLimiter
Source code in kiwoom/http/utils.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class RateLimiter:
    def __init__(self, rps: int = REQ_LIMIT_PER_SECOND):
        """
        Globally limits requests per second.

        Args:
            rps (float): requests per second
        """
        self._period = 1.0 / rps
        self._lock = asyncio.Lock()
        self._next = 0.0

    async def acquire(self):
        async with self._lock:
            now = asyncio.get_running_loop().time()
            if self._next < now:
                self._next = now
            wait = self._next - now
            self._next += self._period

        if wait > 0:
            await asyncio.sleep(wait)
acquire async
acquire()
Source code in kiwoom/http/utils.py
23
24
25
26
27
28
29
30
31
32
async def acquire(self):
    async with self._lock:
        now = asyncio.get_running_loop().time()
        if self._next < now:
            self._next = now
        wait = self._next - now
        self._next += self._period

    if wait > 0:
        await asyncio.sleep(wait)
wrap_async_callback
wrap_async_callback(semaphore, callback)

Wrap async callback to run in async context.

Parameters:

Name Type Description Default
semaphore Semaphore

semaphore to limit the number of callbacks

required
callback Callable

callback to be wrapped

required

Returns:

Name Type Description
Callable Callable

wrapped callback

Source code in kiwoom/http/utils.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def wrap_async_callback(semaphore: asyncio.Semaphore, callback: Callable) -> Callable:
    """
    Wrap async callback to run in async context.

    Args:
        semaphore (asyncio.Semaphore): semaphore to limit the number of callbacks
        callback (Callable): callback to be wrapped

    Returns:
        Callable: wrapped callback
    """

    async def wrapper(msg: RealData | dict):
        async with semaphore:
            await callback(msg)

    return wrapper
wrap_sync_callback
wrap_sync_callback(semaphore, callback)

Wrap sync callback to run in async context.

Parameters:

Name Type Description Default
semaphore Semaphore

semaphore to limit the number of callbacks

required
callback Callable

callback to be wrapped

required

Returns:

Name Type Description
Callable Callable

wrapped callback

Source code in kiwoom/http/utils.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def wrap_sync_callback(semaphore: asyncio.Semaphore, callback: Callable) -> Callable:
    """
    Wrap sync callback to run in async context.

    Args:
        semaphore (asyncio.Semaphore): semaphore to limit the number of callbacks
        callback (Callable): callback to be wrapped

    Returns:
        Callable: wrapped callback
    """

    async def wrapper(msg: RealData | dict):
        async with semaphore:
            await asyncio.get_running_loop().run_in_executor(None, callback, msg)

    return wrapper
cancel async
cancel(task)

Cancel a task if it exists.

Parameters:

Name Type Description Default
task Task | None

task to be cancelled

required
Source code in kiwoom/http/utils.py
73
74
75
76
77
78
79
80
81
82
83
async def cancel(task: asyncio.Task | None) -> None:
    """
    Cancel a task if it exists.

    Args:
        task (asyncio.Task | None): task to be cancelled
    """
    if task:
        task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await task