介绍
MetaTrader 5 在过去的几年里已经非常成熟,为交易者提供了广泛的功能。一个突出的特性是,尽管使用了专有编程语言,它仍然能够与各种系统和平台集成。这种能力是非常重要的,因为它为交易者提供了很多选择,当涉及到探索潜在的盈利交易策略。
这种整合的关键在于它能够利用更高效、更易于实施的现代网络协议。正是在这种情况下,我们将研究在不使用动态链接库的情况下为 MetaTrader 5 应用程序实现 WebSocket 客户端。
首先,简要介绍 WebSocket 网络协议。
WebSocket协议,这是一种通信方法,允许服务器和客户端之间的信息双向流动,而无需发出多个基于超文本传输协议的请求。浏览器和大多数 web 接口应用程序使用 WebSocket 协议来提供各种服务,如即时消息、动态 web 内容和在线多人游戏。
为什么需要 WebSocket
在 WebSocket 协议出现之前,开发人员必须采用低效且昂贵的技术来实现服务器和客户机之间的异步通信。
其中包括 :
上面描述的所有方法都允许在客户端和服务器之间进行不同级别的双向数据交换,但是相对于 WebSockets,由于以下三个主要原因而受到影响:
WebSocket 是一种基于 TCP 的协议,可以进一步扩展以支持其他应用程序或行业定义的子协议。因为它是基于 TCP 的,所以它可以在标准 HTTP 端口 80、443 上工作,并且具有类似的通用资源定位器(URL)模式。WebSocket 服务器地址的前缀是 ws 或 wss,而不是http,但是遵循与 http web 地址相同的 url 地址结构。例如:
ws(s)://websocketexampleurl.com:80/hello.php
为了理解如何在 MQL5 中实现 WebSocket 客户机,必须熟悉通用计算机网络的基础知识。WebSocket 协议类似于超文本传输协议,在客户端对服务器的请求中使用头。与超文本传输协议一样,建立 WebSocket 定义的连接也需要使用头。与 WebSockets的主要区别在于,这样的请求仅用于建立或初始化 WebSocket。客户端发出看起来像普通的超文本传输协议请求,然后使用的协议将从使用超文本传输协议切换到 WebSocket 协议。
这个过程称为 WebSocket 握手。只有当发送到服务器的初始超文本传输协议请求包含一个或多个特定头时,才会执行协议切换。然后服务器必须相应地响应,确认建立 WebSocket 连接的愿望。有关特殊头的性质和服务器如何响应的信息都记录在 RFC 6455 中。
一旦建立了 WebSocket,就不再需要像请求那样使用超文本传输协议,这就是协议在操作上的不同之处。使用 WebSocket 协议交换数据时采用不同的格式。与超文本传输协议请求相比,这种格式更加精简,使用的原始位少得多。所使用的格式被称为帧协议,其中主机之间在一个事务中交换的数据称为帧(frame)。
每个帧是以符合 RFC 6455 中规定的成帧协议的特定方式排列的位序列。每个 WebSocket 帧包含定义操作码、有效负载大小和实际有效负载本身的位。该协议还定义了这些位在帧中的排列和最终打包方式。操作码只是一个保留的数值,用于对帧进行分类。
对于 WebSocket,基本操作码定义如下:
0 — 延续帧:此值表示不完整的载荷数据,因此应期望更多帧。此功能启用帧分段,它允许将数据分割成不同帧中的数据块。
1 — 文本帧:此值表示载荷数据本质上是文本的。
2 — 二进制帧:使用此值,载荷为二进制形式。
8 — 关闭帧:此值表示当任一端点要关闭已建立的 WebSocket 连接时发送的一种特殊类型的帧,它是一种称为控制帧的帧类型。控制帧已经具有特殊的含义,因此它们可能并不总是包含任何载荷数据,即载荷是可选的。
9 — ping 帧:另一个控制帧,用于确定端点是否仍然连接
10 — pong 帧:每当端点接收到 ping 帧时,pong 帧被用作响应。在这种情况下,接收者必须尽快用适当的 pong 帧作出响应。通常,只要回显 ping 帧中包含的任何有效负载就足够了。
这些是所有 WebSocket 都应该支持的唯一基本操作码。该协议允许基于 WebSocket 的 API 或 WebSocket 子策略在这些保留值上展开。
关于帧的最后一个重要方面是掩码操作。RFC6455 要求从客户机发送到服务器的所有帧都要有掩码操作。掩码操作是 WebSocket 协议的基本安全措施。它需要使用预定义的算法,用一个随机生成的4字节值(称为密钥)来混合内容。你可以把它看作是一种数据混淆。算法记录在RFC 6455文档中。从客户端发送的每一帧都必须使用新生成的随机密钥,即使对于碎片帧也是如此。
这一部分简要介绍了 WebSocket 协议的重要特性。有关更多深入信息,请参阅 RFC6455 文档中的所有详细信息。有了这些知识,我认为理解代码实现会容易得多。
首先,代码将分为三个类。
CSocket - 封装 MQL5 API 的网络函数。
CFrame — 帧类表示 WebSocket 帧,主要用于解码从服务器接收的帧。
CWebSocketClient — 表示 WebSocket 客户端本身
//+------------------------------------------------------------------+ //| structs | //+------------------------------------------------------------------+ struct CERT { string cert_subject; string cert_issuer; string cert_serial; string cert_thumbprint; datetime cert_expiry; }; //+------------------------------------------------------------------+ //| Class CSocket. | //| Purpose: Base class of socket operations. | //| | //+------------------------------------------------------------------+ class CSocket { private: static int m_usedsockets; // tracks number of sockets in use in single program bool m_log; // logging state bool m_usetls; // tls state uint m_tx_timeout; // send system socket timeout in milliseconds uint m_rx_timeout; // receive system socket timeout in milliseconds int m_socket; // socket handle string m_address; // server address uint m_port; // port CERT m_cert; // Server certificate info public: CSocket(); ~CSocket(); //--- methods to get private properties int SocketID(void) const { return(m_socket); } string Address(void) const { return(m_address); } uint Port(void) const { return(m_port); } bool IsSecure(void) const { return(m_usetls); } uint RxTimeout(void) const { return(m_rx_timeout); } uint TxTimeout(void) const { return(m_tx_timeout); } bool ServerCertificate(CERT& certificate); //--- methods to set private properties bool SetTimeouts(uint tx_timeout, uint rx_timeout); //--- general methods for working sockets void Log(const string custom_message,const int line,const string func); static uint SocketsInUse(void) { return(m_usedsockets); } bool Open(const string server,uint port,uint timeout,bool use_tls=false,bool enablelog=false); bool Close(void); uint Readable(void); bool Writable(void); bool IsConnected(void); int Read(uchar& out[],uint out_len,uint ms_timeout,bool read_available); int Send(uchar& in[],uint in_len); }; int CSocket::m_usedsockets=0; //+------------------------------------------------------------------+ //| Constructor | //+------------------------------------------------------------------+ CSocket::CSocket():m_socket(INVALID_HANDLE), m_address(""), m_port(0), m_usetls(false), m_log(false), m_rx_timeout(150), m_tx_timeout(150) { ZeroMemory(m_cert); } //+------------------------------------------------------------------+ //| Destructor | //+------------------------------------------------------------------+ CSocket::~CSocket() { //--- check handle if(m_socket!=INVALID_HANDLE) Close(); } //+------------------------------------------------------------------+ //| set system socket timeouts | //+------------------------------------------------------------------+ bool CSocket::SetTimeouts(uint tx_timeout,uint rx_timeout) { if(m_socket==INVALID_HANDLE) { Log("Invalid socket",__LINE__,__FUNCTION__); return(false); } if(SocketTimeouts(m_socket,tx_timeout,rx_timeout)) { m_tx_timeout=tx_timeout; m_rx_timeout=rx_timeout; Log("Socket Timeouts set",__LINE__,__FUNCTION__); return(true); } return(false); } //+------------------------------------------------------------------+ //| certificate | //+------------------------------------------------------------------+ bool CSocket::ServerCertificate(CERT& certificate) { if(m_socket==INVALID_HANDLE) { Log("Invalid socket",__LINE__,__FUNCTION__); return(false); } if(SocketTlsCertificate(m_socket,m_cert.cert_subject,m_cert.cert_issuer,m_cert.cert_serial,m_cert.cert_thumbprint,m_cert.cert_expiry)) { certificate=m_cert; Log("Server certificate retrieved",__LINE__,__FUNCTION__); return(true); } return(false); } //+------------------------------------------------------------------+ //|connect() | //+------------------------------------------------------------------+ bool CSocket::Open(const string server,uint port,uint timeout,bool use_tls=false,bool enablelog=false) { if(m_socket!=INVALID_HANDLE) Close(); if(m_usedsockets>=128) { Log("Too many sockets open",__LINE__,__FUNCTION__); return(false); } m_usetls=use_tls; m_log=enablelog; m_socket=SocketCreate(); if(m_socket==INVALID_HANDLE) { Log("Invalid socket",__LINE__,__FUNCTION__); return(false); } ++m_usedsockets; m_address=server; if(port==0) { if(m_usetls) m_port=443; else m_port=80; } else m_port=port; //--- if(!m_usetls && m_port==443) m_usetls=true; //--- Log("Connecting to "+m_address,__LINE__,__FUNCTION__); //--- if(m_usetls) { if(m_port!=443) { if(SocketConnect(m_socket,server,port,timeout)) return(SocketTlsHandshake(m_socket,server)); } else { return(SocketConnect(m_socket,server,port,timeout)); } } return(SocketConnect(m_socket,server,port,timeout)); } //+------------------------------------------------------------------+ //|close() | //+------------------------------------------------------------------+ bool CSocket::Close(void) { //--- if(m_socket==INVALID_HANDLE) { Log("Socket Disconnected",__LINE__,__FUNCTION__); return(true); } //--- if(SocketClose(m_socket)) { m_socket=INVALID_HANDLE; --m_usedsockets; Log("Socket Disconnected from "+m_address,__LINE__,__FUNCTION__); m_address=""; ZeroMemory(m_cert); return(true); } //--- Log("",__LINE__,__FUNCTION__); return(false); } //+------------------------------------------------------------------+ //|readable() | //+------------------------------------------------------------------+ uint CSocket::Readable(void) { if(m_socket==INVALID_HANDLE) { Log("Invalid socket",__LINE__,__FUNCTION__); return(0); } //--- Log("Is Socket Readable ",__LINE__,__FUNCTION__); //--- return(SocketIsReadable(m_socket)); } //+------------------------------------------------------------------+ //|writable() | //+------------------------------------------------------------------+ bool CSocket::Writable(void) { if(m_socket==INVALID_HANDLE) { Log("Invalid socket",__LINE__,__FUNCTION__); return(false); } //--- Log("Is Socket Writable ",__LINE__,__FUNCTION__); //--- return(SocketIsWritable(m_socket)); } //+------------------------------------------------------------------+ //|isconnected() | //+------------------------------------------------------------------+ bool CSocket::IsConnected(void) { if(m_socket==INVALID_HANDLE) { Log("Invalid socket",__LINE__,__FUNCTION__); return(false); } //--- Log("Is Socket Connected ",__LINE__,__FUNCTION__); //--- return(SocketIsConnected(m_socket)); } //+------------------------------------------------------------------+ //|read() | //+------------------------------------------------------------------+ int CSocket::Read(uchar& out[],uint out_len,uint ms_timeout,bool read_available=false) { if(m_socket==INVALID_HANDLE) { Log("Invalid socket",__LINE__,__FUNCTION__); return(-1); } //--- Log("Reading from "+m_address,__LINE__,__FUNCTION__); if(m_usetls) { if(read_available) return(SocketTlsReadAvailable(m_socket,out,out_len)); else return(SocketTlsRead(m_socket,out,out_len)); } else return(SocketRead(m_socket,out,out_len,ms_timeout)); return(-1); } //+------------------------------------------------------------------+ //|send() | //+------------------------------------------------------------------+ int CSocket::Send(uchar& in[],uint in_len) { if(m_socket==INVALID_HANDLE) { Log("Invalid socket",__LINE__,__FUNCTION__); return(-1); } //--- Log("Sending to "+m_address,__LINE__,__FUNCTION__); //--- if(m_usetls) return(SocketTlsSend(m_socket,in,in_len)); else return(SocketSend(m_socket,in,in_len)); //--- return(-1); } //+------------------------------------------------------------------+ //|log() | //+------------------------------------------------------------------+ void CSocket::Log(const string custom_message,const int line,const string func) { if(m_log) { //--- int eid=GetLastError(); //--- if(eid!=0) { PrintFormat("[MQL error ID: %d][%s][Line: %d][Function: %s]",eid,custom_message,line,func); ResetLastError(); return; } if(custom_message!="") PrintFormat("[%s][Line: %d][Function: %s]",custom_message,line,func); } //--- } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
socket 类定义了封装服务器证书信息的结构 struct CERT。
获取私有属性的方法:
SocketID — 返回成功创建的套接字的套接字句柄。
Address — 以字符串形式返回套接字连接到的远程地址
Port — 返回活动套接字连接到的远程端口
IsSecure — 根据套接字是否启用了TLS安全性,返回true或false。
RxTimeout — 返回从套接字读取的设置超时(毫秒)。
TxTimeout — 返回设置写入套接字的超时(毫秒)
ServerCertificate — 返回套接字连接到的服务器的服务器证书信息。
SocketsInUse — 返回单个程序中当前正在使用的套接字总数。
设置私有属性的方法。
SetTimeouts — 设置读取和写入套接字的超时(毫秒)。
套接字操作的一般方法
Log — 用于记录套接字活动的方法。使用Open方法初始化套接字时,必须设置终端日志记录的输出消息。
Open — 用于建立到远程服务器的连接从而创建新套接字的方法。方法
Close - 断开与远程服务器的连接并解除套接字初始化的方法。
Readable — 返回套接字上可读取的字节数
Writable — 查询套接字是否可用于任何发送操作。
IsConnected — 检查套接字连接是否仍处于活动状态。
Read — 从套接字读取数据
Send — 在活动套接字上执行发送操作的方法。
//+------------------------------------------------------------------+ //| enums | //+------------------------------------------------------------------+ enum ENUM_FRAME_TYPE // type of websocket frames (ie, message types) { CONTINUATION_FRAME=0x0, TEXT_FRAME=0x1, BINARY_FRAME= 0x2, CLOSE_FRAME = 8, PING_FRAME = 9, PONG_FRAME = 0xa, }; //+------------------------------------------------------------------+ //| class frame | //| represents a websocket message frame | //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| | //+------------------------------------------------------------------+ class CFrame { private: uchar m_array[]; uchar m_isfinal; ENUM_FRAME_TYPE m_msgtype; int Resize(int size) {return(ArrayResize(m_array,size,size));} public: CFrame():m_isfinal(0),m_msgtype(0) { } ~CFrame() { } int Size(void) {return(ArraySize(m_array));} bool Add(const uchar value); bool Fill(const uchar &array[],const int src_start,const int count); void Reset(void); uchar operator[](int index); string ToString(void); ENUM_FRAME_TYPE MessageType(void) { return(m_msgtype);} bool IsFinal(void) { return(m_isfinal==1);} void SetMessageType(ENUM_FRAME_TYPE mtype) { m_msgtype=mtype;} void SetFinal(void) { m_isfinal=1;} }; //+------------------------------------------------------------------+ //| Receiving an element by index | //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| | //+------------------------------------------------------------------+ uchar CFrame::operator[](int index) { static uchar invalid_value; //--- int max=ArraySize(m_array)-1; if(index<0 || index>=ArraySize(m_array)) { PrintFormat("%s index %d is not in range (0-%d)!",__FUNCTION__,index,max); return(invalid_value); } //--- return(m_array[index]); } //+------------------------------------------------------------------+ //| Adding element | //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| | //+------------------------------------------------------------------+ bool CFrame::Fill(const uchar &array[],const int src_start,const int count) { int p_size=Size(); //--- int size=Resize(p_size+count); //--- if(size>0) return(ArrayCopy(m_array,array,p_size,src_start,count)==count); else return(false); //--- } //+------------------------------------------------------------------+ //| Assigning for the array | //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| | //+------------------------------------------------------------------+ bool CFrame::Add(const uchar value) { int size=Resize(Size()+1); //--- if(size>0) m_array[size-1]=value; else return(false); //--- return(true); //--- } //+------------------------------------------------------------------+ //| Reset | //+------------------------------------------------------------------+ //+------------------------------------------------------------------+ //| | //+------------------------------------------------------------------+ void CFrame::Reset(void) { if(Size()) ArrayFree(m_array); //--- m_isfinal=0; m_msgtype=0; } //+------------------------------------------------------------------+ //|converting array to string | //+------------------------------------------------------------------+ string CFrame::ToString(void) { if(Size()) if(m_msgtype==CLOSE_FRAME) return(CharArrayToString(m_array,2,WHOLE_ARRAY,CP_UTF8)); else return(CharArrayToString(m_array,0,WHOLE_ARRAY,CP_UTF8)); else return(NULL); }
CFrame 类定义了枚举 ENUM_FRAME_TYPE,它描述 WebSocket 协议记录的不同帧类型。
CFrame类的实例表示从服务器接收的单个帧。这意味着完整的消息可以由一组帧组成。该类使能够查询每个帧的各种特征,包括组成一个帧的各个字节值。
Size方法返回帧的字节大小。因为类使用类型为 unsigned character 的数组作为帧的容器,此方法仅返回基础数组的大小。
MessageType 方法将帧的类型返回为 ENUM_FRAME_TYPE 类型
IsFinal方法是检查帧是不是最后一帧,这意味着接收到的任何数据都应该被假定为完整的,这样就可以区分一条支离破碎的、因此不完整的消息和一条完整的消息。
operator[] - 下标运算符重载允许以数组格式单独检索帧中的任何元素。
CFrame 类将在 WebSocket 客户机中用于从 CSocket 对象读取数据。用于填充数据帧的方法是 Add 和 Fill。允许通过单个元素或使用适当的数组填充帧。
实用方法 Reset 可用于刷新帧并重置其属性,而ToString方法是将帧内容转换为熟悉的字符串值的方便工具。
该类具有使用 #define 实现的常量。HEADER 前缀符号与创建开始握手所需的 http 头字段相关联。GUID 是服务器端的 WebSocket 协议在生成部分响应头时使用的全局唯一标识符。我们的类使用它来确认并演示握手过程的正确性,但本质上没有必要,客户端只需要检查|Sec-WebSocket-Accept|头部字段是否存在就可以确认握手成功。
#include <Socket.mqh> #include <Frame.mqh> //+------------------------------------------------------------------+ //| defines | //+------------------------------------------------------------------+ #define SH1 "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" #define HEADER_EOL "\r\n" #define HEADER_GET "GET /" #define HEADER_HOST "Host: " #define HEADER_UPGRADE "Upgrade: websocket"+HEADER_EOL #define HEADER_CONNECTION "Connection: Upgrade"+HEADER_EOL #define HEADER_KEY "Sec-WebSocket-Key: " #define HEADER_WS_VERSION "Sec-WebSocket-Version: 13"+HEADER_EOL+HEADER_EOL #define HEADER_HTTP " HTTP/1.1"
枚举类型 ENUM_STATUS_CLOSE_CODE 列出了可以与关闭帧一起发送或接收的关闭代码。enum ENUM_WEBSOCKET_CLIENT_STATE 表示 WebSocket 可以采用的不同状态。
Closed 是在为客户机分配任何套接字之前,或在客户机断开连接并关闭基础套接字之后的初始状态(initial state)。
当在发送开始握手(报头)之前建立初始连接时,客户端被称为处于连接状态(connecting)。一旦开始握手被发送并且接收到允许使用 WebSocket协议的响应,那么客户端就被连接(connected)。
当客户机自客户机初始化以来第一次接收到关闭帧,或者客户机发送第一个关闭帧来通知服务器它正在删除连接时,关闭(closing)状态就会出现。在关闭状态下,客户端只能向服务器发送关闭帧,任何发送任何其他类型帧的尝试都将失败。请记住,在关闭状态下,服务器可能没有响应,因为它没有义务在发送或接收到关闭通知后继续服务。
//+------------------------------------------------------------------+ //| | //+------------------------------------------------------------------+ enum ENUM_CLOSE_CODE // possible reasons for disconnecting sent with a close frame { NORMAL_CLOSE = 1000, // normal closure initiated by choice GOING_AWAY_CLOSE, // close code for client navigating away from end point, used in browsers PROTOCOL_ERROR_CLOSE, // close caused by some violation of a protocol, usually application defined FRAME_TYPE_ERROR_CLOSE, // close caused by an endpoint receiving frame type that is not supportted or allowed UNDEFINED_CLOSE_1, // close code is not defined by websocket protocol UNUSED_CLOSE_1, // unused UNUSED_CLOSE_2, // values ENCODING_TYPE_ERROR_CLOSE, // close caused data in message is of wrong encoding type, usually referring to strings APP_POLICY_ERROR_CLOSE, // close caused by violation of user policy MESSAGE_SIZE_ERROR_CLOSE, // close caused by endpoint receiving message that is too large EXTENSION_ERROR_CLOSE, // close caused by non compliance to or no support for specified extension of websocket protocol SERVER_SIDE_ERROR_CLOSE, // close caused by some error that occurred on the server UNUSED_CLOSE_3 = 1015, // unused }; //+------------------------------------------------------------------+ //| | //+------------------------------------------------------------------+ enum ENUM_WEBSOCKET_STATE { CLOSED=0, CLOSING, CONNECTING, CONNECTED };
ClientState 方法检索定义任何给定 WebSocket 客户端的连接状态的属性。
//+------------------------------------------------------------------+ //| ClientState() | //+------------------------------------------------------------------+ ENUM_WEBSOCKET_STATE CWebSocketClient::ClientState(void) { if(m_socket.IsConnected()) return(m_wsclient); //--- if(m_wsclient!=CLOSED) { m_socket.Close(); m_wsclient=CLOSED; } //--- return(m_wsclient); }
SetMaxSendSize () 用于配置 WebSocket 客户端的帧片段的特性。此方法设置从客户端发送到服务器的单个帧的最大大小(以字节为单位)。使客户端灵活地与任何强制执行帧大小限制的 API 一起使用。
void SetMaxSendSize(int maxsend) {if(maxsend>=0) m_maxsendsize=maxsend; else m_maxsendsize=0; }
Connect 方法用于建立 WebSocket 连接。secure参数是一个布尔值,用于配置是否具有TLS的 WebSocket。该方法首先调用 CSocket 类的 Open 方法来建立初始TCP连接。一旦成功,WebSocket 的状态将变为 connecting,之后 upgrade 辅助方法将发挥作用。它的职责是创建切换到 WebSocket 协议所需的 Http 报头。最后在函数退出时检查 WebSocket 的状态。
//+------------------------------------------------------------------+ //| Connect(): Used to establish connection to websocket server | //+------------------------------------------------------------------+ bool CWebSocketClient::Connect(const string url,const uint port,const uint timeout,bool use_tls=false,bool enablelog=false) { reset(); //--- m_timeout=timeout; //--- if(!m_socket.Open(url,port,m_timeout,use_tls,enablelog)) { m_socket.Log("Connect error",__LINE__,__FUNCTION__); return(false); } else m_wsclient=CONNECTING; //--- if(!upgrade()) return(false); //--- m_socket.Log("ws client state "+EnumToString(m_wsclient),__LINE__,__FUNCTION__); //--- if(m_wsclient!=CONNECTED) { m_wsclient=CLOSED; m_socket.Close(); reset(); } //--- return(m_wsclient==CONNECTED); }
要关闭或删除连接,使用 ClientClose 方法。它有两个默认参数,close 代码和一个消息体,该消息体将作为 close 帧发送到服务器。如果消息正文大于122个字符的限制,它将被截断。根据 WebSocket 规范,如果任一端点(服务器或客户端)第一次接收到关闭帧,则接收方应该响应,发送方应该期望响应作为关闭请求的确认。从 ClientClose 代码可以看出,一旦发送了close帧,底层TCP套接字就关闭了,而不必等待响应,即使关闭是由客户机发起的。在客户机生命周期的这个关头等待响应看起来像是浪费资源,所以它没有实现。
//+------------------------------------------------------------------+ //| Close() inform server client is disconnecting | //+------------------------------------------------------------------+ bool CWebSocketClient::Close(ENUM_CLOSE_CODE close_code=NORMAL_CLOSE,const string close_reason="") { ClientState(); //--- if(m_wsclient==0) { m_socket.Log("Client Disconnected",__LINE__,__FUNCTION__); //--- return(true); } //--- if(ArraySize(m_txbuf)<=0) { if(close_reason!="") { int len=StringToCharArray(close_reason,m_txbuf,2,120,CP_UTF8)-1; if(len<=0) return(false); else ArrayRemove(m_txbuf,len,1); } else { if(ArrayResize(m_txbuf,2)<=0) { m_socket.Log("array resize error",__LINE__,__FUNCTION__); return(false); } } m_txbuf[0]=(uchar)(close_code>>8) & 0xff; m_txbuf[1]=(uchar)(close_code>>0) & 0xff; //--- } //--- m_msgsize=ArraySize(m_txbuf); m_sent=false; //--- send(CLOSE_FRAME); //--- m_socket.Close(); //--- reset(); //--- return(true); //--- }
向服务器发送任意数据时,可以选择两种方法。SendString 接受一个字符串,SendData 接受一个数组作为输入。
SendPing 和 SendPong 是发送 ping 和 pong 的特殊方法。两者都允许应用122字符限制的可选消息体。
所有的公有 send 方法都将它们各自的输入打包到 m_txbuff 数组中。私有send方法设置帧的类型,并使用 fillTxBuffer()根据 m_maxsendsize 属性的值启用消息分段。fillTxBuffer() 准备单个帧,将其打包到数组 m_send 中,一旦 m_send 准备好发送,它就被发送到服务器。所有这一切都是在一个循环中完成的,直到 m_txbuffer 的所有内容都已发送。
//+------------------------------------------------------------------+ //| Send() sends text data to websocket server | //+------------------------------------------------------------------+ int CWebSocketClient::SendString(const string message) { ClientState(); //--- if(m_wsclient==CLOSED || m_wsclient==CLOSING) { m_socket.Log("invalid ws client handle",__LINE__,__FUNCTION__); return(0); } //--- if(message=="") { m_socket.Log("no message specified",__LINE__,__FUNCTION__); return(0); } //--- int len=StringToCharArray(message,m_txbuf,0,WHOLE_ARRAY,CP_UTF8)-1; if(len<=0) { m_socket.Log("string char array error",__LINE__,__FUNCTION__); return(0); } else ArrayRemove(m_txbuf,len,1); //--- m_msgsize=ArraySize(m_txbuf); m_sent=false; //--- return(send(TEXT_FRAME)); } //+------------------------------------------------------------------+ //| Send() sends user supplied array buffer | //+------------------------------------------------------------------+ int CWebSocketClient::SendData(uchar &message_buffer[]) { ClientState(); //--- if(m_wsclient==CLOSED || m_wsclient==CLOSING) { m_socket.Log("invalid ws client handle",__LINE__,__FUNCTION__); return(0); } //--- if(ArraySize(message_buffer)==0) { m_socket.Log("array is empty",__LINE__,__FUNCTION__); return(0); } //--- if(ArrayResize(m_txbuf,ArraySize(message_buffer))<0) { m_socket.Log("array resize error",__LINE__,__FUNCTION__); return(0); } else ArrayCopy(m_txbuf,message_buffer); //--- m_msgsize=ArraySize(m_txbuf); m_sent=false; //--- return(send(BINARY_FRAME)); } //+------------------------------------------------------------------+ //| SendPong() sends pong response upon receiving ping | //+------------------------------------------------------------------+ int CWebSocketClient::SendPong(const string msg="") { ClientState(); //--- if(m_wsclient==CLOSED || m_wsclient==CLOSING) { m_socket.Log("invalid ws client handle",__LINE__,__FUNCTION__); return(0); } //--- if(ArraySize(m_txbuf)<=0) { if(msg!="") { int len=StringToCharArray(msg,m_txbuf,0,122,CP_UTF8)-1; if(len<=0) { m_socket.Log("string to char array error",__LINE__,__FUNCTION__); return(0); } else ArrayRemove(m_txbuf,len,1); } } //--- m_msgsize=ArraySize(m_txbuf); m_sent=false; //--- return(send(PONG_FRAME)); } //+------------------------------------------------------------------+ //| SendPing() ping the server | //+------------------------------------------------------------------+ int CWebSocketClient::SendPing(const string msg="") { ClientState(); //--- if(m_wsclient==CLOSED || m_wsclient==CLOSING) { m_socket.Log("invalid ws client handle",__LINE__,__FUNCTION__); return(0); } //--- if(ArraySize(m_txbuf)<=0) { if(msg!="") { int len=StringToCharArray(msg,m_txbuf,0,122,CP_UTF8)-1; if(len<=0) { m_socket.Log("string to char array error",__LINE__,__FUNCTION__); return(0); } else ArrayRemove(m_txbuf,len,1); } } //--- m_msgsize=ArraySize(m_txbuf); m_sent=false; //--- return(send(PING_FRAME)); }
//+------------------------------------------------------------------+ //|prepareSendBuffer()prepares array buffer for socket dispatch | //+------------------------------------------------------------------+ bool CWebSocketClient::fillTxBuffer(ENUM_FRAME_TYPE ftype) { uchar header[]; static int it; static int start; uchar masking_key[4]={0}; int maxsend=(m_maxsendsize<7)?m_msgsize:((m_maxsendsize<126)?m_maxsendsize-6:((m_maxsendsize<65536)?m_maxsendsize-8:m_maxsendsize-14)); //--- for(int i=0; i<4; i++) { masking_key[i]=(uchar)(255*MathRand()/32767); } //--- m_socket.Log("[send]max size - "+IntegerToString(maxsend),__LINE__,__FUNCTION__); m_socket.Log("[send]should be max size - "+IntegerToString(m_maxsendsize),__LINE__,__FUNCTION__); int message_size=(((start+maxsend)-1)<=(m_msgsize-1))?maxsend:m_msgsize%maxsend; bool isfinal=((((start+maxsend)-1)==(m_msgsize-1)) || (message_size<maxsend) ||(message_size<=0))?true:false; bool isfirst=(start==0)?true:false; //--- m_socket.Log("[send]message size - "+IntegerToString(message_size),__LINE__,__FUNCTION__); if(isfirst) m_socket.Log("[send]first frame",__LINE__,__FUNCTION__); if(isfinal) m_socket.Log("[send]final frame",__LINE__,__FUNCTION__); //--- if(ArrayResize(header,2+(message_size>=126 ? 2 : 0)+(message_size>=65536 ? 6 : 0)+(4))<0) { m_socket.Log("array resize error",__LINE__,__FUNCTION__); return(false); } //header[0] = (isfinal)? (0x80 | 0x1) :( ); switch(ftype) { case CLOSE_FRAME: header[0]=uchar(0x80|CLOSE_FRAME); m_socket.Log("[building]close frame",__LINE__,__FUNCTION__); break; case PING_FRAME: header[0]=uchar(0x80|PING_FRAME); m_socket.Log("[building]ping frame",__LINE__,__FUNCTION__); break; case PONG_FRAME: header[0]=uchar(0x80|PONG_FRAME); m_socket.Log("[building]pong frame",__LINE__,__FUNCTION__); break; default: header[0]=(isfinal)? 0x80:0x0; m_socket.Log("[building]"+EnumToString(ftype),__LINE__,__FUNCTION__); if(isfirst) header[0]|=uchar(ftype); break; } //--- if(message_size<126) { header[1] = (uchar)(message_size & 0xff) | 0x80; header[2] = masking_key[0]; header[3] = masking_key[1]; header[4] = masking_key[2]; header[5] = masking_key[3]; } else if(message_size<65536) { header[1] = 126 | 0x80; header[2] = (uchar)(message_size >> 8) & 0xff; header[3] = (uchar)(message_size >> 0) & 0xff; header[4] = masking_key[0]; header[5] = masking_key[1]; header[6] = masking_key[2]; header[7] = masking_key[3]; } else { header[1] = 127 | 0x80; header[2] = (uchar)(message_size >> 56) & 0xff; header[3] = (uchar)(message_size >> 48) & 0xff; header[4] = (uchar)(message_size >> 40) & 0xff; header[5] = (uchar)(message_size >> 32) & 0xff; header[6] = (uchar)(message_size >> 24) & 0xff; header[7] = (uchar)(message_size >> 16) & 0xff; header[8] = (uchar)(message_size >> 8) & 0xff; header[9] = (uchar)(message_size >> 0) & 0xff; header[10] = masking_key[0]; header[11] = masking_key[1]; header[12] = masking_key[2]; header[13] = masking_key[3]; } //--- if(ArrayResize(m_send,ArraySize(header),message_size)<0) { m_socket.Log("array resize error",__LINE__,__FUNCTION__); return(false); } //--- ArrayCopy(m_send,header,0,0); //--- if(message_size) { if(ArrayResize(m_send,ArraySize(header)+message_size)<0) { m_socket.Log("array resize error",__LINE__,__FUNCTION__); return(false); } //--- ArrayCopy(m_send,m_txbuf,ArraySize(header),start,message_size); //--- int bufsize=ArraySize(m_send); //--- int message_offset=bufsize-message_size; //--- for(int i=0; i<message_size; i++) { m_send[message_offset+i]^=masking_key[i&0x3]; } } //--- if(isfinal) { it=0; start=0; m_sent=true; ArrayFree(m_txbuf); } else { it++; start=it*maxsend; } //--- return(true); }
//+------------------------------------------------------------------+ //|int sendMessage() helper | //+------------------------------------------------------------------+ int CWebSocketClient::send(ENUM_FRAME_TYPE frame_type) { //--- bool done=false; int bytes_sent=0,sum_sent=0; while(!m_sent) { done=fillTxBuffer(frame_type); if(done && m_socket.Writable()) { bytes_sent=m_socket.Send(m_send,(uint)ArraySize(m_send)); //--- if(bytes_sent<0) break; else { sum_sent+=bytes_sent; ArrayFree(m_send); } //--- } else break; } //--- if(ArraySize(m_send)>0) ArrayFree(m_send); //--- m_socket.Log("",__LINE__,__FUNCTION__); //--- return(sum_sent); }
无论何时调用 Readable() 共有方法,发送到客户机的任何数据都会被 fillRxBuffer()私有方法缓冲到 m_rxbuff 数组中。它通过对 Read()方法调用返回表示可检索数据可用性的 m_rxbuff 数组的大小。
//+------------------------------------------------------------------+ //| receiver()fills rxbuf with raw message | //+------------------------------------------------------------------+ int CWebSocketClient::fillRxBuffer(void) { uint leng=0; int rsp_len=-1; //--- uint timeout_check=GetTickCount()+m_timeout; //--- do { leng=m_socket.Readable(); if(leng) rsp_len+=m_socket.Read(m_rxbuf,leng,m_timeout); leng=0; } while(GetTickCount()<timeout_check); //--- m_socket.Log("receive size "+IntegerToString(rsp_len),__LINE__,__FUNCTION__); //--- int m_rxsize=ArraySize(m_rxbuf); //--- if(m_rxsize<3) return(0); //--- switch((uint)m_rxbuf[1]) { case 126: if(m_rxsize<4) { m_rxsize=0; } break; case 127: if(m_rxsize<10) { m_rxsize=0; } break; default: break; } //--- return(m_rxsize); }
int Readable(void) { return(fillRxBuffer());}
Read()方法接受CFrame类型的数组作为输入,所有帧都将写入该数组。该方法使用私有函数 parse() 对字节数据进行解码,这样就可以正确地组织字节数据以提高可读性。parse() 方法将载荷与头部字节分离,头字节对刚刚接收到的帧的描述性信息进行编码。
//+------------------------------------------------------------------+ //| parse() cleans up raw data buffer discarding unnecessary elements| //+------------------------------------------------------------------+ bool CWebSocketClient::parse(CFrame &out[]) { uint i,data_len=0,frames=0; uint s=0; m_total_len=0; //--- int shift=0; for(i=0; i<(uint)ArraySize(m_rxbuf); i+=(data_len+shift)) { ++frames; m_socket.Log("value of frame is "+IntegerToString(frames)+" Value of i is "+IntegerToString(i),__LINE__,__FUNCTION__); switch((uint)m_rxbuf[i+1]) { case 126: data_len=((uint)m_rxbuf[i+2]<<8)+((uint)m_rxbuf[i+3]); shift=4; break; case 127: data_len=((uint)m_rxbuf[i+2]<<56)+((uint)m_rxbuf[i+3]<<48)+((uint)m_rxbuf[i+4]<<40)+ ((uint)m_rxbuf[i+5]<<32)+((uint)m_rxbuf[i+6]<<24)+((uint)m_rxbuf[i+7]<<16)+ ((uint)m_rxbuf[i+8]<<8)+((uint)m_rxbuf[i+9]); shift=10; break; default: data_len=(uint)m_rxbuf[i+1]; shift=2; break; } m_total_len+=data_len; if(data_len>0) { if(ArraySize(out)<(int)frames) { if(ArrayResize(out,frames,1)<=0) { m_socket.Log("array resize error",__LINE__,__FUNCTION__); return(false); } } //--- if(!out[frames-1].Fill(m_rxbuf,i+shift,data_len)) { m_socket.Log("Error adding new frame",__LINE__,__FUNCTION__); return(false); } //--- switch((uchar)m_rxbuf[i]) { case 0x1: if(out[frames-1].MessageType()==0) out[frames-1].SetMessageType(TEXT_FRAME); break; case 0x2: if(out[frames-1].MessageType()==0) out[frames-1].SetMessageType(BINARY_FRAME); break; case 0x80: case 0x81: if(out[frames-1].MessageType()==0) out[frames-1].SetMessageType(TEXT_FRAME); case 0x82: if(out[frames-1].MessageType()==0) out[frames-1].SetMessageType(BINARY_FRAME); m_socket.Log("received last frame",__LINE__,__FUNCTION__); out[frames-1].SetFinal(); break; case 0x88: m_socket.Log("received close frame",__LINE__,__FUNCTION__); out[frames-1].SetMessageType(CLOSE_FRAME); if(m_wsclient==CONNECTED) { ArrayCopy(m_txbuf,m_rxbuf,0,i+shift,data_len); m_wsclient=CLOSING; } break; case 0x89: m_socket.Log("received ping frame",__LINE__,__FUNCTION__); out[frames-1].SetMessageType(PING_FRAME); if(m_wsclient==CONNECTED) ArrayCopy(m_txbuf,m_rxbuf,0,i+shift,data_len); break; case 0x8a: m_socket.Log("received pong frame",__LINE__,__FUNCTION__); out[frames-1].SetMessageType(PONG_FRAME); break; default: break; } } } //--- return(true); }
uint CWebSocketClient::Read(CFrame &out[]) { ClientState(); //--- if(m_wsclient==0) { m_socket.Log("invalid ws client handle",__LINE__,__FUNCTION__); return(0); } //--- int rx_size=ArraySize(m_rxbuf); //--- if(rx_size<=0) { m_socket.Log("receive buffer is empty, Make sure to call Readable first",__LINE__,__FUNCTION__); return(0); } //---clean up rxbuf if(!parse(out)) { ArrayFree(m_rxbuf); return(0); } //--- ArrayFree(m_rxbuf); //--- return(m_total_len); }
定义了 websocket 类之后,让我们看看如何在 mt5 程序中使用它。在开始开发实现此类的任何应用程序之前,必须首先在终端设置中的允许端点列表中输入要访问的远程服务器的地址。
请记住要 include WebSocketClient.mqh,然后执行以下步骤:
CWebSocketClient wsc;
如果要为与连接相关的所有发送操作指定最大发送大小,现在正是这样做的最佳时机。在实例初始化时,m_maxsendsize 为 0,表示没有任何帧大小限制。
wsc.SetMaxSendSize(129); // max size in bytes set
if(wsc.Connect(Address,Port,Timeout,usetls,true)) { //// }
如果连接成功,您可以开始发送或检查任何接收到的消息。您可以使用用于先前准备的数组的 SendData 方法发送数据。
sent=wsc.SendString("string message");
// or
// prepare and fill arbitrary array[] with data and send
sent=wsc.SendData(array);
或者如果您只想发送一个字符串消息,请使用 SendString 方法。
sent=wsc.SendPing("optional message");
还可以向服务器发送 ping,ping 还可以附带一些消息。在ping服务器之后等待响应时,pong responce 帧应该回显 ping 发送的内容。如果客户端接收到来自服务器的 ping,它也必须执行相同的操作。
if(wsc.Readable()>0) { //read message.... //declare frame object to receive message // and pass it to read method. CFrame msg_frames[]; received=wsc.Read(msg_frames); Print(msg_frames[0].ToString()); if(msg_frames[0].IsFinal()) { Print("\n Final frame received"); }
对于接收,使用 readable 方法检查从套接字读取的任何数据。如果方法指示可读套接字,则使用帧类型的对象数组调用客户机 read 方法。然后,websocket 将通过的所有消息片段写入对象数组。在这里,您可以使用 frame type 方法来查询 frame 数组的内容。如前所述,如果接收到的帧之一是 ping 帧,建议尽快用 pong 响应。为了满足这个要求,websocket 客户端将在收到任何ping时创建一个 pong responce 框架,用户所要做的就是调用 send ping 方法而不带任何参数。
如果接收到的某个帧是关闭帧,则 websocket 客户端的状态将更改为关闭状态,这意味着服务器发送了关闭请求,并准备断开与此客户端的连接。当处于关闭状态时,发送操作会受到限制。客户端只能发送强制关闭响应帧,同样,正如接收到 ping 帧一样,接收关闭帧会看到 websocket 客户端创建一个准备发送的关闭帧。
wsc.Close(NORMAL_CLOSE,"good bye"); // can also be called with out any arguments. // wsc.Close();
当使用 websocket 客户机调用连接的 close 方法时,通常只要调用该方法而不指定任何参数就足够了,除非您希望通知服务器有关的内容。在这种情况下,您可以使用一个关闭代码的原因以及一个简短的分手信息。此消息将强制限制为122个字符,超出此限制的任何内容都将被丢弃。
出于测试目的,本文附带的 zip 文件包括一个提供 echo 服务的 websocket 服务器。服务器是使用 libwebsocket 库构建的,源代码可以在 github 上下载。要构建它,只需要 Visual Studio,因为 github 存储库中提供了所有其他依赖项。
要运行echo服务器,只需双击应用程序(exe文件),服务器应该就开始工作了。请注意,已安装的防火墙可能会阻止服务器,因此还需授予它必要的权限。同样重要的是要知道,服务器应用程序目录中包含的附带的 .dll 文件是必需的,如果没有这些文件,服务器将无法运行。
让我们快速测试 WebSocketClient 类。下面是一个示例程序。
//+------------------------------------------------------------------+ //| Websocketclient_test.mq5 | //| Copyright 2019, MetaQuotes Software Corp. | //| https://www.mql5.com | //+------------------------------------------------------------------+ #property copyright "Copyright 2019, MetaQuotes Software Corp." #property link "https://www.mql5.com" #property version "1.00" #property strict #include<WebSocketClient.mqh> input string Address="127.0.0.1"; input int Port =7681; input bool ExtTLS =false; input int MaxSize=256; input int Timeout=5000; //+------------------------------------------------------------------+ //| | //+------------------------------------------------------------------+ string _msg="For the mql5-program to operate, it must be compiled (Compile button or F7 key). Compilation should" "pass without errors (some warnings are possible; they should be analyzed). At this process, an" "executable file with the same name and with EX5 extension must be created in the corresponding" "directory, terminal_dir\\MQL5\\Experts, terminal_dir\\MQL5\\indicators or terminal_dir\\MQL5\\scripts." "This file can be run." "Operating features of MQL5 programs are described in the following sections:" "- Program running – order of calling predefined event-handlers." "- Testing trading strategies – operating features of MQL5 programs in the Strategy Tester." "- Client terminal events – description of events, which can be processed in programs." "- Call of imported functions – description order, allowed parameters, search details and call agreement" "for imported functions." "· Runtime errors – getting information about runtime and critical errors." "Expert Advisors, custom indicators and scripts are attached to one of opened charts by Drag'n'Drop" "method from the Navigator window." "For an expert Advisor to stop operating, it should be removed from a chart. To do it select 'Expert'" "'list' in chart context menu, then select an Expert Advisor from list and click 'Remove' button." "Operation of Expert Advisors is also affected by the state of the 'AutoTrading' button." "In order to stop a custom indicator, it should be removed from a chart." "Custom indicators and Expert Advisors work until they are explicitly removed from a chart;" "information about attached Expert Advisors and Indicators is saved between client terminal sessions." "Scripts are executed once and are deleted automatically upon operation completion or change of the" "current chart state, or upon client terminal shutdown. After the restart of the client terminal scripts" "are not started, because the information about them is not saved." "Maximum one Expert Advisor, one script and unlimited number of indicators can operate in one chart." "Services do not require to be bound to a chart to work and are designed to perform auxiliary functions." "For example, in a service, you can create a custom symbol, open its chart, receive data for it in an" "endless loop using the network functions and constantly update it." "Each script, each service and each Expert Advisor runs in its own separate thread. All indicators" "calculated on one symbol, even if they are attached to different charts, work in the same thread." "Thus, all indicators on one symbol share the resources of one thread." "All other actions associated with a symbol, like processing of ticks and history synchronization, are" "also consistently performed in the same thread with indicators. This means that if an infinite action is" "performed in an indicator, all other events associated with its symbol will never be performed." "When running an Expert Advisor, make sure that it has an actual trading environment and can access" "the history of the required symbol and period, and synchronize data between the terminal and the" "server. For all these procedures, the terminal provides a start delay of no more than 5 seconds, after" "which the Expert Advisor will be started with available data. Therefore, in case there is no connection" "to the server, this may lead to a delay in the start of an Expert Advisor."; //--- CWebSocketClient wsc; //--- int sent=-1; uint received=-1; //--- // string subject,issuer,serial,thumbprint; //--- // datetime expiration; //--- //+------------------------------------------------------------------+ //| Expert initialization function | //+------------------------------------------------------------------+ int OnInit() { //--- create timer EventSetTimer(2); //--- wsc.SetMaxSendSize(MaxSize); //--- if(wsc.Connect(Address,Port,Timeout,ExtTLS,true)) { sent=wsc.SendString(_msg); //-- Print("sent data is "+IntegerToString(sent)); //--- } //--- return(INIT_SUCCEEDED); } //+------------------------------------------------------------------+ //| Expert deinitialization function | //+------------------------------------------------------------------+ void OnDeinit(const int reason) { //--- destroy timer EventKillTimer(); Print("Deinit call"); wsc.Close(); } //+------------------------------------------------------------------+ //| Expert tick function | //+------------------------------------------------------------------+ void OnTick() { //--- } //+------------------------------------------------------------------+ //| | //+------------------------------------------------------------------+ void OnTimer() { if(wsc.Readable()>0) { CFrame msg_frames[]; received=wsc.Read(msg_frames); if(received>0) { int ll=ArraySize(msg_frames); Print("number of received frames is "+IntegerToString(ll)); for(int i=0; i<ll; i++) { Print(msg_frames[i].ToString()); } if(msg_frames[ll-1].IsFinal()) { Print("\n Final frame received"); wsc.Close(NORMAL_CLOSE,"good bye"); ExpertRemove(); } } } else { Print("\n Nothing readable in socket"); if(wsc.ClientState()!=CONNECTED) { Print("\n Client disconnected"); ExpertRemove(); } } } //+------------------------------------------------------------------+
此专家顾问连接到本地运行的echo websocket服务器,并立即尝试发送相当大的消息。EA输入允许启用、禁用 tls 和调整发送大小,以查看消息分段机制是如何工作的。在代码中,我将最大消息大小设置为256,因此,每个帧的大小都不超过这个值。
在 OnTimer 函数中,EA 检查来自服务器接收到的消息被输出到 mt5 终端,然后 websocket 连接被断开。在下一个 OnTimer 事件中,如果连接关闭,EA将从图表中删除自己。
以及websocket服务器的输出。
这是连接到服务器时程序运行的视频。
本文首先简要概述了 WebSocket协议,然后详细描述了如何仅使用 MQL5 编程语言在 MetaTrader 5 中实现 WebSocket 客户机。接下来,我们构建了一个服务器,然后用它来测试 MT5 客户机。我希望你会发现这里描述的工具很有用。所有的源代码都可以在下面下载。
所附档案的内容:
文件夹 | 内容 | 描述 |
---|---|---|
MT5zip\server | echo_websocket_server.exe, websockets.dll,ssleay32.dll,libeay32.dll | 服务器应用程序及其所需的依赖库 |
MT5zip\Mql5\include | Frame.mqh, Socket.mqh, WebsocketClient.mqh | 分别包括包含 CFrame 类、CSocket 类和 CWebSocket 类代码的文件 |
MT5zip\Mql5\Experts | Websocketclient_test.mq5 | MetaTrader 专家顾问演示 CWebSocket 类的使用 |
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...
移动端课程