Використання libp2p в op-stack

Автор: Joohhnnn

Програми Libp2P в Optimism

У цьому розділі основна мета полягає в тому, щоб пояснити, як Optimism використовує Libb2P для завершення створення мереж P2P в Op-Node. P2P-мережі в основному використовуються для передачі інформації в різних вузлах, таких як секвенсори після завершення побудови блоку Unsafe і поширення через pub/sub пліткарського центру P2P. Libb2P також має справу з іншими інфраструктурами, такими як мережі, адресація тощо в мережах P2P.

Дізнайтеся про libp2p

libp2p (скорочено від «library peer-to-peer» або «library peer-to-peer») — це одноранговий (P2P) мережевий фреймворк, який допомагає розробляти P2P-додатки. Він містить набір протоколів, специфікацій та бібліотек, які полегшують P2P-зв'язок між учасниками мережі (також відомими як «піри» або «піри») (джерело[2] [3])。 Спочатку будучи частиною проекту IPFS (InterPlanetary File), libp2p перетворився на окремий проект, який став модульним мережевим стеком (джерелом) для розподілених мереж )。

libp2p — це проєкт спільноти IPFS з відкритим вихідним кодом, який вітає внески від широкого кола спільнот, включаючи допомогу в написанні специфікацій, реалізації коду, а також у створенні прикладів та навчальних посібників[4] [5])。 libp2p складається з кількох стандартних блоків, кожен з яких має дуже чіткий, задокументований та протестований інтерфейс, що робить їх компонувати, замінюваними і, отже, оновлюваними )。 Модульний характер libp2p дозволяє розробникам вибирати та використовувати лише компоненти, необхідні для їхніх програм, сприяючи гнучкості та ефективності під час створення веб-додатків P2P.

Пов'язані ресурси

  • Офіційна документація для libp2p[6]
  • Репозиторій libp2p на GitHub[7]
  • Вступ до libp2p на ProtoSchool[8]

Модульна архітектура та природа libp2p з відкритим вихідним кодом забезпечують гарне середовище для розробки потужних, масштабованих та гнучких P2P-додатків, що робить її важливим гравцем у галузі розподілених мереж та розробки мережевих додатків.

Реалізація libp2p

При використанні libp2p вам потрібно буде впровадити та налаштувати деякі основні компоненти для побудови вашої мережі P2P. Ось деякі з основних аспектів реалізації libp2p у програмі:

1. Створення та налаштування вузлів:

  • Створення та налаштування вузла libp2p є найпростішим кроком, який включає встановлення мережевої адреси вузла, ідентичності та інших основних параметрів. Основні коди використання:

libp2p. new()

2. Транспортний протокол:

Виберіть і налаштуйте свої транспортні протоколи (наприклад, TCP, WebSockets і т.д.) для забезпечення зв'язку між вузлами. Основні коди використання:

tcpTransport := tcp. NewTCPTransport()

3. Мультиплексування та керування потоком:

  • Реалізуйте мультиплексування, щоб дозволити обробляти кілька одночасних потоків даних при одному з'єднанні.
  • Впровадити контроль потоку для управління швидкістю передачі даних і швидкістю їх обробки. Основні коди використання:

yamuxTransport := yamux. new()

4. Безпека та шифрування:

  • Налаштуйте безпечний транспортний рівень, щоб забезпечити безпеку та конфіденційність комунікацій.
  • Впровадити механізми шифрування та аутентифікації для захисту даних та автентифікації сторін, що спілкуються. Основні коди використання:

tlsTransport := tls. new()

5. Протоколи та обробка повідомлень:

Визначайте та впроваджуйте спеціальні протоколи для обробки конкретних мережевих операцій та обміну повідомленнями.

  • Обробляти отримані повідомлення та надсилати відповіді за потреби. Основні коди використання:

господар. SetStreamHandler("/my-protocol/1.0.0", myProtocolHandler)

6. Виявлення та маршрутизація:

  • Реалізувати механізм виявлення вузлів для пошуку інших вузлів у мережі.
  • Реалізуйте логіку маршрутизації, щоб визначити, як повідомлення спрямовуються до правильних вузлів мережі. Основні коди використання:

dht := kaddht. NewDHT(ctx, хост, сховище даних. NewMapDatastore())

7. Поведінка мережі та політики:

