Использование libp2p в op-stack

Автор: Joohhnnn

Libp2P приложения в Optimism

В этом разделе основная цель состоит в том, чтобы объяснить, как Optimism использует Libb2P для завершения создания P2P-сетей в Op-Node. P2P-сети в основном используются для передачи информации в различных узлах, таких как секвенсоры после завершения построения блоков Unsafe, и распространения через pub/sub p2p gossiphub. Libb2P также имеет дело с другими инфраструктурами, такими как сеть, адресация и т. д. в сетях P2P.

Узнайте больше о libp2p

libp2p (сокращённо от "library peer-to-peer" или "library peer-to-peer") — это одноранговая (P2P) сетевая инфраструктура, которая помогает разрабатывать P2P-приложения. Он содержит набор протоколов, спецификаций и библиотек, которые упрощают P2P-коммуникацию между участниками сети (также известными как «одноранги» или «одноранговые узлы») (источник[2] [3])。 Первоначально являясь частью проекта IPFS (InterPlanetary File), libp2p превратилась в самостоятельный проект, который стал модульным сетевым стеком (исходным кодом) для распределенных сетей )。

libp2p — это проект с открытым исходным кодом сообщества IPFS, который приветствует вклад широкого круга сообществ, в том числе помощь в написании спецификаций, реализациях кода, а также в создании примеров и учебных пособий[4] [5])。 libp2p состоит из нескольких строительных блоков, каждый из которых имеет очень четкий, документированный и протестированный интерфейс, который делает их компонуемыми, заменяемыми и, следовательно, обновляемыми )。 Модульная природа libp2p позволяет разработчикам выбирать и использовать только те компоненты, которые необходимы для их приложений, что способствует гибкости и эффективности при создании P2P-веб-приложений.

Ресурсы по теме

  • Официальная документация для libp2p[6]
  • Репозиторий libp2p на GitHub[7]
  • Введение в libp2p на ProtoSchool[8]

Модульная архитектура и открытый исходный код libp2p обеспечивают хорошую среду для разработки мощных, масштабируемых и гибких P2P-приложений, что делает ее важным игроком в области распределенных сетей и разработки сетевых приложений.

Реализация libp2p

При использовании libp2p вам нужно будет внедрить и настроить некоторые основные компоненты для построения вашей P2P-сети. Вот некоторые из основных аспектов реализации libp2p в приложении:

1. Создание и настройка узла:

  • Создание и настройка узла libp2p является самым базовым шагом, который включает в себя установку сетевого адреса узла, идентичности и других основных параметров. Коды использования ключей:

libp2p. Новый()

2. Транспортный протокол:

Выберите и настройте свои транспортные протоколы (например, TCP, WebSockets и т. д.) для обеспечения связи между узлами. Коды использования ключей:

tcpTransport := tcp. ДобавленоTCPTransport()

3. Мультиплексирование и управление потоком:

  • Реализуйте мультиплексирование, позволяющее обрабатывать несколько параллельных потоков данных в одном соединении.
  • Реализуйте управление потоком для управления скоростью передачи данных и скоростью обработки. Коды использования ключей:

yamuxTransport := yamux. Новый()

4. Безопасность и шифрование:

  • Настройте безопасный транспортный уровень для обеспечения безопасности и конфиденциальности коммуникаций.
  • Внедрите механизмы шифрования и аутентификации для защиты данных и аутентификации взаимодействующих сторон. Коды использования ключей:

tlsTransport := tls. Новый()

5. Протоколы и обработка сообщений:

Определите и реализуйте пользовательские протоколы для обработки определенных сетевых операций и обмена сообщениями.

  • Обрабатывайте полученные сообщения и отправляйте ответы по мере необходимости. Коды использования ключей:

хозяин. SetStreamHandler("/my-protocol/1.0.0", myProtocolHandler)

6. Обнаружение и маршрутизация:

  • Реализовать механизм обнаружения узлов для поиска других узлов в сети.
  • Реализуйте логику маршрутизации, чтобы определить, как сообщения направляются к нужным узлам в сети. Коды использования ключей:

ДГТ := Каддхт. NewDHT(ctx, хост, хранилище данных. NewMapDatastore())

7. Поведение и политики сети:

