一点悲情
套接字(Sockets)… 如果没有它们,我们的IT世界还可能存在吗?时光倒转回1982年,再到现在,它们每分每秒都与我们同在,这是网络的基础,是我们所居住的 Matrix 世界的神经末梢。
早上,您打开 MetaTrader 终端,它立即就会创建套接字并连接到服务器。您打开浏览器,也会创建和关闭一些套接字连接,把网络上的信息传送给您,或者发送电子邮件,调整时间信号,以及大量的分布式计算,都是需要使用套接字的。
首先,需要一点理论知识,看一看维基百科(Wiki)或者微软开发者网络(MSDN)。里面相应的文章中描述了所有所需的结构和函数,并提供了设置客户端和服务器的实例。
在本文中,将会把这些知识转化为 MQL。
1. 从 WinAPI 移植的结构和函数
WinAPI 是为 C 语言设计的,这不是什么秘密。而 MQL 语言事实上成了它的血缘兄弟(同时在精神上和工作方式上),让我们为这些 WinAPI 函数创建一个 mqh 文件, 它将用于 MQL 主程序,我们工作的顺序是按照需要进行移植。
对于 TCP 客户端,只需要几个函数:
- 使用WSAStartup()初始化函数库;
- 使用socket()创建一个套接字;
- 使用ioctlsocket()设置非足赛模式, 以免在等待数据时会停住;
- 使用connect()连接服务器;
- 使用recv()侦听或者使用send()来发送数据,直到程序结束或者连接断开;
- 在完成工作后使用closesocket()来关闭套接字,并使用WSACleanup()来终止函数库。
TCP 服务器要求类似的函数,一个例外就是它将要绑定一个特定端口并把套接字设置到侦听模式。所需的步骤是:
- 初始化函数库 - WSAStartup();
- 创建一个套接字 - socket();
- 设置为非阻塞模式 - ioctlsocket();
- 绑定到端口 - bind();
- 设置到侦听模式 - listen()
- 成功创建后进行侦听 accept();
- 创建客户端连接并继续操作它们进行 recv()/send(),直到程序结束或者连接断开;
- 完成工作后,关闭服务器侦听的连接客户端的套接字并使用 WSACleanup()终止函数库。
对于 UDP 套接字的状况,步骤会更少一些 (实际上,其中没有客户端和服务器端的 "握手")。UDP 客户端:
- 初始化函数库 - WSAStartup();
- 创建一个套接字 - socket();
- 使用 ioctlsocket() 设置到非阻塞模式, 以使得在等待数据时不会停住;
- 发送 - sendto() /接收 - recvfrom() 数据;
- 完成工作后使用 closesocket() 关闭套接字, 并使用 WSACleanup() 终止函数库.
在 UDP 服务器中只需要加上一个 bind 函数:
- 初始化函数库 - WSAStartup();
- 创建一个套接字 - socket();
- 设置为非阻塞模式 - ioctlsocket();
- 绑定到一个端口 - bind();
- 接收 - recvfrom() / 发送 - sendto();
- 完成工作后,关闭服务器侦听的连接客户端的套接字并使用 WSACleanup()终止函数库。
您可以看到,方法并不是很复杂,只是在调用每个函数的时候需要填充结构。
a) WSAStartup()
在MSDN中参考完整描述:
WINAPI:
int WSAAPI WSAStartup(_In_ WORD wVersionRequested, _Out_ LPWSADATA lpWSAData);
_In_, _Out_ 是空的定义, 只是为了标识出参数的范围。WSAAPI 描述了传递参数的规则,但是出于我们自己的目的考虑,它也可以设为空。您可以从文档中看到, 在第一个参数中还需要设定一个MAKEWORD宏,以及一个指向 LPWSADATA 结构的指针。创建宏并不难,把它从头文件中复制出来就可以了:
#define MAKEWORD(a, b) ((WORD)(((BYTE)(((DWORD_PTR)(a)) & 0xff)) | ((WORD)((BYTE)(((DWORD_PTR)(b)) & 0xff))) << 8))另外,所有的数据类型也能使用 MQL 类型很容易地定义:
#define BYTE uchar #define WORD ushort #define DWORD int #define DWORD_PTR ulong从 MSDN 中复制 WSADATA 结构,大部分数据类型的名称都保留不便,这样易于阅读,特别是它们在上面已经定义好了。
struct WSAData { WORD wVersion; WORD wHighVersion; char szDescription[WSADESCRIPTION_LEN+1]; char szSystemStatus[WSASYS_STATUS_LEN+1]; ushort iMaxSockets; ushort iMaxUdpDg; char lpVendorInfo[]; }请注意,最后一个参数lpVendorInfo在MQL中定义为一个数组 (在 C 语言中它是一个 char* 的指针)。把数组大小的常数定义也移动到定义中。最终,定义此结构的指针为:
#define LPWSADATA char&
为什么这样?很简单,任何结构都没什么特别,它们都是一个有限的内存块,它可以使用任何方式表示 - 例如,可以是另一个相同大小的结构或者相同大小的数组。在此,将使用数组的表示形式,所以,在所有的函数中char&类型都作为数组的地址,并对应着相应大小的所需的结构。在 MQL 中函数的声明看起来如下:
MQL:
int WSAStartup(WORD wVersionRequested, LPWSADATA lpWSAData[]);
函数调用如何把取得的结果传给WSAData结构代码如下:char wsaData[]; // 未来结构的字节数组 ArrayResize(wsaData, sizeof(WSAData)); // 把它的大小设置为结构的大小 WSAStartup(MAKEWORD(2,2), wsaData); // 调用函数
数据将会传给wsaData字节数组,从中可以使用类型转换来简单地收集信息。
希望这个部分不是太难 — 毕竟这只是第一个函数,还有很多工作要做。但是现在基本的原则很清楚了,所以工作会变得更加简单和有趣的。
b) socket()
WINAPI: SOCKET WSAAPI socket(_In_ int af, _In_ int type, _In_ int protocol);
做同样的工作 - 从 MSDN 复制数据。
因为我们使用IPv4的TCP套接字,立即设置此函数参数使用的常数:
#define SOCKET uint #define INVALID_SOCKET (SOCKET)(~0) #define SOCKET_ERROR (-1) #define NO_ERROR 0 #define AF_INET 2 // 互联网家族: UDP, TCP, 等等. #define SOCK_STREAM 1 #define IPPROTO_TCP 6
c) ioctlsocket()
MQL: int ioctlsocket(SOCKET s, int cmd, int &argp);
它最后的参数从指针转变成一个地址:
d) connect()
WINAPI: int connect(_In_ SOCKET s, _In_ const struct sockaddr *name, _In_ int namelen);
传递sockaddr结构会有点小的困难,但是主要原则已经知道了 – 用字节数组替换结构并把数据传到 WinAPI 函数中。
从 MSDN 中取出结构,不需要做修改:
struct sockaddr { ushort sa_family; // 地址家族. char sa_data[14]; // 一共14个字节的直接地址. };我们已经同意,它的指针回使用数组地址来实现:
#define LPSOCKADDR char&MSDN 中的实例使用了sockaddr_in结构,它的大小相同,但是参数声明是不同的:
struct sockaddr_in { short sin_family; ushort sin_port; struct in_addr sin_addr; char sin_zero[8]; };sin_addr的数据是一个'联合(union)', 一种八个字节整数的表现形式:
struct in_addr { ulong s_addr; };这就是MQL中函数声明的结果:
MQL: int connect(SOCKET s, LPSOCKADDR name[], int namelen);
在现阶段,我们已经完全准备好创建一个客户端套接字了,只差一点工作要做了 - 用于接收和发送数据的函数。
原型如下:
WINAPI: int send(_In_ SOCKET s, _In_ const char* buf, _In_ int len, _In_ int flags); int recv(_In_ SOCKET s, _Out_ char* buf, _In_ int len, _In_ int flags); MQL: int send(SOCKET s, char& buf[], int len, int flags); int recv(SOCKET s, char& buf[], int len, int flags);可以看到,第二个参数已经从 char* 指针转换为 char& [] 数组。
f) 针对 UDP 的 recvfrom() and sendto()
在 MQL 中的原型如下:
WINAPI: int recvfrom(_In_ SOCKET s, _Out_ char* buf, _In_ int len, _In_ int flags, _Out_ struct sockaddr *from, _Inout_opt_ int *fromlen); int sendto(_In_ SOCKET s, _In_ const char* buf, _In_ int len, _In_ int flags, _In_ const struct sockaddr *to, _In_ int tolen); MQL: int recvfrom(SOCKET s,char &buf[],int len,int flags,LPSOCKADDR from[],int &fromlen); int sendto(SOCKET s,const char &buf[],int len,int flags,LPSOCKADDR to[],int tolen);
最终还有,两个重要的工作完毕用于清理和关闭句柄的函数:
g) closesocket() 和 WSACleanup()
MQL: int closesocket(SOCKET s); int WSACleanup();
移植过的 WinAPI 函数结果文件:
#define BYTE uchar #define WORD ushort #define DWORD int #define DWORD_PTR ulong #define SOCKET uint #define MAKEWORD(a, b) ((WORD)(((BYTE)(((DWORD_PTR)(a)) & 0xff)) | ((WORD)((BYTE)(((DWORD_PTR)(b)) & 0xff))) << 8)) #define WSADESCRIPTION_LEN 256 #define WSASYS_STATUS_LEN 128 #define INVALID_SOCKET (SOCKET)(~0) #define SOCKET_ERROR (-1) #define NO_ERROR 0 #define SOMAXCONN 128 #define AF_INET 2 // 互联网家族: UDP, TCP, 等等. #define SOCK_STREAM 1 #define IPPROTO_TCP 6 #define SD_RECEIVE 0x00 #define SD_SEND 0x01 #define SD_BOTH 0x02 #define IOCPARM_MASK 0x7f /* 参数必须 < 128 个字节 */ #define IOC_IN 0x80000000 /* 复制输入参数 */ #define _IOW(x,y,t) (IOC_IN|(((int)sizeof(t)&IOCPARM_MASK)<<16)|((x)<<8)|(y)) #define FIONBIO _IOW('f', 126, int) /* 设置/清除 非阻塞 i/o */ //------------------------------------------------------------------ struct WSAData struct WSAData { WORD wVersion; WORD wHighVersion; char szDescription[WSADESCRIPTION_LEN+1]; char szSystemStatus[WSASYS_STATUS_LEN+1]; ushort iMaxSockets; ushort iMaxUdpDg; char lpVendorInfo[]; }; #define LPWSADATA char& //------------------------------------------------------------------ struct sockaddr_in struct sockaddr_in { ushort sin_family; ushort sin_port; ulong sin_addr; //struct in_addr { ulong s_addr; }; char sin_zero[8]; }; //------------------------------------------------------------------ struct sockaddr struct sockaddr { ushort sa_family; // 地址家族. char sa_data[14]; // 14个字节大小的直接地址. }; #define LPSOCKADDR char& struct ref_sockaddr { char ref[2+14]; }; //------------------------------------------------------------------ import Ws2_32.dll #import "Ws2_32.dll" int WSAStartup(WORD wVersionRequested,LPWSADATA lpWSAData[]); int WSACleanup(); int WSAGetLastError(); ushort htons(ushort hostshort); ulong inet_addr(char& cp[]); string inet_ntop(int Family,ulong &pAddr,char &pStringBuf[],uint StringBufSize); ushort ntohs(ushort netshort); SOCKET socket(int af,int type,int protocol); int ioctlsocket(SOCKET s,int cmd,int &argp); int shutdown(SOCKET s,int how); int closesocket(SOCKET s); // 服务器端函数 int bind(SOCKET s,LPSOCKADDR name[],int namelen); int listen(SOCKET s,int backlog); SOCKET accept(SOCKET s,LPSOCKADDR addr[],int &addrlen); // 客户端函数 int connect(SOCKET s,LPSOCKADDR name[],int namelen); int send(SOCKET s,char &buf[],int len,int flags); int recv(SOCKET s,char &buf[],int len,int flags); #import
2. 创建一个客户端和一个服务器
在考虑一段时间如何实现套接字的工作以进行下一步的实验后,我选择了使用函数来进行工作,而不使用类。首先,这是我们更容易理解,在此只需要线形而没有分支的编程;其次,这使我们可以根据任何需要重构这些函数或者进行面向对象的编程(OOP)开发。实验显示了,开发者们倾向于研究简单的类以了解一切如何工作。
重要!在您所有的实验中,请不要忘记,当服务器代码退出时,绑定的端口不会自动释放,这可能会导致当重复创建套接字并调用'bind'的时候会得到错误的结果 - 地址已经被使用。为了解决这个问题,可以在套接字上使用SO_REUSEADDR的选项,或者简单地重新启动终端。使用监控工具,例如 TCPViewer, 来跟踪在您操作系统中创建的套接字。
它也可以是您了解,客户端可以连接到服务器,因而服务器没有隐藏在NAT(网络地址转换)之后,或者客户端/服务器连接的端口没有被操作系统或者路由器屏蔽。
所以,可以在本地的同一台电脑上进行客户端和服务器的实验。 但是,为了完全操作多个客户端,服务器必须至少运行在一台VPS(虚拟专用服务器)并且有"可见"的外部IP地址,以及一个开放的向外的端口。
实例 1. 把图表布局发送给客户端
从简单的交互开始 – 一次性从服务器到客户端传输一个 tpl 文件。
在这种情况下,在客户端没有必要维护send/recv循环,因为连接之后之需要接收一次数据,然后就断开连接。当数据发送过后,连接会由服务器立即关闭,
也就是说,当一个客户端连接到服务器时,服务器调用一次Send,然后就把套接字关闭。同时,客户端调用一次 Recv,然后同样关闭套接字。当然,在更有趣的例子中,可能会创建一个实时的图表变化的广播,这样就可以及时同步客户端和服务器端的图表。这对交易高手向新手们在线展示他们的图表很有帮助,而目前这都是通过使用视频流进行广播或者使用不同的网络会议软件,或者 Skype 来做到的。所以,这个课题在论坛上讨论得最好。
谁以及什么时候会觉得这个代码实例有用呢?例如,您在每日,每小时或者分钟图表上设置您的指标或者图形对象,同时,您有一个服务器EA运行于一个独立的图表中,它会侦听客户端的连接并且为它们提供所要求的交易品种和时段的当前tpl(模板),
客户会很满意,并将能够从您那里接收到目标和交易信号的通知。而他们只需要在一定时间内运行一个脚本来从服务器上下载tpl并将它应用到图表上。
所以,让我们从服务器端开始,一切工作都在OnTimer事件中进行,它就是EA的线程,每秒钟它都检查服务器模块:侦听客户端 -> 发送数据 -> 关闭连接。它也会检查服务器套接字是否活动,如果连接丢失 - 就再次创建一个服务器套接字。
遗憾的是,保存的 tpl 模板不在文件沙盒中,所以,为了从Profiles\Templates文件夹中取得它,必须再次使用 WinAPI,这一次我们不再描述细节了,可以参见以下完整代码。
//+------------------------------------------------------------------+ //| TplServer | //| programming & development - Alexey Sergeev | //+------------------------------------------------------------------+ #property copyright "© 2006-2016 Alexey Sergeev" #property link "profy.mql@gmail.com" #property version "1.00" #include "SocketLib.mqh" input string Host="0.0.0.0"; input ushort Port=8080; uchar tpl[]; int iCnt=0; string exname=""; SOCKET server=INVALID_SOCKET; //------------------------------------------------------------------ OnInit int OnInit() { EventSetTimer(1); exname=MQLInfoString(MQL_PROGRAM_NAME)+".ex5"; return 0; } //------------------------------------------------------------------ OnDeinit void OnDeinit(const int reason) { EventKillTimer(); CloseClean(); } //------------------------------------------------------------------ OnChartEvent void OnChartEvent(const int id,const long &lparam,const double &dparam,const string &sparam) { if(iCnt==0) // 创建模板文件的限制 - 每秒不能超过一次 { Print("创建 TPL"); uchar buf[]; CreateTpl(buf); uchar smb[]; StringToCharArray(Symbol(),smb); ArrayResize(smb,10); uchar tf[]; StringToCharArray(IntegerToString(Period()),tf); ArrayResize(tf,10); // 创建用于发送的数据 ArrayCopy(tpl,smb, ArraySize(tpl)); // 加上交易品种名称 ArrayCopy(tpl, tf, ArraySize(tpl)); // 加上时段数值 ArrayCopy(tpl,buf, ArraySize(tpl)); // 加上模板本身 } iCnt++; } //------------------------------------------------------------------ OnTimer void OnTimer() { iCnt=0; // 重置模板创建计数器 if(server==INVALID_SOCKET) StartServer(Host,Port); else { // 在循环中取得所有的客户端并把当前图表模板发送给每个客户端 SOCKET client=INVALID_SOCKET; do { client=AcceptClient(); // 接受一个客户端的套接字 if(client==INVALID_SOCKET) return; int slen=ArraySize(tpl); int res=send(client,tpl,slen,0); if(res==SOCKET_ERROR) Print("-发送失败,错误: "+WSAErrorDescript(WSAGetLastError())); else printf("已发送 %d 字节,共 %d 字节",res,slen); if(shutdown(client,SD_BOTH)==SOCKET_ERROR) Print("-关闭失败,错误: "+WSAErrorDescript(WSAGetLastError())); closesocket(client); } while(client!=INVALID_SOCKET); } } //------------------------------------------------------------------ StartServer void StartServer(string addr,ushort port) { // 初始化库 char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); int res=WSAStartup(MAKEWORD(2,2), wsaData); if(res!=0) { Print("-WSAStartup 失败,错误: "+string(res)); return; } // 创建一个套接字 server=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(server==INVALID_SOCKET) { Print("-创建失败,错误: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // 绑定地址和端口 Print("尝试绑定..."+addr+":"+string(port)); char ch[]; StringToCharArray(addr,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(port); ref_sockaddr ref=(ref_sockaddr)addrin; if(bind(server,ref.ref,sizeof(addrin))==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-连接失败,错误: "+WSAErrorDescript(err)+". 清除套接字"); CloseClean(); return; } } // 设置为非阻塞模式 int non_block=1; res=ioctlsocket(server,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket 失败,错误: "+string(res)); CloseClean(); return; } // 侦听端口,接受客户端的连接 if(listen(server,SOMAXCONN)==SOCKET_ERROR) { Print("侦听失败,错误: ",WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } Print("启动服务器成功"); } //------------------------------------------------------------------ Accept SOCKET AcceptClient() // 接受一个客户端套接字 { if(server==INVALID_SOCKET) return INVALID_SOCKET; ref_sockaddr ch; int len=sizeof(ref_sockaddr); SOCKET new_sock=accept(server,ch.ref,len); //sockaddr_in aclient=(sockaddr_in)ch; 转换为结构,如有必要可以获得连接的更多信息 if(new_sock==INVALID_SOCKET) { int err=WSAGetLastError(); if(err==WSAEWOULDBLOCK) Comment("\n等待客户端 ("+string(TimeCurrent())+")"); else { Print("接受失败,错误: ",WSAErrorDescript(err)); CloseClean(); return INVALID_SOCKET; } } return new_sock; } //------------------------------------------------------------------ CloseClean void CloseClean() // 关闭套接字 { if(server!=INVALID_SOCKET) { closesocket(server); server=INVALID_SOCKET; } WSACleanup(); Print("停止服务器"); } //------------------------------------------------------------------ #import "kernel32.dll" int CreateFileW(string lpFileName,uint dwDesiredAccess,uint dwShareMode,uint lpSecurityAttributes,uint dwCreationDisposition,uint dwFlagsAndAttributes,int hTemplateFile); bool ReadFile(int h,ushort &lpBuffer[],uint nNumberOfBytesToRead,uint &lpNumberOfBytesRead,int lpOverlapped=0); uint SetFilePointer(int h,int lDistanceToMove,int,uint dwMoveMethod); bool CloseHandle(int h); uint GetFileSize(int h,int); #import #define FILE_BEGIN 0 #define OPEN_EXISTING 3 #define GENERIC_READ 0x80000000 #define FILE_ATTRIBUTE_NORMAL 0x00000080 #define FILE_SHARE_READ_ 0x00000001 //------------------------------------------------------------------ LoadTpl bool CreateTpl(uchar &abuf[]) { string path=TerminalInfoString(TERMINAL_PATH); string name="tcpsend.tpl"; // 创建模板 ChartSaveTemplate(0,name); // 把模板读取到数组中 path+="\\Profiles\\Templates\\"+name; int h=CreateFileW(path, GENERIC_READ, FILE_SHARE_READ_, 0, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0); if(h==INVALID_HANDLE) return false; uint sz=GetFileSize(h,NULL); ushort rbuf[]; ArrayResize(rbuf,sz); ArrayInitialize(rbuf,0); SetFilePointer(h,0,NULL,FILE_BEGIN); // 移动到文件顶端 int r; ReadFile(h,rbuf,sz,r,NULL); CloseHandle(h); // 从模板中删除EA的名称 string a=ShortArrayToString(rbuf); ArrayResize(rbuf,0); StringReplace(a,exname," "); StringToShortArray(a,rbuf); // 把文件复制到字节数组中 (保持 Unicode) sz=ArraySize(rbuf); ArrayResize(abuf,sz*2); for(uint i=0; i<sz;++i) { abuf[2*i]=(uchar)rbuf[i]; abuf[2*i+1]=(uchar)(rbuf[i]>>8); } return true; }
客户端的代码会简单一点,因为已经计划是一次性接收一个文件,就不需要一个一直运行的,并且使用活动的套接字的EA交易了,
客户端是以脚本程序实现的。一切都发生在OnStart事件之中。
//+------------------------------------------------------------------+ //| TplClient | //| programming & development - Alexey Sergeev | //+------------------------------------------------------------------+ #property copyright "© 2006-2016 Alexey Sergeev" #property link "profy.mql@gmail.com" #property version "1.00" #include "..\Experts\SocketLib.mqh" input string Host="127.0.0.1"; input ushort Port=8080; SOCKET client=INVALID_SOCKET; //------------------------------------------------------------------ OnStart void OnStart() { // 初始化库 char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); int res=WSAStartup(MAKEWORD(2,2), wsaData); if(res!=0) { Print("-WSAStartup 失败,错误: "+string(res)); return; } // 创建一个套接字 client=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(client==INVALID_SOCKET) { Print("-创建套接字失败,错误: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // 连接到服务器 char ch[]; StringToCharArray(Host,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(Port); ref_sockaddr ref=(ref_sockaddr)addrin; res=connect(client,ref.ref,sizeof(addrin)); if(res==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-连接失败,错误: "+WSAErrorDescript(err)); CloseClean(); return; } } // 设置为非阻塞模式 int non_block=1; res=ioctlsocket(client,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket 失败,错误: "+string(res)); CloseClean(); return; } Print("连接成功"); // 接收数据 uchar rdata[]; char rbuf[512]; int rlen=512; int rall=0; bool bNext=false; while(true) { res=recv(client,rbuf,rlen,0); if(res<0) { int err=WSAGetLastError(); if(err!=WSAEWOULDBLOCK) { Print("-接收失败,错误: "+string(err)+" "+WSAErrorDescript(err)); CloseClean(); return; } } else if(res==0 && rall==0) { Print("-接收连接已关闭"); break; } else if(res>0) { rall+=res; ArrayCopy(rdata,rbuf,ArraySize(rdata),0,res); } if(res>=0 && res<rlen) break; } // 关闭套接字 CloseClean(); printf("接收了 %d 字节",ArraySize(rdata)); // 从文件中取得交易品种和时段 string smb=CharArrayToString(rdata,0,10); string tf=CharArrayToString(rdata,10,10); // 保存模板文件 int h=FileOpen("tcprecv.tpl", FILE_WRITE|FILE_SHARE_WRITE|FILE_BIN); if(h<=0) return; FileWriteArray(h,rdata,20); FileClose(h); // 应用到图表 ChartSetSymbolPeriod(0,smb,(ENUM_TIMEFRAMES)StringToInteger(tf)); ChartApplyTemplate(0,"\\Files\\tcprecv.tpl"); } //------------------------------------------------------------------ CloseClean void CloseClean() // 关闭套接字 { if(client!=INVALID_SOCKET) { if(shutdown(client,SD_BOTH)==SOCKET_ERROR) Print("-关闭失败,错误: "+WSAErrorDescript(WSAGetLastError())); closesocket(client); client=INVALID_SOCKET; } WSACleanup(); Print("连接关闭"); }
演示这些代码的互操作:
细心的读者将会注意到,客户端套接字可以通过调用 MQL 函数中的 WebRequest 来替换的,为此,只要加上一系列HTTP 头部(header)行并且在客户终端的设置中允许访问服务器的URL就可以了,您可以自己来做实验。
重要!有些情况下,终端会出现一些特别的行为: 当调用了WSACleanup函数时, MetaTrader 关闭了它自己的连接。
如果您在实验中遇到了这样的问题,在代码中把 WSAStartup 和 WSACleanup 注释掉。
实例 2,根据交易品种同步交易
在这个例子中,服务器在发送信息后将不会关闭连接,客户端连接也将一直保持。这样,服务器上任何交易数据的改变都会通过客户端套接字立即发送,通过这种方法,接收到数据包的客户端就能立即与服务器同步它的仓位。
前面例子中服务器和客户端的代码可以作为基础,再加上用于操作仓位的函数,
让我们从服务器开始:
//+------------------------------------------------------------------+ //| SignalServer | //| programming & development - Alexey Sergeev | //+------------------------------------------------------------------+ #property copyright "© 2006-2016 Alexey Sergeev" #property link "profy.mql@gmail.com" #property version "1.00" #include "SocketLib.mqh" input string Host="0.0.0.0"; input ushort Port=8081; bool bChangeTrades; uchar data[]; SOCKET server=INVALID_SOCKET; SOCKET conns[]; //------------------------------------------------------------------ OnInit int OnInit() { OnTrade(); EventSetTimer(1); return 0; } //------------------------------------------------------------------ OnDeinit void OnDeinit(const int reason) { EventKillTimer(); CloseClean(); } //------------------------------------------------------------------ OnTrade void OnTrade() { double lot=GetSymbolLot(Symbol()); StringToCharArray("<<"+Symbol()+"|"+DoubleToString(lot,2)+">>",data); // 把字符串转换为字节数组 bChangeTrades=true; } //------------------------------------------------------------------ OnTimer void OnTimer() { if(server==INVALID_SOCKET) StartServer(Host,Port); else { AcceptClients(); // 加上等待的客户端 if(bChangeTrades) { Print("向客户端发送新的仓位信息"); Send(); bChangeTrades=false; } } } //------------------------------------------------------------------ StartServer void StartServer(string addr,ushort port) { // 初始化库 char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); int res=WSAStartup(MAKEWORD(2,2), wsaData); if(res!=0) { Print("-WSAStartup 失败,错误: "+string(res)); return; } // 创建一个套接字 server=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(server==INVALID_SOCKET) { Print("-创建失败,错误: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // 绑定地址和端口 Print("尝试绑定..."+addr+":"+string(port)); char ch[]; StringToCharArray(addr,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(port); ref_sockaddr ref=(ref_sockaddr)addrin; if(bind(server,ref.ref,sizeof(addrin))==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-连接失败,错误: "+WSAErrorDescript(err)+". 清除套接字"); CloseClean(); return; } } // 设置为非阻塞模式 int non_block=1; res=ioctlsocket(server,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket 失败,错误: "+string(res)); CloseClean(); return; } // 侦听端口,接受客户端的连接 if(listen(server,SOMAXCONN)==SOCKET_ERROR) { Print("侦听失败,错误: ",WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } Print("启动服务器成功"); } //------------------------------------------------------------------ Accept void AcceptClients() // 接受一个客户端套接字 { if(server==INVALID_SOCKET) return; // 加上所有等待的客户端 SOCKET client=INVALID_SOCKET; do { ref_sockaddr ch; int len=sizeof(ref_sockaddr); client=accept(server,ch.ref,len); if(client==INVALID_SOCKET) { int err=WSAGetLastError(); if(err==WSAEWOULDBLOCK) Comment("\n等待客户端 ("+string(TimeCurrent())+")"); else { Print("接受失败,错误: ",WSAErrorDescript(err)); CloseClean(); } return; } // 设置到非阻塞模式 int non_block=1; int res=ioctlsocket(client, (int)FIONBIO, non_block); if(res!=NO_ERROR) { Print("ioctlsocket 失败,错误: "+string(res)); continue; } // 把客户端套接字加到数组中 int n=ArraySize(conns); ArrayResize(conns,n+1); conns[n]=client; bChangeTrades=true; // 用于指示仓位信息必须要发送的标志 // 显示客户端信息 char ipstr[23]={0}; sockaddr_in aclient=(sockaddr_in)ch; // 转换为结构以取得连接的额外信息 inet_ntop(aclient.sin_family,aclient.sin_addr,ipstr,sizeof(ipstr)); // 取得地址 printf("接收新的客户端 %s : %d",CharArrayToString(ipstr),ntohs(aclient.sin_port)); } while(client!=INVALID_SOCKET); } //------------------------------------------------------------------ SendClient void Send() { int len=ArraySize(data); for(int i=ArraySize(conns)-1; i>=0; --i) // 向客户端发送信息 { if(conns[i]==INVALID_SOCKET) continue; // 跳过关闭的套接字 int res=send(conns[i],data,len,0); // 发送 if(res==SOCKET_ERROR) { Print("-发送失败,错误: "+WSAErrorDescript(WSAGetLastError())+". 关闭套接字"); Close(conns[i]); } } } //------------------------------------------------------------------ CloseClean void CloseClean() // 关闭和清除操作 { printf("关闭服务器和%d个连接",ArraySize(conns)); if(server!=INVALID_SOCKET) { closesocket(server); server=INVALID_SOCKET; } // 关闭服务器 for(int i=ArraySize(conns)-1; i>=0; --i) Close(conns[i]); // 关闭客户端 ArrayResize(conns,0); WSACleanup(); } //------------------------------------------------------------------ Close void Close(SOCKET &asock) // 关闭一个套接字 { if(asock==INVALID_SOCKET) return; if(shutdown(asock,SD_BOTH)==SOCKET_ERROR) Print("-关闭失败,错误: "+WSAErrorDescript(WSAGetLastError())); closesocket(asock); asock=INVALID_SOCKET; } //------------------------------------------------------------------ GetSymbolLot double GetSymbolLot(string smb) { double slot=0; int n=PositionsTotal(); for(int i=0; i<n;++i) { PositionSelectByTicket(PositionGetTicket(i)); if(PositionGetString(POSITION_SYMBOL)!=smb) continue; // 当服务器正在运行时,过滤掉当前交易品种的仓位 double lot=PositionGetDouble(POSITION_VOLUME); // 取得交易量 if(PositionGetInteger(POSITION_TYPE)==POSITION_TYPE_SELL) lot=-lot; // 考虑方向 slot+=lot; // 加到总和中 } return slot; }
每一秒它都会检测服务器模块: 连接客户端并把它加到总的数组中 -> 发送数据,它也会检查服务器套接字本身是否活动,如果连接丢失 - 再创建一个服务器套接字。
EA所运行的交易品种的名称和它仓位的交易量也发送给客户端,
每个交易操作将以消息的形式发送交易品种和交易量:
<<GBPUSD|0.25>>
<<GBPUSD|0.00>>
消息在每个交易事件中发送,也会在有新的客户端连接时发送。
这一次客户端的代码是以EA交易的方式实现的,因为有必要使连接保持在活动状态。客户端会从服务器接收新的数据并把它加到已有数据中,然后它寻找以<<开始和>>结束为标记的消息,分析它并根据服务器上指定交易品种的交易量进行调整。
//+------------------------------------------------------------------+ //| SignalClient | //| programming & development - Alexey Sergeev | //+------------------------------------------------------------------+ #property copyright "© 2006-2016 Alexey Sergeev" #property link "profy.mql@gmail.com" #property version "1.00" #include "SocketLib.mqh" #include <Trade\Trade.mqh> input string Host="127.0.0.1"; input ushort Port=8081; SOCKET client=INVALID_SOCKET; // 客户端套接字 string msg=""; // 接收到的消息队列 //------------------------------------------------------------------ OnInit int OnInit() { if(AccountInfoInteger(ACCOUNT_MARGIN_MODE)==ACCOUNT_MARGIN_MODE_RETAIL_HEDGING) { Alert("客户端仅作用于净值账户"); return INIT_FAILED; } EventSetTimer(1); return INIT_SUCCEEDED; } //------------------------------------------------------------------ OnInit void OnDeinit(const int reason) { EventKillTimer(); CloseClean(); } //------------------------------------------------------------------ OnInit void OnTimer() { if(client==INVALID_SOCKET) StartClient(Host,Port); else { uchar data[]; if(Receive(data)>0) // 接收数据 { msg+=CharArrayToString(data); // 如果收到了一些信息,把它加到总字符串中 printf("从服务器收到了消息: %s",msg); } CheckMessage(); } } //------------------------------------------------------------------ CloseClean void StartClient(string addr,ushort port) { // 初始化库 int res=0; char wsaData[]; ArrayResize(wsaData, sizeof(WSAData)); res=WSAStartup(MAKEWORD(2,2), wsaData); if (res!=0) { Print("-WSAStartup 失败,错误: "+string(res)); return; } // 创建一个套接字 client=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(client==INVALID_SOCKET) { Print("-创建套接字失败,错误: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // 连接到服务器 char ch[]; StringToCharArray(addr,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(port); ref_sockaddr ref=(ref_sockaddr)addrin; res=connect(client,ref.ref,sizeof(addrin)); if(res==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-连接失败,错误: "+WSAErrorDescript(err)); CloseClean(); return; } } // 设置为非阻塞模式 int non_block=1; res=ioctlsocket(client,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket 失败,错误: "+string(res)); CloseClean(); return; } Print("连接成功"); } //------------------------------------------------------------------ Receive int Receive(uchar &rdata[]) // 接收直到对方关闭连接 { if(client==INVALID_SOCKET) return 0; // 如果套接字没有打开 char rbuf[512]; int rlen=512; int r=0,res=0; do { res=recv(client,rbuf,rlen,0); if(res<0) { int err=WSAGetLastError(); if(err!=WSAEWOULDBLOCK) { Print("-接收失败,错误: "+string(err)+" "+WSAErrorDescript(err)); CloseClean(); return -1; } break; } if(res==0 && r==0) { Print("-接收. 连接关闭"); CloseClean(); return -1; } r+=res; ArrayCopy(rdata,rbuf,ArraySize(rdata),0,res); } while(res>0 && res>=rlen); return r; } //------------------------------------------------------------------ CloseClean void CloseClean() // 关闭套接字 { if(client!=INVALID_SOCKET) { if(shutdown(client,SD_BOTH)==SOCKET_ERROR) Print("-关闭失败,错误: "+WSAErrorDescript(WSAGetLastError())); closesocket(client); client=INVALID_SOCKET; } WSACleanup(); Print("关闭套接字"); } //------------------------------------------------------------------ CheckMessage void CheckMessage() { string pos; while(FindNextPos(pos)) { printf("server position: %s",pos); }; // 从服务器取得最近变化 if(StringLen(pos)<=0) return; // 从消息中接收数据 string res[]; if(StringSplit(pos,'|',res)!=2) { printf("-错误的仓位信息: %s",pos); return; } string smb=res[0]; double lot=NormalizeDouble(StringToDouble(res[1]),2); // 同步交易量 if(!SyncSymbolLot(smb,lot)) msg="<<"+pos+">>"+msg; // 如果有错,返回消息 } //------------------------------------------------------------------ SyncSymbolLot bool SyncSymbolLot(string smb,double nlot) { // 同步服务器和客户端的交易量 CTrade trade; double clot=GetSymbolLot(smb); // 取得交易品种的当前手数 if(clot==nlot) { Print("nothing change"); return true; } // 如果手数相同,就什么都不做 // 首先检查特殊情况,服务器上没有仓位 if(nlot==0 && clot!=0) { Print("完全关闭仓位"); return trade.PositionClose(smb); } // 如果服务器上有仓位,在客户端上修改 double dif=NormalizeDouble(nlot-clot,2); // 根据差别进行买入或者卖出 if(dif>0) { Print("增加买入仓位"); return trade.Buy(dif,smb); } else { Print("增加卖出仓位"); return trade.Sell(-dif,smb); } } //------------------------------------------------------------------ FindNextPos bool FindNextPos(string &pos) { int b=StringFind(msg, "<<"); if(b<0) return false; // 没有消息开头 int e=StringFind(msg, ">>"); if(e<0) return false; // 没有消息结尾 pos=StringSubstr(msg,b+2,e-b-2); // 取得信息块 msg=StringSubstr(msg,e+2); // 从消息中删除 return true; } //------------------------------------------------------------------ GetSymbolLot double GetSymbolLot(string smb) { double slot=0; int n=PositionsTotal(); for(int i=0; i<n;++i) { PositionSelectByTicket(PositionGetTicket(i)); if(PositionGetString(POSITION_SYMBOL)!=smb) continue; // 当服务器正在运行时,过滤掉当前交易品种的仓位 double lot=PositionGetDouble(POSITION_VOLUME); // 取得交易量 if(PositionGetInteger(POSITION_TYPE)==POSITION_TYPE_SELL) lot=-lot; // 考虑方向 slot+=lot; // 加到总和中 } return NormalizeDouble(slot,2); }
服务器和客户端上成对操作的最终演示:
实例 3. 订单时刻收集器。
这个实例演示了 UDP 套接字,在其中,服务器将等待来自客户端的交易品种的数据。
服务器代码很简单,并且不需要在客户端上保存数据和侦听它们的连接。可以使用一个毫秒级计时器来加速对套接字数据的检查:
input string Host="0.0.0.0"; input ushort Port=8082; SOCKET server=INVALID_SOCKET; //------------------------------------------------------------------ OnInit int OnInit() { EventSetMillisecondTimer(300); return 0; } //------------------------------------------------------------------ OnDeinit void OnDeinit(const int reason) { EventKillTimer(); CloseClean(); } //------------------------------------------------------------------ OnTimer void OnTimer() { if(server!=INVALID_SOCKET) { char buf[1024]={0}; ref_sockaddr ref={0}; int len=ArraySize(ref.ref); int res=recvfrom(server,buf,1024,0,ref.ref,len); if (res>=0) // 接收并显示数据 Print("接收到来自客户端的订单: ", CharArrayToString(buf)); else { int err=WSAGetLastError(); if(err!=WSAEWOULDBLOCK) { Print("-接收失败,错误: "+WSAErrorDescript(err)+". 清除套接字"); CloseClean(); return; } } } else // 否则,启动服务器 { // 初始化库 char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); int res=WSAStartup(MAKEWORD(2,2), wsaData); if(res!=0) { Print("-WSAStartup 失败,错误: "+string(res)); return; } // 创建一个套接字 server=socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP); if(server==INVALID_SOCKET) { Print("-创建失败,错误: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // 绑定到地址和端口 Print("尝试绑定..."+Host+":"+string(Port)); char ch[]; StringToCharArray(Host,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(Port); ref_sockaddr ref=(ref_sockaddr)addrin; if(bind(server,ref.ref,sizeof(addrin))==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-连接失败,错误: "+WSAErrorDescript(err)+". 清除套接字"); CloseClean(); return; } } // 设置到非阻塞模式 int non_block=1; res=ioctlsocket(server,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket 失败,错误: "+string(res)); CloseClean(); return; } Print("启动服务器成功"); } } //------------------------------------------------------------------ CloseClean void CloseClean() // 关闭和清除操作 { printf("关闭服务器"); if(server!=INVALID_SOCKET) { closesocket(server); server=INVALID_SOCKET; } // 关闭服务器 WSACleanup(); }
客户端的代码也很简单,所有的处理都发生在订单来临的事件中:
input string Host="127.0.0.1"; input ushort Port=8082; SOCKET client=INVALID_SOCKET; // 客户端套接字 ref_sockaddr srvaddr={0}; // 连接到服务器的结构 //------------------------------------------------------------------ OnInit int OnInit() { // 填充连接服务器的结构 char ch[]; StringToCharArray(Host,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(Port); srvaddr=(ref_sockaddr)addrin; OnTick(); // 立即创建套接字 return INIT_SUCCEEDED; } //------------------------------------------------------------------ OnDeinit void OnDeinit(const int reason) { CloseClean(); } //------------------------------------------------------------------ OnTick void OnTick() { if(client!=INVALID_SOCKET) // 如果创建了套接字,就发送 { uchar data[]; StringToCharArray(Symbol()+" "+DoubleToString(SymbolInfoDouble(Symbol(),SYMBOL_BID),Digits()),data); if(sendto(client,data,ArraySize(data),0,srvaddr.ref,ArraySize(srvaddr.ref))==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEWOULDBLOCK) { Print("-Send failed error: "+WSAErrorDescript(err)); CloseClean(); } } else Print("发送 "+Symbol()+" 订单到服务器"); } else // 创建客户端套接字 { int res=0; char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); res=WSAStartup(MAKEWORD(2,2),wsaData); if(res!=0) { Print("-WSAStartup 失败,错误: "+string(res)); return; } // 创建一个套接字 client=socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP); if(client==INVALID_SOCKET) { Print("-创建套接字失败,错误: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // 设置到非阻塞模式 int non_block=1; res=ioctlsocket(client,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket 失败,错误: "+string(res)); CloseClean(); return; } Print("成功创建套接字"); } } //------------------------------------------------------------------ CloseClean void CloseClean() // 关闭套接字 { if(client!=INVALID_SOCKET) { closesocket(client); client=INVALID_SOCKET; } WSACleanup(); Print("关闭套接字"); }
而这里是最终工作的演示:
3. 加强服务器的更多方法
显然,这些服务器向任意客户端发送信息的实例还不是最优的,例如,您也许希望限制对您信息的访问,所以,至少要包含一些强制要求:
- 客户端认证(登录/密码);
- 保护以防密码被猜测(屏蔽登录或者屏蔽IP地址)。
另外,所有服务器的工作只在一个线程内进行(在一个EA交易的计时器函数中),这对大量的连接或者大量的信息数据是有严重问题的,所以,为了优化服务器,应该加一个EA池(每个EA有其自身的计时器函数),来处理客户端连接的互操作,这将在一定程度上使得服务器变成多线程的。
是否采用MQL来这样做由您自己决定,还有其他一些方法来这样做,它们可能会更加方便。但是,不能否认MQL在直接访问账户的交易和报价这方面很有优势,另外使用MQL代码就不需要使用第三方的DLL。
结论
在 MetaTrader 中套接字还有哪些用途呢?在写下本文之前,还有几个想法可以作为实例:
- 市场形势指标(当连接的客户端发送它们仓位的交易量,而得到的回应是所有客户端的总的交易量);
- 或者,例如, 把指标计算的数值从服务器发送到客户端 (针对订阅者);
- 或者相反 - 客户端将帮助进行大规模的计算(测试器代理网络);
- 可以使服务器只变成一个"代理"来在客户终端之间交换数据。
还有许多其他选项。如果您有其他应用的想法 — 请在本文的留言处分享它们,也许,如果它们很有趣,我们将能够一起实现它们。
祝你们好运,交易成功!