📢 Gate廣場 #NERO发帖挑战# 秀觀點贏大獎活動火熱開啓!
Gate NERO生態周來襲!發帖秀出NERO項目洞察和活動實用攻略,瓜分30,000NERO!
💰️ 15位優質發帖用戶 * 2,000枚NERO每人
如何參與:
1️⃣ 調研NERO項目
對NERO的基本面、社區治理、發展目標、代幣經濟模型等方面進行研究,分享你對項目的深度研究。
2️⃣ 參與並分享真實體驗
參與NERO生態周相關活動,並曬出你的參與截圖、收益圖或實用教程。可以是收益展示、簡明易懂的新手攻略、小竅門,也可以是行情點位分析,內容詳實優先。
3️⃣ 鼓勵帶新互動
如果你的帖子吸引到他人參與活動,或者有好友評論“已參與/已交易”,將大幅提升你的獲獎概率!
NERO熱門活動(帖文需附以下活動連結):
NERO Chain (NERO) 生態周:Gate 已上線 NERO 現貨交易,爲回饋平台用戶,HODLer Airdrop、Launchpool、CandyDrop、餘幣寶已上線 NERO,邀您體驗。參與攻略見公告:https://www.gate.com/announcements/article/46284
高質量帖子Tips:
教程越詳細、圖片越直觀、互動量越高,獲獎幾率越大!
市場見解獨到、真實參與經歷、有帶新互動者,評選將優先考慮。
帖子需原創,字數不少於250字,且需獲得至少3條有效互動
libp2p在op-stack中的使用
作者:joohhnnn
optimism中的libp2p應用
在本節中,主要用於講解optimism是如何使用libp2p來完成op-node中的p2p網路建立的。 p2p網路主要是用於在不同的node中傳遞資訊,例如sequencer完成unsafe的區塊構建后,通過p2p的gossiphub的pub/sub進行傳播。 libp2p還處理了其他,例如網路,尋址等在p2p網路中的基礎件層。
瞭解libp2p
libp2p(簡稱來自“庫對等”或“library peer-to-peer”)是一個面向對等(P2P)網络的框架,能夠幫助開發P2P應用程式。 它包含了一套協定、規範和庫,使網路參與者(也稱為“對等體”或“peers”)之間的P2P通信變得更為簡便 (source[2] [3])。 libp2p最初是作為IPFS(InterPlanetary File ,星際文件系統)專案的一部分,後來它演變成了一個獨立的專案,成為了分散式網路的模組化網路堆棧 (source )。
libp2p是IPFS社區的一個開源項目,歡迎廣泛社區的貢獻,包括説明編寫規範、編碼實現以及創建示例和教程 (source[4] [5])。 libp2p是由多個構建模組組成的,每個模組都有非常明確、有文檔記錄且經過測試的介面,使得它們可組合、可替換,因此可升級 (source )。 libp2p的模組化特性使得開發人員可以選擇並使用僅對他們的應用程式必要的元件,從而在構建P2P網路應用程式時促進了靈活性和效率。
相關資源
libp2p的模組化架構和開源特性為開發強大、可擴展和靈活的P2P應用程式提供了良好的環境,使其成為分散式網路和網路應用程式開發領域的重要參與者。
libp2p實現方式
在使用libp2p時,你會需要實現和配置一些核心元件以構建你的P2P網路。 以下是libp2p在應用中的一些主要實現方面:
1. 節點創建與配置:
libp2p.新()
2. 傳輸協定:
tcpTransport := tcp.NewTCPTransport()
3. 多路複用和流控制:
yamuxTransport := yamux.新()
4. 安全和加密:
tlsTransport := tls.新()
5. 協定和消息處理:
主機。SetStreamHandler(“/my-protocol/1.0.0”, myProtocolHandler)
6. 發現和路由:
DHT := 卡德特。NewDHT(ctx, host, datastore.NewMapDatastore())
7. 網路行為和策略:
ConnManager := connmgr.NewConnManager(低水位, 高水位, 寬限期)
8. 狀態管理和存儲:
peerstore := pstoremem.NewPeerstore()
9. 測試和調試:
伐木。SetLogLevel(“libp2p”, “DEBUG”)
10. 文檔和社區支援:
}
以上是使用libp2p時需要考慮和實現的一些主要方面。 每個項目的具體實現可能會有所不同,但這些基本方面是構建和運行libp2p應用所必需的。 在實現這些功能時,可以參考libp2p的官方文檔[9] [10]和GitHub倉庫 中的示例代碼和教程。
在OP-node中libp2p的使用
為了弄清楚op-node和libp2p的關係,我們必須弄清楚幾個問題
op-node需要libp2p網络的原因
首先我們要了解為什麼optimism需要p2p網路libp2p是一個模組化的網路協定,允許開發人員構建去中心化的點對點應用,適用於多種用例 (source[11] [12])(來源 [13])。 而devp2p主要用於乙太坊生態系統,專為乙太坊應用定製 (source )。 libp2p的靈活性和廣泛適用性可能使其成為開發人員的首選。
op-node主要使用libp2p的功能點
代碼實現
host自定義初始化
host可以理解為是p2p的節點,當開啟這個節點的時候,需要針對自己的項目進行一些特殊的初始化配置
現在讓我們看一下 op-node/p2p/host.go檔中的Host方法,
該函數主要用於設置 libp2p 主機並進行各種配置。 以下是該函數的關鍵部分以及各部分的簡單中文描述:
如果 P2P 被禁用,函數會直接返回。
使用配置中的公鑰來生成 Peer ID。
創建一個基礎的 Peerstore 存儲。
在基礎 Peerstore 的基礎上,創建一個擴展的 Peerstore。
在 Peerstore 中儲存 Peer 的私鑰和公鑰。
用於控制網路連接。
用於管理網路連接。
設置網路傳輸協定和主機的監聽位址。
使用前面的所有設置來創建一個新的 libp2p 主機。
如果有配置靜態Peer,進行初始化。
最後,函數返回創建好的 libp2p 主機。
這些關鍵部分負責 libp2p 主機的初始化和設置,每個部分都負責主機配置的一個特定方面。
func (conf config) host(log log.記錄器、報告器指標。報告器,指標主機指標)(主機。主機,錯誤){ 如果混淆。禁用 P2P { 返回 nil, nil } 酒吧 := 會議Priv.GetPublic() pid,err := peer。IDFromPublicKey(pub) 如果錯誤 != 無 { 返回零,FMT。錯誤f(「無法從網路金鑰派生公共金鑰: %w」, err) } basePs, err := 已存儲。NewPeerstore(context.背景(), conf.存儲,存儲。DefaultOpts()) 如果錯誤 != 無 { 返回零,FMT。錯誤f(「無法開啟對等存儲: % w」 err ) } peerScoreParams := conf.PeerScoreringParams() 分數保留時間。期間 if peerScoreParams != nil { 使用與八卦相同的保留期(如果有) scoreRetention = peerScoreParams.PeerScorering.RetainScore } else { 如果禁用對等評分,則禁用評分 GC 分數留存率 = 0 } PS,錯誤:=存儲。NewExtendedPeerstore(context.背景(),日誌,時鐘。時鐘,基準,會議。儲存,分數保留) 如果錯誤 != 無 { 返回零,FMT。錯誤f(「無法開啟擴展對等存儲: % w」 err ) } 如果錯誤 := PS。AddPrivKey(pid, conf.普里夫);錯誤 != 無 { 返回零,FMT。錯誤f(「無法使用 priv 鍵: %w」 設定對等存儲,錯誤) } 如果錯誤 := PS。AddPubKey(pid, pub);錯誤 != 無 { 返回零,FMT。錯誤f(「無法使用公共金鑰 %w」 設定對等儲存,錯誤 ) } 連接GTR門控。BlockingConnectionGater connGtr, err = 門控。NewBlockingConnectionGater(conf.商店) 如果錯誤 != 無 { 返回零,FMT。錯誤f(「無法開啟連接門: % w」 錯誤 ) } connGtr = 門控。AddBanExpiry(connGtr, ps, log, clock.時鐘、指標) connGtr = 門控。新增計量(連接,指標) connMngr, err := DefaultConnManager(conf) 如果錯誤 != 無 { 返回零,FMT。錯誤f(「無法開啟連接管理員: %w」 err ) } listenAddr, err := addrFromIPAndPort(conf.聽IP,會議。聽 如果錯誤 != 無 { 返回零,FMT。錯誤f(「無法使偵聽位址: % w」 err ) } tcpTransport := libp2p.運輸( 嘖。NewTCPTransport, 嘖。WithConnectionTimeout(time.分鐘60)) ─ 斷開未使用的連接 TODO:從技術上講,我們也可以在websocket和QUIC傳輸上運行節點 也許將來? 納特·NATManagerC / 如果為零則禁用 如果混淆。NAT { nat = basichost。紐納特馬納格 } 選項 := []libp2p.選項{ libp2p.身份(conf.Priv), 顯式設置使用者代理,以便我們可以與其他 Go libp2p 用戶區分開來。 libp2p.UserAgent(conf.使用者代理), tcp運輸, libp2p.WithDialTimeout(conf.超時撥號), 沒有中繼服務,僅在對等方之間直接連接。 libp2p.DisableRelay(), 主機將在從配置構建后直接啟動並偵聽網路。 libp2p.ListenAddrs(listenAddr), libp2p.ConnectionGater(connGtr), libp2p.ConnectionManager(connMngr), libp2p.資源管理員(nil), - TODO 使用資源管理器介面更好地管理每個對等體的資源。 libp2p.平(真), libp2p.AutoNATServiceRateLimit(10, 5, time.秒*60), } 如果混淆。NoTransportSecurity { } else { opts = append(opts, conf.主機安全... } h, err := libp2p.新(選擇... 如果錯誤 != 無 { } 對於 i,peerAddr := 範圍 conf.靜態對等 { 如果錯誤 != 無 { } 靜態對等方[i] = 位址 } out := &extraHost{ 主機: h, 靜態對等方:靜態對等方, } go out.monitorStaticPeers() }
... n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log) 如果錯誤 != 無 { } ...
使用 blocksTopicV1 函數生成 blocksTopicName,該函數根據配置(cfg)中的 L2ChainID 格式化字串。 格式化的字串遵循特定的結構:/optimism/{L2ChainID}/0/blocks。
函數通過呼叫 ps. Join(blocksTopicName) 嘗試加入區塊八卦主題。 如果出現錯誤,它將返回一個錯誤消息,指示無法加入主題。
生成了一個新的goroutine來使用LogTopicEvents 函數記錄主題事件。
使用 MakeSubscriber 函數創建了一個 subscriber,該函數封裝了一個 BlocksHandler,該處理器處理來自 gossipIn 的 OnUnsafeL2Payload 事件。 生成了一個新的goroutine來運行提供的 subion。
func JoinGossip(p2pCtx context.上下文,自我peer.ID,ps *pubsub。發佈子,日誌日誌。記錄器,cfg *匯總。Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) { blocksTopicName := blocksTopicV1(cfg) // return fmt.Sprintf(“/optimism/%s/0/blocks”, cfg.L2ChainID.String()) 返回零,FMT。錯誤f(「未能註冊阻止八卦主題: %w」 err ) } 區主題, 錯誤 := ps.連線(塊主題名稱) 如果錯誤 != 無 { } 如果錯誤 != 無 { } go LogTopicEvents(p2pCtx, log.New(“topic”, “blocks”), blocksTopicEvents) 如果錯誤 != 無 { } return &publisher{log: log, cfg: cfg, blocksTopic: blocksTopic, runCfg: runCfg}, nil
op-node/rollup/driver/state.go
func (s *Driver) eventLoop() { for(){ ... payload, err := s.sequencer.RunNextSequencerAction(ctx) 如果錯誤 != 無 { 錯誤不夠嚴重,無法更改/停止排序,但應記錄和計量。 if err := s.network.PublishL2Payload(ctx, payload);錯誤 != 無 { } planSequencerAction() - 調度下一個序列器操作以保持排序迴圈 } } ...
當存在缺失區塊,通過p2p快速同步
當節點因為特殊情況,比如宕機后重新連結,可能會產生一些沒有同步上的區塊(gaps),當遇到這種情況時,可以通過p2p網路的反向鏈的方式快速同步。
我們來看一下op-node/rollup/driver/state.go中的checkForGapInUnsafeQueue函數
該代碼段定義了一個名為 checkForGapInUnsafeQueue 的方法,屬於 Driver 結構體。 它的目的是檢查一個名為 「unsafe queue」 的佇列中是否存在數據缺口,並嘗試通過一個名為 altSync 的備用同步方法來檢索缺失的負載。 這裡的關鍵點是,該方法是為了確保數據的連續性,並在檢測到數據缺失時嘗試從其他同步方法中檢索缺失的數據。 以下是函數的主要步驟:
checkForGapInUnsafeQueue 檢查不安全佇列中是否存在間隙,並嘗試從 alt-sync 方法檢索丟失的有效負載。 警告:這隻是一個傳出信號,不保證可以檢索到塊。 結果通過 OnUnsafeL2Payload 接收。 func (s *Driver) checkForGapInUnsafeQueue(ctx context.上下文)錯誤 { start := s.derivation.UnsafeL2Head() end := s.derivation.UnsafeL2SyncTarget() 檢查我們在開始和結束之間是否缺少塊 如果我們這樣做,請請求它們。 如果結束 == (eth.L2BlockRef{}) { s.log.Debug(“請求與開放式範圍同步”、“開始”、開始) 返回 s.altSync.RequestL2Range(ctx, start, eth.L2BlockRef{}) } 否則如果結束。編號>開始。數位+1 { s.log.Debug(“請求缺少不安全的 L2 塊範圍”, “開始”, 開始, “結束”, 結束, “大小”, 結束.數字開始。編號) 返回 s.altSync.RequestL2Range(ctx, start, end) } 返回零 }
RequestL2Range函數向requests通道里傳遞請求區塊的開始和結束信號。
然後通過onRangeRequest方法來對請求向peerRequests通道分發,peerRequests通道會被多個peer開啟的loop所等待,即每一次分發都只有一個peer會去處理這個request。
func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { ... 對於 i := uint64(0); ;i++ { num := req.end.number - 1 - i 如果 num <= req.start { 返回 } 檢查我們是否已經在隔離中 如果 h, ok := s.quarantineByNum[num] ;好的 { if s.trusted.Contains(h) { / 如果我們信任它,請嘗試推廣它。 s.tryPromote(h) } 不要拿我們已經有候選人的東西。 我們將通過發現衝突或同步足夠多的其他塊來將其從隔離區中逐出 繼續 } 如果 _, ok := s.inFlight[num] ;好的 { 。.log。調試(“請求仍在進行中,未重新計劃同步請求”, “num”, num) 繼續 - 請求仍在進行中 } pr := peerRequest{num: num, complete: new(atomic.布爾)} 。.log。除錯(「調度 P2P 塊請求」, 「num」, num) 時程表編號 選擇 { 案例 s.peerRequests <- pr: s.inFlight[num] = 完成 案例<-CTX。完成(): 。.log。Info(“未安排完整的 P2P 同步範圍”, “當前”, num, “err”, ctx.錯誤()) 返回 預設值:// 對等方可能都已忙於處理請求 。.log。Info(“沒有對等方準備好處理 L2 塊歷史記錄的更多 P2P 請求的塊請求”, “當前”, num) 返回 } } }
接下來我們看看,當peer收到這個request的時候會怎麼處理。
首先我們要知道的是,peer和請求節點之間的連結,或者消息傳遞是通過libp2p的stream來傳遞的。 stream的處理方法由接收peer節點實現,stream的創建由發送節點來開啟。
我們可以在之前的init函數中看到這樣的代碼,這裡MakeStreamHandler返回了一個處理函數,SetStreamHandler將協定id和這個處理函數綁定,因此,每當發送節點創建並使用這個stream的時候,都會觸發返回的處理函數。
n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics) 向 libp2p 主機註冊同步協定 payloadByNumber := MakeStreamHandler(resourcesCtx, log.New(“serve”, “payloads_by_number”), n.syncSrv.HandleSyncRequest) n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
接下來讓我們看看處理函數裡面是如何處理的 函數首先進行全域和個人的速率限制檢查,以控制處理請求的速度。 然後,它讀取並驗證了請求的區塊號,確保它在合理的範圍內。 之後,函數從 L2 層獲取請求的區塊負載,並將其寫入到回應流中。 在寫入回應數據時,它設置了寫入截止時間,以避免在寫入過程中被慢速的peer連接阻塞。 最終,函數返回請求的區塊號和可能的錯誤。
func (srv *ReqRespServer) handleSyncRequest(ctx context.上下文,流網路。Stream) (uint64, error) { peerId := 流。Conn()。遠端對等() 從全域速率限制器獲取令牌, 以確保不同對等方之間沒有太多的併發伺服器工作。 if err := srv.globalRequestsRL.Wait(ctx);錯誤 != 無 { 返回 0,FMT。錯誤f(「等待全域同步速率限制超時: %w」, err) } 查找對等方的速率限制數據,或以其他方式添加 srv.peerStatsLock.Lock() ps, _ := srv.peerRateLimits.Get(peerId) 如果 ps == nil { ps = &peerStat{ 請求:費率。NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst), } srv.peerRateLimits.Add(peerId, ps) 附言。Requests.Reserve() } 計算命中數,但讓它延遲下一個請求,而不是立即等待 } else { 僅當它是現有對等方時等待,否則即時速率限制等待調用總是出錯。 如果請求者認為我們花費的時間太長,那麼這是他們的問題,他們可以斷開連接。 只有當讀/寫失敗時,我們才會斷開自己的連接, 如果工作無效(範圍驗證),或者當單個子任務超時時。 如果錯誤 := PS。Requests.Wait(ctx);錯誤 != 無 { 返回 0,FMT。錯誤f(「等待全域同步速率限制超時: %w」, err) } } srv.peerStatsLock.Unlock() 設定讀取截止時間(如果可用) _ = 流。設置讀取截止日期(時間。現在()。Add(serverReadRequestTimeout)) 閱讀請求 要求uint64 如果錯誤 := 二進位。讀取(流,二進位。小端序,和要求);錯誤 != 無 { 返回 0,FMT。錯誤f(「無法讀取請求的塊號: %w」 err ) } 如果錯誤 := 流。關閉讀取();錯誤 != 無 { 返回要求,FMT。錯誤f(「無法關閉 P2P 同步請求呼叫的讀取端: %w」,錯誤) } 檢查請求是否在預期的塊範圍內 if req < srv.cfg.Genesis.L2.Number { 返回要求,FMT。Errorf(“無法在創世 %d 之前為 L2 塊 %d 提供請求: %w”, req, srv.cfg.Genesis.L2.Number, invalidRequestErr) } max, err := srv.cfg.TargetBlockNumber(uint64(time.現在()。Unix())) 如果錯誤 != 無 { 返回要求,FMT。錯誤f(「無法確定驗證請求的最大目標塊數: % w」 無效要求錯誤 ) } 如果 req > max { 返回要求,FMT。Errorf(“無法在最大預期塊 (%v) 之後為 L2 塊 %d 提供請求: %w”, req, max, invalidRequestErr) } payload, err := srv.l2.PayloadByNumber(ctx, req) 如果錯誤 != 無 { 如果錯誤。是(錯誤,乙太坊。找不到) { 返回要求,FMT。錯誤f(「對等方請求未知塊按編號: %w」, err) } else { 返回要求,FMT。錯誤f(「無法檢索有效負載以提供給對等方: %w」, err) } } 我們設置寫入截止時間(如果可用)以安全寫入而不會阻塞限制對等連接 _ = 流。SetWriteDeadline(time.現在()。Add(serverWriteChunkTimeout)) 0 - 結果代碼:成功 = 0 1:5 - 版本: 0 。.tmp[5] 位元組 如果 _,則錯誤 := 流。寫入(tmp[:]);錯誤 != 無 { 返回要求,FMT。錯誤f(「無法寫入回應標頭資料: %w」 err ) } w := 活潑。NewBufferedWriter(stream) 如果 _,則錯誤 := 有效負載。元帥SSZ(w);錯誤 != 無 { 返回要求,FMT。錯誤f(「未能將有效負載寫入同步回應: %w」, err) } 如果錯誤 := w.Close();錯誤 != 無 { 返回要求,FMT。錯誤f(「無法完成將有效負載寫入同步回應: %w」, err) } 返回要求,無 }
至此,反向鏈同步請求和處理的大致流程已經講解完畢
p2p節點中的積分聲譽系統
為了防止某些節點進行惡意的請求與回應來破壞整個網路的安全性,optimism還使用了一套積分系統。
例如在op-node/p2p/app_scores.go 中存在一系列函數對peer的分數進行設置
func (s *peerApplicationScorer) onValidResponse(id peer.ID) { _, err := s.scorebook.SetScore(id, store.IncrementValidResponses{Cap: s.params.ValidResponseCap}) 如果錯誤 != 無 { s.log.Error(“無法更新對等方分數”, “對等方”, id, “err”, err) 返回 } } func (s *peerApplicationScorer) onResponseError(id peer.ID) { _, err := s.scorebook.SetScore(id, store.IncrementErrorResponses{Cap: s.params.ErrorResponseCap}) 如果錯誤 != 無 { s.log.Error(“無法更新對等方分數”, “對等方”, id, “err”, err) 返回 } } func (s *peerApplicationScorer) onRejectedPayload(id peer.ID) { _, err := s.scorebook.SetScore(id, store.IncrementRejectedPayloads{Cap: s.params.RejectedPayloadCap}) 如果錯誤 != 無 { s.log.Error(“無法更新對等方分數”, “對等方”, id, “err”, err) 返回 } }
然後在添加新的節點前會檢查其積分情況
func AddScorering(gater BlockingConnectionGater, score Scores, minScore float64) *ScoreringConnectionGater { return &ScoreringConnectionGater{BlockingConnectionGater: gater, scores: scores, minScore: minScore} } func (g *ScoreringConnectionGater) checkScore(p peer.ID) (allow bool) { score, err := g.scores.GetPeerScore(p) 如果錯誤 != 無 { 返回假 } 返回分數 >= g.minScore } func (g *ScoreringConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) { return g.BlockingConnectionGater.InterceptPeerDial(p) && g.checkScore(p) } func (g *ScoreringConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (允許布爾值) { return g.BlockingConnectionGater.InterceptAddrDial(id, ma) && g.checkScore(id) } func (g *ScoreringConnectionGater) InterceptSecured(dir network.方向,id peer.ID,mas網路。ConnMultiaddrs) (allow bool) { return g.BlockingConnectionGater.InterceptSecured(dir, id, mas) && g.checkScore(id) }
總結
libp2p的高度可配置性使得整個專案的p2p具有高度的可自定義化和模組話,以上是optimsim對libp2p進行個人化實現的主要邏輯,還有其他細節可以在p2p目錄下通過閱讀源碼的方式來詳細學習。