Skip to content

Commit 736dfd7

Browse files
committed
use single reqrep socket remove unused fuctions add symbol list
1 parent fe0876a commit 736dfd7

File tree

2 files changed

+37
-412
lines changed

2 files changed

+37
-412
lines changed

ejtraderMT/api/mql.py

Lines changed: 37 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,9 @@ class Functions:
3535
def __init__(self, host=None, debug=None):
3636
self.HOST = host or "localhost"
3737
self.SYS_PORT = 15555 # REP/REQ port
38-
self.DATA_PORT = 15556 # PUSH/PULL port
39-
self.LIVE_PORT = 15557 # PUSH/PULL port
40-
self.EVENTS_PORT = 15558 # PUSH/PULL port
41-
self.INDICATOR_DATA_PORT = 15559 # REP/REQ port
42-
self.CHART_DATA_PORT = 15560 # PUSH
43-
self.debug = debug or True
4438

4539
# ZeroMQ timeout in seconds
46-
sys_timeout = 1000 * 1000
47-
data_timeout = 1000 * 1000
40+
sys_timeout = 1000
4841

4942
# initialise ZMQ context
5043
context = zmq.Context()
@@ -56,44 +49,17 @@ def __init__(self, host=None, debug=None):
5649
self.sys_socket.RCVTIMEO = sys_timeout
5750
self.sys_socket.connect("tcp://{}:{}".format(self.HOST, self.SYS_PORT))
5851

59-
self.data_socket = context.socket(zmq.PULL)
60-
# set port timeout
61-
self.data_socket.RCVTIMEO = data_timeout
62-
self.data_socket.connect("tcp://{}:{}".format(self.HOST, self.DATA_PORT))
63-
64-
self.indicator_data_socket = context.socket(zmq.PULL)
65-
# set port timeout
66-
self.indicator_data_socket.RCVTIMEO = data_timeout
67-
self.indicator_data_socket.connect(
68-
"tcp://{}:{}".format(self.HOST, self.INDICATOR_DATA_PORT)
69-
)
70-
self.chart_data_socket = context.socket(zmq.PUSH)
71-
# set port timeout
72-
# TODO check if port is listening and error handling
73-
self.chart_data_socket.connect(
74-
"tcp://{}:{}".format(self.HOST, self.CHART_DATA_PORT)
75-
)
76-
7752
except zmq.ZMQError:
7853
raise zmq.ZMQBindError("Binding ports ERROR")
7954
except KeyboardInterrupt:
8055
self.sys_socket.close()
8156
self.sys_socket.term()
82-
self.data_socket.close()
83-
self.data_socket.term()
84-
self.indicator_data_socket.close()
85-
self.indicator_data_socket.term()
86-
self.chart_data_socket.close()
87-
self.chart_data_socket.term()
8857
pass
8958

9059
def _send_request(self, data: dict) -> None:
9160
"""Send request to server via ZeroMQ System socket"""
9261
try:
9362
self.sys_socket.send_json(data)
94-
msg = self.sys_socket.recv_string()
95-
# terminal received the request
96-
assert msg == "OK", "Something wrong on server side"
9763
except AssertionError as err:
9864
raise zmq.NotDone(err)
9965
except zmq.ZMQError:
@@ -102,50 +68,11 @@ def _send_request(self, data: dict) -> None:
10268
def _pull_reply(self):
10369
"""Get reply from server via Data socket with timeout"""
10470
try:
105-
msg = self.data_socket.recv_json()
71+
msg = self.sys_socket.recv_json()
10672
except zmq.ZMQError:
10773
raise zmq.NotDone("Data socket timeout ERROR")
10874
return msg
10975

