Dubbo-go Consumer

本文最后更新于:1 年前

本文为Dubbogo客户端调用过程分析。

此文基本算是转载了志信的源码分析博客

服务端暴露服务和客户端远程调用服务最大的区别是服务端通过 zk 注册服务,发布自己的ivkURL并订阅事件开启监听;而客户应该是通过zk注册组件,拿到需要调用的serviceURL,更新invoker并重写用户的RPCService,从而实现对远程过程调用细节的封装。

其实客户端调用的逻辑就是将用户定义的RPCService对应的函数重写,重写后的函数已经包含实现了远程调用逻辑的 invoker。

接下来,就要通过阅读源码,看看 dubbo-go 是如何做到的。

一. 加载配置:将Service封装入ReferenceConfig

同样调用config.Load()读入配置文件

这次我们只关注consumer的Load()操作,这也是最关键的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (cc *ConsumerConfig) Load() {
for key, ref := range cc.References {
if ref.Generic != "" {
genericService := generic.NewGenericService(key)
SetConsumerService(genericService)
}
rpcService := GetConsumerService(key)
if rpcService == nil {
logger.Warnf("%s does not exist!", key)
continue
}
ref.id = key
// 获取远程 Service URL,实现可供调用的 invoker
ref.Refer(rpcService)
ref.Implement(rpcService)
}

// 等待三秒钟所有 invoker 就绪
...
...
}

ReferImplement是最重要的两个方法,接下来着重介绍。

二. Refer()获取远程 Service URL,实现可供调用的 invoker

上述的 ref.Refer(rpcService) 完成的就是这部分的操作。

  1. 构造注册 url

和 server 端类似,存在注册 url 和服务 url,dubbo 习惯将服务 url 作为注册 url 的 sub。

  1. registryProtocol 获取到 zkRegistry 实例,进一步 Refer
  2. 构造 directory(包含较复杂的异步操作)
  3. 构造带有集群策略的 clusterinvoker
  4. 在 zookeeper 上注册当前 client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Refer retrieves invokers from urls.
