Dubbo-go Provider

本文最后更新于:2 年前

有幸得知开源之夏这个项目,并有机会参与到Dubbo-go的社区建设中。这里便记录自己对于Dubbo-go的源码阅读。

当拿到一款框架之后,一种不错的源码阅读方式大致如下:从运行最基础的 helloworld demo 源码开始 —> 再查看配置文件 —> 开启各种依赖服务(比如zk、consul) —> 开启服务端 —> 再到通过 client 调用服务端 —> 打印完整请求日志和回包。调用成功之后,再根据框架的设计模型,从配置文件解析开始,自顶向下递阅读整个框架的调用栈。 ——志信

由于Dubbo是一个服务端暴露接口的实现服务,消费端远程调用服务端提供的服务的一个RPC框架,所以分析源码也将分别从服务端服务是如何暴露的、消费端是如何调用服务的两个方面进行分析

一. 加载配置:将Service封装入ServiceConfig

init

应用配置 Application

配置中心 ConfigCenter

日志配置 Logger

注册中心配置(判断是否为应用级服务发现 && local 模式 )Registry

MetadataReport 配置 创建一个用于上报元数据的实例

路由配置

Provider 配置 如果是 triple 协议会多配置一个健康检查服务 + tripleReflection 服务

Consumer 配置

下线配置

start

优雅下线

Consumer

Provider

运行dubbo-sample中的helloworld示例demo,这是demo的服务端提供了GreeterProvider接口的实现,客户端则去调用该接口服务。

1
2
3
4
5
6
7
8
func main() {
config.SetProviderService(&GreeterProvider{})
// 读入配置文件,根据配置文件的内容,将注册的 service 实现到配置结构里,再调用 Export 暴露给特定的 registry,进而开启特定的 service 进行对应端口的 tcp 监听,成功启动并且暴露服务。
if err := config.Load(); err != nil {
panic(err)
}
select {}
}

Dubbo Frame

上图是Dubbo-java官网给出的框架设计,在Service层之下就是Config层读取相应配置信息,最后逐层向下注册,最终实现服务端的暴露。

1
2
3
4
5
6
7
8
// SetProviderService is called by init() of implement of RPCService
func SetProviderService(service common.RPCService) {
// ref就是传入的类的string name,并建立好对应的map
ref := common.GetReference(service)
proServicesLock.Lock()
defer proServicesLock.Unlock()
proServices[ref] = service
}

config.Load()读入配置文件,并初始化rootConfig,再分别Load Provider、Consumer,暴露MetaDataService与注册服务实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func Load(opts ...LoaderConfOption) error {
// conf
conf := NewLoaderConf(opts...)
if conf.rc == nil {
koan := GetConfigResolver(conf)
koan = conf.MergeConfig(koan)
if err := koan.UnmarshalWithConf(rootConfig.Prefix(),
rootConfig, koanf.UnmarshalConf{Tag: "yaml"}); err != nil {
return err
}
} else {
rootConfig = conf.rc
}
// 这是关键,初始化dubbo-go
if err := rootConfig.Init(); err != nil {
return err
}
return nil
}

rootConfig.Init()中最后会初始化Provider、Consumer。

1
2
3
4
5
6
7
8
9
10
11
12
// provider、consumer must last init
if err := rc.Provider.Init(rc); err != nil {
return err
}
if err := rc.Consumer.Init(rc); err != nil {
return err
}
if err := rc.Shutdown.Init(); err != nil {
return err
}
// todo if we can remove this from Init in the future?
rc.Start()

其中Provider.Init()逻辑如下,会做一些检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 先init再Load
func (c *ProviderConfig) Init(rc *RootConfig) error {
...
...
for key, serviceConfig := range c.Services {
if serviceConfig.Interface == "" {
// // 这个GetProviderService便是利用我们第一步SetProviderService的map[string]service实例通过配置文件的name key拿出实例去做相应配置
service := GetProviderService(key)
// try to use interface name defined by pb
supportPBPackagerNameSerivce, ok := service.(common.TriplePBService)
if !ok {
...
} else {
// use interface name defined by pb
serviceConfig.Interface = supportPBPackagerNameSerivce.XXX_InterfaceName()
}
}
//
if err := serviceConfig.Init(rc); err != nil {
return err
}

serviceConfig.adaptiveService = c.AdaptiveService
}
...
...
}

回到rootConfig.Init()的rc.Start()中,即最关键的部分!

