Spray 中协议处理 Pipelines 的实现
最近在玩 scala,用到了 spray 来处理 Http,看了一下代码觉得很神奇,这里抄一段 spray 1.3 中的协议处理 pipeline 的实现,原始文件在 spray 中的 spray-io/src/main/scala/spray/io/Pipelines.scala
在 spray-io 中,网络协议可以拆分成多级流水线来处理,从网络到应用逐级升高,在前一级中处理低级事务,屏蔽掉一些底层机制,把高层决断交给后一级处理;而从应用到网络逐级降低,在前一级接受处理高层命令,分解成低级命令,交给后一级执行。spray-io框架规定了每层的接口,并帮助实现了拼接,而不同协议就负责实现自己的每级流水线。
比如 Spray-can 的 Http 协议栈处理中,就通过很多级的拼接来实现了一个协议栈:
ServerFrontend(settings) >> RequestChunkAggregation(requestChunkAggregationLimit) ? (requestChunkAggregationLimit > 0) >> PipeliningLimiter(pipeliningLimit) ? (pipeliningLimit > 0) >> StatsSupport(statsHolder.get) ? statsSupport >> RemoteAddressHeaderSupport ? remoteAddressHeader >> SSLSessionInfoSupport ? parserSettings.sslSessionInfoHeader >> RequestParsing(settings) >> ResponseRendering(settings) >> ConnectionTimeouts(idleTimeout) ? (reapingCycle.isFinite && idleTimeout.isFinite) >> PreventHalfClosedConnections(sslEncryption) >> SslTlsSupport(maxEncryptionChunkSize, parserSettings.sslSessionInfoHeader) ? sslEncryption >> TickGenerator(reapingCycle) ? (reapingCycle.isFinite && (idleTimeout.isFinite || requestTimeout.isFinite)) >> BackPressureHandling(backpressureSettings.get.noAckRate, backpressureSettings.get.readingLowWatermark) ? autoBackPressureEnabled
这其中每一行都是一级流水线,>> 就是流水线拼接的方法,它们实现了这样一对流水线:
/** * The HttpServerConnection pipeline setup: * * |------------------------------------------------------------------------------------------ * | ServerFrontend: converts HttpMessagePart, Closed and SendCompleted events to * | MessageHandlerDispatch.DispatchCommand, * | generates HttpResponsePartRenderingContext * |------------------------------------------------------------------------------------------ * /\ | * | HttpMessagePart | HttpResponsePartRenderingContext * | IOServer.Closed | IOServer.Tell * | IOServer.SentOK | * | TickGenerator.Tick | * | \/ * |------------------------------------------------------------------------------------------ * | RequestChunkAggregation: listens to HttpMessagePart events, generates HttpRequest events * |------------------------------------------------------------------------------------------ * /\ | * | HttpMessagePart | HttpResponsePartRenderingContext * | IOServer.Closed | IOServer.Tell * | IOServer.SentOK | * | TickGenerator.Tick | * | \/ * |------------------------------------------------------------------------------------------ * | PipeliningLimiter: throttles incoming requests according to the PipeliningLimit, listens * | to HttpResponsePartRenderingContext commands and HttpRequestPart events, * | generates StopReading and ResumeReading commands * |------------------------------------------------------------------------------------------ * /\ | * | HttpMessagePart | HttpResponsePartRenderingContext * | IOServer.Closed | IOServer.Tell * | IOServer.SentOK | IOServer.StopReading * | TickGenerator.Tick | IOServer.ResumeReading * | \/ * |------------------------------------------------------------------------------------------ * | StatsSupport: listens to most commands and events to collect statistics * |------------------------------------------------------------------------------------------ * /\ | * | HttpMessagePart | HttpResponsePartRenderingContext * | IOServer.Closed | IOServer.Tell * | IOServer.SentOK | IOServer.StopReading * | TickGenerator.Tick | IOServer.ResumeReading * | \/ * |------------------------------------------------------------------------------------------ * | RemoteAddressHeaderSupport: add `Remote-Address` headers to incoming requests * |------------------------------------------------------------------------------------------ * /\ | * | HttpMessagePart | HttpResponsePartRenderingContext * | IOServer.Closed | IOServer.Tell * | IOServer.SentOK | IOServer.StopReading * | TickGenerator.Tick | IOServer.ResumeReading * | \/ * |------------------------------------------------------------------------------------------ * | SSLSessionInfoSupport: add `SSL-Session-Info` header to incoming requests * |------------------------------------------------------------------------------------------ * /\ | * | HttpMessagePart | HttpResponsePartRenderingContext * | IOServer.Closed | IOServer.Tell * | IOServer.SentOK | IOServer.StopReading * | TickGenerator.Tick | IOServer.ResumeReading * | SslTlsSupport.SSLSessionEstablished | * | \/ * |------------------------------------------------------------------------------------------ * | RequestParsing: converts Received events to HttpMessagePart, * | generates HttpResponsePartRenderingContext (in case of errors) * |------------------------------------------------------------------------------------------ * /\ | * | IOServer.Closed | HttpResponsePartRenderingContext * | IOServer.SentOK | IOServer.Tell * | IOServer.Received | IOServer.StopReading * | TickGenerator.Tick | IOServer.ResumeReading * | SslTlsSupport.SSLSessionEstablished | * | \/ * |------------------------------------------------------------------------------------------ * | ResponseRendering: converts HttpResponsePartRenderingContext * | to Send and Close commands * |------------------------------------------------------------------------------------------ * /\ | * | IOServer.Closed | IOServer.Send * | IOServer.SentOK | IOServer.Close * | IOServer.Received | IOServer.Tell * | TickGenerator.Tick | IOServer.StopReading * | SslTlsSupport.SSLSessionEstablished | IOServer.ResumeReading * | \/ * |------------------------------------------------------------------------------------------ * | ConnectionTimeouts: listens to Received events and Send commands and * | TickGenerator.Tick, generates Close commands * |------------------------------------------------------------------------------------------ * /\ | * | IOServer.Closed | IOServer.Send * | IOServer.SentOK | IOServer.Close * | IOServer.Received | IOServer.Tell * | TickGenerator.Tick | IOServer.StopReading * | SslTlsSupport.SSLSessionEstablished | IOServer.ResumeReading * | \/ * |------------------------------------------------------------------------------------------ * | SslTlsSupport: listens to event Send and Close commands and Received events, * | provides transparent encryption/decryption in both directions * |------------------------------------------------------------------------------------------ * /\ | * | IOServer.Closed | IOServer.Send * | IOServer.SentOK | IOServer.Close * | IOServer.Received | IOServer.Tell * | TickGenerator.Tick | IOServer.StopReading * | | IOServer.ResumeReading * | \/ * |------------------------------------------------------------------------------------------ * | TickGenerator: listens to Closed events, * | dispatches TickGenerator.Tick events to the head of the event PL * |------------------------------------------------------------------------------------------ * /\ | * | IOServer.Closed | IOServer.Send * | IOServer.SentOK | IOServer.Close * | IOServer.Received | IOServer.Tell * | TickGenerator.Tick | IOServer.StopReading * | | IOServer.ResumeReading * | \/ */
流水线本身的实现非常简单,>> 操作符只有17行代码,整个 trait 也只有 26 行
trait RawPipelineStage[-C <: PipelineContext] { left => type CPL = Pipeline[Command] // alias for brevity type EPL = Pipeline[Event] // alias for brevity def apply(context: C, commandPL: CPL, eventPL: EPL): Pipelines def >>[R <: C](right: RawPipelineStage[R]): RawPipelineStage[R] = if (right eq EmptyPipelineStage) this else new RawPipelineStage[R] { def apply(ctx: R, cpl: CPL, epl: EPL) = { var cplProxy: CPL = Pipeline.Uninitialized var eplProxy: EPL = Pipeline.Uninitialized val cplProxyPoint: CPL = cplProxy(_) val eplProxyPoint: EPL = eplProxy(_) val leftPL = left(ctx, cplProxyPoint, epl) val rightPL = right(ctx, cpl, eplProxyPoint) cplProxy = rightPL.commandPipeline eplProxy = leftPL.eventPipeline Pipelines( commandPL = (if (leftPL.commandPipeline eq cplProxyPoint) rightPL else leftPL).commandPipeline, eventPL = (if (rightPL.eventPipeline eq eplProxyPoint) leftPL else rightPL).eventPipeline) } } def ?(condition: Boolean): RawPipelineStage[C] = macro RawPipelineStage.enabled[C] }
这里解释一下这个拼接的实现:
- RawPipelineStage自己是 left(左级),被拼接的是right(右级),拼接的结果仍然是 RawPipeline,可以拼接下一级
-
RawPipelineStage 有 apply方法,这个方法应该带有参数(Context,CPL, EPL),返回 Pipelines,Pipelines 就是两条流水线,Command Pipeline (CPL) 是下行的命令处理,Event Pipeline (EPL) 是上行的事件处理,每个实际都是一个 Command 或 Event 处理的方法(Command => Unit 或 Event => Unit)
- 在拼接时,
- 如果右侧是空的,那么拼接的结果就是自己
- 如果右侧不是空的,那么拼接的结果是一个RawPipelineStage,大意是:
- 构造一个新的RawPipeline的apply方法,参数为 ctx, cpl, epl
- 如果左级不改变 cmd pipeline 的话,就调用右级的 cpl,否则用左级的cpl,但把右级的 cpl 当参数传给左级;对 cmd 来说,右级是左级的下一级,左级处理过交给右级
- 如果右级不改变 event pipeline 的话,就用左级的 epl,否则用右级的 epl,但把左级的 epl当作参数传给右级;对event来说,左级是右级的下一级,右级处理过交给左级
- 以一个方向为例:
- 首先初始化 cplProxy 是一个处理 Command 的 Pipeline
- val cplProxyPoint 是调用 cplProxy 的调用方法
- leftPL 调用了左级的 apply,生成一个 Pipelines,而在生成的过程中,CPL 参数传入的就是 cplProxyPoint,也就是会调用 cplProxy
- 定 cpl 为 rightPL 的 command pipeline,也就是说,leftPL 的 cpl 可以使用 rightPL 的 cpl
- 最后,如果 leftPL 的 cpl 就是 rightPL 的 cpl 的话,也就是说,左级对 command 处理没有任何附加操作,那么就直接使用右级的 command 处理,反之,就使用左级的处理,但是左级可以利用作为参数传进来的右级的command 处理
- event 处理与此对称
最终调用 RawPipelineState 的 apply 的方法是 spray-io 的ConnectionHandler.running 方法,送进来的是 baseCommandPipeline 和 baseEventPipeline,分别作为两者的最后一级
- baseCommandPipeline 送给最右级作为下一级
- TcpWriteCommand和TcpCloseCommand 送给connection
- (Tcp.SuspendReading | Tcp.ResumeReading | Tcp.ResumeWriting)backpressure相关的送给connection
- Pipeline.Tell(receiver, msg, sender)消息,为sender把msg送给receiver
- baseEventPipeline送给最左级作为下一级
- 处理 ConnectionClosed 消息,等待tcpConnection Actor结束来自杀
- 丢弃消息,对于 Droppable 的不warning,否则warning
就是这样,完成,非常简洁,有些晦涩。