func (rc *ReferenceConfig) Refer(srv interface{}) {
// If adaptive service is enabled,
// the cluster and load balance should be overridden to "adaptivesvc" and "p2c" respectively.
if rc.rootConfig.Consumer.AdaptiveService {
rc.Cluster = constant.ClusterKeyAdaptiveService
rc.Loadbalance = constant.LoadBalanceKeyP2C
}

// cfgURL is an interface-level invoker url, in the other words, it represents an interface.
cfgURL := common.NewURLWithOptions(
common.WithPath(rc.InterfaceName),
common.WithProtocol(rc.Protocol),
common.WithParams(rc.getURLMap()),
common.WithParamsValue(constant.BeanNameKey, rc.id),
common.WithParamsValue(constant.MetadataTypeKey, rc.metaDataType),
)

SetConsumerServiceByInterfaceName(rc.InterfaceName, srv)
if rc.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
rc.postProcessConfig(cfgURL)

// retrieving urls from config, and appending the urls to rc.urls
if rc.URL != "" { // use user-specific urls
/*
Two types of URL are allowed for rc.URL:
1. direct url: server IP, that is, no need for a registry anymore
2. registry url
They will be handled in different ways:
For example, we have a direct url and a registry url:
1. "tri://localhost:10000" is a direct url
2. "registry://localhost:2181" is a registry url.
Then, rc.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181".
The result of urlStrings is a string array: []string{"tri://localhost:10000", "registry://localhost:2181"}.
*/
urlStrings := gxstrings.RegSplit(rc.URL, "\\s*[;]+\\s*")
for _, urlStr := range urlStrings {
serviceURL, err := common.NewURL(urlStr)
if err != nil {
panic(fmt.Sprintf("url configuration error, please check your configuration, user specified URL %v refer error, error message is %v ", urlStr, err.Error()))
}
if serviceURL.Protocol == constant.RegistryProtocol { // serviceURL in this branch is a registry protocol
serviceURL.SubURL = cfgURL
rc.urls = append(rc.urls, serviceURL)
} else { // serviceURL in this branch is the target endpoint IP address
if serviceURL.Path == "" {
serviceURL.Path = "/" + rc.InterfaceName
}
// replace params of serviceURL with params of cfgUrl
// other stuff, e.g. IP, port, etc., are same as serviceURL
newURL := common.MergeURL(serviceURL, cfgURL)
rc.urls = append(rc.urls, newURL)
}
}
} else { // use registry configs
// 配置读入注册中心的信息
rc.urls = loadRegistries(rc.RegistryIDs, rc.rootConfig.Registries, common.CONSUMER)
// set url to regURLs
for _, regURL := range rc.urls {
regURL.SubURL = cfgURL
}
}

// Get invokers according to rc.urls
var (
invoker protocol.Invoker
regURL *common.URL
)
invokers := make([]protocol.Invoker, len(rc.urls))
for i, u := range rc.urls {
// 获取registryProtocol实例,调用其Refer方法,传入新构建好的regURL
if u.Protocol == constant.ServiceRegistryProtocol {
invoker = extension.GetProtocol("registry").Refer(u)
} else {
invoker = extension.GetProtocol(u.Protocol).Refer(u)
}

if rc.URL != "" {
invoker = protocolwrapper.BuildInvokerChain(invoker, constant.ReferenceFilterKey)
}

invokers[i] = invoker
if u.Protocol == constant.RegistryProtocol {
regURL = u
}
}

// TODO(hxmhlt): decouple from directory, config should not depend on directory module
if len(invokers) == 1 {
rc.invoker = invokers[0]
if rc.URL != "" {
hitClu := constant.ClusterKeyFailover
if u := rc.invoker.GetURL(); u != nil {
hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware)
}
cluster, err := extension.GetCluster(hitClu)
if err != nil {
panic(err)
} else {
rc.invoker = cluster.Join(static.NewDirectory(invokers))
}
}
} else {
var hitClu string
if regURL != nil {
// for multi-subscription scenario, use 'zone-aware' policy by default
hitClu = constant.ClusterKeyZoneAware
} else {
// not a registry url, must be direct invoke.
hitClu = constant.ClusterKeyFailover
if u := invokers[0].GetURL(); u != nil {
hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware)
}
}
cluster, err := extension.GetCluster(hitClu)
if err != nil {
panic(err)
} else {
rc.invoker = cluster.Join(static.NewDirectory(invokers))
}
}

// publish consumer's metadata
publishServiceDefinition(cfgURL)
// create proxy
if rc.Async {
callback := GetCallback(rc.id)
rc.pxy = extension.GetProxyFactory(rc.rootConfig.Consumer.ProxyFactory).GetAsyncProxy(rc.invoker, callback, cfgURL)
} else {
rc.pxy = extension.GetProxyFactory(rc.rootConfig.Consumer.ProxyFactory).GetProxy(rc.invoker, cfgURL)
}
}

接下来,已经拿到的 url 将被传递给 RegistryProtocol,进一步 refer。