1
2
3
4
5
6
7
8
9
10
func (rc *RootConfig) Start() {
startOnce.Do(func() {
gracefulShutdownInit()
rc.Consumer.Load()
rc.Provider.Load()
// todo if register consumer instance or has exported services
exportMetadataService()
registerServiceInstance()
})
}

先关注服务提供者的Load()函数。 svs即serviceConfig,由框架图也能看出对于服务Service封装的第一层就是Config层。

ServiceConfig包含了标示Service的Name与实现类的实例,然后调用Export()暴露服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *ProviderConfig) Load() {
for key, svs := range c.Services {
// rpcService就是用户new的服务实例
rpcService := GetProviderService(key)
if rpcService == nil {
logger.Warnf("Service reference key %s does not exist, please check if this key "+
"matches your provider struct type name, or matches the returned valued of your provider struct's Reference() function."+
"View https://www.yuque.com/u772707/eqpff0/pqfgz3#zxdw0 for details", key)
continue
}
svs.id = key
// svs.rpcService = rpcService 即将实例写入ServiceConfig
svs.Implement(rpcService)
// Export()去暴露服务
if err := svs.Export(); err != nil {
logger.Errorf(fmt.Sprintf("service %s export failed! err: %#v", key, err))
}
}
}

二. 将ServiceConfig 封装入 ProxyInvoker

ServiceConfig.Export()

  1. 首先通过配置生成对应 registryUrl
1
regUrls := loadRegistries(s.RegistryIDs, s.RCRegistriesMap, common.PROVIDER)

regUrl 是用来向中心注册组件发起注册请求的,对于 zookeeper 的话,会传入其 ip 和端口号,以及附加的用户名密码等信息。

然后会为不同的协议分配随机端口,如果指定了多个中心注册协议,则会将服务通过多个中心注册协议的 registryProtocol 暴露出去。

  1. 对于一个注册协议,将用户传入的 rpcService 实例注册在 common.ServiceMap
1
methods, err := common.ServiceMap.Register(s.Interface, proto.Name, s.Group, s.Version, s.rpcService)

通过反射实现服务注册功能

这个 Register 函数将服务实例注册了两次,一次是以 Interface 为 key 写入接口服务组内,一次是以 interface 和 proto 为 key 写入特定的一个唯一的服务。

后续会从 common.Map 里面取出来Service实例。

1
2
3
4
5
6
7
// Service is description of service
type Service struct {
name string
rcvr reflect.Value
rcvrType reflect.Type
methods map[string]*MethodType
}

所谓服务注册就是由用户提供的实例创建Service实例的过程,并会通过哈希表ServiceMap保存以在服务调用时找到相应实例。

  1. 生成ivkURL即服务暴露的URL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ivkURL := common.NewURLWithOptions(
common.WithPath(s.Interface),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(port),
common.WithParams(urlMap),
common.WithParamsValue(constant.BeanNameKey, s.id),
//common.WithParamsValue(constant.SslEnabledKey, strconv.FormatBool(config.GetSslEnabled())),
common.WithMethods(strings.Split(methods, ",")),
common.WithToken(s.Token),
common.WithParamsValue(constant.MetadataTypeKey, s.metadataType),
)
if len(s.Tag) > 0 {
ivkURL.AddParam(constant.Tagkey, s.Tag)
}
  1. 获取默认Proxy工厂,将实例封装入代理 invoker
1
2
3
4
5
6
7
8
proxyFactory := extension.GetProxyFactory(s.ProxyFactoryKey)
// 把ivkURL嵌在regURL下
setRegistrySubURL(ivkURL, regUrl)
// ProxyFactory通过ivkURL、regURL生成invoker
invoker := proxyFactory.GetInvoker(regUrl)
// 一般cacheProtocol为此:s.cacheProtocol = extension.GetProtocol("registry")
// Export() 生成exporter,开启tcp监听
exporter := s.cacheProtocol.Export(invoker)

invoker实现的Invoke方法未来将由invoker.invoke(invocation)去执行服务【invocation是消费者请求最外层的抽象,包含所有请求参数(方法参数 + Attachment)】,invoker封装了许多过程:

1⃣️获取provider暴露的url,并通过url获取协议

2⃣️通过协议与invocation解析的请求接口从common.ServiceMap拿到Service实例

3⃣️通过Service实例内的方法与参数构建真正调用方法的参数

1
2
3
4
method := svc.Method()[methodName]
in := []reflect.Value{svc.Rcvr()}
... // 一堆append
returnValues := method.Method().Func.Call(in)

其实一句话来说就是invoke()封装了真正调用实例的逻辑。

三. registry 协议在 zkRegistry 上暴露上面的 ProxyInvoker