Определите и реализуйте сетевое поведение и политики, такие как управление подключениями, обработка ошибок и логика повторных попыток. Коды использования ключей:

connManager := connmgr. NewConnManager(lowWater, highWater, gracePeriod)

8. Управление состоянием и хранение:

Управляйте состоянием узлов и сетей, включая состояние подключений, список узлов и хранилище данных. Коды использования ключей:

peerstore := pstoremem. NewPeerstore()

9. Тестирование и отладка:

  • Напишите тесты для вашего приложения libp2p, чтобы убедиться в его корректности и надежности.
  • Используйте средства отладки и журналы для диагностики и устранения проблем с сетью. Коды использования ключей:

лесозаготовка. SetLogLevel("libp2p", "DEBUG")

10. Документация и поддержка сообщества:

  • Обратитесь к документации libp2p, чтобы узнать о различных компонентах и API.
  • Общайтесь с сообществом libp2p для получения поддержки и решения проблем.

}

Выше приведены некоторые из основных аспектов, которые следует учитывать и реализовывать при использовании libp2p. Конкретная реализация каждого проекта может отличаться, но эти основные аспекты необходимы для создания и запуска libp2p-приложений. При реализации этих функций вы можете обратиться к официальной документации libp2p[9] [10]и репозитории GitHub Примеры кода и учебные пособия.

Использование libp2p в OP-ноде

Для того, чтобы выяснить связь между op-node и libp2p, мы должны ответить на несколько вопросов

  • Почему libp2p? Почему бы не выбрать devp2p (geth использует devp2p)
  • Какие данные или процессы тесно связаны с P2P-сетью в OP-узле
  • Как эти функции реализуются на уровне кода

Зачем op-узлам сеть libp2p

Во-первых, нам нужно понять, почему Optimism требуется P2P-сеть ** LibbP2P — это модульный сетевой протокол, который позволяет разработчикам создавать децентрализованные одноранговые приложения для различных вариантов использования (источники[11] [12])(источник [13])。 DevP2P, с другой стороны, в основном используется в экосистеме Ethereum и заточен под приложения Ethereum (Источник )。 Гибкость и широкая применимость libp2p могут сделать ее лучшим выбором для разработчиков.

Op-Node в основном использует функции libp2p

  • Используется секвенсором для передачи результирующего небезопасного блока другим узлам, не являющимся узлами секвенсора
  • Для других узлов в режиме без секвенсора для быстрой синхронизации при возникновении разрыва (обратная синхронизация цепочки)
  • Хорошая среда для внедрения интегральной системы репутации для регулирования всего узла

Реализация кода

Пользовательская инициализация узла сети

Хост можно понимать как P2P-узел, при открытии этого узла нужно сделать какую-то специальную конфигурацию инициализации для собственного проекта

Теперь давайте посмотрим на метод Host в файле op-node/p2p/host.go.

Эта функция в основном используется для настройки хоста libp2p и выполнения различных конфигураций. Вот ключевые части функции и простое описание каждой из них на китайском языке:

  1. Проверьте, отключен ли P2P
    Если P2P отключен, функция возвращается напрямую.
  2. Получение идентификатора пира из открытого ключа
    Используйте открытый ключ в конфигурации для создания идентификатора однорангового узла.
  3. Инициализация Peerstore
    Создайте базовое хранилище Peerstore.
  4. Инициализируем расширение Peerstore
    Поверх базового Peerstore создайте расширенное Peerstore.
  5. Добавление закрытых и открытых ключей в Peerstore
    Храните закрытый и открытый ключи однорангового узла в одноранговом хранилище.
  6. Инициализация шлюза соединения
    Используется для управления сетевыми соединениями.
  7. Инициализация диспетчера подключений
    Используется для управления сетевыми подключениями.
  8. Установите адрес передачи и прослушивания
    Задайте сетевой транспортный протокол и адрес прослушивания хоста.
  9. Создайте хост libp2p
    Используйте все предыдущие настройки для создания нового хоста libp2p.
  10. Инициализация статического пира
    Если у вас настроен статический одноранговый узел, инициализируйте его.
  11. Вернуться к хозяину
    Наконец, функция возвращает созданный хост libp2p.

Эти ключевые разделы отвечают за инициализацию и настройку хоста libp2p, и каждая часть отвечает за определенный аспект конфигурации хоста.