Визначайте та впроваджуйте поведінку та політики мережі, такі як керування підключеннями, обробка помилок і логіка повторних спроб. Основні коди використання:

connManager := connmgr. NewConnManager(lowWater, highWater, gracePeriod)

8. Державне управління та зберігання:

Керуйте станом вузлів і мереж, включаючи статус з'єднання, список вузлів і зберігання даних. Основні коди використання:

peerstore := pstoremem. NewPeerstore()

9. Тестування та налагодження:

  • Напишіть тести для вашої програми libp2p, щоб переконатися в її правильності та надійності.
  • Використовуйте інструменти налагодження та журнали для діагностики та вирішення проблем із мережею. Основні коди використання:

Журналювання. SetLogLevel("libp2p", "DEBUG")

10. Документація та підтримка спільноти:

  • Зверніться до документації libp2p, щоб дізнатися про її різноманітні компоненти та API.
  • Спілкуйтеся зі спільнотою libp2p для підтримки та вирішення проблем.

}

Вище наведено деякі з основних аспектів, які слід враховувати та реалізовувати під час використання libp2p. Конкретна реалізація кожного проекту може відрізнятися, але ці основні аспекти необхідні для створення та запуску програм libp2p. При реалізації цих функцій ви можете звернутися до офіційної документації libp2p[9] [10]та репозиторії GitHub Приклад коду та навчальні посібники.

Використання libp2p в OP-вузлі

Для того, щоб з'ясувати взаємозв'язок між op-node і libp2p, ми повинні з'ясувати кілька питань

  • Чому libp2p? Чому б не вибрати devp2p (geth використовує devp2p)
  • Які дані або процеси тісно пов'язані з P2P-мережею в OP-вузлі
  • Як ці функції реалізовані на рівні коду

Навіщо операційним вузлам потрібна мережа libp2p

Для початку нам потрібно зрозуміти, чому для Optimism потрібна P2P-мережа ** LibbP2P — це модульний мережевий протокол, який дозволяє розробникам створювати децентралізовані однорангові програми для кількох випадків використання (джерела[11] [12])(джерело [13])。 DevP2P, з іншого боку, в основному використовується в екосистемі Ethereum і адаптований для додатків Ethereum (Джерело )。 Гнучкість і широка застосовність libp2p може зробити її найкращим вибором для розробників.

Op-Node в основному використовує функціональні точки libp2p

  • Використовується секвенсором для передачі отриманого небезпечного блоку іншим вузлам, що не є секвенсором
  • Для інших вузлів у режимі без секвенсора для швидкої синхронізації при виникненні розриву (зворотна ланцюгова синхронізація)
  • Гарне середовище для прийняття цілісної системи репутації для регулювання всього вузла

Реалізація коду

Користувацька ініціалізація хоста

Host можна розуміти як P2P-вузол, при відкритті цього вузла потрібно зробити якусь спеціальну конфігурацію ініціалізації для власного проекту

Тепер розглянемо метод Host у файлі op-node/p2p/host.go.

Ця функція в основному використовується для налаштування вузла libp2p та створення різноманітних конфігурацій. Ось ключові частини функції та простий китайський опис кожної з них:

  1. Перевірте, чи вимкнено P2P
    Якщо P2P вимкнено, функція повертається безпосередньо.
  2. Отримайте ідентифікатор однорангового вузла з відкритого ключа
    Використовуйте відкритий ключ у конфігурації, щоб згенерувати ідентифікатор однорангового вузла.
  3. Ініціалізація Peerstore
    Створіть базовий магазин Peerstore.
  4. Ініціалізуємо розширення Peerstore
    Поверх базового Peerstore створіть розширений Peerstore.
  5. Додайте приватні та публічні ключі до Peerstore
    Зберігайте приватні та публічні ключі однорангового вузла в одноранговому сховищі.
  6. Ініціалізація з'єднання Gater
    Використовується для управління мережевими з'єднаннями.
  7. Ініціалізація диспетчера підключень
    Використовується для управління мережевими підключеннями.
  8. Встановіть адресу передачі та прослуховування
    Встановіть протокол мережевого транспорту та адресу прослуховування хоста.
  9. Створіть вузол libp2p
    Використайте всі попередні параметри, щоб створити новий вузол libp2p.
  10. Ініціалізація статичного вузла
    Якщо у вас налаштований статичний вузол, ініціалізуйте його.
  11. Назад до господаря
    Нарешті, функція повертає створений вузол libp2p.

