请 [注册] 或 [登录]  | 返回主站

量化交易吧 /  量化策略 帖子:3364712 新帖:0

自己动手开发多线程异步 MQL5 WebRequest

随心所致发表于:4 月 17 日 18:55回复(1)

交易算法的实现经常需要分析来自各种外部来源、包括互联网的数据,MQL5 提供了 WebRequest 函数来发送 HTTP 请求到 "外部世界", 然而不幸的是,它有一个明显的缺点。这个函数是同步的,也就是说它会在执行请求的整个阶段阻止 EA 的运行。对于每个 EA, MetaTrader 5 都为它们分配了一个单独的线程,在代码中执行已有的 API(应用程序接口) 函数,以及执行到来的事件处理函数 (例如分时,市场深度变化的事件,计时器,交易操作,图表事件等等)。一次只执行一个代码片段,而所有剩余的“任务”都排队等候,直到当前片段把控制权还给内核。

例如,如果一个EA交易要实时处理新的分时,也要间断检查一个或者多个网站上的经济新闻,就不可能实现这两个需求而使它们不互相影响。一旦 WebRequest 在代码中执行,EA 交易就会在函数调用序列中保持“冻结”状态,而新的分时事件就会跳过。就算可以使用 CopyTicks 函数可能可以读取到跳过的分时,而做出交易决策的时机已经错过了。这里是使用UML顺序图表来阐述的这种情况:

事件处理序列图是在一个线程中的阻塞式代码

图1. 事件处理序列图是在一个线程中的阻塞式代码

在这一点上,最好能创建一个工具用于 HTTP 请求的异步执行,相当于一种 WebRequestAsync,很明显,我们需要为此使用额外的线程。在 MetaTrader 5 中这样做的最简单方法就是运行另外的EA,然后您可以在其中发送另外的 HTTP 请求。另外, 您可以在那里调用 WebRequest 并随后取得结果,当请求在这样的辅助EA交易中处理的时候,我们的主EA交易还是可以用于快速和交互的操作。对于这种情况,UML 顺序图表可能看起来是这样的:

向其它线程分发异步事件处理的顺序图表

图 2. 向其它线程分发异步事件处理的顺序图表


1. 计划

您应该知道,在MetaTrader中,每个EA交易都运行在独立的图表中。因而,创建辅助EA交易也需要它们的图表,人工来做很不方便,所以,把所有的常规操作放到一个特定的管理器中就很合理了 - 一个管理辅助图表和EA池的EA交易,并且它提供了一个入口点用于注册来自客户程序的新请求。在某种程度上,这种架构可以被称为三层架构,与客户端-服务器架构类似,其中EA管理器作为服务器:

multiweb 开发库的架构: 客户 MQL 代码 - 服务器 (助理池管理器) - 助手 EA

图 3 multiweb 开发库的架构: 客户 MQL 代码 <-> 服务器 (助理池管理器) <-> 助手 EA

但是为了简化,管理器和辅助EA可以使用相同的代码样式来实现 (程序)。这样一个 "通用" EA 的两种角色之一 - 管理器或是助手 - 将会由优先级规则来判断。第一个运行的实例会把它自己声明为一个管理器,它可以打开另外的图表并以助手的角色运行一定数量的自身。

那么在客户端、管理器和助手之间到底是怎样传递信息的呢?为了了解这个,让我们分析 WebRequest 函数,

您要知道,MetaTrader 5 含有两种 WebRequest 函数的选项,我们将会考虑使用第二种,它是最通用的。

int WebRequest
( 
  const string      method,           // HTTP 方法 
  const string      url,              // url 地址 
  const string      headers,          // 请求头部  
  int               timeout,          // 超时 
  const char        &data[],          // HTTP 消息体数组 
  char              &result[],        // 服务器回复数据的数组 
  string            &result_headers   // 服务器回复的头部 
);

前面五个参数是输入参数,它们是从调用代码传递到核心的,定义了请求的内容。后面两个参数是输出参数,它们是从核心传递到调用代码,包含了查询的结果。很明显,把这个函数变成两个异步函数实际上要把它分成两个组件:初始化查询和取得结果:

int WebRequestAsync
( 
  const string      method,           // HTTP 方法 
  const string      url,              // url 地址 
  const string      headers,          // 请求头部  
  int               timeout,          // 超时 
  const char        &data[],          // HTTP 消息体数组 
);

int WebRequestAsyncResult
( 
  char              &result[],        // 服务器回复数据的数组 
  string            &result_headers   // 服务器回复的头部 
);

函数的名称和原型是由条件的,实际上,我们需要在不同的MQL程序之间传递这个信息,普通的函数调用对此并不适合。为了使 MQL 程序相互之间能够“联络”,MetaTrader 5 有我们将要使用的 自定义事件交换系统。事件的交换是根据接收者ID 进行的,使用的是 ChartID — 它对每个图表都是唯一的。一张图表上只允许有一个EA交易,但是指标的话使用起来没有限制。这意味着用户应当确保每个图表中与管理器通信的指标不能超过一个。

为了使数据交换可行,您需要把所有的“函数”参数都封装到用户事件参数中。请求参数和结果都包含了比较大量的信息,而无法容纳在事件有限的空间中,例如,就算要把 HTTP 方法和URL放到字符串类型的事件参数中,63个字符的限制在大多数实际情况下都有困难。这就意味着事件交换系统需要有一些共享数据的存放空间,而只在事件参数中发送这个存放空间的链接。幸运的是,MetaTrader 5 以自定义资源的形式提供了这样的存储。实际上,从MQL中动态创建的资源都是图片,但是图片可以作为二进制信息的容器,我们可以在里面记录我们想要的任何内容。

为了简化任务,我们将会使用已有的方案来在用户资源中写入和读取数据 — 来自 MQL5 社区成员 fxsaber 所开发的 Resource.mqh and ResourceData.mqh。

所提供的链接指向的源代码 — TradeTransactions 开发库与当前文章的主题没有关系,但是在讨论 (俄语) 中包含了一个通过使用资源来作数据存储和交换的例子。因为开发库可以修改,另外为了方便读者使用,所有本文使用的文件都在下面的附件中,但是它们的版本只是对应着写这篇文章的时间,可能与以上链接中的当前版本不一样。另外,所说的资源类在它们的工作中还使用了另一个开发库 — TypeToBytes,它的版本也附加在本文之后了。

我们不需要关心这些辅助类的内部结构,主要的事情就是我们可以把做好的 RESOURCEDATA 类当作 “黑盒”,再使用它的构造函数和我们需要的一些方法,晚些时候我们将详细探讨。现在,让我们看看整体的概念。

我们的架构中交互的顺序看起来如下:

  1. 为了进行异步的 web 请求,客户 MQL 程序应当使用我们开发的类来把请求的参数封装到本地资源中,并向管理器使用资源的连接发送一个自定义事件,资源是在客户程序中创建的,直到取得结果之前都不会被删除(当不需要时删除);
  2. 管理器在池中寻找未被占用的助手EA并向其发送资源的连接;这个实例就会暂时标记为被占用而在随后的请求中不能被选择,直到当前请求被处理完毕;
  3. 当辅助EA接收到自定义事件的时候,来自客户的web请求参数就从外部资源中解开,
  4. 辅助EA再调用标准的阻塞式 WebRequest 并等待回答(头部以及/或者web文档);
  5. 辅助EA再把请求的结果打包到本地资源中,并向管理器发送包含这个连接的自定义事件;
  6. 管理器把事件转发到客户并把对应的辅助EA再次标记为空闲;
  7. 客户端收到来自管理器的信息并从外部辅助资源中解开请求的结果;
  8. 然后客户端和辅助EA可以删除它们的本地资源了。