1
exporter := s.cacheProtocol.Export(invoker)

这里的cacheProtocol使用的是RegistryProtocol调用的方法为registry/protocol/protocol.go::Export()

  1. 获取注册中心url和服务暴露的url
1
2
registryUrl := getRegistryUrl(originInvoker)
providerUrl := getProviderUrl(originInvoker)
  1. 获取注册中心实例
1
2
3
// zookeeper则拿到zkRegistry,Registry接口需要实现Register、Subscribe等方法
reg := proto.getRegistry(registryUrl)
registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl)
  1. 通过相应协议如Dubbo暴露服务,监听相应端口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// export invoker
exporter := proto.doLocalExport(originInvoker, providerUrl)

func (proto *registryProtocol) doLocalExport(originInvoker protocol.Invoker, providerUrl *common.URL) *exporterChangeableWrapper {
key := getCacheKey(originInvoker)
cachedExporter, loaded := proto.bounds.Load(key)
if !loaded {
// new Exporter
invokerDelegate := newInvokerDelegate(originInvoker, providerUrl)
// 这里的Export调用的是protocol/protocolwrapped/protocol_filter_wrapper.go:Export()
cachedExporter = newExporterChangeableWrapper(originInvoker,
extension.GetProtocol(protocolwrapper.FILTER).Export(invokerDelegate))
proto.bounds.Store(key, cachedExporter)
}
return cachedExporter.(*exporterChangeableWrapper)
}

这里会调用ProtocolFilterWrapper的Export()函数。

在这个Export()函数中,会构建InvokerChain,并调用服务配置的协议即 DubboProtocol 的 Export 方法,将上述InvokerChain真正暴露。

关于调用链还没完全弄明白 Todo…

应该是根据配置的内容,通过链式调用的构造,将 ProxyInvoker 层层包裹在调用链的最底部,最终返回一个调用链 invoker。

1
2
3
4
5
6
7
8
9
10
// Export service for remote invocation
func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Exporter {
if pfw.protocol == nil {
pfw.protocol = extension.GetProtocol(invoker.GetURL().Protocol)
}
// 构造InvokerChain
invoker = BuildInvokerChain(invoker, constant.ServiceFilterKey)
// 这里是DubboProtocol.Export()
return pfw.protocol.Export(invoker)
}

DubboProtocol.Export()主要做两件事:构造触发器、启动服务。

  • 将传入的 Invoker 调用 chain 进一步封装,封装成一个 exporter,再将这个 export 放入 map 保存。注意!这里把 exporter 放入了 SetExporterMap中,在下面服务启动的时候,会以注册事件监听器的形式将这个 exporter 取出!
  • 调用 DubboProtocol 的 openServer 方法,开启一个针对特定端口的监听。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Export export dubbo service.
// 这里Exporter传入的是一个经过filter构造的包含调用链的Invoker与一个哈希表
func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetURL()
serviceKey := url.ServiceKey()
exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap())
dp.SetExporterMap(serviceKey, exporter)
logger.Infof("[DUBBO Protocol] Export service: %s", url.String())
// start server
// 开启了一个getty的tcp server,并放在dp.serverMap中
dp.openServer(url)
// 返回exporter,业务代码在exporter.invoker.invoker.invoker...里。
return exporter
}
  1. zkRegistry 调用 Registry 方法,在 zookeeper 上注册 dubboPath
1
2
3
4
5
6
7
8
9
10
11
12
err := reg.Register(registeredProviderUrl)
if err != nil {
logger.Errorf("provider service %v register registry %v error, error message is %s",
providerUrl.Key(), registryUrl.Key(), err.Error())
return nil
}

go func() {
if err := reg.Subscribe(overriderUrl, overrideSubscribeListener); err != nil {
logger.Warnf("reg.subscribe(overriderUrl:%v) = error:%v", overriderUrl, err)
}
}()

zkRegistry类包含registry.BaseRegistry 结构,BaseRegistry 结构定义了注册器基础的功能函数,比如 Registry、Subscribe 等,但在这些默认定义的函数内部,还是会调用 facade 层(zkRegistry 层)的具体实现函数,这一设计模型能在保证已有功能函数不需要重复定义的同时,引入外层函数的实现,类似于结构体继承却又复用了代码。这一设计模式值得学习。

1
2
3
4
5
6
7
8
9
type zkRegistry struct {
registry.BaseRegistry
client *gxzookeeper.ZookeeperClient
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
dataListener *RegistryDataListener
cltLock sync.Mutex
zkPath map[string]int // key = protocol://ip:port/interface
}

