// Refer retrieves invokers from urls. func(rc *ReferenceConfig)Refer(srv interface{}) { // If adaptive service is enabled, // the cluster and load balance should be overridden to "adaptivesvc" and "p2c" respectively. if rc.rootConfig.Consumer.AdaptiveService { rc.Cluster = constant.ClusterKeyAdaptiveService rc.Loadbalance = constant.LoadBalanceKeyP2C }
// cfgURL is an interface-level invoker url, in the other words, it represents an interface. cfgURL := common.NewURLWithOptions( common.WithPath(rc.InterfaceName), common.WithProtocol(rc.Protocol), common.WithParams(rc.getURLMap()), common.WithParamsValue(constant.BeanNameKey, rc.id), common.WithParamsValue(constant.MetadataTypeKey, rc.metaDataType), )
SetConsumerServiceByInterfaceName(rc.InterfaceName, srv) if rc.ForceTag { cfgURL.AddParam(constant.ForceUseTag, "true") } rc.postProcessConfig(cfgURL)
// retrieving urls from config, and appending the urls to rc.urls if rc.URL != "" { // use user-specific urls /* Two types of URL are allowed for rc.URL: 1. direct url: server IP, that is, no need for a registry anymore 2. registry url They will be handled in different ways: For example, we have a direct url and a registry url: 1. "tri://localhost:10000" is a direct url 2. "registry://localhost:2181" is a registry url. Then, rc.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181". The result of urlStrings is a string array: []string{"tri://localhost:10000", "registry://localhost:2181"}. */ urlStrings := gxstrings.RegSplit(rc.URL, "\\s*[;]+\\s*") for _, urlStr := range urlStrings { serviceURL, err := common.NewURL(urlStr) if err != nil { panic(fmt.Sprintf("url configuration error, please check your configuration, user specified URL %v refer error, error message is %v ", urlStr, err.Error())) } if serviceURL.Protocol == constant.RegistryProtocol { // serviceURL in this branch is a registry protocol serviceURL.SubURL = cfgURL rc.urls = append(rc.urls, serviceURL) } else { // serviceURL in this branch is the target endpoint IP address if serviceURL.Path == "" { serviceURL.Path = "/" + rc.InterfaceName } // replace params of serviceURL with params of cfgUrl // other stuff, e.g. IP, port, etc., are same as serviceURL newURL := common.MergeURL(serviceURL, cfgURL) rc.urls = append(rc.urls, newURL) } } } else { // use registry configs // 配置读入注册中心的信息 rc.urls = loadRegistries(rc.RegistryIDs, rc.rootConfig.Registries, common.CONSUMER) // set url to regURLs for _, regURL := range rc.urls { regURL.SubURL = cfgURL } }
// Get invokers according to rc.urls var ( invoker protocol.Invoker regURL *common.URL ) invokers := make([]protocol.Invoker, len(rc.urls)) for i, u := range rc.urls { // 获取registryProtocol实例,调用其Refer方法,传入新构建好的regURL if u.Protocol == constant.ServiceRegistryProtocol { invoker = extension.GetProtocol("registry").Refer(u) } else { invoker = extension.GetProtocol(u.Protocol).Refer(u) }
if rc.URL != "" { invoker = protocolwrapper.BuildInvokerChain(invoker, constant.ReferenceFilterKey) }
invokers[i] = invoker if u.Protocol == constant.RegistryProtocol { regURL = u } }
// TODO(hxmhlt): decouple from directory, config should not depend on directory module iflen(invokers) == 1 { rc.invoker = invokers[0] if rc.URL != "" { hitClu := constant.ClusterKeyFailover if u := rc.invoker.GetURL(); u != nil { hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware) } cluster, err := extension.GetCluster(hitClu) if err != nil { panic(err) } else { rc.invoker = cluster.Join(static.NewDirectory(invokers)) } } } else { var hitClu string if regURL != nil { // for multi-subscription scenario, use 'zone-aware' policy by default hitClu = constant.ClusterKeyZoneAware } else { // not a registry url, must be direct invoke. hitClu = constant.ClusterKeyFailover if u := invokers[0].GetURL(); u != nil { hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware) } } cluster, err := extension.GetCluster(hitClu) if err != nil { panic(err) } else { rc.invoker = cluster.Join(static.NewDirectory(invokers)) } }
// Refer provider service from registry center // 拿到的是配置文件registries的url,他能够生成一个invoker = 指向目的addr,以供客户端直接调用。 func(proto *registryProtocol)Refer(url *common.URL)protocol.Invoker { registryUrl := url // 这里拿到的是referenceConfig,serviceUrl里面包含了Reference的所有信息,包含interfaceName、method等等 serviceUrl := registryUrl.SubURL if registryUrl.Protocol == constant.RegistryProtocol { // 替换成了具体的值,比如"zookeeper" registryUrl.Protocol = registryUrl.GetParam(constant.RegistryKey, "") }
reg := proto.getRegistry(url)
// 到这里,获取到了reg实例 zookeeper的registry // 1. 根据Register的实例zkRegistry和传入的regURL新建一个directory // 这一步存在复杂的异步逻辑,从注册中心拿到了目的service的真实addr,获取了invoker并放入directory, // 这一步将在下面详细给出步骤 // new registry directory for store service url from registry directory, err := extension.GetDefaultRegistryDirectory(registryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) returnnil }
// 2. DoRegister 在zk上注册当前client service err = reg.Register(serviceUrl) if err != nil { logger.Errorf("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error()) }
// 3. new cluster invoker,将directory写入集群,获得具有集群策略的invoker // new cluster invoker clusterKey := serviceUrl.GetParam(constant.ClusterKey, constant.DefaultCluster) cluster, err := extension.GetCluster(clusterKey) if err != nil { panic(err) } invoker := cluster.Join(directory) return invoker }
// file: registry/directory/directory.go // NewRegistryDirectory will create a new RegistryDirectory funcNewRegistryDirectory(url *common.URL, registry registry.Registry)(directory.Directory, error) { if url.SubURL == nil { returnnil, perrors.Errorf("url is invalid, suburl can not be nil") } logger.Debugf("new RegistryDirectory for service :%s.", url.Key()) dir := &RegistryDirectory{ Directory: base.NewDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, serviceType: url.SubURL.Service(), registry: registry, }
dir.consumerURL = dir.getConsumerUrl(url.SubURL)
if routerChain, err := chain.NewRouterChain(); err == nil { dir.Directory.SetRouterChain(routerChain) } else { logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err) }
// Notify monitor changes from registry,and update the cacheServices func(dir *RegistryDirectory)Notify(event *registry.ServiceEvent) { if event == nil { return } dir.refreshInvokers(event) }
// refreshInvokers refreshes service's events. func(dir *RegistryDirectory)refreshInvokers(event *registry.ServiceEvent) { if event != nil { logger.Debugf("refresh invokers with %+v", event) } else { logger.Debug("refresh invokers with nil") }
var oldInvoker []protocol.Invoker if event != nil { oldInvoker, _ = dir.cacheInvokerByEvent(event) } dir.setNewInvokers() for _, v := range oldInvoker { if v != nil { v.Destroy() } } }
逻辑图如下:
至此,经过上述操作,已经拿到了 server 端 Invokers,放入了 directory 的 cacheinvokers 数组里面缓存。
// 重要!这里是集群策略的体现,失败后重试! for i := 0; i <= retries; i++ { // Reselect before retry to avoid a change of candidate `invokers`. // NOTE: if `invokers` changed, then `invoked` also lose accuracy. if i > 0 { if err := invoker.CheckWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} }
invokers = invoker.Directory.List(invocation) if err := invoker.CheckInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } } // 这里是负载均衡策略的体现!选择特定ivk进行调用。 ivk = invoker.DoSelect(loadBalance, invocation, invokers, invoked) if ivk == nil { continue } invoked = append(invoked, ivk) // DO INVOKE result = ivk.Invoke(ctx, invocation) if result.Error() != nil { providers = append(providers, ivk.GetURL().Key()) continue } return result } ... ... }
start := 0 end := len(in) invCtx := context.Background() // retrieve the context from the first argument if existed if end > 0 { if in[0].Type().String() == "context.Context" { if !in[0].IsNil() { // the user declared context as method's parameter invCtx = in[0].Interface().(context.Context) } start += 1 } }
if end-start <= 0 { inIArr = []interface{}{} inVArr = []reflect.Value{} } elseif v, ok := in[start].Interface().([]interface{}); ok && end-start == 1 { inIArr = v inVArr = []reflect.Value{in[start]} } else { inIArr = make([]interface{}, end-start) inVArr = make([]reflect.Value, end-start) index := 0 for i := start; i < end; i++ { inIArr[index] = in[i].Interface() inVArr[index] = in[i] index++ } }
for k, value := range p.attachments { inv.SetAttachment(k, value) }
// add user setAttachment. It is compatibility with previous versions. atm := invCtx.Value(constant.AttachmentKey) if m, ok := atm.(map[string]string); ok { for k, value := range m { inv.SetAttachment(k, value) } } elseif m2, ok2 := atm.(map[string]interface{}); ok2 { // it is support to transfer map[string]interface{}. It refers to dubbo-java 2.7. for k, value := range m2 { inv.SetAttachment(k, value) } }
// 触发Invoker 之前已经将cluster_invoker放入proxy,使用Invoke方法,通过getty远程过程调用 result := p.invoke.Invoke(invCtx, inv) err = result.Error() // cause is raw user level error cause := perrors.Cause(err) if err != nil { // if some error happened, it should be log some info in the separate file. if throwabler, ok := cause.(java_exception.Throwabler); ok { logger.Warnf("[CallProxy] invoke service throw exception: %v , stackTraceElements: %v", cause.Error(), throwabler.GetStackTrace()) } else { // entire error is only for printing, do not return, because user would not want to deal with massive framework-level error message logger.Warnf("[CallProxy] received rpc err: %v", err) } } else { logger.Debugf("[CallProxy] received rpc result successfully: %s", result) } iflen(outs) == 1 { return []reflect.Value{reflect.ValueOf(&cause).Elem()} } iflen(outs) == 2 && outs[0].Kind() != reflect.Ptr { return []reflect.Value{reply.Elem(), reflect.ValueOf(&cause).Elem()} } return []reflect.Value{reply, reflect.ValueOf(&cause).Elem()} } }
if err := refectAndMakeObjectFunc(valueOfElem, makeDubboCallProxy); err != nil { logger.Errorf("The type or combination type of RPCService %T must be a pointer of a struct. error is %s", v, err) return } }
funcrefectAndMakeObjectFunc(valueOfElem reflect.Value, makeDubboCallProxy func(methodName string, outs []reflect.Type)func(in []reflect.Value) []reflect.Value) error { typeOf := valueOfElem.Type() // check incoming interface, incoming interface's elem must be a struct. if typeOf.Kind() != reflect.Struct { return errors.New("invalid type kind") } numField := valueOfElem.NumField() for i := 0; i < numField; i++ { t := typeOf.Field(i) methodName := t.Tag.Get("dubbo") if methodName == "" { methodName = t.Name } f := valueOfElem.Field(i) if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { outNum := t.Type.NumOut()
if outNum != 1 && outNum != 2 { logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2", t.Name, t.Type.String(), outNum) continue }
// The latest return type of the method must be error. if returnType := t.Type.Out(outNum - 1); returnType != typError { logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name) continue }
funcOuts := make([]reflect.Type, outNum) for i := 0; i < outNum; i++ { funcOuts[i] = t.Type.Out(i) }