Ці ключові розділи відповідають за ініціалізацію та налаштування вузла libp2p, і кожна частина відповідає за певний аспект конфігурації хоста.

func (conf *Config) Host(журнал журналу. Метрики реєстратора, репортера. Reporter, метрики HostMetrics) (хост. Host, error) {  
    Якщо конф. DisableP2P {  
        повернути нуль, нуль  
    }  
    pub := conf. Priv.GetPublic()  
    pid, err := peer. IDFromPublicKey(pub)  
    if err != nil {  
        Повернення нуль, FMT. Errorf("Не вдалося отримати pubkey з ключа priv мережі: %w", err)  
    }  
    basePs, err := pstoreds. NewPeerstore(контекст. Background(), conf. Зберігати, зберігати. DefaultOpts())  
    if err != nil {  
        Повернення нуль, FMT. Errorf("не вдалося відкрити пірсторію: %w", err)  
    }  
    peerScoreParams := conf. PeerScoringParams()  
     рахунокЧас утримання. Тривалість  
    if peerScoreParams != nil {  
        Використовуйте той самий період зберігання, що й плітки, якщо такі є  
        scoreRetention = peerScoreParams.PeerScoring.RetainScore  
    } else {  
        Вимкнути оцінку GC, якщо вимкнуто оцінювання за результатами аналогів  
        scoreRetention = 0  
    }  
    ps, err := store. NewExtendedPeerstore(контекст. Background(), журнал, годинник. Clock, basePs, conf. Store, scoreRetention)  
    if err != nil {  
        Повернення нуль, FMT. Errorf("не вдалося відкрити розширене пір-сховище: %w", err)  
    }  
    якщо err := ps. AddPrivKey(pid, conf. Рів); err != nil {  
        Повернення нуль, FMT. Errorf("не вдалося налаштувати однорангове сховище з ключем priv: %w", err)  
    }  
    якщо err := ps. AddPubKey(pid, pub); err != nil {  
        Повернення нуль, FMT. Errorf("не вдалося налаштувати однорангове сховище з ключем pub: %w", err)  
    }  
     connGtr gating. BlockingConnectionGater  
    connGtr, err = gating. NewBlockingConnectionGater(conf. магазину)  
    if err != nil {  
        Повернення нуль, FMT. Errorf("не вдалося відкрити gater з'єднання: %w", err)  
    }  
    connGtr = gating. AddBanExpiry(connGtr, ps, log, clock. Годинник, метрики)  
    connGtr = gating. AddMetering(connGtr, metrics)  
    connMngr, err := DefaultConnManager(conf)  
    if err != nil {  
        Повернення нуль, FMT. Errorf("не вдалося відкрити диспетчер з'єднань: %w", err)  
    }  
    listenAddr, err := addrFromIPAndPort(conf. ListenIP, conf. ListenTCPPort)  
    if err != nil {  
        Повернення нуль, FMT. Errorf("не вдалося зробити прослуховування addr: %w", err)  
    }  
    tcpTransport := libp2p. Транспорт(  
        ПТС. NewTCPTransport,  
        ПТС. WithConnectionTimeout(time. Хвилина*60)) // розрив невикористовуваних з'єднань  
    TODO: технічно ми також можемо запускати вузол на транспортах websocket та QUIC Можливо, в майбутньому?  
     Нат лконф. NATManagerC // вимкнено, якщо nil  
    Якщо конф. NAT {  
        nat = базовий вузол. NewNATManager  
    }  
    opts := []libp2p. Option{  
        libp2p. Ідентичність (конф. Priv),  
        Явно встановіть user-agent, щоб ми могли відрізнятися від інших користувачів libp2p Go.  
        libp2p. UserAgent(conf. UserAgent),  
        tcpТранспорт,  
        libp2p. WithDialTimeout(conf. Timeout Dial),  
        Немає ретрансляційних послуг, лише пряме з'єднання між вузлами.  
        libp2p. DisableRelay(),  
        host запуститься і прослухає мережу безпосередньо після збірки з конфігурації.  
        libp2p. ListenAddrs(listenAddr),  
        libp2p. ConnectionGater(connGtr),  
        libp2p. ConnectionManager(connMngr),  
        libp2p. ResourceManager(nil), // TODO використовувати інтерфейс менеджера ресурсів для кращого керування ресурсами для кожного вузла.  
        libp2p. Ping(true),  
        libp2p. AutoNATServiceRateLimit(10, 5, time. Другий*60),  
    }  
    Якщо конф. NoTransportSecurity {  
    } else {  
        opts = append(opts, conf. HostSecurity...)  
    }  
    h, err := libp2p. Нове(опти...)  
    if err != nil {  
    }  
    for i, peerAddr := range conf. StaticPeers {  
        if err != nil {  
        }  
        staticPeers[i]  = addr  
    }  
    out := &extraHost{  
        Ведучий: ч,  
        staticPeers: staticPeers,  
    }  
        вийти.monitorStaticPeers()  
    }  






        ...  
        n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)  
        if err != nil {  
        }  
        ...  
  1. Створення аутентифікатора: Генерація назви теми блоку:
  • Генерувати blocksTopicName за допомогою функції blocksTopicV1, яка форматує рядок на основі L2ChainID в конфігурації (cfg). Відформатовані рядки мають певну структуру: /optimism/{L2ChainID}/0/blocks.

  • функція викликом ps. Join(blocksTopicName) намагається додати тему пліток блоку. Якщо виникає помилка, він повертає повідомлення про помилку, яке вказує на те, що до теми не вдалося приєднатися.

  • Згенеровано нову goroutine для реєстрації тематичних подій за допомогою функції LogTopicEvents.

  • Створено підписника за допомогою функції MakeSubscriber, яка інкапсулює BlocksHandler, який обробляє події OnUnsafeL2Payload з gossipIn. Було згенеровано нову goroutine для запуску наданого subion.

    func JoinGossip(p2pCtx контекст. Контекст, peer.ID, ps *pubsub. PubSub, журнал журналу. Logger, cfg *rollup. Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, помилка) {
    blocksTopicName := blocksTopicV1(cfg) // return fmt. Sprintf("/optimism/%s/0/blocks", cfg. L2ChainID.String())
    Повернення нуль, FMT. Errorf("не вдалося зареєструвати заблокувати тему пліток: %w", err)
    }
    blocksTopic, err := ps. Join(blocksTopicName)
    if err != nil {
    }
    if err != nil {
    }
    go LogTopicEvents(p2pCtx, log. New("тема", "блоки"), blocksTopicEvents)
    if err != nil {
    }
    return &publisher{log: log, cfg: cfg, blocksTopic: blocksTopic, runCfg: runCfg}, nil