func (conf *Config) Host(log log. Логгер, репортерные метрики. Reporter, metrics HostMetrics) (host. Узел сети, ошибка) {  
    Если конф. DisableP2P {  
        Возврат ноль, ноль  
    }  
    pub := conf. Priv.GetPublic()  
    pid, err := peer. IDFromPublicKey(pub)  
    if err != nil {  
        Возврат nil, fmt. Errorf("Не удалось получить открытый ключ из ключа сети: %w", err)  
    }  
    basePs, err := pstoreds. NewPeerstore(context. Background(), conf. Магазин, pstoreds. DefaultOpts())  
    if err != nil {  
        Возврат nil, fmt. Errorf("Не удалось открыть peerstore: %w", err)  
    }  
    peerScoreParams := conf. PeerScoringParams()  
     scoreВремя удержания. Длительность  
    if peerScoreParams != nil {  
        Используйте тот же срок хранения, что и сплетни, если они доступны.  
        scoreRetention = peerScoreParams.PeerScoring.RetainScore  
    } else {  
        Отключите сборку мусора с оценкой, если оценка одноранговых узлов отключена  
        scoreRetention = 0  
    }  
    ps, err := store. NewExtendedPeerstore(context. Background(), log, clock. Clock, basePs, conf. Store, scoreRetention)  
    if err != nil {  
        Возврат nil, fmt. Errorf("Не удалось открыть расширенное одноранговое хранилище: %w", err)  
    }  
    if err := ps. AddPrivKey(pid, conf. Прив.); err != nil {  
        Возврат nil, fmt. Errorf("Не удалось настроить peerstore с ключом priv: %w", err)  
    }  
    if err := ps. AddPubKey(pid, pub); err != nil {  
        Возврат nil, fmt. Errorf("Не удалось настроить peerstore с ключом pub: %w", err)  
    }  
     Стробирование connGtr. BlockingConnectionGater  
    connGtr, err = стробирование. NewBlockingConnectionGater(conf. Магазин)  
    if err != nil {  
        Возврат nil, fmt. Errorf("Не удалось открыть шлюз соединения: %w", err)  
    }  
    connGtr = стробирование. AddBanExpiry(connGtr, ps, log, clock. Часы, метрики)  
    connGtr = стробирование. AddMetering(connGtr, метрики)  
    connMngr, err := DefaultConnManager(conf)  
    if err != nil {  
        Возврат nil, fmt. Errorf("Не удалось открыть диспетчер подключений: %w", err)  
    }  
    listenAddr, err := addrFromIPAndPort(conf. ListenIP, conf. СлушатьTCPPort)  
    if err != nil {  
        Возврат nil, fmt. Errorf("Не удалось заставить слушать addr: %w", err)  
    }  
    tcpTransport := libp2p. Транспорт(  
        протокол tcp. NewTCPTransport,  
        протокол tcp. WithConnectionTimeout(время. Минута*60)) // разрыв неиспользуемых соединений  
    TODO: технически мы также можем запустить ноду на websocket и транспортах QUIC Может быть, в будущем?  
     Нат ЛКОНФ. NATManagerC // отключено, если nil  
    Если конф. NAT {  
        nat = basichost. NewNATManager  
    }  
    opts := []libp2p. Опция{  
        libp2p. Identity(conf. Priv),  
        Явно задайте user-agent, чтобы мы могли отличаться от других пользователей Go libp2p.  
        libp2p. UserAgent(conf. UserAgent),  
        tcpTransport,  
        libp2p. WithDialTimeout(conf. TimeoutDial),  
        Никаких ретрансляционных служб, только прямые соединения между одноранговыми узлами.  
        libp2p. DisableRelay(),  
        Хост запустится и прослушивает сеть сразу после сборки из конфига.  
        libp2p. ListenAddrs(listenAddr),  
        libp2p. ConnectionGater(connGtr),  
        libp2p. ConnectionManager(connMngr),  
        libp2p. ResourceManager(nil), // TODO использует интерфейс менеджера ресурсов для более эффективного управления ресурсами для каждого однорангового узла.  
        libp2p. ping(true),  
        libp2p. AutoNATServiceRateLimit(10, 5, time. Второй*60),  
    }  
    Если конф. NoTransportSecurity {  
    } else {  
        opts = append(opts, conf. HostSecurity...)  
    }  
    h, err := libp2p. Новинка(опт...)  
    if err != nil {  
    }  
    для i, peerAddr := range conf. StaticPeers {  
        if err != nil {  
        }  
        staticPeers (статическиеОдноранговые узлы)[i]  = addr  
    }  
    out := &extraHost{  
        Ведущий: ч,  
        staticPeers: статические одноранговые узлы,  
    }  
        go out.monitorStaticPeers()  
    }  






        ...  
        n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)  
        if err != nil {  
        }  
        ...  
  1. Создание аутентификатора: Генерация названия темы блока:
  • Генерируем blocksTopicName с помощью функции blocksTopicV1, которая форматирует строку на основе L2ChainID в конфигурации (cfg). Отформатированные строки имеют определенную структуру: /optimism/{L2ChainID}/0/blocks.