invoker = extension.GetProtocol("registry").Refer(u)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Refer provider service from registry center
// 拿到的是配置文件registries的url,他能够生成一个invoker = 指向目的addr,以供客户端直接调用。
func (proto *registryProtocol) Refer(url *common.URL) protocol.Invoker {
registryUrl := url
// 这里拿到的是referenceConfig,serviceUrl里面包含了Reference的所有信息,包含interfaceName、method等等
serviceUrl := registryUrl.SubURL
if registryUrl.Protocol == constant.RegistryProtocol {
// 替换成了具体的值,比如"zookeeper"
registryUrl.Protocol = registryUrl.GetParam(constant.RegistryKey, "")
}

reg := proto.getRegistry(url)

// 到这里,获取到了reg实例 zookeeper的registry
// 1. 根据Register的实例zkRegistry和传入的regURL新建一个directory
// 这一步存在复杂的异步逻辑,从注册中心拿到了目的service的真实addr,获取了invoker并放入directory,
// 这一步将在下面详细给出步骤
// new registry directory for store service url from registry
directory, err := extension.GetDefaultRegistryDirectory(registryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
return nil
}

// 2. DoRegister 在zk上注册当前client service
err = reg.Register(serviceUrl)
if err != nil {
logger.Errorf("consumer service %v register registry %v error, error message is %s",
serviceUrl.String(), registryUrl.String(), err.Error())
}

// 3. new cluster invoker,将directory写入集群,获得具有集群策略的invoker
// new cluster invoker
clusterKey := serviceUrl.GetParam(constant.ClusterKey, constant.DefaultCluster)
cluster, err := extension.GetCluster(clusterKey)
if err != nil {
panic(err)
}
invoker := cluster.Join(directory)
return invoker
}

上述的 extension.GetDefaultRegistryDirectory(&registryUrl, reg) 函数,本质上调用了已经注册好的 NewRegistryDirectory 函数:

首先构造了一个注册 directory,开启协程调用其 subscribe 函数,传入 serviceURL。

这个 directory 目前包含了对应的 zkRegistry,以及传入的 URL,它的 cacheInvokers 部分是空的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// file: registry/directory/directory.go
// NewRegistryDirectory will create a new RegistryDirectory
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (directory.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
logger.Debugf("new RegistryDirectory for service :%s.", url.Key())
dir := &RegistryDirectory{
Directory: base.NewDirectory(url),
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
serviceType: url.SubURL.Service(),
registry: registry,
}

dir.consumerURL = dir.getConsumerUrl(url.SubURL)

if routerChain, err := chain.NewRouterChain(); err == nil {
dir.Directory.SetRouterChain(routerChain)
} else {
logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err)
}

dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)

go dir.subscribe(url.SubURL)
return dir, nil
}

进入 dir.subscribe(url.SubURL) 这个异步函数,它调用了 zkRegistry 的 Subscribe 方法,与此同时将自己作为 ConfigListener 传入。

1
2
3
4
5
6
7
8
9
10
11
// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key())
// 增加两个监听
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
// subscribe订阅
if err := dir.registry.Subscribe(url, dir); err != nil {
logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
}
}

上述传递自身的这个 Listener 需要实现 Notify 方法,进而在作为参数传入内部之后,可以被异步地调用 Notify,将内部触发的异步事件“传递出来”,再进一步处理加工。

Notify 回调链,以事件的形式向外传递,最终落到 directory 上的时候,已经是成型的 newInvokers 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
if event == nil {
return
}
dir.refreshInvokers(event)
}

// refreshInvokers refreshes service's events.
func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
if event != nil {
logger.Debugf("refresh invokers with %+v", event)
} else {
logger.Debug("refresh invokers with nil")
}

var oldInvoker []protocol.Invoker
if event != nil {
oldInvoker, _ = dir.cacheInvokerByEvent(event)
}
dir.setNewInvokers()
for _, v := range oldInvoker {
if v != nil {
v.Destroy()
}
}
}

逻辑图如下:

订阅更新invoker的逻辑

至此,经过上述操作,已经拿到了 server 端 Invokers,放入了 directory 的 cacheinvokers 数组里面缓存。

后续的操作对应本文从 url 到 invoker 的过程的最后一步,由 directory 生成带有特性集群策略的 invoker。

invoker := cluster.Join(directory)

dubbo-go 框架默认选择 failover 策略。

Join 函数的实现就是如下函数:

1
2
3
4
5
func newFailoverClusterInvoker(directory directory.Directory) protocol.Invoker {
return &failoverClusterInvoker{
BaseClusterInvoker: base.NewBaseClusterInvoker(directory),
}
}