op-node/rollup/driver/state.go

func (s *Driver) eventLoop() {  
    for(){  
        ...  
            payload, err := s.sequencer.RunNextSequencerAction(ctx)  
            if err != nil {  
                Помилки не є достатньо серйозними, щоб змінювати/зупиняти послідовність, але їх слід реєструвати та вимірювати.  
                if err := s.network.PublishL2Payload(ctx, payload); err != nil {  
            }  
            planSequencerAction() // запланувати наступну дію секвенсера, щоб зберегти цикл секвенування  
            }  
    }  
    ...  

Коли відсутні блоки, швидка синхронізація через P2P

Коли вузли перепов'язуються після простою через особливі обставини, такі як перелінкування після простою, деякі блоки (розриви) можуть не синхронізуватися, і при виникненні цієї ситуації вони можуть швидко синхронізуватися через зворотний ланцюг мережі P2P.

Давайте подивимося на функцію checkForGapInUnsafeQueue в op-node/rollup/driver/state.go

Фрагмент коду визначає метод checkForGapInUnsafeQueue, який належить до структури драйвера. Його мета полягає в тому, щоб перевірити наявність прогалин у даних у черзі, яка називається «небезпечною чергою», і спробувати отримати відсутні корисні навантаження за допомогою альтернативного методу синхронізації під назвою altSync. Ключовим моментом тут є те, що метод полягає в тому, щоб забезпечити безперервність даних і спробувати отримати відсутні дані з інших методів синхронізації при виявленні відсутніх даних. Ось основні кроки функції:

  1. Функція спочатку отримує UnsafeL2Head та UnsafeL2SyncTarget від s.derivation як початкову та кінцеву точки діапазону перевірки.

  2. Функція перевіряє відсутність блоків між початком і кінцем, що робиться шляхом порівняння значень Number кінця і початку.

  3. Якщо буде виявлено прогалину в даних, функція запросить відсутній діапазон даних, викликавши s.altSync.RequestL2Range(ctx, початок, кінець). Якщо end є нульовим посиланням (тобто eth. L2BlockRef{}), функція запросить синхронізацію відкритого діапазону, починаючи з start.

  4. Під час запиту даних функція реєструє журнал налагодження, вказуючи, який діапазон даних вона запитує.

  5. Функція в кінцевому підсумку повертає значення помилки. Якщо помилок немає, він повертає нуль

    checkForGapInUnsafeQueue перевіряє, чи є прогалини в небезпечній черзі, і намагається отримати відсутні набори корисних даних із методу alt-синхронізації.
    ПОПЕРЕДЖЕННЯ: Це лише вихідний сигнал, блокування не гарантовано буде отримано.
    Результати надходять через OnUnsafeL2Payload.
    func (s *Driver) checkForGapInUnsafeQueue(контекст ctx. Context) error {
    start := s.derivation.UnsafeL2Head()
    end := s.derivation.UnsafeL2SyncTarget()
    Перевірте, чи є у нас відсутні блоки між початком і кінцем Запитайте їх, якщо ми це зробимо.
    if end == (eth. L2BlockRef{}) {
    s.log.Debug("запит синхронізації з відкритим діапазоном", "start", start)
    return s.altSync.RequestL2Range(ctx, start, eth. L2BlockRef{})
    } інакше, якщо кінець. Номер > початку. Число+1 {
    s.log.Debug("запит на відсутність небезпечного діапазону блоків L2", "start", "start", "end", end, "size", end. Number-start.Number)
    return s.altSync.RequestL2Range(ctx, початок, кінець)
    }
    повернути нуль
    }

Функція RequestL2Range сигналізує каналу запитів про початок і закінчення блоку запитів.

Потім запит розподіляється на канал peerRequests за допомогою методу onRangeRequest, а на канал peerRequests чекає цикл, відкритий кількома вузлами, тобто лише один вузол оброблятиме запит для кожного дистрибутива.

func (s *SyncClient) onRangeRequest(контекст ctx. Context, req rangeRequest) {  
        ...  
        для i := uint64(0); ; i++ {  
        num := req.end.Число - 1 - i  
        if num <= req.start {  
            Повернутися  
        }  
        Перевірте, чи є у нас вже щось на карантині  
        if h, ok := s.quarantineByNum[num] ; ok {  
            if s.trusted.Contains(h) { // якщо ми йому довіряємо, спробуйте його просунути.  
                s.tryPromote(h)  
            }  
            Не приносьте речі, на які у нас вже є кандидат.  
            Ми виселимо його з карантину, знайшовши конфлікт або синхронізувавши достатню кількість інших блокувань  
            продовжити  
        }  
        if _, ok := s.inFlight[num] ; ok {  
            .log. Debug("запит все ще в польоті, без перенесення запиту синхронізації", "num", num)  
            continue // запит все ще в польоті  
        }  
        pr := peerRequest{num: num, complete: new(atomic. Булеве значення)}  
        .log. Debug("Планування P2P-запиту блоку", "num", num)  
        Номер графіка  
        select {  
        case s.peerRequests <- pr:  
            s.inFlight[num]  = пр.повне  
        Випадок <-CTX. Done():  
            .log. Info("не заплановано повний діапазон синхронізації P2P", "current", num, "err", ctx. err())  
            Повернутися  
        Типове значення: // всі вузли вже можуть бути зайняті обробкою запитів  
            .log. Info("немає вузлів, готових обробляти запити на блокування для більшої кількості P2P-запитів для історії блоків L2", "current", num)  
            Повернутися  
        }  
    }  
}

Давайте подивимося, що станеться, коли вузол отримає цей запит.

Перше, що нам потрібно знати, це те, що зв'язок між вузлом, що надсилає запит, або повідомленням, що передається, передається через потік libp2p. Метод обробки потоку реалізується вузлом-одержувачем, а створення потоку відкривається вузлом-відправником.

Ми можемо побачити цей код у попередній функції ініціалізації, де MakeStreamHandler повертає обробник, а SetStreamHandler прив'язує ідентифікатор протоколу до цього обробника, тому щоразу, коли вузол-відправник створює та використовує цей потік, спрацьовує повернутий обробник.

n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)  
Зареєструйте протокол синхронізації з вузлом libp2p  
payloadByNumber := MakeStreamHandler(resourcesCtx, log. New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)  
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)