在第5步和第6步结果效率可以进一步提高,因为辅助EA可以直接向客户窗口发送结果而绕过管理器。

上面所描述的步骤与处理HTTP请求的主要阶段有关,现在,是时候谈一下如何把分离的部分连接成一个整体架构了,它也是部分依赖于用户事件。

架构的中心连接 — 管理器 — 是人工运行的,您应当只要做一次。和任何其它运行的EA一样,在终端重新启动时它会与图表一起自动恢复,终端中只允许有一个 web 请求管理器。

管理器会创建所要求数量的辅助窗口 (可以在设置中设定) 并在它们中运行它自身的实例,它们可以通过特殊的“协议”发现它们自己是助手状态 (在实现部分有详细介绍)。

任何助手在关闭时都会通过特定事件来通知管理器,这对管理器维护相关可用的助手列表是必需的。类似地,管理器关闭时也可以通知助手,然后,助手就会停止工作并关闭它们的窗口。助手离开管理器是无法工作的,而重新运行管理器会不可避免地重新创建助手 (例如,如果您在设置中修改了助手的数量)。

助手的窗口,和辅助EA自身类似,总是由管理器自动创建的,所以我们的程序应当“清除它们”。不要人工运行助手EA - 输入参数与管理器状态不对应的话会被程序当成错误。

在它载入的时候,客户 MQL 程序应当侦测终端窗口看管理器是否存在,它是把它的图表ID作为参数然后使用消息来探查的。管理器 (如果被发现) 应当把它的窗口ID返回给客户,之后,客户和管理器就能交换消息了。

这些就是主要功能,是时候开始进行实现了。


2. 实现

为了简化开发,创建一个 multiweb.mqh 头文件,我们在其中声明所有的类: 它们中的一些对客户端和 "服务器" 都是通用的, 而其它一些是继承的而分别针对这些角色。

2.1. 基类 (开始)

让我们从保存资源、ID和每个元素变量的类开始。从它派生出的类的实例将可以用在管理器、助手和客户中。在客户和助手中,这样的对象主要是用于保存“通过连接传递的“资源,另外,要注意的是在客户端中要创建几个实例来同时执行多个web请求。所以,分析当前请求的状态 (至少是这样的对象是繁忙还是空闲) 应当在客户中广泛被使用。在管理器中,这些对象是用于实现助手状态的识别和跟踪的。下面就是基类。

class WebWorker
{
  protected:
    long chartID;
    bool busy;
    const RESOURCEDATA<uchar> *resource;
    const string prefix;
    
    const RESOURCEDATA<uchar> *allocate()
    {
      release();
      resource = new RESOURCEDATA<uchar>(prefix + (string)chartID);
      return resource;
    }
    
  public:
    WebWorker(const long id, const string p = "WRP_"): chartID(id), busy(false), resource(NULL), prefix("::" + p)
    {
    }

    ~WebWorker()
    {
      release();
    }
    
    long getChartID() const
    {
      return chartID;
    }
    
    bool isBusy() const
    {
      return busy;
    }
    
    string getFullName() const
    {
      return StringSubstr(MQLInfoString(MQL_PROGRAM_PATH), StringLen(TerminalInfoString(TERMINAL_PATH)) + 5) + prefix + (string)chartID;
    }
    
    virtual void release()
    {
      busy = false;
      if(CheckPointer(resource) == POINTER_DYNAMIC) delete resource;
      resource = NULL;
    }

    static void broadcastEvent(ushort msg, long lparam = 0, double dparam = 0.0, string sparam = NULL)
    {
      long currChart = ChartFirst(); 
      while(currChart != -1)
      {
        if(currChart != ChartID())
        {
          EventChartCustom(currChart, msg, lparam, dparam, sparam); 
        }
        currChart = ChartNext(currChart);
      }
    }
};

变量描述:

  • chartID — MQL 程序所运行的图表的 ID;
  • busy — 当前实例是否正在忙于处理 web请求;
  • resource — 对象的资源 (随机数据存储); RESOURCEDATA 类来自于 ResourceData.mqh;
  • prefix — 对于每个状态的唯一前缀; 前缀是用于资源名称的。在特定的客户中,推荐按照下面的例子来进行独特设置。助手 EA 默认使用的是 "WRR_" (Web Request Result 的简写) 前缀。

'allocate' 方法是在派生类中使用的,它在 ’resource' 变量中创建 RESOURCEDATA<uchar> 类型的资源对象,chart ID 也用于和前缀一起命名资源。资源可以使用 'release' 方法来释放。

应当特别提到 getFullName 方法,因为它返回完整的资源名称,包含了当前 MQL 程序名称和目录的路径,完整名称用于访问第三方程序的资源 (是只读的)。例如,如果 multiweb.mq5 EA 位于 MQL5\Experts 并且在 ID 为 129912254742671346 的图表中载入, 其中资源的完整名称就是 "\Experts\multiweb.ex5::WRR_129912254742671346",我们将把这样的字符串作为资源连接,在自定义事件的 sparam 字符串型参数中使用。

broadcastEvent 静态方法可以向所有窗口发信息,在将来用于寻找管理器。

为了在客户程序中使用请求和相关的资源,我们定义了 ClientWebWorker 类,它派生于 WebWorker (这里的代码省略了,完整版在附件的文件中)。

class ClientWebWorker : public WebWorker
{
  protected:
    string _method;
    string _url;
    
  public:
    ClientWebWorker(const long id, const string p = "WRP_"): WebWorker(id, p)
    {
    }

    string getMethod() const
    {
      return _method;
    }

    string getURL() const
    {
      return _url;
    }
    
    bool request(const string method, const string url, const string headers, const int timeout, const uchar &body[], const long managerChartID)
    {
      _method = method;
      _url = url;

      // allocate()? and what's next?
      ...
    }
    
    static void receiveResult(const string resname, uchar &initiator[], uchar &headers[], uchar &text[])
    {
      Print(ChartID(), ": Reading result ", resname);
      
      ...
    }
};

首先,请注意 'request' 方法也就是实现了上面所述的第一步,也就是把 web 请求发送到管理器。这个方法的声明是跟随假定的 WebRequestAsync 原型的,而 receiveResult 静态方法进行的就是第七步所说的反向过程。对于第一个输入参数 'resname', 它是请求结果中保存的外部资源的完整名称,而 'initiator', 'headers' 和 'text' 字节数组是方法中从资源中解开数据之后填充的。

什么是 'initiator'?答案很简单,因为我们所有的 "调用" 现在都是异步的 (而它们的执行顺序都不保证), 我们应当能够把结果和之前发送的请求匹配起来。所以,助手 EA 要把来源客户资源的完整名称打包,用于初始化请求到回应资源中,与从互联网中取得的数据放在一起。在解包之后,'initiator' 参数中的名称可以用于把结果与对应的请求关联起来。

receiveResult 方法是静态的,因为它没有使用对象的变量 - 所有从调用代码中返回的结果都是通过参数传递的。

这两个方法在从资源中打包和解包数据都是需要的,这将在下一个部分中介绍。


2.2. 把请求和请求结果打包进资源

我们应当记得,在底层水平,资源是通过 RESOURCEDATA 类来处理的,这是一个模板(template)类, 也就是说它可以接收含有数据类型的参数, 我们可以写入资源或者从资源中读取。因为我们的数据也包含字符串,有理由选择最小的 uchar 类型作为存储单位,这样,我们就把 RESOURCEDATA<uchar> 类的对象用作数据容器。当创建资源时,要在它的构造函数中创建一个唯一(对于程序来说)的 'name'。