+, вызвав ps. Join(blocksTopicName) пытается добавить тему сплетен блока. При возникновении ошибки возвращается сообщение об ошибке, указывающее на то, что не удалось присоединить раздел.

  • Сгенерирована новая горутина для логирования событий темы с помощью функции LogTopicEvents.

  • Создан подписчик с помощью функции MakeSubscriber, которая инкапсулирует BlocksHandler, обрабатывающий события OnUnsafeL2Payload из gossipIn. Была сгенерирована новая горутина для запуска предоставленного субиона.

    func JoinGossip(контекст p2pCtx. Context, self peer.ID, ps *pubsub. PubSub, журнал журнала. Logger, cfg *rollup. Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
    blocksTopicName := blocksTopicV1(cfg) // возвращаем fmt. Sprintf("/optimism/%s/0/blocks", cfg. L2ChainID.String())
    Возврат nil, fmt. Errorf("не удалось зарегистрировать блоки gossip topic: %w", err)
    }
    blocksTopic, err := ps. Join(blocksTopicName)
    if err != nil {
    }
    if err != nil {
    }
    go LogTopicEvents(p2pCtx, log. New("topic", "blocks"), blocksTopicEvents)
    if err != nil {
    }
    return &publisher{log: log, cfg: cfg, blocksTopic: blocksTopic, runCfg: runCfg}, nil

op-node/rollup/driver/state.go