既然返回了一个 invoker,我们查看一下 failoverClusterInvoker 的 Invoker 方法,看它是如何将集群策略封装到 Invoker 函数内部的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.Result
invoked []protocol.Invoker
providers []string
ivk protocol.Invoker
)
// 调用List方法拿到directory缓存的所有invokers
invokers := invoker.Directory.List(invocation)
if err := invoker.CheckInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
// 获取来自用户方向传入的参数
methodName := invocation.ActualMethodName()
retries := getRetries(invokers, methodName)
loadBalance := base.GetLoadBalance(invokers[0], methodName)

// 重要!这里是集群策略的体现,失败后重试!
for i := 0; i <= retries; i++ {
// Reselect before retry to avoid a change of candidate `invokers`.
// NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
if err := invoker.CheckWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
}

invokers = invoker.Directory.List(invocation)
if err := invoker.CheckInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
}
// 这里是负载均衡策略的体现!选择特定ivk进行调用。
ivk = invoker.DoSelect(loadBalance, invocation, invokers, invoked)
if ivk == nil {
continue
}
invoked = append(invoked, ivk)
// DO INVOKE
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
providers = append(providers, ivk.GetURL().Key())
continue
}
return result
}
...
...
}

现在我们有了可以打通的 invokers,但还不能直接调用,因为 invoker 的入参是 invocation,而调用函数使用的是具体的参数列表,需要通过一层 proxy 来规范入参和出参。回到func (rc *ReferenceConfig) Refer(...)中可以发现最后会新建一个默认 proxy,放置在 rc.proxy 内,以供后续使用。

proxy 的作用是将用户定义的函数参数列表,转化为抽象的 invocation 传入 Invoker,进行调用。

三. 将调用逻辑以代理函数的形式写入 RPCService

上面完成了 ref.Refer(rpcService),下面回到另一个重要的方法ref.Implement(rpcService)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
func (p *Proxy) Implement(v common.RPCService) {
p.once.Do(func() {
p.implement(p, v)
p.rpc = v
})
}
// Implement 实现的过程,就是proxy根据函数名和返回值,通过调用invoker构造出拥有远程调用逻辑的代理函数
// DefaultProxyImplementFunc the default function for proxy impl
func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
// check parameters, incoming interface must be a elem's pointer.
valueOf := reflect.ValueOf(v)

valueOfElem := valueOf.Elem()

makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
var (
err error
inv *invocation_impl.RPCInvocation
inIArr []interface{}
inVArr []reflect.Value
reply reflect.Value
replyEmptyFlag bool
)
if methodName == "Echo" {
methodName = "$echo"
}

if len(outs) == 2 { // return (reply, error)
if outs[0].Kind() == reflect.Ptr {
reply = reflect.New(outs[0].Elem())
} else {
reply = reflect.New(outs[0])
}
} else { // only return error
replyEmptyFlag = true
}

start := 0
end := len(in)
invCtx := context.Background()
// retrieve the context from the first argument if existed
if end > 0 {
if in[0].Type().String() == "context.Context" {
if !in[0].IsNil() {
// the user declared context as method's parameter
invCtx = in[0].Interface().(context.Context)
}
start += 1
}
}

if end-start <= 0 {
inIArr = []interface{}{}
inVArr = []reflect.Value{}
} else if v, ok := in[start].Interface().([]interface{}); ok && end-start == 1 {
inIArr = v
inVArr = []reflect.Value{in[start]}
} else {
inIArr = make([]interface{}, end-start)
inVArr = make([]reflect.Value, end-start)
index := 0
for i := start; i < end; i++ {
inIArr[index] = in[i].Interface()
inVArr[index] = in[i]
index++
}
}

inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),
invocation_impl.WithArguments(inIArr),
invocation_impl.WithCallBack(p.callback), invocation_impl.WithParameterValues(inVArr))
if !replyEmptyFlag {
inv.SetReply(reply.Interface())
}

