有幸得知开源之夏这个项目,并有机会参与到Dubbo-go的社区建设中。这里便记录自己对于Dubbo-go的源码阅读。
当拿到一款框架之后,一种不错的源码阅读方式大致如下:从运行最基础的 helloworld demo 源码开始 —> 再查看配置文件 —> 开启各种依赖服务(比如zk、consul) —> 开启服务端 —> 再到通过 client 调用服务端 —> 打印完整请求日志和回包。调用成功之后,再根据框架的设计模型,从配置文件解析开始,自顶向下递阅读整个框架的调用栈。 ——志信
由于Dubbo是一个服务端暴露接口的实现服务,消费端远程调用服务端提供的服务的一个RPC框架,所以分析源码也将分别从服务端服务是如何暴露的、消费端是如何调用服务的两个方面进行分析
一. 加载配置:将Service封装入ServiceConfig
init
应用配置 Application
配置中心 ConfigCenter
日志配置 Logger
注册中心配置(判断是否为应用级服务发现 && local 模式 )Registry
MetadataReport 配置 创建一个用于上报元数据的实例
路由配置
Provider 配置 如果是 triple 协议会多配置一个健康检查服务 + tripleReflection 服务
Consumer 配置
下线配置
start
优雅下线
Consumer
Provider
运行dubbo-sample中的helloworld示例demo,这是demo的服务端提供了GreeterProvider接口的实现,客户端则去调用该接口服务。
func main () { config.SetProviderService(&GreeterProvider{}) if err := config.Load(); err != nil { panic (err) } select {} }
上图是Dubbo-java官网给出的框架设计,在Service层之下就是Config层读取相应配置信息,最后逐层向下注册,最终实现服务端的暴露。
1 2 3 4 5 6 7 8 func SetProviderService (service common.RPCService) { ref := common.GetReference(service) proServicesLock.Lock() defer proServicesLock.Unlock() proServices[ref] = service }
config.Load()
读入配置文件,并初始化rootConfig,再分别Load Provider、Consumer,暴露MetaDataService与注册服务实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func Load (opts ...LoaderConfOption) error { conf := NewLoaderConf(opts...) if conf.rc == nil { koan := GetConfigResolver(conf) koan = conf.MergeConfig(koan) if err := koan.UnmarshalWithConf(rootConfig.Prefix(), rootConfig, koanf.UnmarshalConf{Tag: "yaml" }); err != nil { return err } } else { rootConfig = conf.rc } if err := rootConfig.Init(); err != nil { return err } return nil }
rootConfig.Init()中最后会初始化Provider、Consumer。
1 2 3 4 5 6 7 8 9 10 11 12 if err := rc.Provider.Init(rc); err != nil { return err }if err := rc.Consumer.Init(rc); err != nil { return err }if err := rc.Shutdown.Init(); err != nil { return err } rc.Start()
其中Provider.Init()逻辑如下,会做一些检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (c *ProviderConfig) Init (rc *RootConfig) error { ... ... for key, serviceConfig := range c.Services { if serviceConfig.Interface == "" { service := GetProviderService(key) supportPBPackagerNameSerivce, ok := service.(common.TriplePBService) if !ok { ... } else { serviceConfig.Interface = supportPBPackagerNameSerivce.XXX_InterfaceName() } } if err := serviceConfig.Init(rc); err != nil { return err } serviceConfig.adaptiveService = c.AdaptiveService } ... ... }
回到rootConfig.Init()的rc.Start()
中,即最关键的部分!
1 2 3 4 5 6 7 8 9 10 func (rc *RootConfig) Start () { startOnce.Do(func () { gracefulShutdownInit() rc.Consumer.Load() rc.Provider.Load() exportMetadataService() registerServiceInstance() }) }
先关注服务提供者的Load()函数。 svs即serviceConfig,由框架图也能看出对于服务Service封装的第一层就是Config层。
ServiceConfig包含了标示Service的Name与实现类的实例,然后调用Export()暴露服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (c *ProviderConfig) Load () { for key, svs := range c.Services { rpcService := GetProviderService(key) if rpcService == nil { logger.Warnf("Service reference key %s does not exist, please check if this key " + "matches your provider struct type name, or matches the returned valued of your provider struct's Reference() function." + "View https://www.yuque.com/u772707/eqpff0/pqfgz3#zxdw0 for details" , key) continue } svs.id = key svs.Implement(rpcService) if err := svs.Export(); err != nil { logger.Errorf(fmt.Sprintf("service %s export failed! err: %#v" , key, err)) } } }
二. 将ServiceConfig 封装入 ProxyInvoker
ServiceConfig.Export()
首先通过配置生成对应 registryUrl
1 regUrls := loadRegistries(s.RegistryIDs, s.RCRegistriesMap, common.PROVIDER)
regUrl 是用来向中心注册组件发起注册请求的,对于 zookeeper 的话,会传入其 ip 和端口号,以及附加的用户名密码等信息。
然后会为不同的协议分配随机端口,如果指定了多个中心注册协议,则会将服务通过多个中心注册协议的 registryProtocol 暴露出去。
对于一个注册协议,将用户传入的 rpcService 实例注册在 common.ServiceMap
1 methods, err := common.ServiceMap.Register(s.Interface, proto.Name, s.Group, s.Version, s.rpcService)
通过反射实现服务注册功能
这个 Register 函数将服务实例注册了两次,一次是以 Interface 为 key 写入接口服务组内,一次是以 interface 和 proto 为 key 写入特定的一个唯一的服务。
后续会从 common.Map 里面取出来Service实例。
1 2 3 4 5 6 7 type Service struct { name string rcvr reflect.Value rcvrType reflect.Type methods map [string ]*MethodType }
所谓服务注册就是由用户提供的实例创建Service实例的过程,并会通过哈希表ServiceMap保存以在服务调用时找到相应实例。
生成ivkURL即服务暴露的URL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ivkURL := common.NewURLWithOptions( common.WithPath(s.Interface), common.WithProtocol(proto.Name), common.WithIp(proto.Ip), common.WithPort(port), common.WithParams(urlMap), common.WithParamsValue(constant.BeanNameKey, s.id), common.WithMethods(strings.Split(methods, "," )), common.WithToken(s.Token), common.WithParamsValue(constant.MetadataTypeKey, s.metadataType), )if len (s.Tag) > 0 { ivkURL.AddParam(constant.Tagkey, s.Tag) }
获取默认Proxy工厂,将实例封装入代理 invoker
1 2 3 4 5 6 7 8 proxyFactory := extension.GetProxyFactory(s.ProxyFactoryKey) setRegistrySubURL(ivkURL, regUrl) invoker := proxyFactory.GetInvoker(regUrl) exporter := s.cacheProtocol.Export(invoker)
invoker实现的Invoke方法未来将由invoker.invoke(invocation)去执行服务【invocation是消费者请求最外层的抽象,包含所有请求参数(方法参数 + Attachment)】,invoker封装了许多过程:
1⃣️获取provider暴露的url,并通过url获取协议
2⃣️通过协议与invocation解析的请求接口从common.ServiceMap拿到Service实例
3⃣️通过Service实例内的方法与参数构建真正调用方法的参数
1 2 3 4 method := svc.Method()[methodName] in := []reflect.Value{svc.Rcvr()} ... returnValues := method.Method().Func.Call(in)
其实一句话来说就是invoke()封装了真正调用实例的逻辑。
三. registry 协议在 zkRegistry 上暴露上面的 ProxyInvoker 1 exporter := s.cacheProtocol.Export(invoker)
这里的cacheProtocol使用的是RegistryProtocol调用的方法为registry/protocol/protocol.go::Export()
获取注册中心url和服务暴露的url
1 2 registryUrl := getRegistryUrl(originInvoker) providerUrl := getProviderUrl(originInvoker)
获取注册中心实例
1 2 3 reg := proto.getRegistry(registryUrl) registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl)
通过相应协议如Dubbo暴露服务,监听相应端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 exporter := proto.doLocalExport(originInvoker, providerUrl)func (proto *registryProtocol) doLocalExport (originInvoker protocol.Invoker, providerUrl *common.URL) *exporterChangeableWrapper { key := getCacheKey(originInvoker) cachedExporter, loaded := proto.bounds.Load(key) if !loaded { invokerDelegate := newInvokerDelegate(originInvoker, providerUrl) cachedExporter = newExporterChangeableWrapper(originInvoker, extension.GetProtocol(protocolwrapper.FILTER).Export(invokerDelegate)) proto.bounds.Store(key, cachedExporter) } return cachedExporter.(*exporterChangeableWrapper) }
这里会调用ProtocolFilterWrapper的Export()函数。
在这个Export()函数中,会构建InvokerChain,并调用服务配置的协议即 DubboProtocol 的 Export 方法,将上述InvokerChain真正暴露。
关于调用链还没完全弄明白 Todo…
应该是根据配置的内容,通过链式调用的构造,将 ProxyInvoker 层层包裹在调用链的最底部,最终返回一个调用链 invoker。
1 2 3 4 5 6 7 8 9 10 func (pfw *ProtocolFilterWrapper) Export (invoker protocol.Invoker) protocol .Exporter { if pfw.protocol == nil { pfw.protocol = extension.GetProtocol(invoker.GetURL().Protocol) } invoker = BuildInvokerChain(invoker, constant.ServiceFilterKey) return pfw.protocol.Export(invoker) }
DubboProtocol.Export()主要做两件事:构造触发器、启动服务。
将传入的 Invoker 调用 chain 进一步封装,封装成一个 exporter,再将这个 export 放入 map 保存。注意!这里把 exporter 放入了 SetExporterMap中,在下面服务启动的时候,会以注册事件监听器的形式将这个 exporter 取出!
调用 DubboProtocol 的 openServer 方法,开启一个针对特定端口的监听。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (dp *DubboProtocol) Export (invoker protocol.Invoker) protocol .Exporter { url := invoker.GetURL() serviceKey := url.ServiceKey() exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap()) dp.SetExporterMap(serviceKey, exporter) logger.Infof("[DUBBO Protocol] Export service: %s" , url.String()) dp.openServer(url) return exporter }
zkRegistry 调用 Registry 方法,在 zookeeper 上注册 dubboPath
1 2 3 4 5 6 7 8 9 10 11 12 err := reg.Register(registeredProviderUrl)if err != nil { logger.Errorf("provider service %v register registry %v error, error message is %s" , providerUrl.Key(), registryUrl.Key(), err.Error()) return nil }go func () { if err := reg.Subscribe(overriderUrl, overrideSubscribeListener); err != nil { logger.Warnf("reg.subscribe(overriderUrl:%v) = error:%v" , overriderUrl, err) } }()
zkRegistry类包含registry.BaseRegistry 结构,BaseRegistry 结构定义了注册器基础的功能函数,比如 Registry、Subscribe 等,但在这些默认定义的函数内部,还是会调用 facade 层(zkRegistry 层)的具体实现函数,这一设计模型能在保证已有功能函数不需要重复定义的同时,引入外层函数的实现,类似于结构体继承却又复用了代码。这一设计模式值得学习。
1 2 3 4 5 6 7 8 9 type zkRegistry struct { registry.BaseRegistry client *gxzookeeper.ZookeeperClient listenerLock sync.Mutex listener *zookeeper.ZkEventListener dataListener *RegistryDataListener cltLock sync.Mutex zkPath map [string ]int }
至此,将服务端调用 url 注册到了 zookeeper 上,而客户端如果想获取到这个 url,只需要传入特定的 dubboPath,向 zk 请求即可。 客户端可以通过注册中心获取到服务的访问方式url了,并且服务端的特定服务已经启动,已开启特定协议端口的监听。
四. 注册触发动作 上述只是启动了服务,但还没有看到触发事件的细节,在openServer()方法中会调用如下Start()方法。这里可以看到tcpServer.RunEventLoop(s.newSession)
会创建的新会话。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (s *Server) Start () { var ( addr string tcpServer getty.Server ) addr = s.addr serverOpts := []getty.ServerOption{getty.WithLocalAddress(addr)} if s.conf.SSLEnabled { serverOpts = append (serverOpts, getty.WithServerSslEnabled(s.conf.SSLEnabled), getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder())) } serverOpts = append (serverOpts, getty.WithServerTaskPool(gxsync.NewTaskPoolSimple(s.conf.GrPoolSize))) tcpServer = getty.NewTCPServer(serverOpts...) tcpServer.RunEventLoop(s.newSession) logger.Debugf("s bind addr{%s} ok!" , s.addr) s.tcpServer = tcpServer }
新会话中很重要的一个配置是 EventListener,传入的是 DubboServer 的默认的rpcHandler。session.SetEventListener(s.rpcHandler)
注册的rpcHandler 有一个实现好的 OnMessage 函数,根据 getty 的 API,当 client 调用该端口时,会触发 OnMessage。这一函数实现了在 getty session 接收到 rpc 调用后的一系列处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (h *RpcServerHandler) OnMessage (session getty.Session, pkg interface {}) { ... ... decodeResult, drOK := pkg.(*remoting.DecodeResult) req := decodeResult.Result.(*remoting.Request) invoc, ok := req.Data.(*invocation.RPCInvocation) attachments := invoc.Attachments() attachments[constant.LocalAddr] = session.LocalAddr() attachments[constant.RemoteAddr] = session.RemoteAddr() result := h.server.requestHandler(invoc) ... ... resp.Result = result reply(session, resp) }
requestHandler在DubboProtocol中被设置为handler:=func(invocation *invocation.RPCInvocation) protocol.RPCResult {return doHandleRequest(invocation)}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func doHandleRequest (rpcInvocation *invocation.RPCInvocation) protocol .RPCResult { exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey()) result := protocol.RPCResult{} if exporter == nil { err := fmt.Errorf("don't have this exporter, key: %s" , rpcInvocation.ServiceKey()) logger.Errorf(err.Error()) result.Err = err return result } invoker := exporter.(protocol.Exporter).GetInvoker() if invoker != nil { ctx := rebuildCtx(rpcInvocation) invokeResult := invoker.Invoke(ctx, rpcInvocation) if err := invokeResult.Error(); err != nil { result.Err = invokeResult.Error() } else { result.Rest = invokeResult.Result() } result.Attrs = invokeResult.Attachments() } else { result.Err = fmt.Errorf("don't have the invoker, key: %s" , rpcInvocation.ServiceKey()) } return result }
整个被调过程一气呵成。实现了从 getty.Session 的调用事件,到经过层层封装的 invoker 的调用。
至此,一次RPC调用得以正确返回。
小结 个人认为想要深入理解RPC框架,可以先参考极客兔兔的7天写一个RPC框架,在那个教程里,其实对于Service的抽象和Dubbo是基本一致的,不过Dubbo在Service层之下由Config、Proxy、Registry等层会进一步封装,更加复杂,但通过这样的层层封装将复杂性不暴露给上一层。
简单来说调用过程就是
请求—>invocation—>服务key—>exporterMap.get(key)—>exporter—>invoker—>invoker.invoke(invocation)–>执行服务
能把一次调用的所有参数抽象成一次 invocation;
能把一个协议抽象成针对 invoke 的封装;
能把针对一次 invoke 所做出的特定改变封装到 invoke 函数内部,可以降低模块之间的耦合性。层层封装逻辑更加清晰。
参考 Dubbo-go 源码笔记(一)Server 端开启服务过程