RESOURCEDATA<uchar>(const string name)

我们可以把这个名称 (加上程序名称作为前缀) 传给自定义事件,这样其它 MQL 程序就能访问同样的资源了。请注意,所有其它程序,除了创建资源的之外,都是有只读的访问权限。

数据写入资源使用的是重载过的赋值操作符:

void operator=(const uchar &array[]) const

其中'array' 是我们需要准备的一种数组。

从资源中读取数据使用的是这个函数:

int Get(uchar &array[]) const

在此,'array' 是一个输出参数,也包含最初数组的内容。

现在让我们转到应用部分,就是使用资源来传递关于 HTTP 请求和它们结果的数据。我们将会在资源和主代码之间创建一个层次类 - ResourceMediator. 这个类用于把 'method', 'url', 'headers', 'timeout' 和 'data' 参数打包到 'array' 类型的数组,然后再在客户端写到资源中。在服务器端,就要从资源中解包参数。类似地,这个类也要把服务器端的 'result' 和 'result_headers' 参数打包到 'array' 字节数组中并写到资源里,然后在客户端可以把它们以数组形式读出并解包。

ResourceMediator 构造函数接收 RESOURCEDATA 资源的指针,它会在方法内部进行处理。另外,ResourceMediator 包含了用于保存有关数据的元信息的结构。实际上,当打包和解包资源时,我们需要某种头部信息包含所有栏位以及数据本身的大小。

例如,如果我们只是简单使用 StringToCharArray 函数把一个 URL 转换为一个字节数组,那么当进行反向操作的时候,使用 CharArrayToString 我们需要设置数组的长度,否则,不仅 URL 字节本身,之后的头部栏位都会被从数组中读取出来。您也许记得,在写入资源之前,我们把所有数据都写到一个单独的数组中了。关于栏位长度的元信息也应当被转换为字节的序列,为此我们使用的是联合(union)。

#define LEADSIZE (sizeof(int)*5) // web-request 中的5个栏位

class ResourceMediator
{
  private:
    const RESOURCEDATA<uchar> *resource; // underlying asset
    
    // 头部的元数据可以是5个整数的‘lengths’或者是一个字节数组`sizes`
    union lead
    {
      struct _l
      {
        int m; // 方法
        int u; // url
        int h; // 头部
        int t; // 超时
        int b; // 主体
      }
      lengths;
      
      uchar sizes[LEADSIZE];
      
      int total()
      {
        return lengths.m + lengths.u + lengths.h + lengths.t + lengths.b;
      }
    }
    metadata;
  
    // 整数和字节数组
    union _s
    {
      int x;
      uchar b[sizeof(int)];
    }
    int2chars;
    
    
  public:
    ResourceMediator(const RESOURCEDATA<uchar> *r): resource(r)
    {
    }
    
    void packRequest(const string method, const string url, const string headers, const int timeout, const uchar &body[])
    {
      // 使用参数数据长度填充元数据
      metadata.lengths.m = StringLen(method) + 1;
      metadata.lengths.u = StringLen(url) + 1;
      metadata.lengths.h = StringLen(headers) + 1;
      metadata.lengths.t = sizeof(int);
      metadata.lengths.b = ArraySize(body);
      
      // 分配结果数组,用于元数据和参数数据
      uchar data[];
      ArrayResize(data, LEADSIZE + metadata.total());
      
      // 把元数据以字节数组方式放在数组的开始
      ArrayCopy(data, metadata.sizes);
      
      // 把数据栏位挨个放到数组中
      int cursor = LEADSIZE;
      uchar temp[];
      StringToCharArray(method, temp);
      ArrayCopy(data, temp, cursor);
      ArrayResize(temp, 0);
      cursor += metadata.lengths.m;
      
      StringToCharArray(url, temp);
      ArrayCopy(data, temp, cursor);
      ArrayResize(temp, 0);
      cursor += metadata.lengths.u;
      
      StringToCharArray(headers, temp);
      ArrayCopy(data, temp, cursor);
      ArrayResize(temp, 0);
      cursor += metadata.lengths.h;
      
      int2chars.x = timeout;
      ArrayCopy(data, int2chars.b, cursor);
      cursor += metadata.lengths.t;
      
      ArrayCopy(data, body, cursor);
      
      // 在资源中存储数组
      resource = data;
    }
    