110-
def _indicator_pull_reply(self):
111-
"""Get reply from server via Data socket with timeout"""
112-
try:
113-
msg = self.indicator_data_socket.recv_json()
114-
except zmq.ZMQError:
115-
raise zmq.NotDone("Indicator Data socket timeout ERROR")
116-
if self.debug:
117-
logging.info("ZMQ INDICATOR DATA REPLY: ", msg)
118-
return msg
119-
120-
def live_socket(self, context=None):
121-
"""Connect to socket in a ZMQ context"""
122-
try:
123-
context = context or zmq.Context.instance()
124-
socket = context.socket(zmq.PULL)
125-
socket.connect("tcp://{}:{}".format(self.HOST, self.LIVE_PORT))
126-
except zmq.ZMQError:
127-
raise zmq.ZMQBindError("Live port connection ERROR")
128-
return socket
129-
130-
def streaming_socket(self, context=None):
131-
"""Connect to socket in a ZMQ context"""
132-
try:
133-
context = context or zmq.Context.instance()
134-
socket = context.socket(zmq.PULL)
135-
socket.connect("tcp://{}:{}".format(self.HOST, self.EVENTS_PORT))
136-
except zmq.ZMQError:
137-
raise zmq.ZMQBindError("Data port connection ERROR")
138-
return socket
139-
140-
def _push_chart_data(self, data: dict) -> None:
141-
"""Send message for chart control to server via ZeroMQ chart data socket"""
142-
try:
143-
if self.debug:
144-
logging.info("ZMQ PUSH CHART DATA: ", data, " -> ", data)
145-
self.chart_data_socket.send_json(data)
146-
except zmq.ZMQError:
147-
raise zmq.NotDone("Sending request ERROR")
148-
14976
def Command(self, **kwargs) -> dict:
15077
"""Construct a request dictionary from default and send it to server"""
15178

@@ -185,58 +112,6 @@ def Command(self, **kwargs) -> dict:
185112
# return server reply
186113
return self._pull_reply()
187114

188-
def indicator_construct_and_send(self, **kwargs) -> dict:
189-
"""Construct a request dictionary from default and send it to server"""
190-
191-
# default dictionary
192-
request = {
193-
"action": None,
194-
"actionType": None,
195-
"id": None,
196-
"symbol": None,
197-
"chartTF": None,
198-
"fromDate": None,
199-
"toDate": None,
200-
"name": None,
201-
"params": None,
202-
"linecount": None,
203-
}
204-
205-
# update dict values if exist
206-
for key, value in kwargs.items():
207-
if key in request:
208-
request[key] = value
209-
else:
210-
raise KeyError("Unknown key in **kwargs ERROR")
211-
212-
# send dict to server
213-
self._send_request(request)
214-
215-
# return server reply
216-
return self._indicator_pull_reply()
217-
218-
def chart_data_construct_and_send(self, **kwargs) -> dict:
219-
"""Construct a request dictionary from default and send it to server"""
220-
221-
# default dictionary
222-
message = {
223-
"action": None,
224-
"actionType": None,
225-
"chartId": None,
226-
"indicatorChartId": None,
227-
"data": None,
228-
}
229-
230-
# update dict values if exist
231-
for key, value in kwargs.items():
232-
if key in message:
233-
message[key] = value
234-
else:
235-
raise KeyError("Unknown key in **kwargs ERROR")
236-
237-
# send dict to server
238-
self._push_chart_data(message)
239-
240115