至此,将服务端调用 url 注册到了 zookeeper 上,而客户端如果想获取到这个 url,只需要传入特定的 dubboPath,向 zk 请求即可。 客户端可以通过注册中心获取到服务的访问方式url了,并且服务端的特定服务已经启动,已开启特定协议端口的监听。

四. 注册触发动作

上述只是启动了服务,但还没有看到触发事件的细节,在openServer()方法中会调用如下Start()方法。这里可以看到tcpServer.RunEventLoop(s.newSession)会创建的新会话。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Start dubbo server.
func (s *Server) Start() {
var (
addr string
tcpServer getty.Server
)

addr = s.addr
serverOpts := []getty.ServerOption{getty.WithLocalAddress(addr)}
if s.conf.SSLEnabled {
serverOpts = append(serverOpts, getty.WithServerSslEnabled(s.conf.SSLEnabled),
getty.WithServerTlsConfigBuilder(config.GetServerTlsConfigBuilder()))
}

serverOpts = append(serverOpts, getty.WithServerTaskPool(gxsync.NewTaskPoolSimple(s.conf.GrPoolSize)))

tcpServer = getty.NewTCPServer(serverOpts...)
// newSession
tcpServer.RunEventLoop(s.newSession)
logger.Debugf("s bind addr{%s} ok!", s.addr)
s.tcpServer = tcpServer
}

新会话中很重要的一个配置是 EventListener,传入的是 DubboServer 的默认的rpcHandler。session.SetEventListener(s.rpcHandler)注册的rpcHandler 有一个实现好的 OnMessage 函数,根据 getty 的 API,当 client 调用该端口时,会触发 OnMessage。这一函数实现了在 getty session 接收到 rpc 调用后的一系列处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// OnMessage get request from getty client, update the session reqNum and reply response to client
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
...
...

// 解析传入的pkg
decodeResult, drOK := pkg.(*remoting.DecodeResult)
req := decodeResult.Result.(*remoting.Request)

// 通过req的Data直接转换为Invocation
invoc, ok := req.Data.(*invocation.RPCInvocation)

// 设置Attachment
attachments := invoc.Attachments()
attachments[constant.LocalAddr] = session.LocalAddr()
attachments[constant.RemoteAddr] = session.RemoteAddr()

result := h.server.requestHandler(invoc)
...
...
resp.Result = result
reply(session, resp)
}

requestHandler在DubboProtocol中被设置为handler:=func(invocation *invocation.RPCInvocation) protocol.RPCResult {return doHandleRequest(invocation)}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
// 由之前构造的ExporterMap通过ServiceKey拿到exporter
exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
result := protocol.RPCResult{}
if exporter == nil {
err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
logger.Errorf(err.Error())
result.Err = err
// reply(session, p, hessian.PackageResponse)
return result
}
// 通过exporter拿到invoker
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker != nil {
// FIXME
ctx := rebuildCtx(rpcInvocation)

// 通过Invoke方法调用服务!
invokeResult := invoker.Invoke(ctx, rpcInvocation)
if err := invokeResult.Error(); err != nil {
result.Err = invokeResult.Error()
// p.Header.ResponseStatus = hessian.Response_OK
// p.Body = hessian.NewResponse(nil, err, result.Attachments())
} else {
result.Rest = invokeResult.Result()
// p.Header.ResponseStatus = hessian.Response_OK
// p.Body = hessian.NewResponse(res, nil, result.Attachments())
}
result.Attrs = invokeResult.Attachments()
} else {
result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
}
return result
}

整个被调过程一气呵成。实现了从 getty.Session 的调用事件,到经过层层封装的 invoker 的调用。

至此,一次RPC调用得以正确返回。

小结

个人认为想要深入理解RPC框架,可以先参考极客兔兔的7天写一个RPC框架,在那个教程里,其实对于Service的抽象和Dubbo是基本一致的,不过Dubbo在Service层之下由Config、Proxy、Registry等层会进一步封装,更加复杂,但通过这样的层层封装将复杂性不暴露给上一层。

简单来说调用过程就是

请求—>invocation—>服务key—>exporterMap.get(key)—>exporter—>invoker—>invoker.invoke(invocation)–>执行服务

  • 关于 Invoker 的层层封装

能把一次调用的所有参数抽象成一次 invocation;

能把一个协议抽象成针对 invoke 的封装;

能把针对一次 invoke 所做出的特定改变封装到 invoke 函数内部,可以降低模块之间的耦合性。层层封装逻辑更加清晰。

参考

Dubbo-go 源码笔记(一)Server 端开启服务过程


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!