packagemainimport("fmt""github.com/ethereum/go-ethereum/crypto""github.com/ethereum/go-ethereum/log""github.com/ethereum/go-ethereum/p2p""github.com/ethereum/go-ethereum/p2p/discover""github.com/ethereum/go-ethereum/p2p/nat""net""os")constmessageId=0constmessageId1=1typeMessagestringfuncMyProtocol()p2p.Protocol{returnp2p.Protocol{Name:"MyProtocol",Version:1,Length:2,Run:msgHandler,}}funcmain(){nodekey,_:=crypto.GenerateKey()logger:=log.New()logger.SetHandler(log.StderrHandler)srv:=p2p.Server{Config:p2p.Config{MaxPeers:10,PrivateKey:nodekey,Name:"my node name",ListenAddr:":30300",Protocols:[]p2p.Protocol{MyProtocol()},NAT:nat.Any(),Logger:logger,},}iferr:=srv.Start();err!=nil{fmt.Println(err)os.Exit(1)}fmt.Println("started..",srv.NodeInfo())select{}}funcmsgHandler(peer*p2p.Peer,wsp2p.MsgReadWriter)error{fmt.Println("peer",peer.Name(),"connected.")p2p.SendItems(ws,messageId,"foo")for{msg,err:=ws.ReadMsg()iferr!=nil{fmt.Println("peer",peer.Name(),"disconnected")returnerr}// SendItems writes an RLP with the given code and data elements.// For a call such as://// SendItems(w, code, e1, e2, e3)//// the message payload will be an RLP list containing the items://// [e1, e2, e3]// 所以这里收消息应该定义为数组varmyMessage[1]Messageerr=msg.Decode(&myMessage)iferr!=nil{// handle decode errorcontinue}fmt.Println("code:",msg.Code,"receiver at:",msg.ReceivedAt,"msg:",myMessage)switchmyMessage[0]{case"foo":err:=p2p.SendItems(ws,messageId1,"bar")iferr!=nil{returnerr}default:fmt.Println("recv:",myMessage)}}}
func(srv*Server)startListening()error{// Launch the TCP listener.listener,err:=net.Listen("tcp",srv.ListenAddr)...gosrv.listenLoop()...// 主执行逻辑gosrv.run(dialer)returnnil}
当接收到一个新的tcp连接,节点开始检查并初始化peer
1234567891011121314151617
func(srv*Server)setupConn(c*conn,flagsconnFlag,dialDest*discover.Node)error{...// 从这里开始,其实已经开始了ethereum的自有协议,doEncHandshake是RLPX协议的握手方法ifc.id,err=c.doEncHandshake(srv.PrivateKey,dialDest);err!=nil{srv.log.Trace("Failed RLPx handshake","addr",c.fd.RemoteAddr(),"conn",c.flags,"err",err)returnerr}...// 两次握手消息代码(handshakeMsg = 0x00)和(discMsg = 0x01)phs,err:=c.doProtoHandshake(srv.ourHandshake)...// 握手完毕,将新连接对象*p2p.conn压入server.addpeererr=srv.checkpoint(c,srv.addpeer)// If the checks completed successfully, runPeer has now been// launched by run.returnnil}
funcmatchProtocols(protocols[]Protocol,caps[]Cap,rwMsgReadWriter)map[string]*protoRW{// 按协议(name asc,version asc)排序子协议sort.Sort(capsByNameAndVersion(caps))// 自定义协议偏移offset:=baseProtocolLengthresult:=make(map[string]*protoRW)outer:for_,cap:=rangecaps{for_,proto:=rangeprotocols{ifproto.Name==cap.Name&&proto.Version==cap.Version{// If an old protocol version matched, revert itifold:=result[cap.Name];old!=nil{offset-=old.Length}// Assign the new matchresult[cap.Name]=&protoRW{Protocol:proto,offset:offset,in:make(chanMsg),w:rw}offset+=proto.Lengthcontinueouter}}}returnresult}