241116
class Metatrader:
242117
def __init__(
@@ -290,7 +165,6 @@ def symbols(self):
290165
except Exception as e:
291166
logging.info(f"Error while processing. Error message: {str(e)}")
292167

293-
self.__api.Command(action="RESET")
294168
if df is not None and isinstance(df, dict):
295169
try:
296170
if df["data"]:
@@ -349,63 +223,48 @@ def _calendar(self, data):
349223
toDate = start_date
350224
toDate += delta2
351225
toDate = toDate.strftime("%d/%m/%Y")
352-
attempts = 0
353-
success = False
354226

355-
while not success and attempts < 5:
356-
try:
357-
df = self.__api.Command(
358-
action="CALENDAR",
359-
actionType="DATA",
360-
symbol=symbol,
361-
fromDate=self.__date_to_timestamp(fromDate),
362-
toDate=self.__date_to_timestamp(toDate),
363-
)
364-
success = True
227+
try:
228+
df = self.__api.Command(
229+
action="CALENDAR",
230+
actionType="DATA",
231+
symbol=symbol,
232+
fromDate=self.__date_to_timestamp(fromDate),
233+
toDate=self.__date_to_timestamp(toDate),
234+
)
235+
except Exception as e:
236+
logging.info(
237+
f"Error while processing {symbol}. Error message: {str(e)}"
238+
)
239+
pass
365240

366-
except Exception as e:
367-
logging.info(
368-
f"Error while processing {symbol}. Error message: {str(e)}"
369-
)
370-
attempts += 1
371-
if attempts == 5 and not success:
372-
# print(f"Check if {symbol} is avalible from {fromDate}")
241+
try:
242+
df = pd.DataFrame(df["data"])
243+
df.columns = [
244+
"date",
245+
"currency",
246+
"impact",
247+
"event",
248+
"country",
249+
"actual",
250+
"forecast",
251+
"previous",
252+
]
253+
df["date"] = pd.to_datetime(df["date"], errors="coerce")
254+
df = df.dropna(subset=["date"])
255+
df = df.set_index("date")
256+
df.index = pd.to_datetime(df.index)
257+
except Exception as e:
258+
logging.info(
259+
f"Error while processing {symbol} Dataframe. Error message: {str(e)}"
260+
)
373261
pass
374-
self.__api.Command(action="RESET")
375-
if df is not None and isinstance(df, dict):
376-
try:
377-
if df["data"]:
378-
df = pd.DataFrame(df["data"])
379-
df.columns = [
380-
"date",
381-
"currency",
382-
"impact",
383-
"event",
384-
"country",
385-
"actual",
386-
"forecast",
387-
"previous",
388-
]
389-
df["date"] = pd.to_datetime(df["date"], errors="coerce")
390-
df = df.dropna(subset=["date"])
391-
df = df.set_index("date")
392-
df.index = pd.to_datetime(df.index)
393-
except Exception as e:
394-
logging.info(
395-
f"Error while processing {symbol} Dataframe. Error message: {str(e)}"
396-
)
397-
pass
398262

399263
appended_data.append(df)
400264
start_date += delta
401265
pbar.close()
402-
try:
403-
df = pd.concat(appended_data)
404-
except Exception as e:
405-
logging.info(
406-
f"Error while processing {symbol} Dataframe. Error message: {str(e)}"
407-
)
408-
pass
266+
267+
df = pd.concat(appended_data)
409268

410269
if self.__database:
411270
start(self.__save_to_db, data=[df], repeat=1, max_threads=20)
@@ -608,7 +467,6 @@ def _event(self):
608467
pass
609468

610469
def price(self, symbol, chartTF):
611-
self.__api.Command(action="RESET")
612470
self._allsymbol_ = symbol
613471
self._allchartTF = chartTF
614472
for active in symbol:
@@ -618,7 +476,6 @@ def price(self, symbol, chartTF):
618476
return self.__priceQ.get()
619477

620478
def event(self, symbol, chartTF):
621-
self.__api.Command(action="RESET")
622479
self._allsymbol_ = symbol
623480
self._allchartTF = chartTF
624481
for active in symbol:
@@ -816,14 +673,6 @@ def __historyThread_save(self, data):
816673
logging.info(f"Check if {active} is avalible from {fromDate}")
817674
pass
818675

819-
try:
820-
self.__api.Command(action="RESET")
821-
except Exception as e:
822-
logging.info(
823-
f"Error while processing RESET. Error message: {str(e)}"
824-
)
825-
pass
826-
827676
if data is not None and isinstance(data, dict):
828677
try:
829678
if data["data"]:
@@ -875,13 +724,6 @@ def __historyThread_save(self, data):
875724
logging.info(f"Check if {active} is avalible from {fromDate}")
876725
pass
877726

878-
try:
879-
self.__api.Command(action="RESET")
880-
except Exception as e:
881-
logging.info(
882-
f"Error while processing commad Reset {active}. Error message: {str(e)}"
883-
)
884-
pass
885727
if data is not None and isinstance(data, dict):
886728
try:
887729
if data["data"]:

tes.ipynb

Lines changed: 0 additions & 217 deletions
This file was deleted.

0 commit comments

Comments
 (0)