我有分寸

Spray 中协议处理 Pipelines 的实现

gnawux codescalaspraycoding

最近在玩 scala,用到了 spray 来处理 Http,看了一下代码觉得很神奇,这里抄一段 spray 1.3 中的协议处理 pipeline 的实现,原始文件在 spray 中的 spray-io/src/main/scala/spray/io/Pipelines.scala

在 spray-io 中,网络协议可以拆分成多级流水线来处理,从网络到应用逐级升高,在前一级中处理低级事务,屏蔽掉一些底层机制,把高层决断交给后一级处理;而从应用到网络逐级降低,在前一级接受处理高层命令,分解成低级命令,交给后一级执行。spray-io框架规定了每层的接口,并帮助实现了拼接,而不同协议就负责实现自己的每级流水线。

比如 Spray-can 的 Http 协议栈处理中,就通过很多级的拼接来实现了一个协议栈:

    ServerFrontend(settings) >>
      RequestChunkAggregation(requestChunkAggregationLimit) ? (requestChunkAggregationLimit > 0) >>
      PipeliningLimiter(pipeliningLimit) ? (pipeliningLimit > 0) >>
      StatsSupport(statsHolder.get) ? statsSupport >>
      RemoteAddressHeaderSupport ? remoteAddressHeader >>
      SSLSessionInfoSupport ? parserSettings.sslSessionInfoHeader >>
      RequestParsing(settings) >>
      ResponseRendering(settings) >>
      ConnectionTimeouts(idleTimeout) ? (reapingCycle.isFinite && idleTimeout.isFinite) >>
      PreventHalfClosedConnections(sslEncryption) >>
      SslTlsSupport(maxEncryptionChunkSize, parserSettings.sslSessionInfoHeader) ? sslEncryption >>
      TickGenerator(reapingCycle) ? (reapingCycle.isFinite && (idleTimeout.isFinite || requestTimeout.isFinite)) >>
      BackPressureHandling(backpressureSettings.get.noAckRate, backpressureSettings.get.readingLowWatermark) ? autoBackPressureEnabled

这其中每一行都是一级流水线,>> 就是流水线拼接的方法,它们实现了这样一对流水线:

  /**
   * The HttpServerConnection pipeline setup:
   *
   * |------------------------------------------------------------------------------------------
   * | ServerFrontend: converts HttpMessagePart, Closed and SendCompleted events to
   * |                 MessageHandlerDispatch.DispatchCommand,
   * |                 generates HttpResponsePartRenderingContext
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | HttpMessagePart                     | HttpResponsePartRenderingContext
   *    | IOServer.Closed                     | IOServer.Tell
   *    | IOServer.SentOK                     |
   *    | TickGenerator.Tick                  |
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | RequestChunkAggregation: listens to HttpMessagePart events, generates HttpRequest events
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | HttpMessagePart                     | HttpResponsePartRenderingContext
   *    | IOServer.Closed                     | IOServer.Tell
   *    | IOServer.SentOK                     |
   *    | TickGenerator.Tick                  |
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | PipeliningLimiter: throttles incoming requests according to the PipeliningLimit, listens
   * |                    to HttpResponsePartRenderingContext commands and HttpRequestPart events,
   * |                    generates StopReading and ResumeReading commands
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | HttpMessagePart                     | HttpResponsePartRenderingContext
   *    | IOServer.Closed                     | IOServer.Tell
   *    | IOServer.SentOK                     | IOServer.StopReading
   *    | TickGenerator.Tick                  | IOServer.ResumeReading
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | StatsSupport: listens to most commands and events to collect statistics
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | HttpMessagePart                     | HttpResponsePartRenderingContext
   *    | IOServer.Closed                     | IOServer.Tell
   *    | IOServer.SentOK                     | IOServer.StopReading
   *    | TickGenerator.Tick                  | IOServer.ResumeReading
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | RemoteAddressHeaderSupport: add `Remote-Address` headers to incoming requests
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | HttpMessagePart                     | HttpResponsePartRenderingContext
   *    | IOServer.Closed                     | IOServer.Tell
   *    | IOServer.SentOK                     | IOServer.StopReading
   *    | TickGenerator.Tick                  | IOServer.ResumeReading
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | SSLSessionInfoSupport: add `SSL-Session-Info` header to incoming requests
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | HttpMessagePart                     | HttpResponsePartRenderingContext
   *    | IOServer.Closed                     | IOServer.Tell
   *    | IOServer.SentOK                     | IOServer.StopReading
   *    | TickGenerator.Tick                  | IOServer.ResumeReading
   *    | SslTlsSupport.SSLSessionEstablished |
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | RequestParsing: converts Received events to HttpMessagePart,
   * |                 generates HttpResponsePartRenderingContext (in case of errors)
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | IOServer.Closed                     | HttpResponsePartRenderingContext
   *    | IOServer.SentOK                     | IOServer.Tell
   *    | IOServer.Received                   | IOServer.StopReading
   *    | TickGenerator.Tick                  | IOServer.ResumeReading
   *    | SslTlsSupport.SSLSessionEstablished |
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | ResponseRendering: converts HttpResponsePartRenderingContext
   * |                    to Send and Close commands
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | IOServer.Closed                     | IOServer.Send
   *    | IOServer.SentOK                     | IOServer.Close
   *    | IOServer.Received                   | IOServer.Tell
   *    | TickGenerator.Tick                  | IOServer.StopReading
   *    | SslTlsSupport.SSLSessionEstablished | IOServer.ResumeReading
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | ConnectionTimeouts: listens to Received events and Send commands and
   * |                     TickGenerator.Tick, generates Close commands
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | IOServer.Closed                     | IOServer.Send
   *    | IOServer.SentOK                     | IOServer.Close
   *    | IOServer.Received                   | IOServer.Tell
   *    | TickGenerator.Tick                  | IOServer.StopReading
   *    | SslTlsSupport.SSLSessionEstablished | IOServer.ResumeReading
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | SslTlsSupport: listens to event Send and Close commands and Received events,
   * |                provides transparent encryption/decryption in both directions
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | IOServer.Closed                     | IOServer.Send
   *    | IOServer.SentOK                     | IOServer.Close
   *    | IOServer.Received                   | IOServer.Tell
   *    | TickGenerator.Tick                  | IOServer.StopReading
   *    |                                     | IOServer.ResumeReading
   *    |                                    \/
   * |------------------------------------------------------------------------------------------
   * | TickGenerator: listens to Closed events,
   * |                dispatches TickGenerator.Tick events to the head of the event PL
   * |------------------------------------------------------------------------------------------
   *    /\                                    |
   *    | IOServer.Closed                     | IOServer.Send
   *    | IOServer.SentOK                     | IOServer.Close
   *    | IOServer.Received                   | IOServer.Tell
   *    | TickGenerator.Tick                  | IOServer.StopReading
   *    |                                     | IOServer.ResumeReading
   *    |                                    \/
   */