Давайте подивимося, як це обробляється всередині обробника Функція спочатку виконує глобальну та індивідуальну перевірку ліміту ставки, щоб контролювати, наскільки швидко обробляються запити. Потім він зчитує та перевіряє запитуваний номер блоку, гарантуючи, що він знаходиться в розумному діапазоні. Потім функція отримує корисне навантаження фрагмента запиту з рівня L2 і записує його в потік відповідей. Під час запису даних відповіді він встановлює кінцевий термін запису, щоб уникнути блокування повільними одноранговими з'єднаннями під час процесу запису. Нарешті, функція повертає запитуваний номер блоку та можливі помилки.

func (srv *ReqRespServer) handleSyncRequest(контекст ctx. Контекст, потокова мережа. Stream) (uint64, помилка) {  
    peerId := stream. Conn(). RemotePeer()  
    взяти токен з глобального обмежувача швидкості,  
    щоб переконатися, що між різними вузлами не надто багато одночасної роботи сервера.  
    if err := srv.globalRequestsRL.Wait(ctx); err != nil {  
        return 0, fmt. Errorf("тайм-аут очікування глобального обмеження швидкості синхронізації: %w", err)  
    }  
    знайти дані про обмеження швидкості однорангових вузлів або додати інше  
    srv.peerStatsLock.Lock()  
    ps, _ := srv.peerRateLimits.Get(peerId)  
    if ps == nil {  
        ps = &peerStat{  
            Запити: ставка. NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst),  
        }  
        srv.peerRateLimits.Add(peerId, ps)  
        PS. Requests.Reserve() // підрахувати звернення, але змусити його відкласти наступний запит, а не відразу чекати  
    } else {  
        Зачекайте, лише якщо це існуючий вузол, інакше миттєвий виклик очікування з обмеженням швидкості завжди буде помилковим.  
        Якщо запитувач вважає, що ми займаємо занадто багато часу, то це його проблема, і він може відключитися.  
        Ми відключимося тільки тоді, коли не вміємо читати/писати,  
        якщо робота недійсна (перевірка діапазону), або коли тайм-аут окремих підзадач.  
        якщо err := ps. Requests.Wait(ctx); err != nil {  
            return 0, fmt. Errorf("тайм-аут очікування глобального обмеження швидкості синхронізації: %w", err)  
        }  
    }  
    srv.peerStatsLock.Unlock()  
    Установіть кінцевий термін читання, якщо він доступний  
    _ = потік. SetReadDeadline(час. Now(). Add(serverReadRequestTimeout))  
    Ознайомитись із запитом  
     req uint64  
    if err := двійковий. Читання(потік, двійковий. LittleEndian, &req); err != nil {  
        return 0, fmt. Errorf("не вдалося прочитати номер запитуваного блоку: %w", err)  
    }  
    if err := stream. CloseRead(); err != nil {  
        Return req, FMT. Errorf("не вдалося закрити сторону читання виклику запиту синхронізації P2P: %w", err)  
    }  
    Переконайтеся, що запит знаходиться в очікуваному діапазоні блоків  
    if req < srv.cfg.Genesis.L2.Number {  
        Return req, FMT. Errorf("не може обслуговувати запит для блоку L2 %d до генезису %d: %w", req, srv.cfg.Genesis.L2.Number, invalidRequestErr)  
    }  
    max, err := srv.cfg.TargetBlockNumber(uint64(time. Now(). Unix()))  
    if err != nil {  
        Return req, FMT. Errorf("неможливо визначити максимальну кількість цільових блоків для перевірки запиту: %w", invalidRequestErr)  
    }  
    if req > max {  
        Return req, FMT. Errorf("Неможливо обслужити запит для блоку L2 %d після максимального очікуваного блоку (%v): %w", req, max, invalidRequestErr)  
    }  
    payload, err := srv.l2.PayloadByNumber(ctx, req)  
    if err != nil {  
        якщо помилки. Is(err, ethereum. NotFound) {  
            Return req, FMT. Errorf("вузол запитав невідомий блок за номером: %w", err)  
        } else {  
            Return req, FMT. Errorf("не вдалося отримати корисне навантаження для подачі в одноранговий вузол: %w", err)  
        }  
    }  
    Ми встановлюємо кінцевий термін запису, якщо він доступний, щоб безпечно писати без блокування однорангового з'єднання, що дроселюється  
    _ = потік. SetWriteDeadline(час. Now(). Add(serverWriteChunkTimeout))  
    0 - resultCode: success = 0  
    1:5 - версія: 0  
     .tmp [5] байт  
    if _, err := stream. Write(tmp[:]); err != nil {  
        Return req, FMT. Errorf("не вдалося записати дані заголовка відповіді: %w", err)  
    }  
    w := Швидкий. NewBufferedWriter(потік)  
    if _, err := корисне навантаження. Маршал ССЗ(ж); err != nil {  
        Return req, FMT. Errorf("не вдалося записати набір корисних даних для синхронізації відповіді: %w", err)  
    }  
    if err := w.Close(); err != nil {  
        Return req, FMT. Errorf("не вдалося завершити запис набору корисних даних для синхронізації відповіді: %w", err)  
    }  
    return req, нуль  
}