    ...

首先,packRequest 方法把所有栏位的大小写到 'metadata' 结构中,然后这个结构的内容被以字节数组的形式复制到‘data'数组的开始部分。'data' 数组随后会被放到资源中。'data' 数组的大小是根据所有栏位的总长度以及元数据结构的大小保留的。字符串类型的参数使用 StringToCharArray 转为数组并复制到结果数组中,还要有对应的偏移,偏移是在’cursor'变量中保存的。'timeout' 参数使用int2chars联合而被转换为字符数组。'body' 参数就按原样复制到数组中,因为它已经是所需类型的数组了。最后,把通用数组中的内容依次移动到资源中 (您也许记得, 在 RESOURCEDATA 类中的 ‘=' 操作符被重载过了):

      resource = data;

在 unpackRequest 方法中对资源中参数的读取所进行的是反向的操作。

    void unpackRequest(string &method, string &url, string &headers, int &timeout, uchar &body[])
    {
      uchar array[];
      // 使用资源中的数据填充数组  
      int n = resource.Get(array);
      Print(ChartID(), ": Got ", n, " bytes in request");
      
      // 从数组中读取元数据
      ArrayCopy(metadata.sizes, array, 0, 0, LEADSIZE);
      int cursor = LEADSIZE;

      // 一个个读取所有的数据栏位      
      method = CharArrayToString(array, cursor, metadata.lengths.m);
      cursor += metadata.lengths.m;
      url = CharArrayToString(array, cursor, metadata.lengths.u);
      cursor += metadata.lengths.u;
      headers = CharArrayToString(array, cursor, metadata.lengths.h);
      cursor += metadata.lengths.h;
      
      ArrayCopy(int2chars.b, array, 0, cursor, metadata.lengths.t);
      timeout = int2chars.x;
      cursor += metadata.lengths.t;
      
      if(metadata.lengths.b > 0)
      {
        ArrayCopy(body, array, 0, cursor, metadata.lengths.b);
      }
    }
    
    ...

这里的主要工作是通过连续调用 resource.Get(array) 来进行的,然后,会按照步骤从‘array’ 中读取元数据字节,以及随后的栏位,

请求的执行结果是使用相同方法封装和解包的,分别使用的是 packResponse 和 unpackResponse 方法 (下面的附件中有完整代码).

    void packResponse(const string source, const uchar &result[], const string &result_headers);
    void unpackResponse(uchar &initiator[], uchar &headers[], uchar &text[]);

现在我们可以回到 ClientWebWorker 源代码中并完成 'request' 和 'receiveResult' 方法了。

class ClientWebWorker : public WebWorker
{
    ...

    bool request(const string method, const string url, const string headers, const int timeout, const uchar &body[], const long managerChartID)
    {
      _method = method;
      _url = url;

      ResourceMediator mediator(allocate());
      mediator.packRequest(method, url, headers, timeout, body);
    
      busy = EventChartCustom(managerChartID, 0 /* TODO: 特定的信息 */, chartID, 0.0, getFullName());
      return busy;
    }
    
    static void receiveResult(const string resname, uchar &initiator[], uchar &headers[], uchar &text[])
    {
      Print(ChartID(), ": Reading result ", resname);
      const RESOURCEDATA<uchar> resource(resname);
      ResourceMediator mediator(&resource);
      mediator.unpackResponse(initiator, headers, text);
    }
};

它们非常简单,因为常规工作都是由 ResourceMediator 类来完成的。

剩下的问题就是由谁以及什么时候调用 WebWorker 方法, 以及我们怎样得到一些工具参数的值,例如在 ‘request’ 方法中的 managerChartID。尽管我稍微超前了一点,我推荐把所有 WebWorker 类对象的管理分配到更高一层的类中,它可以支持实际对象的列表,并且在程序之间“代表”对象交换信息,包括管理器搜索信息。但是在我们转到这一新水平之前,需要为“服务器”部分完成类似的准备工作。


2.3. 基类 (继续)

让我们声明从 WebWorker 派生的自定义类来在“服务器”(管理器)端处理异步请求,就和客户端的 ClientWebWorker 类似。

class ServerWebWorker : public WebWorker
{
  public:
    ServerWebWorker(const long id, const string p = "WRP_"): WebWorker(id, p)
    {
    }
    
    bool transfer(const string resname, const long clientChartID)
    {
      // 回应 `clientChartID` 的客户端,任务名为 `resname` 
      // 并且把任务分配给 ID 为 `chartID` 的工作单元 
      busy = EventChartCustom(clientChartID, TO_MSG(MSG_ACCEPTED), chartID, 0.0, resname)
          && EventChartCustom(chartID, TO_MSG(MSG_WEB), clientChartID, 0.0, resname);
      return busy;
    }
    
    void receive(const string source, const uchar &result[], const string &result_headers)
    {
      ResourceMediator mediator(allocate());
      mediator.packResponse(source, result, result_headers);
    }
};

'transfer' 方法根据整个交互过程中的第二步,把对请求的处理分发到某个辅助EA的实例中。resname 参数是从客户端取得的资源名称,而 clientChartID 是客户窗口的 ID。我们从自定义事件中取得所有这些参数,自定义事件本身,包括 MSG_WEB, 在下面描述。

'receive' 方法在 WebWorker 当前对象中创建一个本地资源 ('allocate' 调用) 并且把名称写到原来请求初始的资源,另外还使用 ResourceMediator 类的 ‘mediator' 对象把从互联网上取得的数据 (result) 和 HTTP 头 (result_headers) 也写到资源中。这就是整体步骤中的第五步部分。

这样,我们就为客户端和“服务器”端都定义了 WebWorker 类,在这两种情况中,这些对象很大程度上都很近似。例如,一个客户可以一次下载几个文档,而在管理器端,一开始最好分发足够数量的助手,因为可能同时会有多个客户发来请求。让我们定义 WebWorkersPool 基类来用于处理对象数组。让我们把它作为模板,因为在客户端和“服务器”端保存对象的类型是不同的 (分别对应 ClientWebWorker 和 ServerWebWorker)。

template<typename T>
class WebWorkersPool
{
  protected:
    T *workers[];
    
  public:
    WebWorkersPool() {}
    
    WebWorkersPool(const uint size)
    {
      // 分配工作单元;在客户端它们用于在资源中保存请求参数
      ArrayResize(workers, size);
      for(int i = 0; i < ArraySize(workers); i++)
      {
        workers[i] = NULL;
      }
    }
    
    ~WebWorkersPool()
    {
      for(int i = 0; i < ArraySize(workers); i++)
      {
        if(CheckPointer(workers[i]) == POINTER_DYNAMIC) delete workers[i];
      }
    }
    
    int size() const
    {
      return ArraySize(workers);
    }
    
    void operator<<(T *worker)
    {
      const int n = ArraySize(workers);
      ArrayResize(workers, n + 1);
      workers[n] = worker;
    }
    
    T *findWorker(const string resname) const
    {
      for(int i = 0; i < ArraySize(workers); i++)
      {
        if(workers[i] != NULL)
        {
          if(workers[i].getFullName() == resname)
          {
            return workers[i];
          }
        }
      }
      return NULL;
    }
    
    T *getIdleWorker() const
    {
      for(int i = 0; i < ArraySize(workers); i++)
      {
        if(workers[i] != NULL)
        {
          if(ChartPeriod(workers[i].getChartID()) > 0) // check if exist
          {
            if(!workers[i].isBusy())
            {
              return workers[i];
            }
          }
        }
      }
      return NULL;
    }
    
    T *findWorker(const long id) const
    {
      for(int i = 0; i < ArraySize(workers); i++)
      {
        if(workers[i] != NULL)
        {
          if(workers[i].getChartID() == id)
          {
            return workers[i];
          }
        }
      }
      return NULL;
    }
    
    bool revoke(const long id)
    {
      for(int i = 0; i < ArraySize(workers); i++)
      {
        if(workers[i] != NULL)
        {
          if(workers[i].getChartID() == id)
          {
            if(CheckPointer(workers[i]) == POINTER_DYNAMIC) delete workers[i];
            workers[i] = NULL;
            return true;
          }
        }
      }
      return false;
    }
    
    int available() const
    {
      int count = 0;
      for(int i = 0; i < ArraySize(workers); i++)
      {
        if(workers[i] != NULL)
        {
          count++;
        }
      }
      return count;
    }
    
    T *operator[](int i) const
    {
      return workers[i];
    }
    
};

方法背后的思路很简单,构造函数和析构函数分配和释放指定大小处理器的数组,findWorker 和 getIdleWorker 方法组用于在数组中根据各种标准进行搜索, 'operator<<' 操作符可以动态增加对象,而 'revoke' 方法可以动态删除它们。

客户端处理器的池有一些特殊 (特别是关于事件处理的部分). 所以,我们扩展了基类,使用了派生的 ClientWebWorkersPool 类。

template<typename T>
class ClientWebWorkersPool: public WebWorkersPool<T>
{
  protected:
    long   managerChartID;
    short  managerPoolSize;
    string name;
    
  public:
    ClientWebWorkersPool(const uint size, const string prefix): WebWorkersPool(size)
    {
      name = prefix;
      // 尝试寻找 WebRequest 管理器图表
      WebWorker::broadcastEvent(TO_MSG(MSG_DISCOVER), ChartID());
    }
    
    bool WebRequestAsync(const string method, const string url, const string headers, int timeout, const char &data[])
    {
      T *worker = getIdleWorker();
      if(worker != NULL)
      {
        return worker.request(method, url, headers, timeout, data, managerChartID);
      }
      return false;
    }
    
    void onChartEvent(const int id, const long &lparam, const double &dparam, const string &sparam)
    {
      if(MSG(id) == MSG_DONE) // 异步请求完成,有结果或者出错
      {
        Print(ChartID(), ": Result code ", (long)dparam);
    
        if(sparam != NULL)
        {
          // 从资源中根据 sparam 中的名称读取数据
          uchar initiator[], headers[], text[];
          ClientWebWorker::receiveResult(sparam, initiator, headers, text);
          string resname = CharArrayToString(initiator);
          
          T *worker = findWorker(resname);
          if(worker != NULL)
          {
            worker.onResult((long)dparam, headers, text);
            worker.release();
          }
        }
      }
      
      ...
      
      else
      if(MSG(id) == MSG_HELLO) // MSG_DISCOVER 广播结果找到了管理器
      {
        if(managerChartID == 0 && lparam != 0)
        {
          if(ChartPeriod(lparam) > 0)
          {
            managerChartID = lparam;
            managerPoolSize = (short)dparam;
            for(int i = 0; i < ArraySize(workers); i++)
            {
              workers[i] = new T(ChartID(), name + (string)(i + 1) + "_");
            }
          }
        }
      }
    }
    
    bool isManagerBound() const
    {
      return managerChartID != 0;
    }
};

变量描述:

  • managerChartID — 找到的工作管理器窗口的 ID ;
  • managerPoolSize — 处理对象数组的初始大小;
  • name — 所有池中对象资源的通用前缀。


2.4. 交换消息

在 ClientWebWorkersPool 构造函数中, 我们看到了对 WebWorker::broadcastEvent(TO_MSG(MSG_DISCOVER), ChartID()) 的调用,它发送 MSG_DISCOVER 事件到所有的窗口,在事件参数中传入当前窗口的 ID。MSG_DISCOVER 是一个保留值: 它应当定义在相同头文件的开始,同时还有其它程序要交换的消息类型。

#define MSG_DEINIT   1 // 销毁 (管理器 <-> 工作单元)
#define MSG_WEB      2 // 开始请求 (客户 -> 管理器 -> 工作单元)
#define MSG_DONE     3 // 请求结束 (工作单元 -> 客户, 工作单元 -> 管理器)
#define MSG_ERROR    4 // 请求失败 (管理器 -> 客户, 工作单元 -> 客户)
#define MSG_DISCOVER 5 // 寻找管理器 (客户 -> 管理器)
#define MSG_ACCEPTED 6 // 请求正在进行 (管理器 -> 客户)
#define MSG_HELLO    7 // 找到了管理器 (管理器 -> 客户)

注释中标记了消息发送的方向。

TO_MSG 宏定义是设计用于把列出的ID转换为用户随机选择基础值的实际事件代码,我们将通过 MessageBroadcast 输入参数得到它。

sinput uint MessageBroadcast = 1;
 
#define TO_MSG(X) ((ushort)(MessageBroadcast + X))

这种方法可以把所有事件都通过修改基础值而转换到任何空闲范围。请注意,自定义事件可以在终端中由其它程序使用,所以,避免冲突是很重要的。

MessageBroadcast 输入参数将出现在我们所有使用 multiweb.mqh 文件的 MQL 程序中,也就是客户和管理器中。在管理器和客户中应当指定相同的 MessageBroadcast 值。

让我们回到 ClientWebWorkersPool 类,onChartEvent 方法占有特殊的位置,它将从标准的 OnChartEvent 事件处理函数中调用。事件的类型通过‘id'参数来传递。因为我们从系统中根据选定的基础数值接收代码,我们应当使用“镜像”的 MSG 宏来把它转回到 MSG_*** 范围:

#define MSG(x) (x - MessageBroadcast - CHARTEVENT_CUSTOM)

这里 CHARTEVENT_CUSTOM 就是终端中所有自定义事件的开始。

我们可以看到,在 ClientWebWorkersPool 中的 onChartEvent 方法处理了以上描述的一些消息,例如,管理器应当对 MSG_HELLO 做出回应而返回 MSG_DISCOVER 消息。在这种情况下,管理器窗口ID是在 lparam 参数中传递的,而可用的助手数量是在 dparam 参数中传递的。当管理器被侦测到,池使用所需类型的真实对象来填充空白的’workers'数组。当前窗口的ID,以及每个对象中的独有资源名称就传递给对象的构造函数,后者包含了通用的‘name'前缀和在数组中的序列号。

在 managerChartID 栏位收到了有意义的数值后,就可以发送请求到管理器了。'request' 方法就是在 ClientWebWorker 类中为此保留的,而它的用法展示在 WebRequestAsync 方法中。首先,WebRequestAsync 使用 getIdleWorkder 来找到一个空闲的处理器对象,然后再调用 worker.request(method, url, headers, timeout, data, managerChartID)。在 'request' 方法内部,我们有一个关于选择特定消息代码来发送事件的注释。现在,在探讨了事件子系统之后,我们可以构建最终版本的 ClientWebWorker::request 方法了:

class ClientWebWorker : public WebWorker
{
    ...

    bool request(const string method, const string url, const string headers, const int timeout, const uchar &body[], const long managerChartID)
    {
      _method = method;
      _url = url;

      ResourceMediator mediator(allocate());
      mediator.packRequest(method, url, headers, timeout, body);
    
      busy = EventChartCustom(managerChartID, TO_MSG(MSG_WEB), chartID, 0.0, getFullName());
      return busy;
    }
    
    ...
};

MSG_WEB 是关于执行web请求的消息,在接收到它之后,管理器应当找到一个空闲的助手EA,并向它传递客户资源名称 (sparam) 以及请求的参数,还有 chartID (lparam),即客户窗口 ID。

助手执行请求,并使用 MSG_DONE 事件把结果返回给客户 (如果成功) 或者使用 MSG_ERROR 返回错误代码(如果出了问题)。结果 (或者错误) 代码是传到 dparam 的, 而结果本身是打包到位于助手EA的资源中,而名称会传递给 sparam。在 MSG_DONE 分支,我们看到数据是如何从资源中读取的,调用的是之前探讨过的 ClientWebWorker::receiveResult(sparam, initiator, headers, text) 函数。然后,执行搜索客户处理器对象 (findWorker) ,根据的是请求的资源名称,再在侦测到的对象中执行一系列方法:

    T *worker = findWorker(resname);
    if(worker != NULL)
    {
      worker.onResult((long)dparam, headers, text);
      worker.release();
    }

我们已经知道了 'release' 方法 — 它可以释放已经不再需要的资源。什么是 onResult?如果我们看完整的源代码, 我们将看到 ClientWebWorker 类中含有两个没有实现的虚拟函数: onResult and onError. 这使得类称为抽象类。客户代码应当在从 ClientWebWorker 类派生时提供实现。方法的名称提示了,如果成功接收到结果,就调用 onResult,而如果出错就调用 onError。这可以使异步请求的工作类和使用它们的客户程序代码之间提供反馈,换句话说,客户程序不需要知道关于消息在核心内部使用的任何事情:所有客户代码的交互都已经在 MQL5 OOP 内建工具中提供了。

让我们看一下客户端的源代码 (multiwebclient.mq5)。


2.5. 客户 EA

测试的EA会通过 multiweb API 根据用户输入的数据发送几个请求,为此,我们需要包含头文件并增加输入参数:

sinput string Method = "GET";
sinput string URL = "https://google.com/,https://ya.ru,https://www.startpage.com/";
sinput string Headers = "User-Agent: n/a";
sinput int Timeout = 5000;

#include <multiweb.mqh>

最终,所有参数都是用于配置进行 HTTP 请求的,在 URL 列表中,我们可以列出几个逗号分隔的地址,以评估并行执行请求的速度。URL 参数在 OnInit 中使用 StringSplit 函数分成几段,就像这样:

int urlsnum;
string urls[];
  
void OnInit()
{
  // 取得用于测试请求的URL
  urlsnum = StringSplit(URL, ',', urls);
  ...
}

另外,我们需要在OnInit 中创建一个请求处理对象的池 (ClientWebWorkersPool) ,但是为了做到这一点,我们需要从 ClientWebWorker 类中派生我们的类。

class MyClientWebWorker : public ClientWebWorker
{
  public:
    MyClientWebWorker(const long id, const string p = "WRP_"): ClientWebWorker(id, p)
    {
    }
    
    virtual void onResult(const long code, const uchar &headers[], const uchar &text[]) override
    {
      Print(getMethod(), " ", getURL(), "\nReceived ", ArraySize(headers), " bytes in header, ", ArraySize(text), " bytes in document");
      // 不注释掉这个会导致可能有过多记录
      // Print(CharArrayToString(headers));
      // Print(CharArrayToString(text));
    }

    virtual void onError(const long code) override
    {
      Print("WebRequest error code ", code);
    }
};

它的唯一目标是记录状态和取得的数据,现在我们可以在OnInit 中为这样的对象创建一个池。

ClientWebWorkersPool<MyClientWebWorker> *pool = NULL;

void OnInit()
{
  ...
  pool = new ClientWebWorkersPool<MyClientWebWorker>(urlsnum, _Symbol + "_" + EnumToString(_Period) + "_");
  Comment("Click the chart to start downloads");
}

您可以看到,池是使用了 MyClientWebWorker 类参数化的,这就可以从开发库代码中创建我们的对象了。数组的大小选择等于输入地址的数量。这对演示目的是合理的:更小的数量表示处理队列正在处理而会破坏并行执行的思路,而更大的数量则会浪费资源。在真实项目中,池的大小不一定要等于任务的数量,但是这需要另外的数学验证。

资源的前缀设为所操作交易品种和图表时段的组合。

初始化的最后部分是搜索管理器窗口,您也许记得,搜索是在池自身中进行的 (ClientWebWorkersPool 类),客户代码只要确保能找到管理器。为此,让我们设置合理的时间,等待关于搜索管理器的消息,就应该能够保证得到回应。让我们把它设为5秒,为这个时间创建一个计时器:

void OnInit()
{
  ...
  // 等待管理器有5秒最大的讨论时间
  EventSetTimer(5);
}

检查在计时器处理函数中是否有管理器出现。如果连接没有建立好,就显示一个提醒。

void OnTimer()
{
  // 如果管理器在5秒中内都没有回应,看起来就丢失了。
  EventKillTimer();
  if(!pool.isManagerBound())
  {
    Alert("WebRequest Pool Manager (multiweb) is not running");
  }
}

不要忘记在 OnDeinit 处理函数中删除池对象。

void OnDeinit(const int reason)
{
  delete pool;
  Comment("");
}

为了让池来处理所有的服务消息而不需要我们的介入,首先,搜索管理器,使用的是标准的 OnChartEvent 图表事件处理函数:

void OnChartEvent(const int id, const long &lparam, const double &dparam, const string &sparam) 
{
  if(id == CHARTEVENT_CLICK) // 通过简单的用户操作初始化测试请求
  {
    ...
  }
  else
  {
    // 这个处理函数管理幕后所有重要的消息
    pool.onChartEvent(id, lparam, dparam, sparam);
  }
}

所有的事件,除了 CHARTEVENT_CLICK 之外, 都发送到池中,根据分析所使用事件的代码再执行相应的操作 ( onChartEvent 代码片段上面已经提供了).

CHARTEVENT_CLICK 事件是交互的,直接用于运行下载,最简单的情况下,它可能看起来如下:

void OnChartEvent(const int id, const long &lparam, const double &dparam, const string &sparam) 
{
  if(id == CHARTEVENT_CLICK) // 通过简单的用户操作初始化测试请求
  {
    if(pool.isManagerBound())
    {
      uchar Body[];

      for(int i = 0; i < urlsnum; i++)
      {
        pool.WebRequestAsync(Method, urls[i], Headers, Timeout, Body);
      }
    }
    ...

实例的完整代码有些长,因为它还包含着计算执行时间并把它与对相同地址集合串行调用标准 WebRequest 来作比较。


2.6. 管理器 EA 和助手 EA

我们终于到了”服务器“部分,因为基本机制已经在头文件中实现,管理器和助手的代码就没有想象得那样复杂了,

您也许记得,我们只有一个EA,它既以管理器工作,也可以作为助手 (multiweb.mq5 文件)。作为客户,我们包含头文件并声明输入参数:

sinput uint WebRequestPoolSize = 3;
sinput ulong ManagerChartID = 0;

#include <multiweb.mqh>

WebRequestPoolSize 是管理器应当创建辅助窗口的数量,再在其中运行助手,

ManagerChartID 是管理器窗口 ID,这个参数只有在助手中可以使用,是在助手从代码中自动运行的时候要填充到管理器中的。当运行管理器时人工填写 ManagerChartID 会被认为出错。

算法是基于两个全局变量构建的:

bool manager;
WebWorkersPool<ServerWebWorker> pool;

'manager' 本地标志指示了当前EA实例的角色,'pool' 变量是用于到来任务的处理器对象的数组。WebWorkersPool 是通过上面所描述的 ServerWebWorker 类来分类的。数组没有进一步初始化,因为它是根据角色填充的。

第一个运行的实例 (在 OnInit 中定义) 得到的是管理器角色。

const string GVTEMP = "WRP_GV_TEMP";

int OnInit()
{
  manager = false;
  
  if(!GlobalVariableCheck(GVTEMP))
  {
    // 当开始启动 multiweb 的第一个实例是,它被当成管理器
    // 全局变量是管理器存在的标志
    if(!GlobalVariableTemp(GVTEMP))
    {
      FAILED(GlobalVariableTemp);
      return INIT_FAILED;
    }
    
    manager = true;
    GlobalVariableSet(GVTEMP, 1);
    Print("WebRequest Pool Manager started in ", ChartID());
  }
  else
  {
    // 所有随后的 multiweb 实例都是工作单元/助手
    Print("WebRequest Worker started in ", ChartID(), "; manager in ", ManagerChartID);
  }
  
  // 使用计时器来延迟工作单元的初始化
  EventSetTimer(1);
  return INIT_SUCCEEDED;
}

EA 检查终端中是否有特定的全局变量,如果没有,EA 会把自身赋为管理器,并创建这样的一个全局变量。如果变量已经存在,那么这就是有管理器,所以这个实例就变成一个助手。请注意,全局变量是临时的,也就是说在终端重新启动的时候它不会被保存,但是如果管理器保留在任何图表上,它会再次创建这个变量。

计时器设为1秒,因为辅助图表的初始化会花好几秒,在 OnInit 中做这些不是最好的方案。在计时器事件处理函数中填充池:

void OnTimer()
{
  EventKillTimer();
  if(manager)
  {
    if(!instantiateWorkers())
    {
      Alert("Workers not initialized");
    }
    else
    {
      Comment("WebRequest Pool Manager ", ChartID(), "\nWorkers available: ", pool.available());
    }
  }
  else // 工作单元
  {
    // 这是用于资源的宿主,保存回应的头部和数据
    pool << new ServerWebWorker(ChartID(), "WRR_");
  }
}

如果是助手角色,就简单把另一个 ServerWebWorker 处理器对象加到数组中管理器的情况更加复杂,要在独立的 instantiateWorkers 函数中处理,让我们看看它。

bool instantiateWorkers()
{
  MqlParam Params[4];
  
  const string path = MQLInfoString(MQL_PROGRAM_PATH);
  const string experts = "\\MQL5\\";
  const int pos = StringFind(path, experts);
  
  // 再次启动自身 (以助手EA的角色)
  Params[0].string_value = StringSubstr(path, pos + StringLen(experts));
  
  Params[1].type = TYPE_UINT;
  Params[1].integer_value = 1; //  新的助手EA实例中有一个工作单元,用于返回结果到管理器或客户

  Params[2].type = TYPE_LONG;
  Params[2].integer_value = ChartID(); // 这个图表是管理器

  Params[3].type = TYPE_UINT;
  Params[3].integer_value = MessageBroadcast; // 使用相同的自定义事件基础编号
  
  for(uint i = 0; i < WebRequestPoolSize; ++i)
  {
    long chart = ChartOpen(_Symbol, _Period);
    if(chart == 0)
    {
      FAILED(ChartOpen);
      return false;
    }
    if(!EXPERT::Run(chart, Params))
    {
      FAILED(EXPERT::Run);
      return false;
    }
    pool << new ServerWebWorker(chart);
  }
  return true;
}

这个函数使用了第三方的 Expert 开发库,是由我们的老朋友 - MQL5 社区成员 fxsaber 开发的, 所以在源代码的开头加入了对应的头文件。

#include <fxsaber\Expert.mqh>

Expert 开发库允许您动态生成 tpl 模板和指定的 EA 参数,并且把它们应用到图表上,就能够载入 EA,在我们的实例中,所有助手EA的参数都是相同的,所以它们的列表在创建指定数量的窗口之前就生成了,

参数 0 指定了执行EA文件的路径,也就是它自己。参数 1 是 WebRequestPoolSize,它在每个助手中都等于1. 我已经提过,处理器对象在助手中只是用于保存 HTTP 请求结果的资源的,每个助手都通过阻塞式的 WebRequest 处理请求,也就是最多使用一个处理器对象。参数 2 — ManagerChartID 管理器窗口 ID. 参数 3 — 消息代码的基础数值 (MessageBroadcast 参数是来自 multiweb.mqh).

另外,在循环中使用了 ChartOpen 来创建了空白的图表,并且使用 EXPERT::Run (chart, Params) 来在其中运行 EA。ServerWebWorker(chart) 处理器对象在每个新窗口中创建,并加入池中。在管理器中,处理器对象就像助手窗口 ID 的连接和它们的状态,因为 HTTP 请求不是在管理器自身中进行的,而不会为它们创建资源。

来临的任务是根据用于在 OnChartEvent 中的事件来处理的。

void OnChartEvent(const int id, const long &lparam, const double &dparam, const string &sparam) 
{
  if(MSG(id) == MSG_DISCOVER) // 在新的客户图表中初始化一个工作单元EA并绑定到管理器
  {
    if(manager && (lparam != 0))
    {
      // 只有管理器使用它的图表ID回应,lparam 是客户图表 ID
      EventChartCustom(lparam, TO_MSG(MSG_HELLO), ChartID(), pool.available(), NULL);
    }
  }
  else
  if(MSG(id) == MSG_WEB) // 已经有了请求 web 下载的客户端
  {
    if(lparam != 0)
    {
      if(manager)
      {
        // 管理器把工作分发到空闲工作单元
        // lparam 是客户图表ID,而 sparam 是客户资源
        if(!transfer(lparam, sparam))
        {
          EventChartCustom(lparam, TO_MSG(MSG_ERROR), ERROR_NO_IDLE_WORKER, 0.0, sparam);
        }
      }
      else
      {
        // 工作单元实际处理 web 请求
        startWebRequest(lparam, sparam);
      }
    }
  }
  else
  if(MSG(id) == MSG_DONE) // 一个根据在 lparam 中的图表 ID 识别到的工作单元完成了工作
  {
    WebWorker *worker = pool.findWorker(lparam);
    if(worker != NULL)
    {
      // 我们这里是在管理器中,并且池中只保存工作单元而没有资源,
      // 所以这个 release 只是用于清除繁忙状态
      worker.release();
    }
  }
}

首先,为了回应来自客户端带有 lparam ID 的 MSG_DISCOVER 消息,管理器要返回含有它窗口 ID 的 MSG_HELLO 消息,

根据接收到的 MSG_WEB, lparam 应当包含发送请求的客户的窗口 ID,而 sparam 应当包含打包了请求参数的资源的名称。作为管理器工作,代码会把包含这些参数的任务传递给一个空闲助手,调用的是 'transfer' 函数 (下面会描述) 并把选中对象的状态设为 "busy(繁忙)"。如果没有空闲的助手,就向客户发送 MSG_ERROR 事件,代码为 ERROR_NO_IDLE_WORKER。助手在 startWebRequest 函数中执行 HTTP 请求。

当上传了所需的文档时,会从助手到管理器发送 MSG_DONE 消息,管理器从中根据 lparam 中的助手ID来寻找对应的对象,并通过调用 ‘release' 方法修改它的“busy"状态。已经说过,助手会把运行的结果直接发送给客户。

完整的代码中也包含了 MSG_DEINIT 事件,它与 OnDeinit 的处理相关。思路是,助手在管理器删除时被通知,然后自己退出并关闭它们的窗口,而再通知管理器助手已经被删除,并从管理器池中删除它。我相信,您可以自己了解其中的机制。

'transfer' 函数搜索空闲对象,并调用它的 'transfer' 方法 (上面讨论过)。

bool transfer(const long returnChartID, const string resname)
{
  ServerWebWorker *worker = pool.getIdleWorker();
  if(worker == NULL)
  {
    return false;
  }
  return worker.transfer(resname, returnChartID);
}

startWebRequest 函数描述如下:

void startWebRequest(const long returnChartID, const string resname)
{
  const RESOURCEDATA<uchar> resource(resname);
  ResourceMediator mediator(&resource);

  string method, url, headers;
  int timeout;
  uchar body[];

  mediator.unpackRequest(method, url, headers, timeout, body);

  char result[];
  string result_headers;
  
  int code = WebRequest(method, url, headers, timeout, body, result, result_headers);
  if(code != -1)
  {
    // 使用结果创建资源,通过自定义事件传回客户端
    ((ServerWebWorker *)pool[0]).receive(resname, result, result_headers);
    // 首先,向客户发送 MSG_DONE,包括结果资源。
    EventChartCustom(returnChartID, TO_MSG(MSG_DONE), ChartID(), (double)code, pool[0].getFullName());
    // 第二, 发送 MSG_DONE 到管理器,把对应工作单元设为空闲状态
    EventChartCustom(ManagerChartID, TO_MSG(MSG_DONE), ChartID(), (double)code, NULL);
  }
  else
  {
    // 错误代码在 dparam 中
    EventChartCustom(returnChartID, TO_MSG(MSG_ERROR), ERROR_MQL_WEB_REQUEST, (double)GetLastError(), resname);
    EventChartCustom(ManagerChartID, TO_MSG(MSG_DONE), ChartID(), (double)GetLastError(), NULL);
  }
}

通过使用 ResourceMediator, 这个函数解包取得请求的参数并调用标准的 MQL WebRequest 函数,如果函数执行没有出现 MQL 错误,就把结果发送给客户。为此,使用 ’receive' 方法把它们打包到本地资源中,而它的名称与 MSG_DONE 消息在 EventChartCustom 函数的 sparam 参数中传递。请注意,HTTP 错误 (例如,无效页面 404 或者 web 服务器错误 501) 也会在这里出现 — 客户可以在 dparam 参数中收到 HTTP 代码,在资源中收到回应的 HTTP 头部,这可以让我们进一步进行分析。

如果 WebRequest 调用以 MQL 错误结束, 客户会收到 MSG_ERROR 消息和 ERROR_MQL_WEB_REQUEST 代码, 而 GetLastError 的结果放在 dparam 中。因为这种情况下本地资源没有填充,来源资源的名称会直接放在 sparam 参数中,这样处理对象的某个实例还是能在客户端识别。

用于异步和并行调用 WebRequest 的 multiweb 开发库类图

图 4. 用于异步和并行调用 WebRequest 的 multiweb 开发库类图


3. 测试

可以按照如下方式测试所实现的软件。

首先,打开终端设置,并在专家页面中允许访问的URL列表中指定所有可以访问的服务器。

然后,运行 multiweb EA 并且在输入参数中设置三个助理。这样,可以打开三个新的窗口,含有角色不同的相同的 multiweb EA。EA 角色显示在窗口左上角的注释中。

现在,让我们在另一个图表中运行 multiwebclient 客户端 EA,并在图表上点击一次,使用默认的设置,它会启动三个并行的 web request 并把诊断信息写到记录中,包括取得数据的大小和运行时间。如果 TestSyncRequests 特定参数保持为 'true', 就会通过管理器使用标准 WebRequest 在并行 web request 之外再顺序进行同样的请求,这样做是为了比较两种选项的执行速度。按照原则,并行处理会比串行处理快几倍 - 从 sqrt(N) 到 N, 其中 N 是可用助手的数量。

示例记录显示如下:

01:16:50.587    multiweb (EURUSD,H1)    OnInit 129912254742671339
01:16:50.587    multiweb (EURUSD,H1)    WebRequest Pool Manager started in 129912254742671339
01:16:52.345    multiweb (EURUSD,H1)    OnInit 129912254742671345
01:16:52.345    multiweb (EURUSD,H1)    WebRequest Worker started in 129912254742671345; manager in 129912254742671339
01:16:52.757    multiweb (EURUSD,H1)    OnInit 129912254742671346
01:16:52.757    multiweb (EURUSD,H1)    WebRequest Worker started in 129912254742671346; manager in 129912254742671339
01:16:53.247    multiweb (EURUSD,H1)    OnInit 129912254742671347
01:16:53.247    multiweb (EURUSD,H1)    WebRequest Worker started in 129912254742671347; manager in 129912254742671339
01:17:16.029    multiweb (EURUSD,H1)    Pool manager transfers \Experts\multiwebclient.ex5::GBPJPY_PERIOD_M5_1_129560567193673862
01:17:16.029    multiweb (EURUSD,H1)    129912254742671345: Reading request \Experts\multiwebclient.ex5::GBPJPY_PERIOD_M5_1_129560567193673862
01:17:16.029    multiweb (EURUSD,H1)    129912254742671345: Got 64 bytes in request
01:17:16.029    multiweb (EURUSD,H1)    129912254742671345: GET https://google.com/ User-Agent: n/a 5000 
01:17:16.030    multiweb (EURUSD,H1)    Pool manager transfers \Experts\multiwebclient.ex5::GBPJPY_PERIOD_M5_2_129560567193673862
01:17:16.030    multiweb (EURUSD,H1)    129912254742671346: Reading request \Experts\multiwebclient.ex5::GBPJPY_PERIOD_M5_2_129560567193673862
01:17:16.030    multiwebclient (GBPJPY,M5)      Accepted: \Experts\multiwebclient.ex5::GBPJPY_PERIOD_M5_1_129560567193673862 after 0 retries
01:17:16.031    multiweb (EURUSD,H1)    129912254742671346: Got 60 bytes in request
01:17:16.031    multiweb (EURUSD,H1)    129912254742671346: GET https://ya.ru User-Agent: n/a 5000 
01:17:16.031    multiweb (EURUSD,H1)    Pool manager transfers \Experts\multiwebclient.ex5::GBPJPY_PERIOD_M5_3_129560567193673862
01:17:16.031    multiwebclient (GBPJPY,M5)      Accepted: \Experts\multiwebclient.ex5::GBPJPY_PERIOD_M5_2_129560567193673862 after 0 retries
01:17:16.031    multiwebclient (GBPJPY,M5)      Accepted: \Experts\multiwebclient.ex5::GBPJPY_PERIOD_M5_3_129560567193673862 after 0 retries
01:17:16.031    multiweb (EURUSD,H1)    129912254742671347: Reading request \Experts\multiwebclient.ex5::GBPJPY_PERIOD_M5_3_129560567193673862
01:17:16.032    multiweb (EURUSD,H1)    129912254742671347: Got 72 bytes in request
01:17:16.032    multiweb (EURUSD,H1)    129912254742671347: GET https://www.startpage.com/ User-Agent: n/a 5000 
01:17:16.296    multiwebclient (GBPJPY,M5)      129560567193673862: Result code 200
01:17:16.296    multiweb (EURUSD,H1)    Result code from 129912254742671346: 200, now idle
01:17:16.297    multiweb (EURUSD,H1)    129912254742671346: Done in 265ms
01:17:16.297    multiwebclient (GBPJPY,M5)      129560567193673862: Reading result \Experts\multiweb.ex5::WRR_129912254742671346
01:17:16.300    multiwebclient (GBPJPY,M5)      129560567193673862: Got 16568 bytes in response
01:17:16.300    multiwebclient (GBPJPY,M5)      GET https://ya.ru
01:17:16.300    multiwebclient (GBPJPY,M5)      Received 3704 bytes in header, 12775 bytes in document
01:17:16.715    multiwebclient (GBPJPY,M5)      129560567193673862: Result code 200
01:17:16.715    multiwebclient (GBPJPY,M5)      129560567193673862: Reading result \Experts\multiweb.ex5::WRR_129912254742671347
01:17:16.715    multiweb (EURUSD,H1)    129912254742671347: Done in 686ms
01:17:16.715    multiweb (EURUSD,H1)    Result code from 129912254742671347: 200, now idle
01:17:16.725    multiwebclient (GBPJPY,M5)      129560567193673862: Got 45236 bytes in response
01:17:16.725    multiwebclient (GBPJPY,M5)      GET https://www.startpage.com/
01:17:16.725    multiwebclient (GBPJPY,M5)      Received 822 bytes in header, 44325 bytes in document
01:17:16.900    multiwebclient (GBPJPY,M5)      129560567193673862: Result code 200
01:17:16.900    multiweb (EURUSD,H1)    Result code from 129912254742671345: 200, now idle
01:17:16.900    multiweb (EURUSD,H1)    129912254742671345: Done in 873ms
01:17:16.900    multiwebclient (GBPJPY,M5)      129560567193673862: Reading result \Experts\multiweb.ex5::WRR_129912254742671345
01:17:16.903    multiwebclient (GBPJPY,M5)      129560567193673862: Got 13628 bytes in response
01:17:16.903    multiwebclient (GBPJPY,M5)      GET https://google.com/
01:17:16.903    multiwebclient (GBPJPY,M5)      Received 790 bytes in header, 12747 bytes in document
01:17:16.903    multiwebclient (GBPJPY,M5)      > > > Async WebRequest workers [3] finished 3 tasks in 873ms

请注意,所有请求的总执行时间等于最慢的请求所执行的时间,

如果我们在管理器中把助手数量设为1,请求就被串行处理。


结论

在这篇文章中,我们探讨了一些类和完成的EA交易,它们是用于在非阻塞模式下执行 HTTP 请求的。这使我们可以从互联网中以几个并行线程的方式取得数据,从而提高了EA的效率,除了处理HTTP请求,也可以实时进行分析计算。另外,这个开发库还可以用于禁止使用标准 WebRequest 的指标中。为了实现整个架构,我们使用了很广范围的 MQL 特性:传递用户事件,创建资源,动态开启窗口并在其中运行EA等等。

在写这篇文章时,创建辅助窗口用于载入助手EA交易只是实现并行 HTTP 请求的一个仅有选项,但是 MetaQuotes 有计划开发特别的后台 MQL 程序。MQL5/Services 文件下已经为这样的服务做了保留。当这种技术在终端中出现时,这个开发库可能就可以通过使用服务来替换辅助窗口来进行改进了。

附件中的文件:

  • MQL5/Include/multiweb.mqh — 开发库
  • MQL5/Experts/multiweb.mq5 — 管理器 EA 和助手 EA 
  • MQL5/Experts/multiwebclient.mq5 — 演示的客户端 EA
  • MQL5/Include/fxsaber/Resource.mqh — 用于操作资源的辅助类
  • MQL5/Include/fxsaber/ResourceData.mqh — 用于操作资源的辅助类
  • MQL5/Include/fxsaber/Expert.mqh — 用于运行EA的辅助类
  • MQL5/Include/TypeToBytes.mqh — 数据转换开发库

全部回复

0/140

量化课程

    移动端课程