流水线本身的实现非常简单,>> 操作符只有17行代码,整个 trait 也只有 26 行

trait RawPipelineStage[-C <: PipelineContext] { left => 
  type CPL = Pipeline[Command] // alias for brevity 
  type EPL = Pipeline[Event] // alias for brevity 
 
  def apply(context: C, commandPL: CPL, eventPL: EPL): Pipelines 
 
  def >>[R <: C](right: RawPipelineStage[R]): RawPipelineStage[R] = 
    if (right eq EmptyPipelineStage) this 
    else new RawPipelineStage[R] { 
      def apply(ctx: R, cpl: CPL, epl: EPL) = { 
        var cplProxy: CPL = Pipeline.Uninitialized 
        var eplProxy: EPL = Pipeline.Uninitialized 
        val cplProxyPoint: CPL = cplProxy(_) 
        val eplProxyPoint: EPL = eplProxy(_) 
        val leftPL = left(ctx, cplProxyPoint, epl) 
        val rightPL = right(ctx, cpl, eplProxyPoint) 
        cplProxy = rightPL.commandPipeline 
        eplProxy = leftPL.eventPipeline 
        Pipelines( 
          commandPL = (if (leftPL.commandPipeline eq cplProxyPoint) rightPL else leftPL).commandPipeline, 
          eventPL = (if (rightPL.eventPipeline eq eplProxyPoint) leftPL else rightPL).eventPipeline) 
      } 
    } 
 
  def ?(condition: Boolean): RawPipelineStage[C] = macro RawPipelineStage.enabled[C] 
}

这里解释一下这个拼接的实现:

  • RawPipelineStage自己是 left(左级),被拼接的是right(右级),拼接的结果仍然是 RawPipeline,可以拼接下一级
  • RawPipelineStage 有 apply方法,这个方法应该带有参数(Context,CPL, EPL),返回 Pipelines,Pipelines 就是两条流水线,Command Pipeline (CPL) 是下行的命令处理,Event Pipeline (EPL) 是上行的事件处理,每个实际都是一个 Command 或 Event 处理的方法(Command => Unit 或 Event => Unit)

  • 在拼接时,
    • 如果右侧是空的,那么拼接的结果就是自己
    • 如果右侧不是空的,那么拼接的结果是一个RawPipelineStage,大意是:
      • 构造一个新的RawPipelineapply方法,参数为 ctx, cpl, epl
      • 如果左级不改变 cmd pipeline 的话,就调用右级的 cpl,否则用左级的cpl,但把右级的 cpl 当参数传给左级;对 cmd 来说,右级是左级的下一级,左级处理过交给右级
      • 如果右级不改变 event pipeline 的话,就用左级的 epl,否则用右级的 epl,但把左级的 epl当作参数传给右级;对event来说,左级是右级的下一级,右级处理过交给左级

    • 以一个方向为例:
      • 首先初始化 cplProxy 是一个处理 Command 的 Pipeline
      • val cplProxyPoint 是调用 cplProxy 的调用方法
      • leftPL 调用了左级的 apply,生成一个 Pipelines,而在生成的过程中,CPL 参数传入的就是 cplProxyPoint,也就是会调用 cplProxy
      • 定 cpl 为 rightPL 的 command pipeline,也就是说,leftPL 的 cpl 可以使用 rightPL 的 cpl
      • 最后,如果 leftPL 的 cpl 就是 rightPL 的 cpl 的话,也就是说,左级对 command 处理没有任何附加操作,那么就直接使用右级的 command 处理,反之,就使用左级的处理,但是左级可以利用作为参数传进来的右级的command 处理
      • event 处理与此对称

最终调用 RawPipelineState apply 的方法是 spray-io ConnectionHandler.running 方法,送进来的是 baseCommandPipeline baseEventPipeline,分别作为两者的最后一级

  • baseCommandPipeline 送给最右级作为下一级
    • TcpWriteCommandTcpCloseCommand 送给connection
    • (Tcp.SuspendReading | Tcp.ResumeReading | Tcp.ResumeWriting)backpressure相关的送给connection
    • Pipeline.Tell(receiver, msg, sender)消息,为sendermsg送给receiver

  • baseEventPipeline送给最左级作为下一级
    • 处理 ConnectionClosed 消息,等待tcpConnection Actor结束来自杀
    • 丢弃消息,对于 Droppable 的不warning,否则warning

就是这样,完成,非常简洁,有些晦涩。

gnawux
me!#$!@#$@#$wangxu!@#$%^&*()_me