func (s *Driver) eventLoop() {  
    for(){  
        ...  
            полезная нагрузка, err := s.sequencer.RunNextSequencerAction(ctx)  
            if err != nil {  
                Ошибки не являются достаточно серьезными для изменения/остановки последовательности, но должны регистрироваться и измеряться.  
                if err := s.network.PublishL2Payload(ctx, payload); err != nil {  
            }  
            planSequencerAction() // запланировать следующее действие секвенсора, чтобы сохранить цикл секвенирования  
            }  
    }  
    ...  

При наличии отсутствующих блоков, быстрая синхронизация через P2P

При повторной привязке узлов после простоя из-за особых обстоятельств, таких как перелинковка после простоя, некоторые блоки (разрывы) могут не синхронизироваться, и при возникновении такой ситуации они могут быстро синхронизироваться через обратную цепочку P2P-сети.

Давайте посмотрим на функцию checkForGapInUnsafeQueue в op-node/rollup/driver/state.go

Фрагмент кода определяет метод checkForGapInUnsafeQueue, принадлежащий структуре Driver. Его цель — проверить наличие пробелов в данных в очереди, называемой «небезопасной очередью», и попытаться получить отсутствующие полезные данные с помощью альтернативного метода синхронизации, называемого altSync. Ключевым моментом здесь является то, что метод обеспечивает непрерывность данных и пытается извлечь отсутствующие данные из других методов синхронизации при обнаружении отсутствующих данных. Вот основные шаги функции:

  1. Сначала функция получает UnsafeL2Head и UnsafeL2SyncTarget из s.derivation в качестве начальной и конечной точек контрольного диапазона.

  2. Функция проверяет наличие отсутствующих блоков между start и end, что делается путем сравнения значений Number end и start.

  3. При обнаружении пробела в данных функция запросит недостающий диапазон данных, вызвав s.altSync.RequestL2Range(ctx, start, end). Если end является нулевой ссылкой (т.е. eth. L2BlockRef{}), функция запросит синхронизацию открытого диапазона, начиная со start.

  4. При запросе данных функция регистрирует журнал отладки, указывая, какой диапазон данных она запрашивает.

  5. В конечном итоге функция возвращает значение ошибки. Если ошибок нет, возвращается nil

    checkForGapInUnsafeQueue проверяет, есть ли разрыв в небезопасной очереди, и пытается извлечь отсутствующие полезные данные из метода alt-sync.
    ВНИМАНИЕ: Это только исходящий сигнал, извлечение блоков не гарантируется.
    Результаты получаются через OnUnsafeL2Payload.
    func (s *Driver) checkForGapInUnsafeQueue(контекст ctx. Context) error {
    start := s.derivation.UnsafeL2Head()
    end := s.derivation.UnsafeL2SyncTarget()
    Проверьте, нет ли у нас пропущенных блоков между началом и концом, запросите их, если они есть.
    Если end == (eth. L2BlockRef{}) {
    s.log.Debug("запрос синхронизации с открытым диапазоном", "start", start)
    return s.altSync.RequestL2Range(ctx, start, eth. L2BlockRef{})
    } else if end. Номер > старт. Число+1 {
    s.log.Debug("запрашивает отсутствующий небезопасный диапазон блоков L2", "start", start, "end", end, "size", end. Число-начало.Число)
    return s.altSync.RequestL2Range(ctx, start, end)
    }
    Возврат nil
    }

Функция RequestL2Range сигнализирует о начале и конце блока запроса в канал запросов.

Затем запрос распределяется по каналу peerRequests через метод onRangeRequest, а канал peerRequests ожидает цикл, открытый несколькими пирами, т.е. только один пир будет обрабатывать запрос для каждого распределения.

func (s *SyncClient) onRangeRequest(контекст ctx. Context, req rangeRequest) {  
        ...  
        для i := uint64(0); ; i++ {  
        num := req.end.Number - 1 - i  
        if num <= req.start {  
            возвращать  
        }  
        Проверьте, есть ли у нас что-то уже на карантине  
        если h, ok := s.quarantineByNum[num] ; ok {  
            if s.trusted.Contains(h) { // если мы доверяем ему, то пытаемся его продвигать.  
                s.tryPromote(h)  
            }  
            Не приносите то, на что у нас уже есть кандидат.  
            Мы выселим его из карантина, найдя конфликт или синхронизировав достаточное количество других блоков  
            продолжать  
        }  
        if _, ok := s.inFlight[num] ; ok {  
            .log. Debug("запрос всё ещё в разработке, не перепланирующий запрос на синхронизацию", "num", num)  
            continue // запрос все еще в полете  
        }  
        pr := peerRequest{num: num, complete: new(atomic. Буль)}  
        .log. Debug("Планирование запроса блока P2P", "num", num)  
        Номер расписания  
        select {  
        case s.peerRequests <- pr:  
            s.inFlight[num]  = пр.полный  
        Корпус <-CTX. Готово():  
            .log. Info("не запланировал полный диапазон синхронизации P2P", "current", num, "err", ctx. Err())  
            возвращать  
        default: // все одноранговые узлы могут быть заняты обработкой запросов  
            .log. Info("нет пиров, готовых обрабатывать запросы на блокировку для большего количества запросов P2P для истории блоков L2", "current", num)  
            возвращать  
        }  
    }  
}

Давайте посмотрим, что произойдет, когда одноранговый узел получит этот запрос.

Первое, что нам нужно знать, это то, что связь между одноранговым узлом и запрашивающим узлом, или передача сообщения, проходит через поток libp2p. Метод обработки потока реализуется принимающим одноранговым узлом, а создание потока открывается отправляющим узлом.

Мы можем видеть этот код в предыдущей функции init, где MakeStreamHandler возвращает обработчик, а SetStreamHandler привязывает идентификатор протокола к этому обработчику, поэтому всякий раз, когда отправляющий узел создает и использует этот поток, срабатывает возвращаемый обработчик.

n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, метрики)  
Регистрация протокола синхронизации на хосте libp2p  
payloadByNumber := MakeStreamHandler(resourcesCtx, log. New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)  
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)

