文章

使用go-micro的rpc客户端

2018-06-25 | 3 minutes read |标签 golang micro-server go-micro |分类 开发

使用GO-MICRO的RPC客户端

由于我们在项目中使用了gin,造成我们无法按官网的教程方式简单使用注册发现服务,所以在项目里面,我们必须单独使用go-micro的client模块。由于我们还使用gin这让我们需要自己解决两者集成关系

示例代码

package handler

import (
	"github.com/gin-gonic/gin"
	"net/http"
	"xdgj/rpc/proto/news"
	"context"
	"github.com/micro/go-micro/client/rpc"
)

type NewsHandler struct{
	service proto.NewsCRUDService
}

func (me *NewsHandler)Init(){
	me.service=proto.NewNewsCRUDService("go.micro.srv.news",rpc.NewClient())
}

func (me *NewsHandler) GetNews(c *gin.Context) {
	response,err:=me.service.Create(context.TODO(), &proto.CreateNewsRequest{
		News: &proto.News{
			IconUrl:    "https://xiadangongji",
			OriginUrl:  "https://xiadangongji/",
			OriginText: "xiadangongji",
			Title:      "xiadangongji",
		},
	})
	if err!=nil{
		c.JSONP(http.StatusAccepted,err.Error())
	}
	c.JSONP(http.StatusAccepted,response)
}

在使用prorobuf 生成工具生成的rpc文件中,其实已经生成了service服务代码示例代码中的proto.NewNewsCRUDService便是生成函数,经过单步调试,我们了解到起实现的过程,在创建client后,由rpcclient的next函数负责请求服务发现业务,在获得到节点信息后,通过call函数调用到具体note发起rpc调用代码如下

func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
	// return remote address
	if len(opts.Address) > 0 {
		return func() (*registry.Node, error) {
			return &registry.Node{
				Address: opts.Address,
			}, nil
		}, nil
	}

	// get next nodes from the selector
	next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
	if err != nil && err == selector.ErrNotFound {
		return nil, errors.NotFound("go.micro.client", err.Error())
	} else if err != nil {
		return nil, errors.InternalServerError("go.micro.client", err.Error())
	}

	return next, nil
}

func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
	// make a copy of call opts
	callOpts := r.opts.CallOptions
	for _, opt := range opts {
		opt(&callOpts)
	}

	next, err := r.next(request, callOpts)
	if err != nil {
		return err
	}

	// check if we already have a deadline
	d, ok := ctx.Deadline()
	if !ok {
		// no deadline so we create a new one
		ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
	} else {
		// got a deadline so no need to setup context
		// but we need to set the timeout we pass along
		opt := WithRequestTimeout(d.Sub(time.Now()))
		opt(&callOpts)
	}

	// should we noop right here?
	select {
	case <-ctx.Done():
		return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
	default:
	}

	// make copy of call method
	rcall := r.call

	// wrap the call in reverse
	for i := len(callOpts.CallWrappers); i > 0; i-- {
		rcall = callOpts.CallWrappers[i-1](rcall)
	}

	// return errors.New("go.micro.client", "request timeout", 408)
	call := func(i int) error {
		// call backoff first. Someone may want an initial start delay
		t, err := callOpts.Backoff(ctx, request, i)
		if err != nil {
			return errors.InternalServerError("go.micro.client", err.Error())
		}

		// only sleep if greater than 0
		if t.Seconds() > 0 {
			time.Sleep(t)
		}

		// select next node
		node, err := next()
		if err != nil && err == selector.ErrNotFound {
			return errors.NotFound("go.micro.client", err.Error())
		} else if err != nil {
			return errors.InternalServerError("go.micro.client", err.Error())
		}

		// set the address
		address := node.Address
		if node.Port > 0 {
			address = fmt.Sprintf("%s:%d", address, node.Port)
		}

		// make the call
		err = rcall(ctx, address, request, response, callOpts)
		r.opts.Selector.Mark(request.Service(), node, err)
		return err
	}

	ch := make(chan error, callOpts.Retries)
	var gerr error

	for i := 0; i <= callOpts.Retries; i++ {
		go func() {
			ch <- call(i)
		}()

		select {
		case <-ctx.Done():
			return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408)
		case err := <-ch:
			// if the call succeeded lets bail early
			if err == nil {
				return nil
			}

			retry, rerr := callOpts.Retry(ctx, request, i, err)
			if rerr != nil {
				return rerr
			}

			if !retry {
				return err
			}

			gerr = err
		}
	}

	return gerr
}

从源代码中我们发现,在发起rpc调用后,函数创建了两个channel,ctx.Done()主要获得是错误信息,ch主要就是消息体本省,并且其函数内部实现的call函数是使用协程运行的,所以其性能还是有保证的