На цьому етапі пояснено загальний процес запиту та обробки зворотної ланцюгової синхронізації

Система репутації балів у p2p-вузлах

Для того, щоб певні вузли не робили шкідливих запитів і відповідей, які підривають безпеку всієї мережі, Optimism також використовує систему балів.

Наприклад, в op-node/p2p/app_scores.go є ряд функцій для встановлення оцінки однорангового вузла

func (s *peerApplicationScorer) onValidResponse(id peer.ID) {  
    _, err := s.scorebook.SetScore(id, store. IncrementValidResponses{Cap: s.params.ValidResponseCap})  
    if err != nil {  
        s.log.Error("Неможливо оновити оцінку аналогів", "peer", id, "err", err)  
        Повернутися  
    }  
}  
func (s *peerApplicationScorer) onResponseError(id peer.ID) {  
    _, err := s.scorebook.SetScore(id, store. IncrementErrorResponses{Cap: s.params.ErrorResponseCap})  
    if err != nil {  
        s.log.Error("Неможливо оновити оцінку аналогів", "peer", id, "err", err)  
        Повернутися  
    }  
}  
func (s *peerApplicationScorer) onRejectedPayload(id peer.ID) {  
    _, err := s.scorebook.SetScore(id, store. IncrementRejectedPayloads{Cap: s.params.RejectedPayloadCap})  
    if err != nil {  
        s.log.Error("Неможливо оновити оцінку аналогів", "peer", id, "err", err)  
        Повернутися  
    }  
}