for k, value := range p.attachments {
inv.SetAttachment(k, value)
}

// add user setAttachment. It is compatibility with previous versions.
atm := invCtx.Value(constant.AttachmentKey)
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
inv.SetAttachment(k, value)
}
} else if m2, ok2 := atm.(map[string]interface{}); ok2 {
// it is support to transfer map[string]interface{}. It refers to dubbo-java 2.7.
for k, value := range m2 {
inv.SetAttachment(k, value)
}
}

// 触发Invoker 之前已经将cluster_invoker放入proxy,使用Invoke方法,通过getty远程过程调用
result := p.invoke.Invoke(invCtx, inv)
err = result.Error()
// cause is raw user level error
cause := perrors.Cause(err)
if err != nil {
// if some error happened, it should be log some info in the separate file.
if throwabler, ok := cause.(java_exception.Throwabler); ok {
logger.Warnf("[CallProxy] invoke service throw exception: %v , stackTraceElements: %v", cause.Error(), throwabler.GetStackTrace())
} else {
// entire error is only for printing, do not return, because user would not want to deal with massive framework-level error message
logger.Warnf("[CallProxy] received rpc err: %v", err)
}
} else {
logger.Debugf("[CallProxy] received rpc result successfully: %s", result)
}
if len(outs) == 1 {
return []reflect.Value{reflect.ValueOf(&cause).Elem()}
}
if len(outs) == 2 && outs[0].Kind() != reflect.Ptr {
return []reflect.Value{reply.Elem(), reflect.ValueOf(&cause).Elem()}
}
return []reflect.Value{reply, reflect.ValueOf(&cause).Elem()}
}
}

if err := refectAndMakeObjectFunc(valueOfElem, makeDubboCallProxy); err != nil {
logger.Errorf("The type or combination type of RPCService %T must be a pointer of a struct. error is %s", v, err)
return
}
}

Implemt主要完成在代理函数中实现由参数列表生成 Invocation 的逻辑与在代理函数实现调用 Invoker 的逻辑

然后在最后refectAndMakeObjectFunc函数中将代理函数替换为原始RPCService对应函数(f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts))))。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func refectAndMakeObjectFunc(valueOfElem reflect.Value, makeDubboCallProxy func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value) error {
typeOf := valueOfElem.Type()
// check incoming interface, incoming interface's elem must be a struct.
if typeOf.Kind() != reflect.Struct {
return errors.New("invalid type kind")
}
numField := valueOfElem.NumField()
for i := 0; i < numField; i++ {
t := typeOf.Field(i)
methodName := t.Tag.Get("dubbo")
if methodName == "" {
methodName = t.Name
}
f := valueOfElem.Field(i)
if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() {
outNum := t.Type.NumOut()

if outNum != 1 && outNum != 2 {
logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
t.Name, t.Type.String(), outNum)
continue
}

// The latest return type of the method must be error.
if returnType := t.Type.Out(outNum - 1); returnType != typError {
logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name)
continue
}

funcOuts := make([]reflect.Type, outNum)
for i := 0; i < outNum; i++ {
funcOuts[i] = t.Type.Out(i)
}

// 调用make函数,传入函数名和返回值,获得能调用远程的proxy,用这个proxy替换掉原来的函数位置
// do method proxy here:
f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))
logger.Debugf("set method [%s]", methodName)
} else if f.IsValid() && f.CanSet() {
// for struct combination
valueOfSub := reflect.New(t.Type)
valueOfElemInterface := valueOfSub.Elem()
if valueOfElemInterface.Type().Kind() == reflect.Struct {
if err := refectAndMakeObjectFunc(valueOfElemInterface, makeDubboCallProxy); err != nil {
return err
}
f.Set(valueOfElemInterface)
}
}
}
return nil
}

最后直接调用用户定义的 RPCService 的函数,此处实际调用的是经过重写入的函数代理,所以就能实现远程调用了。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!