Давайте посмотрим, как это обрабатывается внутри обработчика Сначала функция выполняет глобальные и индивидуальные проверки ограничения скорости, чтобы контролировать скорость обработки запросов. Затем он считывает и проверяет запрошенный номер блока, гарантируя, что он находится в разумном диапазоне. Затем функция получает полезную нагрузку фрагмента запроса с уровня L2 и записывает ее в поток ответа. При записи данных ответа он устанавливает крайний срок записи, чтобы избежать блокировки медленными одноранговыми соединениями во время процесса записи. Наконец, функция возвращает запрошенный номер блока и возможные ошибки.

func (srv *ReqRespServer) handleSyncRequest(контекст ctx. Контекст, потоковая сеть. Stream) (uint64, error) {  
    peerId := поток. Конн(). RemotePeer()  
    взять токен из глобального лимитера скорости,  
    , чтобы убедиться, что между разными одноранговыми узлами не слишком много параллельной работы сервера.  
    if err := srv.globalRequestsRL.Wait(ctx); err != nil {  
        Возврат 0, FMT. Errorf("Истекло время ожидания ограничения глобальной скорости синхронизации: %w", err)  
    }  
    найти данные ограничения скорости однорангового узла или добавить иным образом  
    srv.peerStatsLock.Lock()  
    ps, _ := srv.peerRateLimits.Get(peerId)  
    if ps == nil {  
        ps = &peerStat{  
            Запросы: rate. NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst),  
        }  
        srv.peerRateLimits.Add(peerId, ps)  
        пс. Requests.Reserve() // подсчитывает попадание, но делает так, чтобы он задерживал следующий запрос, а не сразу ждал  
    } else {  
        Ожидайте только в том случае, если это существующий одноранговый узел, в противном случае вызов Wait с мгновенным ограничением скорости всегда будет ошибочным.  
        Если заказчик считает, что мы занимаем слишком много времени, то это его проблема, и он может отключиться.  
        Мы отключимся только тогда, когда не сможем прочитать/записать,  
        Если работа недействительна (проверка диапазона) или когда истекло время ожидания отдельных подзадач.  
        if err := ps. Requests.Wait(ctx); err != nil {  
            Возврат 0, FMT. Errorf("Истекло время ожидания ограничения глобальной скорости синхронизации: %w", err)  
        }  
    }  
    srv.peerStatsLock.Unlock()  
    Установите крайний срок чтения, если он доступен  
    _ = поток. SetReadDeadline(время. Сейчас(). Добавить(serverReadRequestTimeout))  
    Ознакомиться с запросом  
     req uint64  
    if err := двоичный. Чтение(поток, двоичный. LittleEndian, &req); err != nil {  
        Возврат 0, FMT. Errorf("Не удалось прочитать запрошенный номер блока: %w", err)  
    }  
    if err := поток. CloseRead(); err != nil {  
        Возврат REQ, FMT. Errorf("Не удалось закрыть сторону чтения вызова запроса на синхронизацию P2P: %w", err)  
    }  
    Убедитесь, что запрос находится в пределах ожидаемого диапазона блокировок  
    if req < srv.cfg.Genesis.L2.Number {  
        Возврат REQ, FMT. Errorf("Невозможно обслужить запрос для блока L2 %d перед генезисом %d: %w", req, srv.cfg.Genesis.L2.Number, invalidRequestErr)  
    }  
    max, err := srv.cfg.TargetBlockNumber(uint64(time. Сейчас(). Unix()))  
    if err != nil {  
        Возврат REQ, FMT. Errorf("невозможно определить максимальный номер целевого блока для проверки запроса: %w", invalidRequestErr)  
    }  
    if req > max {  
        Возврат REQ, FMT. Errorf("Невозможно обслужить запрос для блока L2 %d после максимального ожидаемого блока (%v): %w", req, max, invalidRequestErr)  
    }  
    полезная нагрузка, err := srv.l2.PayloadByNumber(ctx, req)  
    if err != nil {  
        если ошибки. Is(err, ethereum. NotFound) {  
            Возврат REQ, FMT. Errorf("Одноранговый запрошенный неизвестный блок по номеру: %w", err)  
        } else {  
            Возврат REQ, FMT. Errorf("не удалось получить полезные данные для обслуживания однорангового узла: %w", err)  
        }  
    }  
    Мы устанавливаем крайний срок записи, если он доступен, чтобы безопасно записывать без блокировки на регулировающем одноранговом соединении  
    _ = поток. SetWriteDeadline(время. Сейчас(). Добавить(serverWriteChunkTimeout))  
    0 - resultCode: success = 0  
    1:5 - версия: 0  
     .tmp [5] байт  
    if _, err := поток. Запись(tmp[:]); err != nil {  
        Возврат REQ, FMT. Errorf("Не удалось записать данные заголовка ответа: %w", err)  
    }  
    w := быстрый. NewBufferedWriter(поток)  
    if _, err := полезная нагрузка. МаршалССЗ(ж); err != nil {  
        Возврат REQ, FMT. Errorf("Не удалось записать полезную нагрузку в ответ синхронизации: %w", err)  
    }  
    if err := w.Close(); err != nil {  
        Возврат REQ, FMT. Errorf("Не удалось завершить запись полезных данных для синхронизации ответа: %w", err)  
    }  
    возвратное требование, nil  
}