Потім перевіряється статус інтеграції нового вузла перед його додаванням

func AddScoring(gater BlockingConnectionGater, scores Scores, minScore float64) *ScoringConnectionGater {  
    return &ScoringConnectionGater{BlockingConnectionGater: gater, scores: scores, minScore: minScore}  
}  
func (g *ScoringConnectionGater) checkScore(p peer.ID) (allow bool) {  
    score, err := g.scores.GetPeerScore(p)  
    if err != nil {  
        повертає false  
    }  
    return score >= g.minScore  
}  
func (g *ScoringConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {  
    return g.BlockingConnectionGater.InterceptPeerDial(p) && g.checkScore(p)  
}  
func (g *ScoringConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr. Multiaddr) (allow bool) {  
    return g.BlockingConnectionGater.InterceptAddrDial(id, ma) && g.checkScore(id)  
}  
func (g *ScoringConnectionGater) InterceptSecured(мережа каталог. Напрямок, id peer.ID, мас мережа. ConnMultiaddrs) (allow bool) {  
    return g.BlockingConnectionGater.InterceptSecured(dir, id, mas) && g.checkScore(id)  
}

Підсумки

Висока конфігурованість libp2p робить весь проект p2p дуже настроюваним і модульним, вище наведена основна логіка персоналізованої реалізації libp2p optimsim, а інші деталі можна детально дізнатися, прочитавши вихідний код в директорії p2p.

Переглянути оригінал
Ця сторінка може містити контент третіх осіб, який надається виключно в інформаційних цілях (не в якості запевнень/гарантій) і не повинен розглядатися як схвалення його поглядів компанією Gate, а також як фінансова або професійна консультація. Див. Застереження для отримання детальної інформації.
  • Нагородити
  • Прокоментувати
  • Поділіться
Прокоментувати
0/400
Немає коментарів
  • Закріпити