На этом этапе был объяснен общий процесс запроса и обработки запроса обратной синхронизации цепочки

Система репутации очков в p2p узлах

Для того, чтобы определенные узлы не делали вредоносных запросов и ответов, которые подрывают безопасность всей сети, Optimism также использует систему баллов.

Например, в op-node/p2p/app_scores.go есть ряд функций для установки оценки однорангового узла

func (s *peerApplicationScorer) onValidResponse(id peer.ID) {  
    _, err := s.scorebook.SetScore(id, store. IncrementValidResponses{Cap: s.params.ValidResponseCap})  
    if err != nil {  
        s.log.Error("Невозможно обновить оценку однорангового узла", "peer", id, "err", err)  
        возвращать  
    }  
}  
func (s *peerApplicationScorer) onResponseError(id peer.ID) {  
    _, err := s.scorebook.SetScore(id, store. IncrementErrorResponses{Cap: s.params.ErrorResponseCap})  
    if err != nil {  
        s.log.Error("Невозможно обновить оценку однорангового узла", "peer", id, "err", err)  
        возвращать  
    }  
}  
func (s *peerApplicationScorer) onRejectedPayload(id peer.ID) {  
    _, err := s.scorebook.SetScore(id, store. IncrementRejectedPayloads{Cap: s.params.RejectedPayloadCap})  
    if err != nil {  
        s.log.Error("Невозможно обновить оценку однорангового узла", "peer", id, "err", err)  
        возвращать  
    }  
}

Затем перед добавлением нового узла проверяется статус интеграции

func AddScoring(gater BlockingConnectionGater, scores Scores, minScore float64) *ScoringConnectionGater {  
    return &ScoringConnectionGater{BlockingConnectionGater: gater, scores: баллы, minScore: minScore}  
}  
func (g *ScoringConnectionGater) checkScore(p peer.ID) (allow bool) {  
    score, err := g.scores.GetPeerScore(p)  
    if err != nil {  
        return false  
    }  
    return score >= g.minScore  
}  
func (g *ScoringConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {  
    return g.BlockingConnectionGater.InterceptPeerDial(p) && g.checkScore(p)  
}  
func (g *ScoringConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr. Multiaddr) (allow bool) {  
    return g.BlockingConnectionGater.InterceptAddrDial(id, ma) && g.checkScore(id)  
}  
func (g *ScoringConnectionGater) InterceptSecured(dir network. Направление, id peer.ID, mas сеть. ConnMultiaddrs) (allow bool) {  
    return g.BlockingConnectionGater.InterceptSecured(dir, id, mas) && g.checkScore(id)  
}

Резюме

Высокая конфигурируемость libp2p делает весь проект p2p очень настраиваемым и модульным, выше приведена основная логика персонализированной реализации libp2p от optimsim, а другие детали можно подробно изучить, прочитав исходный код в каталоге p2p.

Посмотреть Оригинал
На этой странице может содержаться сторонний контент, который предоставляется исключительно в информационных целях (не в качестве заявлений/гарантий) и не должен рассматриваться как поддержка взглядов компании Gate или как финансовый или профессиональный совет. Подробности смотрите в разделе «Отказ от ответственности» .
  • Награда
  • комментарий
  • Поделиться
комментарий
0/400
Нет комментариев
  • Закрепить