• 产品与解决方案
  • 行业解决方案
  • 服务
  • 支持
  • 合作伙伴
  • 新华三人才研学中心
  • 关于我们

H3C交换机 Telemetry二次开发指南(Comware V7)-6W100

手册下载

H3C交换机 Telemetry二次开发指南(Comware V7)-6W100-整本手册.pdf  (813.76 KB)

  • 发布时间:2024/7/9 19:53:03
  • 浏览量:
  • 下载量:

H3C交换机 Telemetry二次开发指南(Comware V7

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Copyright © 2022 新华三技术有限公司 版权所有,保留一切权利。

非经本公司书面许可,任何单位和个人不得擅自摘抄、复制本文档内容的部分或全部,并不得以任何形式传播。

除新华三技术有限公司的商标外,本手册中出现的其它公司的商标、产品标识及商品名称,由各自权利人拥有。

本文中的内容为通用性技术信息,某些信息可能不适用于您所购买的产品


 

本文档主用来指导用户使用Telemetry技术配置和管理设备。

前言部分包含如下内容:

·     读者对象

·     本书约定

·     资料意见反馈

读者对象

本手册主要适用于如下工程师:

·     熟悉gRPCGPB编码的开发人员

·     熟悉对应编程语言(C++JAVAPythonGO)的开发人员

·     负责网络配置和维护,且具有一定XMLNETCONF技术基础的网络管理员

本书约定

1. 命令行格式约定

   

   

粗体

命令行关键字(命令中保持不变、必须照输的部分)采用加粗字体表示。

斜体

命令行参数(命令中必须由实际值进行替代的部分)采用斜体表示。

[ ]

表示用“[ ]”括起来的部分在命令配置时是可选的。

{ x | y | ... }

表示从多个选项中仅选取一个。

[ x | y | ... ]

表示从多个选项中选取一个或者不选。

{ x | y | ... } *

表示从多个选项中至少选取一个。

[ x | y | ... ] *

表示从多个选项中选取一个、多个或者不选。

&<1-n>

表示符号&前面的参数可以重复输入1n次。

#

由“#”号开始的行表示为注释行。

 

2. 各类标志

本书还采用各种醒目标志来表示在操作过程中应该特别注意的地方,这些标志的意义如下:

警告

该标志后的注释需给予格外关注,不当的操作可能会对人身造成伤害。

注意

提醒操作中应注意的事项,不当的操作可能会导致数据丢失或者设备损坏

提示

为确保设备配置成功或者正常工作而需要特别关注的操作或信息。

说明

对操作内容的描述进行必要的补充和说明。

窍门

配置、操作、或使用设备的技巧、小窍门。

 

3. 图标约定

本书使用的图标及其含义如下:

该图标及其相关描述文字代表一般网络设备,如路由器、交换机、防火墙等。

该图标及其相关描述文字代表一般意义下的路由器,以及其他运行了路由协议的设备。

该图标及其相关描述文字代表二、三层以太网交换机,以及运行了二层协议的设备。

该图标及其相关描述文字代表无线控制器、无线控制器业务板和有线无线一体化交换机的无线控制引擎设备。

该图标及其相关描述文字代表无线接入点设备。

该图标及其相关描述文字代表无线终结单元

该图标及其相关描述文字代表无线终结者

该图标及其相关描述文字代表无线Mesh设备。

该图标代表发散的无线射频信号。

该图标代表点到点的无线射频信号。

该图标及其相关描述文字代表防火墙、UTM、多业务安全网关、负载均衡等安全设备。

该图标及其相关描述文字代表防火墙插卡、负载均衡插卡、NetStream插卡、SSL VPN插卡、IPS插卡、ACG插卡等安全插卡。

 

4. 示例约定

由于设备型号不同、配置不同、版本升级等原因,可能造成本手册中的内容与用户使用的设备显示信息不一致。实际使用中请以设备显示的内容为准。

本手册中出现的端口编号仅作示例,并不代表设备上实际具有此编号的端口,实际使用中请以设备上存在的端口编号为准。

资料意见反馈

如果您在使用过程中发现产品资料的任何问题,可以通过以下方式反馈:

E-mailinfo@h3c.com

感谢您的反馈,让我们做得更好!

 


 

1 概述··· 1-1

1.1 Telemetry简介·· 1-1

1.2 Telemetry网络模型·· 1-1

2 基于gRPCTelemetry技术介绍··· 2-1

2.1 gRPC协议·· 2-1

2.1.1 gRPC协议栈分层·· 2-1

2.1.2 gRPC网络架构·· 2-2

2.1.3 gRPC的对接模式·· 2-2

2.1.4 gRPC的服务模式·· 2-3

2.2 采样数据·· 2-4

2.3 采样路径·· 2-4

2.3.1 采样路径格式·· 2-4

2.3.2 过滤条件·· 2-5

2.4 编码格式·· 2-5

2.4.1 GPB编码介绍·· 2-6

2.4.2 JSON编码介绍·· 2-7

2.4.3 JSON_IETF编码介绍·· 2-8

2.5 Proto文件·· 2-9

2.5.1 Proto文件介绍·· 2-9

2.5.2 公共Proto文件·· 2-9

2.5.3 业务Proto文件·· 2-13

2.5.4 不同对接模式支持的Proto文件·· 2-13

3 Telemetry配置限制和指导··· 3-1

4 配置设备侧的Telemetry订阅··· 4-1

4.1 配置Dial-in模式的Telemetry订阅·· 4-1

4.1.1 功能简介·· 4-1

4.1.2 配置限制和指导·· 4-1

4.1.3 配置准备·· 4-1

4.1.4 配置步骤·· 4-1

4.1.5 验证配置·· 4-3

4.1.6 配置举例·· 4-3

4.2 配置Dial-out模式的Telemetry订阅·· 4-3

4.2.1 功能简介·· 4-3

4.2.2 配置限制和指导·· 4-4

4.2.3 配置准备·· 4-4

4.2.4 配置步骤·· 4-4

4.2.5 验证配置·· 4-7

4.2.6 配置举例·· 4-7

5 Telemetry对接软件二次开发举例(Dial-in模式)··· 5-1

5.1 开发前准备·· 5-1

5.1.1 开发人员要求·· 5-1

5.1.2 开发环境准备·· 5-1

5.2 组网需求·· 5-2

5.3 配置设备侧的Telemetry订阅·· 5-2

5.4 gRPC Dial-in模式二次开发举例(C++·· 5-2

5.4.1 生成代码·· 5-2

5.4.2 开发代码·· 5-3

5.5 gRPC Dial-in模式二次开发举例(GO·· 5-10

5.5.1 生成代码·· 5-10

5.5.2 开发代码·· 5-10

5.6 gRPC Dial-in模式二次开发举例(Python·· 5-17

5.6.1 生成代码·· 5-17

5.6.2 开发代码·· 5-18

5.7 gRPC Dial-in模式二次开发举例(JAVA·· 5-22

5.7.1 生成代码·· 5-22

5.7.2 开发代码·· 5-23

6 Telemetry对接软件二次开发举例(Dial-out模式)··· 6-1

6.1 开发前准备·· 6-1

6.1.1 开发人员要求·· 6-1

6.1.2 开发环境准备·· 6-1

6.2 组网需求·· 6-2

6.3 配置设备侧的Telemetry订阅·· 6-2

6.4 gRPC Dial-out模式二次开发举例(C++·· 6-2

6.4.1 生成代码·· 6-2

6.4.2 开发代码·· 6-3

6.5 gRPC Dial-out模式二次开发举例(GO·· 6-8

6.5.1 生成代码·· 6-8

6.5.2 开发代码·· 6-9

6.6 gRPC Dial-out模式二次开发举例(Python·· 6-13

6.6.1 生成代码·· 6-13

6.6.2 开发代码·· 6-14

6.7 gRPC Dial-out模式二次开发举例(JAVA·· 6-16

6.7.1 生成代码·· 6-16

6.7.2 开发代码·· 6-17

7 常见问题··· 7-1

 


1 概述

1.1  Telemetry简介

Telemetry通常指基于数据模型的Telemetry (Model-Driven Telemetry)它提供了一种高速的、大规模远程数据采集机制,可用于整网设备的实时性能监控。

随着网络的普及和新技术的涌现,网络规模日益增大,部署的复杂度逐步提升,用户对业务的质量要求也不断提高。为了满足用户需求,网络运维务必更加精细化、智能化。当今网络的运维面临着如下挑战:

·     超大规模:管理的设备数目众多,监控的信息数量非常庞大。

·     快速定位:在复杂的网络中,能够快速地定位故障,达到秒级、甚至亚秒级的故障定位速度。

·     精细监控:监控的数据类型更加丰富,监控粒度更细,以便完整、准确地反应网络状况,据此预估可能发生的故障,并为网络优化提供有力的数据依据。实际网络运维中,不仅需要监控接口的流量统计信息、每条流的丢包情况、CPU和内存占用情况,还需要监控每条流的时延抖动、每个报文在传输路径上的时延、每台设备的缓冲区占用情况等。

传统的网络监控手段(SNMPCLI、日志)已经无法满足网络需求:

·     SNMPCLI方式主要采用“拉模式”获取数据,即发送请求来获取设备上的数据,限制了可以监控的网络设备数量,且无法快速获取数据。

·     SNMP Trap和日志方式虽然采用“推模式”获取数据,即设备主动将数据上报给监控设备,但仅上报事件和告警,监控的数据内容极其有限,无法准确地反映网络状况。

因此,面对大规模、高性能的网络监控需求,用户需要一种新的网络监控方式。Telemetry技术可以满足用户的要求,实现一种智能的运维系统:

·     相对于传统的“拉模式”,网络设备采用“推模式”,周期性地主动向采集器上送丰富的监控数据,提供主动高效的数据采集功能,为网络问题的快速定位、网络质量优化调整提供了重要的大数据基础。

·     利用Telemetry技术,采集器可以收集到大量的设备数据,然后将数据交给分析器进行大数据分析,分析器再将分析结果上报给控制器,由控制器调整设备的配置,便能够满足更加实时且高精度的智能运维需求。

1.2  Telemetry网络模型

1-1所示,Telemetry网络模型中包括如下组成部分:

·     网络设备:接受监控的设备。通过在网络设备上配置订阅数据源,网络设备将依据订阅要求对指定的监控数据进行采样,并将采样数据通过gRPCGoogle Remote Procedure CallGoogle远程过程调用)方式定时上送给采集器。

·     采集器:用于接收和保存网络设备上报的监控数据。

·     分析器:用于分析采集器接收到的监控数据,并对数据进行处理,例如以图形化界面的形式展现给用户。

·     控制器:通过NETCONF等方式向设备下发配置,实现对网络设备的管理。控制器可以根据分析器提供的分析结果,为网络设备下发配置,对网络设备的转发行为进行调整,也可以控制网络设备对哪些数据进行采样和上报。

图1-1 Telemetry网络模型

 


2 基于gRPCTelemetry技术介绍

H3CTelemetry技术通过gRPC协议将监控数据从设备推送给采集器。网络设备和网管系统建立gRPC连接后,设备将自动读取各种统计信息(CPU、内存、接口等),根据采集器的订阅要求将监控数据通过gRPC协议上报给采集器。

2.1  gRPC协议

gRPCGoogle Remote Procedure CallGoogle远程过程调用)是Google发布的基于HTTP 2.0协议承载的高性能开源软件框架,提供了支持多种编程语言、对网络设备进行配置和管理的方法。通信双方可以基于该软件框架进行二次开发。

2.1.1  gRPC协议栈分层

gRPC协议栈分层如2-1所示。

图2-1 RPC协议栈分层

 

自上而下,gRPC协议栈各层含义如2-1所示。

表2-1 gRPC协议栈分层

分层

说明

内容层

用于承载编码后的业务数据。业务数据的编码格式包括:

·     GPBGoogle Protocol Buffer):高效的二进制编码格式,通过Proto文件描述编码使用的数据结构。在设备和采集器之间传输数据时,该编码格式的数据比其他格式(如JSON)的数据具有更高的信息负载能力。业务数据使用GPB格式编码时,需要配合对应的业务模块Proto文件才能解码。

·     JSONJavaScript Object Notation):轻量级的数据交换格式,采用独立于编程语言的文本格式来存储和表示数据,易于阅读和编写。业务数据使用JSON格式编码时,通过公共Proto文件即可解码,无需对应的业务模块Proto文件。

设备和采集器通信时,双方的Proto文件必须保持一致才能解码

gRPC

定义了RPCRemote Procedure Call,远程过程调用)的协议交互格式。公共RPC方法定义在公共Proto文件中,例如grpc_dialout.proto

HTTP 2.0

当传输协议为TCP时,gRPC承载在HTTP 2.0协议上。HTTP 2.0协议具有首部数据压缩、单TCP连接支持多路请求、流量控制等性能增强特性

TLSTransport Layer Security,传输层安全)层

该层是可选的,设备和采集器可以基于TLS协议进行通道加密和双向证书认证,实现安全通信

传输层

底层通信协议,支持以下传输层协议:

·     TCP:提供面向连接的、可靠的数据链路

 

2.1.2  gRPC网络架构

2-2所示,gRPC网络采用客户端/服务器模型,使用HTTP 2.0协议传输报文。

图2-2 gRPC网络架构

 

gRPC网络的工作机制如下:

(1)     gRPC服务器通过监听指定服务端口等待gRPC客户端的连接请求。

(2)     用户通过执行gRPC客户端程序登录到gRPC服务器。

(3)     gRPC客户端调用Proto文件提供的gRPC方法发送请求消息。关于Proto文件的详细介绍请参见“2.5  Proto文件”。

(4)     gRPC服务器回复应答消息。

说明

H3C设备支持作为gRPC服务器或者gRPC客户端。

 

2.1.3  gRPC的对接模式

gRPC网络中,根据设备和采集器的角色不同,支持Dial-inDial-out两种gRPC对接模式。

1. Dial-in模式

Dial-in模式下,设备作为gRPC服务器,采集器作为gRPC客户端,由采集器主动向设备发起gRPC连接并订阅需要采集的数据信息。该模式适用于小规模网络以及采集器需要向设备下发配置的场景。

Dial-in模式支持以下操作:

·     普通Subscribe操作:用于向设备订阅事件触发类数据。所需的公共RPC方法在grpc_server.proto文件中定义,同时还需要各业务模块的Proto文件定义对应业务的RPC方法。

·     gNMI Subscribe操作:基于gNMI协议,用于向设备订阅数据推送服务,包括事件触发类数据和周期采样类数据。所需的公共RPC方法在grpc_server.proto文件中定义,同时还需要gnmi.protognmi_ext.proto文件。

说明

gNMIgRPC Network Management InterfacegRPC网络管理接口)是基于gRPC框架开发的一种操作协议,定义了一系列用于设备状态获取和配置操作的RPC方法。gNMI支持通用数据模型,不需要为内容层额外提供业务模块的Proto文件。

 

2. Dial-out模式

Dial-out模式下,设备作为gRPC客户端,采集器作为gRPC服务器。设备主动和采集器建立gRPC连接,将设备上配置的订阅数据推送给采集器。该模式适用于网络设备较多的情况下向采集器提供设备数据信息。

根据对编码格式支持能力的不同,Dial-out模式下的Telemetry数据模型如下:

·     二层Telemetry数据模型,为缺省类型。该模型下,数据仅经过两个层次的处理。根据是否支持gNMI协议,又分为两种代码开发模式:

¡     普通模式

-     RPC层:定义在公共proto文件grpc_dialout.proto中,提供消息格式等公共RPC方法。grpc_dialout.proto文件中的jsonData字段承载JSON数据

-     内容层:只承载JSON编码格式的业务数据。编码后的业务数据传输到采集器后,用户只需要对grpc_dialout.proto文件进行解码,不需要使用相应的业务proto文件。

¡     gNMI模式

-     RPC层:定义在gnmi.protognmi_ext.proto文件中,提供消息格式等公共RPC方法。gnmi.proto文件中的Notification字段承载JSON数据

-     内容层:只承载JSON编码格式的业务数据。编码后的业务数据传输到采集器后,用户只需要对gnmi.proto文件进行解码,不需要使用相应的业务proto文件。

2.1.4  gRPC的服务模式

gRPC服务根据RPC方法中的参数和返回值类型来确定不同的服务模式,具体的RPC方法在各Proto文件中定义,格式为:

service 业务名称{

    rpc 方法名称(参数名称)  returns(返回值) {}

    }

例如:

service gRPCService{

    rpc gRPCMethod(para)  returns(response) {}

    rpc gRPCMethod(para)  returns(stream response) {}

    rpc gRPCMethod(stream para)  returns(response) {}

    rpc gRPCMethod(stream para)  returns(stream response) {}

    }

表2-2 gRPC服务模式

服务模式

说明及示例

简单模式

普通RPC调用,即客户端发送一个RPC请求,服务端立即返回一个RPC响应

例如:rpc Login (LoginRequest)  returns (LoginReply)

服务端流模式

客户端发送一个RPC请求,服务端持续地返回多个RPC响应

例如:rpc Subscribe(SubsPara)  returns(stream SubsRply)

客户端流模式

客户端持续地向服务器发送RPC请求,服务端返回一个RPC响应

例如:rpc Dialout(stream DialoutMsg)  returns (DialoutResponse)

双向流模式

客户端持续地向服务器发送RPC请求,服务端持续响应RPC响应

例如:rpc Subscribe(stream SubscribeRequest)  returns (stream SubscribeResponse)

 

2.2  采样数据

Telemetry支持的采样数据可以从以下三个方面理解:

·     Telemetry支持采样的原始数据包括设备的接口流量统计、CPU或者内存数据等信息。用户可在《Telemetry性能指标集》中查询所支持的采样数据。

·     Telemetry基于YANG模型组织采数据。

·     Telemetry支持对特定的采样路径采集指定的数据信息。

说明

·     YANG是一种数据建模语言,用于设计可以作为各种传输协议操作的配置数据模型、状态数据模型、远程调用模型和通知机制等。

·     设置采样路径时,可以通过设置采样条件,获取所需要的采样信息,具体的设置方法请参见“2.3  采样路径”。

·     各采样路径可以在设定的采样周期里完成对应的采集工作,对于部分数据量庞大的采样路径,若一个采样周期内完成不了采集工作,会持续到下个周期继续完成。当设备的CPU被限速的时候,采样周期可能会相应地延长。

 

2.3  采样路径

用户通过配置采样路径来获取自己需要的采样数据。设备上的数据已经通过YANG模型描述说明,基于YANG模型和它的子树路径组成了采样路径。

2.3.1  采样路径格式

1. 基本格式

基本格式为:YANG模型节点名称/子节点名称,例如:Device/CPUs

该采样路径中,“Device”表示YANG模型中的节点名称,“/”为各个节点的连接符,“CPUs”表示“Device”下的子节点名称。

2. 组织节点格式

组织节点格式的采样路径基于指定组织定义的YANG模型构建。

例如:openconfig-lldp:lldp/state

该采样路径中,之前的“openconfig-lldp”为YANG模型名称,其中的openconfig表示定义该模型的组织名称,目前支持的组织有IETFOpenConfigGSTA之后的内容为基本格式的采用路径。

2.3.2  过滤条件

在采样路径上支持配置附加的过滤条件,实现更为精细化地数据采集。

过滤条件中指定的节点必须是YANG模型中的Leaf节点。

1. 周期采样的过滤条件

周期采样是指设备以固定的时间间隔进行数据采样。周期性采样的采样路径上支持设置以下几种过滤条件。

(1)     谓语过滤

通过[column=“value”]格式设置条件表达式,表示采样数据的指定属性必须满足指定取值。

例如:ifmgr/statistics[ifindex="GigabitEthernet1/0/*"],或者ifmgr/statistics[ifindex=“12”]

该过滤条件表示,接口统计相关采样数据必须满足接口索引以“GigabitEthernet1/0/”开头或为“12”。

需要注意的是:

·     采样路径上附加谓语过滤条件时,不支持再配置其它方式的过滤条件;反之亦然。

·     一次采样过程中,同一采样路径最多支持64个谓语过滤条件,只要满足其中一个过滤条件设备就会推送采集数据。

·     谓语过滤条件中,仅索引节点的取值末尾可以为通配符“*”,且“*”仅能出现一次。

说明

目前,仅采样路径ifmgr/statistics支持谓语过滤条件。

 

(2)     列选择过滤

通过selection-nodes关键字设置条件表达式,表示仅将指定节点下的部分子节点信息上报至采集器。该过滤条件中最多可支持同时指定24个子节点,各子节点之间以空格分隔。

例如:ifmgr/statistics selection-nodes inrate outrate

该过滤条件表示,仅将statistics节点下的inrateoutrate节点的数据上报至采集器。

2.4  编码格式

gRPC当前支持如下三种编码格式承载数据:

·     JSONJavaScript Object Notation编码格式

·     GPBGoogle Protocol Buffer编码格式

·     JSON_IETFJSON Data Interchange Format)编码格式

2.4.1  GPB编码介绍

GPB编码格式是一种用于通信协议及数据存储的序列化结构数据格式,具有与语言无关、平台无关、高扩展性的优点。GPB编码与XMLJSON编码类似,不同之处在于它是一种二进制编码,性能更高。目前,GPB包括v2v3两个版本,设备支持的GPB版本是v3

2-3为经过GPB编码解析前后的代码对比示例。

表2-3 GPB编码格式

GPB编码解析前

GPB编码解析后

1 : 1

3: EOK

4: 0

1: H3C

2: m16287

3: H3C CR16006-F

15: 2

16: Device/Base

17: 188

18: 1617276638160

19: 1617276638208

20: 1617276638208

21: 5000

22: OK

23: 1

1{

  1: 1617276638207

  11: 3 {

  1: 147699

  2: "m16287"

  3 "1.3.6.1.4.1.25506.1.1101"

  4: 22

  5: 1

  10: "H3C Comware Platform Software, Software Version 7.1.075, ESS 8305\r\nH3C CR16006-F\r\nCopyright (c) 2004-2021 New H3C Technologies Co., Ltd. All rights reserved."

  11: "2021-04-01T11:30:38"

  12 {

    1: "Z"

  }

  13 {

    1: 3

    2: 1

  }

}

}

 

ReqId : 1

errors: EOK

totalSize: 0

producer_name: H3C

Node_Id_str: m16287

ProductName: H3C CR16006-F

Sub_Id_str: 2

Sensor_path: Device/Base

Collection_Id: 188

Collection_start_time: 1617276638160

msg_timestamp: 1617276638208

Collection_end_time: 1617276638208

Current_period: 5000

except_desc: OK

Encoding: Encoding_GPB

row {

  timestamp: 1617276638207

  content:

     Base {

          Uptime: 147699

          HostName: "m16287"

          HostOid: "1.3.6.1.4.1.25506.1.1101"

          MaxSlotNum: 22

           MaxCPUIDNum: 1

          HostDescription: "H3C Comware Platform Software, Software Version 7.1.075, ESS 8305\r\nH3C CR16006-F\r\nCopyright (c) 2004-2021 New H3C Technologies Co., Ltd. All rights reserved."

          LocalTime: "2021-04-01T11:30:38"

           TimeZone {

                   Zone: "Z"

            }

          ClockProtocol {

                    Protocol: 3

                    MDCID: 1

             }

          }

}

 

 

2.4.2  JSON编码介绍

JSON编码格式是一种轻量级的数据交换格式,它基于ECMAScript的一个子集,采用完全独立于编程语言的文本格式来存储和表示数据。JSON的层次结构十分简洁和清晰,不仅易于开发人员阅读、编写代码,也易于机器解析、生成代码,是一种理想的数据交换语言。

2-4为经过JSON编码解析前后的代码对比示例。

表2-4 JSON编解码格式

JSON编码解析前

JSON编码解析后

{

1:“H3C”

2:“H3C”

3:“H3C device_test”

4:“not-config”

5:“sample”

2:“Syslog/LogBuffer”

3:"notification": {

"Syslog": {

"LogBuffer": {

"BufferSize": 512,

"BufferSizeLimit": 1024,

"DroppedLogsCount": 0,

"LogsCount": 100,

"LogsCountPerSeverity": {

"Alert": 0,

"Critical": 1,

"Debug": 0,

"Emergency": 0,

"Error": 3,

"Informational": 80,

"Notice": 15,

"Warning": 1

},

"OverwrittenLogsCount": 0,

"State": "enable"

}

},

"Timestamp": "1527206160022"

}

}

{

"producerName": "H3C",

"deviceName": "H3C",

"deviceModel": "H3C device_test",

“deviceIpAddr” : "not-config",

“eventType” : "sample",

"sensorPath": "Syslog/LogBuffer",

"jsonData": {

"notification": {

"Syslog": {

"LogBuffer": {

"BufferSize": 512,

"BufferSizeLimit": 1024,

"DroppedLogsCount": 0,

"LogsCount": 100,

"LogsCountPerSeverity": {

"Alert": 0,

"Critical": 1,

"Debug": 0,

"Emergency": 0,

"Error": 3,

"Informational": 80,

"Notice": 15,

"Warning": 1

},

"OverwrittenLogsCount": 0,

"State": "enable"

}

},

"Timestamp": "1527206160022"

}

}

}

 

2.4.3  JSON_IETF编码介绍

JSON_IETF是基于YANG 1.1数据建模语言定义的JSON编码格式。JSON_IETF的基本编码格式同JSON,它的YANG数据节点(叶子、容器、叶子列表、列表、anydata节点和anyxml节点)作为JSON对象来编码。对于JSON_IETF编码,JSON对象成员的名称格式为:命名空间前缀:成员标识符,其中:

·     命名空间前缀为该数据节点的模块名称。

·     成员标识符为对应的YANG数据节点的标识符。

以上两个名称组成元素之间使用冒号(“:“)分隔,例如"H3C-device-data:Uptime",其中H3C-device-data为命名空间前缀,UptimeYANG数据节点标识符。

对于子模块中定义的数据节点,它的JSON对象成员名称必须使用所属主模块的命名空间前缀。

2-5为经过JSON_IETF编码解析前后的代码对比示例。

表2-5 JSON_IETF编解码格式

JSON_IETF编码解析前

JSON_IETF编码解析后

{

1:“H3C”

2:“H3C”

3:“H3C device_test”

4:“not-config”

5:“sample”

2:“Syslog/LogBuffer”

3:"notification": {

"Syslog": {

"LogBuffer": {

"BufferSize": 512,

"BufferSizeLimit": 1024,

"DroppedLogsCount": 0,

"LogsCount": 100,

"LogsCountPerSeverity": {

"Alert": 0,

"Critical": 1,

"Debug": 0,

"Emergency": 0,

"Error": 3,

"Informational": 80,

"Notice": 15,

"Warning": 1

},

"OverwrittenLogsCount": 0,

"State": "enable"

}

},

"Timestamp": "1527206160022"

}

}

{

"producerName": "H3C",

"deviceName": "H3C",

"deviceModel": "H3C device_test",

“deviceIpAddr” : "not-config",

“eventType” : "sample",

"sensorPath": "Syslog/LogBuffer",

"jsonData": {

"notification": {

"Syslog": {

"LogBuffer": {

"H3C-syslog-data:BufferSize": 512,

"H3C-syslog-data:BufferSizeLimit": 1024,

"H3C-syslog-data:DroppedLogsCount": 0,

"H3C-syslog-data:LogsCount": 100,

"H3C-syslog-data:LogsCountPerSeverity": {

"H3C-syslog-data:Alert": 0,

"H3C-syslog-data:Critical": 1,

"H3C-syslog-data:Debug": 0,

"H3C-syslog-data:Emergency": 0,

"H3C-syslog-data:Error": 3,

"H3C-syslog-data:Informational": 80,

"H3C-syslog-data:Notice": 15,

"H3C-syslog-data:Warning": 1

},

"OverwrittenLogsCount": 0,

"State": "enable"

}

},

"Timestamp": "1527206160022"

}

}

}

 

2.5  Proto文件

2.5.1  Proto文件介绍

Proto文件使用Protocol Buffers语言编写。Protocol Buffers简称Protobuf,是Google公司开发的一种跨语言和平台的序列化数据结构的方式,是一个灵活的、高效的用于序列化数据的协议。Proto文件用于定义Protobuf协议的编码规则。

采集器可以通过Protoc工具软件,根据“.proto”文件自动生成对应语言的开发代码(目前工具支持将Proto文件转换成多种语言的代码,例如C++JAVAGOPython),用户基于自动生成的代码进行二次开发,可以实现多种编程语言程序与设备的对接。

Proto文件包含两种类型:公共Proto文件、业务Proto文件。

2.5.2  公共Proto文件

Telemetry提供6类公共Proto文件:

·     grpc_dialout.proto文件

·     grpc_dialout_v3.proto文件

·     telemetry.proto文件

·     grpc_service.proto文件

·     gnmi.proto文件和gnmi_ext.proto文件

·     dialout.proto文件

1. grpc_dialout.proto文件

grpc_dialout.proto文件在设备作为客户端向采集器推送数据时使用,定义了RPC方法和承载的数据消息描述,其内容与含义如下文所示。(此处所列举的公共Proto文件内容仅为示例,请以设备的实际情况为准)

syntax = "proto2";//proto版本为v2版本

package grpc_dialout;//包名为grpc_dialout

message DeviceInfo{//推送的设备信息

    required string producerName = 1;//厂商名

    required string deviceName = 2;//设备名称

    required string deviceModel = 3;//设备的实体型号

    optional string deviceIpAddr = 4;//设备源ip

    optional string eventType = 5;//采样类型

}

message ChunkInfo{//推送的消息格式描述

    required int64 totalSize = 1;//报文大小

    required uint64 totalFragments = 2;//报文总共分片的片数

    required uint64 nodeId = 3;//报文的Id

}

message DialoutMsg{//推送的消息格式描述

    required DeviceInfo deviceMsg = 1;//设备实体信息

    required string sensorPath = 2;//采样路径

    required string jsonData = 3;//JSON数据

    optional ChunkInfo chunkMsg = 4;//报文分片信息

}

 

message DialoutResponse{//采集器(gRPC服务器)返回信息,预留(暂不处理返回值,可填充任意值)

    required string response = 1;//设备响应信息

}

service GRPCDialout {//服务名称为GRPCDialout

    rpc Dialout(stream DialoutMsg) returns (DialoutResponse);//方法为Dialout,单向流模式,提供数据推送的方法。入参是DialoutMsg数据流

}

2. grpc_dialout_v3.proto文件

grpc_dialout_v3.proto文件在设备作为客户端向采集器推送数据时使用,定义了RPC方法和承载的数据消息描述,主要用来定义三层Telemetry数据模型的编码规则,同时支持GPBJSON两种编码格式。该文件的内容与含义如下文所示。(此处所列举的公共Proto文件内容仅为示例,请以设备的实际情况为准)

syntax = "proto3";//proto版本为v3版本

package grpc_dialout_v3;//包名称为grpc_dialout_v3

message DialoutV3Args{//三层编码描述消息

    int64 ReqId = 1;//请求ID

    bytes data = 2;//承载的数据

    string errors = 3;//产生错误时的描述信息

    int32 totalSize = 4;// 分片时信息的总大小,未分片时为0

}

service gRPCDialoutV3{//服务名称为gRPCDialoutV3

    rpc DialoutV3(stream DialoutV3Args) returns (stream DialoutV3Args) {};

//方法为DialoutV3,双向流模式,提供数据推送的方法,入参是DialoutV3Args数据流。

}

3. telemetry.proto文件

telemetry.proto文件在设备作为客户端向采集器推送数据时使用,主要用来定义三层Telemetry数据模型的采样数据消息描述,包括采样路径、采样时间戳等重要信息。该文件的内容与含义如下文所示。(此处所列举的公共Proto文件内容仅为示例,请以设备的实际情况为准)

syntax = "proto3";//Proto版本为v3版本

package telemetry; //包名称为telemetry

message Telemetry {//Telemetry消息描述

    string producer_name = 1;//厂商名

    string node_id_str = 2;//设备名称

    string product_name = 3;//产品名称

    string subscription_id_str = 15;//订阅名

    string sensor_path = 16;//采样路径

    uint64 collection_id = 17;//标识采样轮次

    uint64 collection_start_time = 18;//采样开始时间

    uint64 msg_timestamp = 19;//生成本消息时间戳

    uint64 collection_end_time = 20; //采样结束时间

    uint32 current_period = 21;//采样精度

    string except_desc = 22;//异常描述信息

    enum Encoding {//编码方式

        Encoding_JSON = 0;//JSON数据编码格式

        Encoding_GPB = 1;//GPB数据编码格式

        };

    Encoding encoding = 23;//数据编码格式

    string data_str = 24;//数据编码非GPB时有效,否则为空

    TelemetryGPBTable data_gpb = 25; //承载的数据由TelemetryGPBTable定义

}

message TelemetryGPBTable {//消息描述

    repeated TelemetryRowGPB row = 1; //数组定义,标识数据是TelemetryRowGPB结构的重复

}

message TelemetryRowGPB {//消息描述

    uint64 timestamp = 1; //采样当前实例的时间戳

    bytes keys = 10; //保留字段

    bytes content = 11; //承载的采样实例数据

}

4. grpc_service.proto文件

grpc_service.proto文件在设备作为服务端对外推送数据时使用,定义了RPC方法和承载的数据消息描述。该文件内容与含义如下文所示。(此处所列举的公共Proto文件内容仅为示例,请以设备的实际情况为准)

syntax = "proto2";//Proto版本为v2版本

package grpc_service;//包名为grpc_service

message GetJsonReply { //Get方法应答结果

    required string result = 1;

}

message SubscribeReply { //订阅结果

    required string result = 1;

}

message ConfigReply { //配置结果

    required string result = 1;

}

message ReportEvent { //订阅事件结果定义

    required string token_id = 1; //登录token_id

    required string stream_name = 2; //订阅的事件流名称

    required string event_name = 3; //订阅的事件名

    required string json_text = 4; //订阅结果json字符串

}

 

message GetReportRequest{ //获取事件订阅结果请求

    required string token_id = 1; //登录成功后的token_id

}

message LoginRequest {//登录请求参数定义

    required string user_name = 1; //登录请求用户名

    required string password = 2; //登录请求密码

}

message LoginReply {//登录请求应答定义

    required string token_id = 1; //登录成功后返回的token_id

}

message LogoutRequest {//退出登录请求参数定义

    required string token_id = 1;  //token_id

}

message LogoutReply { //退出登录返回结果定义

    required string result = 1; //退出登录结果

}

message SubscribeRequest {//定义事件流

    required string stream_name = 1; //事件流名称

}

message CliConfigArgs {//向设备下发配置命令,并指定命令行参数

    required int64 ReqId = 1; //配置命令请求ID

    required string cli = 2; //配置命令

}

message CliConfigReply { //设备返回配置命令行执行的结果

    required int64 ResReqId = 1; //返回配置命令请求ID,与CliConfigArgs相对应

    required string output = 2; //返回配置命令执行输出

    required string errors = 3; //标记配置命令执行结果

}

message DisplayCmdArgs {//向设备下发display命令,并指定命令行参数

    required int64 ReqId = 1; //display命令请求ID

    required string cli = 2; //display命令

}

message DisplayCmdReply {//设备返回display命令行执行的结果

    required int64 ResReqId =1;//display命令请求ID,与DisplayCmdArgs相对应

    required string output = 2; //返回display命令执行输出

    required string errors = 3; //标记display命令执行结果

}

service GrpcService {//定义gRPC dialin模式的RPC方法

    rpc Login (LoginRequest) returns (LoginReply) {}//登录方法

    rpc Logout (LogoutRequest) returns (LogoutReply) {}//退出登录方法

    rpc SubscribeByStreamName (SubscribeRequest) returns (SubscribeReply) {}//订阅事件流

    rpc GetEventReport (GetReportRequest) returns (stream ReportEvent) {}//获取事件结果

    rpc CliConfig (CliConfigArgs)  returns (stream CliConfigReply) {}//gRPC 支持通过命令行下发配置命令,并返回执行结果

    rpc DisplayCmdTextOutput(DisplayCmdArgs)  returns(stream DisplayCmdReply) {}//gRPC 支持通过命令行下发display命令,并返回查询结果

}

5. gnmi.proto文件和gnmi_ext.proto文件

gnmi.protognmi_ext.proto文件定义了gNMI类操作的公共RPC方法。它们由Google提供,是开源文件,此处不做详细描述,具体内容和详细介绍请下载相关文件后查看。gnmi.proto文件和gnmi_ext.proto文件的官方下载地址如下:

·     https://github.com/openconfig/gnmi/tree/master/proto/gnmi/gnmi.proto

·     https://github.com/openconfig/gnmi/tree/master/proto/gnmi_ext/gnmi_ext.proto

6. dial_out.proto文件

dial_out.proto文件定义了gNMI类的RPC方法,该文件来源于SONIC,官方下载地址为:https://github.com/Azure/sonic-telemetry/blob/master/proto/dial_out.proto

2.5.3  业务Proto文件

设备提供多个业务Proto文件,它们用于定义Dial-in模式下数据订阅所需的RPC方法和消息描述,以及定义Dial-out模式下具体业务的GPB编码所需的消息描述,采集器需要根据实际监控的业务选择对应的Proto文件进行编码和二次开发。

请联系H3C技术支持人员获取业务Proto文件。

2.5.4  不同对接模式支持的Proto文件

表2-6 不同对接模式支持的Proto文件

对接模式

Proto文件

Dial-in模式

普通Subscribe操作:

·     grpc_service.proto文件

·     业务Proto文件

gNMI操作:

·     grpc_service.proto文件

·     gnmi.proto文件和gnmi_ext.proto文件

Dial-out模式

二层数据模型下:

·     普通模式

¡     grpc_dialout.proto文件

·     gNMI模式

¡     dialout.proto文件

¡     gnmi.proto文件和gnmi_ext.proto文件

三层数据模型下:

·     grpc_dialout_v3.proto文件

·     telemetry.proto文件

·     业务Proto文件

 


3 Telemetry配置限制和指导

所有gRPC相关功能,都需要在成功执行grpc enable命令后,才能配置。

如果执行undo grpc enable命令关闭了gRPC功能,则所有gRPC相关配置都会被删除。

缺省情况下,设备和采集器建立的gRPC连接是非加密的。配置设备和采集器建立gRPC连接时引用PKI域后,设备和采集器会基于TLSTransport Layer Security,传输层安全)协议进行通道加密和双向证书认证,从而提高gRPC通信的安全性。需要注意的是:

·     指定的PKI域必须存在,并且PKI域中包含完整的证书和密钥。

·     指定PKI域后,gRPC功能将重启,与采集器的连接将短暂断开。采集器需要重新发送连接请求才能继续访问设备。


4 配置设备侧的Telemetry订阅

本章节主要描述设备上如何通过命令行配置Telemetry订阅。采集器上的gRPC对接软件需要另开发,具体开发过程请参考“5 Telemetry对接软件二次开发”。

4.1  配置Dial-in模式的Telemetry订阅

4.1.1  功能简介

Dial-in模式是指设备作为gRPC服务器,采集器作为gRPC客户端发起到设备的gRPC连接,由设备进行数据采集及上送。通过在设备上配置Dial-in模式,使采集器可以获取和订阅设备上的数据和事件。

4.1.2  配置限制和指导

如果设备和采集器的gRPC连接断开,设备会自动取消订阅,不再采集推送数据。gRPC连接断开后,设备不支持自动配置恢复,需要采集器重新发起gRPC连接请求。

4.1.3  配置准备

在配置Dail-in模式之前,需要完成以下任务:

·     确保gRPC服务器与gRPC客户端之间路由可达。

·     如果需要配置gRPC连接的加密功能,则必须完成所引用的PKI域的配置,并且该PKI域中需要包含完整的证书和密钥。关于PKI的配置,请参见相关产品 “安全配置指导”中的“PKI”。

4.1.4  配置步骤

1. 配置gRPC服务

(1)     进入系统视图。

system-view

(2)     开启gRPC功能。

grpc enable

缺省情况下,gRPC功能处于关闭状态。

(3)     (可选)配置gRPC服务的端口号。

grpc port port-number

缺省情况下,gRPC服务的端口号为50051

修改端口号后,gRPC功能将重启,正在访问的客户端将被断开,客户端需要重新发送连接请求才能继续访问。

(4)     (可选)配置gRPC会话超时时间。

grpc idle-timeout minutes

缺省情况下,gRPC会话超时时间为5分钟。

2. (可选)配置设备与采集器之间的安全通信

(1)     进入系统视图。

system-view

(2)     配置设备和采集器建立gRPC连接时引用的PKI域。

grpc pki domain domain-name

缺省情况下,设备和采集器建立gRPC连接时不会引用PKI域。

3. 配置gRPC用户

(1)     进入系统视图。

system-view

(2)     添加设备管理类本地用户。

local-user user-name [ class manage ]

设备上需要为gRPC客户端创建本地用户,gRPC客户端才能与设备建立gRPC会话。

(3)     设置本地用户的密码。

password [ { hash | simple } password ]

缺省情况下,不存在本地用户密码,即本地用户认证时无需输入密码,只要用户名有效且其他属性验证通过即可认证成功。

(4)     配置本地用户的授权用户角色为network-admin

authorization-attribute user-role network-admin

缺省情况下,本地用户的授权用户角色为network-operator

(5)     配置本地用户可以使用的服务类型为HTTPS服务。

service-type https

缺省情况下,未配置用户的服务类型。

有关local-userpasswordauthorization-attributeservice-type命令的详细介绍,请参见“安全命令参考”中的“AAA”。

4. (可选)开启gRPC Dial-in模式的日志功能

(1)     进入系统视图。

system-view

(2)     开启gRPC Dial-in模式的日志功能。请至少选择其中一项进行配置。

¡     开启gRPC Dial-in模式的RPC类操作日志功能。

grpc log dial-in rpc { all | { cli | get }* }

缺省情况下,gRPC Dial-in模式的RPC类操作日志功能处于关闭状态。

¡     开启gRPC Dial-in模式的gNMI类操作日志功能。

grpc log dial-in gnmi { all | { capabilities | get | set | subscribe }* }

缺省情况下,gRPC Dial-in模式的gNMI Set操作日志功能处于开启状态,其他gNMI类操作日志功能处于关闭状态。

为了管理员定位gRPC问题的需要,可以开启gRPC日志功能,以便记录设备对gRPC报文的处理信息。如果gRPC操作频繁,设备会输出大量gRPC日志,影响设备性能,建议仅开启需关注的gRPC操作的日志功能。

4.1.5  验证配置

在任意视图下执行display grpc命令可以显示配置后gRPC Dial-in模式的相关信息,通过查看显示信息验证配置的效果。

4.1.6  配置举例

1. 组网需求

4-1所示,设备作为gRPC服务器与采集器相连,采集器为gRPC客户端。

通过在设备上配置gRPC Dial-in模式,使gRPC客户端可以订阅设备上的相关事件。

图4-1 gRPC Dial-in模式配置组网图

 

2. 配置DevicegRPC服务器)

# 开启gRPC功能。

<Device> system-view

[Device] grpc enable

# 创建本地用户test,配置该用户的密码为123456TESTplat&!,授权用户角色为network-admin,可以使用的服务类型为HTTPS服务。

[Device] local-user test

[Device-luser-manage-test] password simple 123456TESTplat&!

[Device-luser-manage-test] authorization-attribute user-role network-admin

[Device-luser-manage-test] service-type https

[Device-luser-manage-test] quit

4.2  配置Dial-out模式的Telemetry订阅

4.2.1  功能简介

Dial-out模式是指设备作为gRPC客户端,采集器作为gRPC服务端,由设备主动发起到采集器的gRPC连接,进行数据采集及上送。通过在设备上配置Dial-out模式的Telemetry订阅,使设备能够主动向采集器推送用户订阅的数据和事件。

Dial-out模式下的数据订阅以及上报通信参数由如下两个关键配置实现:

·     配置传感器:传感器用来指定设备上采样的数据源,即采样路径。一个传感器组是一到多个采样路径的集合。传感器组有两种类型:普通传感器组和gNMI模式的传感器组。

采样路径包括以下类型:

¡     周期采样:传感器组以固定的时间间隔来进行数据采样。关于周期采样类型的采样路径,请参见对应模块的《Telemetry性能指标集》手册。

¡     事件触发采样:传感器组的数据采样没有固定周期,仅由事件触发。关于事件触发采样类型的采样路径,请参见对应模块的《Telemetry性能指标集》手册。

¡     条件触发采样:传感器组根据一定频率检测采样路径,如果满足推送条件,则采集数据并上送给采集器。关于条件触发采样类型的采样路径以及相关的检测频率、推送条件,请联系H3C技术支持人员获取具体信息。

条件触发采样类型的采样路径仅在gNMI模式的传感器组中配置才能生效。

说明

同一传感器组内只能支持一种类型的采样路径。

 

·     配置采集器:采集器用于接收网络设备推送的采样数据。设备上需要建立目标组并在目标组中配置正确的采集器地址信息,才能和采集器通信。

完成传感器组和目标组的配置后,需要创建“订阅”并将二者关联,设备才能和目标组中的采集器建立gRPC连接,从而将订阅报文发送给采集器。

订阅分为普通订阅和gNMI模式的订阅,其中普通订阅仅支持关联普通传感器组,gNMI模式的订阅仅支持关联gNMI模式的传感器组。

4.2.2  配置限制和指导

如果采集器和设备的gRPC连接断开,设备会进行重新连接,再次上送数据,但是重连期间的数据会丢失。

在系统主备倒换或保存gRPC业务配置并重启后,gRPC会重新加载相关配置,gRPC业务会继续进行,但是重启或者倒换期间的采样数据会丢失。

4.2.3  配置准备

在配置Dail-out模式之前,需要完成以下任务:

·     确保gRPC服务器与gRPC客户端之间路由可达。

·     如果需要配置gRPC连接的加密功能,则必须完成所引用的PKI域的配置,并且该PKI域中需要包含完整的证书和密钥。关于PKI的配置,请参见相关产品“安全配置指导”中的“PKI”。

4.2.4  配置步骤

1. 开启gRPC功能

(1)     进入系统视图。

system-view

(2)     开启gRPC功能。

grpc enable

缺省情况下,gRPC功能处于关闭状态。

(3)     (可选)配置gRPC使用的Telemetry数据模型。

grpc data-model { 2-layer | 3-layer }

缺省情况下,设备使用二层Telemetry数据模型上送数据。

设备使用二层Telemetry数据模型上送数据时,不支持对上送数据使用GPB编码格式。

2. (可选)配置设备与采集器之间的安全通信

(1)     进入系统视图。

system-view

配置设备和采集器建立gRPC连接时引用的PKI域。

(2)     grpc pki domain domain-name

缺省情况下,设备和采集器建立gRPC连接时不会引用PKI域。

3. 配置普通传感器

(1)     进入系统视图。

system-view

(2)     进入Telemetry视图。

telemetry

(3)     创建普通传感器组,并进入传感器组视图。

sensor-group group-name

(4)     配置采样路径。请选择其中一项进行配置。

¡     周期采样类型的采样路径

sensor path path [ selection-nodes node-list | depth depth ]

多次执行本命令可配置多个采样路径。多次执行本命令且指定的采样路径相同时,最后一次执行的命令生效。

说明

·     selection-nodes表示只把指定节点的数据推送给采集器。

·     depth为采样深度,缺省值为11表示设备只上报当前采样路径的所有列数据;2表示设备除了上报当前采样路径的列数据外,还会上报当前采样路径下所有子表的列数据;3表示设备除了上报当前采样路径的列数据外,还会上报当前采样路径下所有子表以及这些子表的所有子表的列数据。

·     condition node node operator operator value value用于设置样路径的推送条件,只有当指定节点满足指定条件时才会将采样路径的数据推送给采集器。

·     配置本命令时,若在path中携谓语过滤条件表达式则不支持再指定selection-nodescondition参数;反之亦然

·     event-condition表示采样路径的数据只有满足指定条件时才会推送给采集器。

 

4. 配置gNMI模式传感器

(1)     进入系统视图。

system-view

(2)     进入Telemetry视图。

telemetry

(3)     创建gNMI模式传感器组,并进入传感器组视图。

sensor-group group-name gnmi

通过本命令进入已经创建的gNMI模式的传感器组视图时,不需要携带gnmi关键字。

(4)     配置采样路径。

sensor path path

多次执行本命令可配置多个采样路径。

5. 配置采集器

(1)     进入系统视图。

system-view

(2)     进入Telemetry视图。

telemetry

(3)     创建目标组,并进入目标组视图。

destination-group group-name

建议系统中创建的目标组数量不超过5个,否则会影响系统性能。

(4)     配置采集器的地址和相关参数。

IPv4网络)

ipv4-address ipv4-address [ port port-number ] [ vpn-instance vpn-instance-name ]

IPv6网络)

ipv6-address ipv6-address [ port port-number ] [ vpn-instance vpn-instance-name ]

采集器的IPv6地址不能指定为IPv6链路本地地址。

多次执行本命令可配置多个采集器。配置本命令时,只要任意一个参数不同,就算不同的采集器。

6. 配置订阅

(1)     进入系统视图。

system-view

(2)     进入Telemetry视图。

telemetry

(3)     创建订阅,并进入订阅视图。请选择其中一项进行配置。

¡     普通订阅

subscription subscription-name

¡     gNMI模式订阅

subscription subscription-name gnmi

通过本命令进入已经创建的gNMI模式的订阅视图时,不需要携带gnmi关键字。

(4)     (可选)配置订阅的采样数据推送模式为条件触发推送模式。

push-mode condition-triggered

仅当订阅的传感器组中的采样路径类型为条件触发采样时,需要配置本命令。

本命令仅支持在gNMI模式的订阅下配置。

缺省情况下,订阅仅支持推送来自周期采样和事件触发采样类型的采样路径的数据。

(5)     (可选)配置设备发送的订阅报文的DSCP优先级。

dscp dscp-value

缺省情况下,设备发送的订阅报文的DSCP优先级为0

DSCP优先级的取值越大,报文的优先级越高。

(6)     (可选)配置设备发送订阅报文的源地址。

source-address { ipv4-address | interface interface-type interface-number | ipv6 ipv6-address } [ port port-number ]

缺省情况下,设备使用路由出接口的主IP地址作为发送订阅报文的源IP地址。

当设备发送订阅报文的源地址发生变化时,设备将会重新连接gRPC服务器。

 

(7)     配置关联传感器组。

sensor-group group-name [ sample-interval [ msec ] interval | suppress-time suppress-time ]

仅当订阅的传感器组中的采样路径类型为周期采样时,才需要配置本命令中的sample-interval参数。

仅当订阅的传感器组中的采样路径类型为条件触发采样时,才需要配置本命令中的suppress-time参数。

(8)     配置关联目标组。

destination-group group-name

同一目标组不支持同时在普通订阅和gNMI模式订阅中使用。

7. 开启gRPC Dial-out模式的日志功能

(1)     进入系统视图。

system-view

(2)     开启gRPC Dial-out模式的日志功能。

grpc log dial-out { all | { event | sample }* }

缺省情况下,gRPC Dial-out模式的日志功能处于关闭状态。

为了管理员定位gRPC问题的需要,可以开启gRPC日志功能,以便记录设备对gRPC报文的处理信息。gNMI模式的订阅不支持gRPC Dial-out模式的日志功能。

4.2.5  验证配置

Probe视图下执行display system internal telemetry命令可以显示配置后gRPC Dial-out模式的相关信息,通过查看显示信息验证配置的效果。

4.2.6  配置举例

1. 组网需求

设备作为gRPC客户端与采集器相连,采集器为gRPC服务器,接收数据的端口号为50051

通过在设备上配置gRPC Dial-out模式,使设备以10秒的周期向采集器推送接口模块的设备能力信息。

图4-2 gRPC Dial-out模式配置组网图

 

2. 配置DevicegRPC客户端)

在开始下面的配置之前,假设设备与采集器的IP地址都已配置完毕,并且它们之间路由可达。

# 开启gRPC功能。

<Device> system-view

[Device] grpc enable

# 创建传感器组test,并添加采样路径为ifmgr/devicecapabilities

[Device] telemetry

[Device-telemetry] sensor-group test

[Device-telemetry-sensor-group-test] sensor path ifmgr/devicecapabilities

[Device-telemetry-sensor-group-test] quit

# 创建目标组collector1,并配置采集器的IP地址为2.2.2.2、端口号为50051

[Device-telemetry] destination-group collector1

[Device-telemetry-destination-group-collector1] ipv4-address 2.2.2.2 port 50051

[Device-telemetry-destination-group-collector1] quit

# 创建订阅A,配置关联传感器组为test,数据采样和推送周期为10秒,关联目标组为collector1

[Device-telemetry] subscription A

[Device-telemetry-subscription-A] sensor-group test sample-interval 10

[Device-telemetry-subscription-A] destination-group collector1

[Device-telemetry-subscription-A] quit


5 Telemetry对接软件二次开发举例(Dial-in模式)

说明

本指南中展示的代码仅供参考,由于没有实际的代码框架,在实际的对接过程中不能直接使用。除举例中生成的代码之外,其它代码还需要开发人员自行开发。

 

对于Dial-in模式,主要是实现gRPC客户端的代码,使采集器能够获取设备上的采集数据并进行解析。客户端代码主要包括以下三个部分:

·     进行登录操作,获取token_id

·     为要发起的RPC方法准备参数,用Proto文件生成的服务类发起RPC调用并解析返回结果。

·     退出登录。

本章节用于介绍Dial-in模式的设备与对接客户端软件的开发过程,可实现如下操作:

·     普通Subscribe操作

·     gNMI Subscribe操作

5.1  开发前准备

5.1.1  开发人员要求

·     熟悉gRPC的开发(可通过https://doc.oschina.net/grpc学习)。

·     熟悉GPB编码的开发(可通过https://developers.google.com/protocol-buffers学习)。

·     熟悉对应语言(C++JAVAPythonGO)的开发。

5.1.2  开发环境准备

1. 获取Proto文件

联系H3C技术支持人员获取相关Proto文件。

2. 获取处理Proto文件的工具软件protoc

下载地址:https://github.com/google/protobuf/releases

3. 获取对应开发语言的protobuf插件

下载地址:https://github.com/google/protobuf/releases

开发者需要准备好对应语言的开发环境,例如获取C++插件protobuf-cpp。本指南会给出当前主流语言的开发举例(C++GOPythonJAVA)。

4. 存放Proto文件和工具软件

建议将获取到的Proto文件和protoc工具存在开发代码的工程目录下,具体以实际使用的开发环境为准。

5.2  组网需求

5-1所示,设备作为gRPC服务器与采集器相连,采集器为gRPC客户端。gRPC客户端登录设备的用户名为admin、密码为123456

通过配置gRPC Dial-in模式,使gRPC客户端可以订阅设备上的相关事件。

图5-1 gRPC Dial-in模式配置组网图

 

5.3  配置设备侧的Telemetry订阅

执行以下配置之前,请确保gRPC服务器与gRPC客户端之间路由可达。

# 开启gRPC功能。

<Device> system-view

[Device] grpc enable

# 创建本地用户admin,配置该用户的密码为123456,授权用户角色为network-admin,可以使用的服务类型为HTTPS服务。

[Device] local-user admin

[Device-luser-manage-admin] password simple 123456

[Device-luser-manage-admin] authorization-attribute user-role network-admin

[Device-luser-manage-admin] service-type https

[Device-luser-manage-admin] quit

5.4  gRPC Dial-in模式二次开发举例(C++

5.4.1  生成代码

开发代码之前,需要使用protoc工具将收集到的Proto文件转换成C++代码,并将生成的代码加入到开发的工程中。

本例中,开发普通Subscribe操作的代码使用以下Proto文件:

·     grpc_service.proto

·     业务Proto文件,本例中为BufferMonitor.proto

本例中,开发gNMI Subscribe操作的代码使用以下Proto文件:

·     grpc_service.proto

·     gnmi.protognmi_ext.proto

使用protoc工具生成上述Proto文件的C++代码,示例如下:

$protoc --plugin=./grpc_cpp_plugin  --grpc_out=. --cpp_out=. *.proto

5.4.2  开发代码

1. 普通Subscribe操作

以调用GrpcServiceBufferMonitorService服务类为例,编码步骤如下:

(1)     编写一个GrpcServiceTest类。

GrpcServiceTest类中使用由grpc_service.proto生成的GrpcService::Stub类,通过grpc_service.proto自动生成的LoginLogout方法分别完成登录和退出操作,代码示例如下:

class GrpcServiceTest

{

public:

    /* 构造函数 */

    GrpcServiceTest(std::shared_ptr<Channel> channel): GrpcServiceStub(GrpcService::NewStub(channel))  {}

 

    /* 成员函数 */

    int Login(const std::string& username, const std::string& password);

    void Logout();

    void listen();

Status listen(const std::string& command);

 

    /* 成员变量 */

    std::string token;

 

private:

    std::unique_ptr<GrpcService::Stub> GrpcServiceStub;  //使用grpc_service.proto生成的GrpcService::Stub

};

(2)     实现自定义的Login方法。

通过用户名、密码调用GrpcService::Stub类的Login方法完成登录,代码示例如下:

int GrpcServiceTest::Login(const std::string& username, const std::string& password)

{

    LoginRequest request;   //设置用户名密码

    request.set_user_name(username);

    request.set_password(password);

 

    LoginReply reply;

    ClientContext context;

 

  //调用登录方法

  Status status = GrpcServiceStub->Login(&context, request, &reply);

  if (status.ok())

    {

        std::cout << "login ok!" << std::endl;

        std::cout <<"token id is :" << reply.token_id() << std::endl;

        token = reply.token_id();  //登录成功,获取到token

        return 0;

    }

    else{

        std::cout << status.error_code() << ": " << status.error_message()

                << ". Login failed!" << std::endl;

        return -1;

    }

}

(3)     发起对设备的RPC方法请求。

以订阅接口丢包事件为例,代码如下:

rpc SubscribePortQueDropEvent(PortQueDropEvent) returns (grpc_service.SubscribeReply) {}

(4)     编写一个BufMon_GrpcClient类来封装发起的RPC方法。

使用BufferMonitor.proto自动生成的BufferMonitorService::Stub类完成RPC方法的调用,代码示例如下:

class BufMon_GrpcClient

{

public:

    BufMon_GrpcClient(std::shared_ptr<Channel> channel): mStub(BufferMonitorService::NewStub(channel))

    {}

 

    std::string BufMon_Sub_AllEvent(std::string token);

    std::string BufMon_Sub_BoardEvent(std::string token);

    std::string BufMon_Sub_PortOverrunEvent(std::string token);

    std::string BufMon_Sub_PortDropEvent(std::string token);

 

    /* get 表项 */

    std::string BufMon_Sub_GetStatistics(std::string token);

    std::string BufMon_Sub_GetGlobalCfg(std::string token);

    std::string BufMon_Sub_GetBoardCfg(std::string token);

    std::string BufMon_Sub_GetNodeQueCfg(std::string token);

    std::string BufMon_Sub_GetPortQueCfg(std::string token);

 

private:

    std::unique_ptr<BufferMonitorService::Stub> mStub; //使用BufferMonitor.proto自动生成的类

};

(5)     实现自定义的std::string BufMon_Sub_PortDropEvent(std::string token)方法完成接口丢包事件订阅。

实现丢包事件订阅方法的代码示例如下:

std::string BufMon_GrpcClient::BufMon_Sub_PortDropEvent(std::string token)

{

    std::cout << "-------BufMon_Sub_PortDropEvent-------- " << std::endl;

 

    PortQueDropEvent stNodeEvent;

    PortQueDropEvent_PortQueDrop* pstParam = stNodeEvent.add_portquedrop();

 

    UINT uiIfIndex = 0;

    UINT uiQueIdx = 0;

    UINT uiAlarmType = 0;

 

    std::cout<<"Please input interface queue info : ifIndex queIdx alarmtype " << std::endl;

    cout<<"alarmtype : 1 for ingress; 2 for egress; 3 for port headroom"<<endl;

 

    std::cin>>uiIfIndex>>uiQueIdx>>uiAlarmType; //设置订阅参数,接口索引等

    pstParam->set_ifindex(uiIfIndex);

    pstParam->set_queindex(uiQueIdx);

    pstParam->set_alarmtype(uiAlarmType);

 

    ClientContext context;

 

    /* token need add to context */ //设置登录成功后返回的token_id

    std::string key = "token_id";

    std::string value = token;

    context.AddMetadata(key, value);

 

    SubscribeReply reply;

    Status status = mStub->SubscribePortQueDropEvent(&context,stNodeEvent,&reply); //调用RPC方法

 

    return reply.result();

}

(6)     循环等待设备上报事件。

在之前的GrpcServiceTest类中实现此方法,代码示例如下:

void GrpcServiceTest::listen()

{

    GetReportRequest reportRequest;

    ClientContext context;

    ReportEvent reportedEvent;

 

    /* add token to request */

    reportRequest.set_token_id(token);

 

    std::unique_ptr< ClientReader< ReportEvent>> reader(GrpcServiceStub->GetEventReport(&context, reportRequest)); //通过grpc_service.proto自动生成的类的GetEventReport来获取事件信息

 

    std::string streamName;  

    std::string eventName;

    std::string jsonText;

    std::string token;

 

    JsonFormatTool jsonTool;

 

    std::cout << "Listen to server for Event" << std::endl;

    while(reader->Read(&reportedEvent) ) //读取收到的上报事件

    {

        streamName = reportedEvent.stream_name();

        eventName = reportedEvent.event_name();

        jsonText = reportedEvent.json_text();

        token = reportedEvent.token_id();

 

        std::cout << "/***********EVENT COME**************/" << std::endl;

        std::cout << "TOKEN: " << token << std::endl;

        std::cout << "StreamName: "<< streamName << std::endl;

        std::cout << "EventName: " << eventName << std::endl;

        std::cout << "JsonText without format: " << std::endl << jsonText << std::endl;

        std::cout << std::endl;

        std::cout << "JsonText Formated: " << jsonTool.formatJson(jsonText) << std::endl;

        std::cout << std::endl;

    }

 

    Status status = reader->Finish();

    std::cout << "Status Message:" << status.error_message() << "ERROR code :" << status.error_code();

}

(7)     实现向设备发起接口丢包事件的订阅请求。

代码示例如下:

void bufmon_test_PortDropStream()

{

    auto channel = grpc::CreateChannel(g_server_address, grpc::InsecureChannelCredentials());

 

    /* 1. login */

    GrpcServiceTest reporter(channel);

        if(0 != reporter.Login(g_username, g_password))

    {

                return;

        }

 

        /* 2. subscribe */

        BufMon_GrpcClient cSubscriber(channel);

 

        std::string replyForSysLog;

 

    /* subscribe bufmon port overrun event */

        replyForSysLog = cSubscriber.BufMon_Sub_PortDropEvent(reporter.token);

        std::cout<<"BufMon board event: "<<replyForSysLog<<std::endl;

 

        /* 3. listen to the server and get event */

    reporter.listen();

 

    std::cout<<"End of main."<<std::endl;

    return;

 

}

(8)     调用Logout方法退出登录。

调用Logout方法的代码示例如下:

void GrpcServiceTest:: Logout ()

{

    LogoutRequest request;

    request.set_token_id(token);

    LogoutReply reply;

    ClientContext context;

    Status status = mStub->Logout(&context, request, &reply);

std::cout << "Logout! :" << reply.result() << std::endl;

}

2. gNMI Subscribe操作

gNMI Subscribe操作的编码步骤如下:

(1)     编写一个GrpcServiceTest类。

步骤与“5.4.2  1. ”中编写GrpcServiceTest类相同。

(2)     实现自定义的Login方法。

步骤与“5.4.2  1. ”中编写Login方法相同。

(3)     发起对设备的RPC方法请求。

代码示例如下:

rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse);

(4)     编写gnmi_client类来封装发起的RPC方法。

代码示例如下:

class gnmi_client

{

public:

      explicit gnmi_client(const std::string &address,const std::string &tokenId);

 

      bool TestCapabilities();

      bool TestGet();

      bool TestSet();

      bool TestSubscribePool();

      bool TestSubscribeOnce();

      bool TestSubscribeStream();

      bool TestSubscribeStreamWithAlias();

 

 

private:

      void PrintCapabilityResponse(const gnmi::CapabilityResponse &response);

      void PrintGetResponse(const gnmi::GetResponse &response);

      void PrintSubscribeResponse(const gnmi::SubscribeResponse &response);

      void PrintSubscribeRequest(const gnmi::SubscribeRequest &request);

      void PrintGetRequest(const gnmi::GetRequest &request);

      void PrintSetRequest(const gnmi::SetRequest &request);

 

      void FillGetRequest(gnmi::GetRequest &request);

      void FillSetRequest(gnmi::SetRequest &request);

      void FillSubscribeRequestByOnce(gnmi::SubscribeRequest &request);

      void FillSubscribeRequestByPool(gnmi::SubscribeRequest &request);

      void FillSubscribeRequestByStream(gnmi::SubscribeRequest &request);

      void FillSubscribePool(gnmi::SubscribeRequest &request);

      void FillSubscribeAlias(gnmi::SubscribeRequest &request);

 

private:

      std::unique_ptr<gnmi::gNMI::Stub> mStubGnmiService;

      std::string mTokenID;  

};

(5)     实现自定义的订阅方法。

实现事件订阅方法的代码示例如下:

void gnmi_client::FillSubscribeRequestByOnce(gnmi::SubscribeRequest &request)

{

  auto subscribeList = request.mutable_subscribe();

 

  auto prefix = subscribeList->mutable_prefix();

  auto pathelem01 = prefix->add_elem();

  pathelem01->set_name("LLDP");

 

  auto subscribe = subscribeList->add_subscription();

 

  auto path = subscribe->mutable_path();

  auto pathelem02 = path->add_elem();

  pathelem02->set_name("NeighborEvent");

 

  auto pathelem03 = path->add_elem();

  pathelem03->set_name("Neighbor");

  (*pathelem03->mutable_key())["IfName"] = "xxx";

 

  subscribeList->set_mode(::gnmi::SubscriptionList_Mode_ONCE);

  subscribeList->set_encoding(::gnmi::JSON);

}

 

void gnmi_client::FillSubscribeRequestByPool(gnmi::SubscribeRequest &request)

{

  auto subscribeList =  request.mutable_subscribe();

 

  auto prefix = subscribeList->mutable_prefix();

  auto pathelem01 = prefix->add_elem();

  pathelem01->set_name("Device");

 

  auto subscribe = subscribeList->add_subscription();

 

  auto path = subscribe->mutable_path();

  auto pathelem02 = path->add_elem();

  pathelem02->set_name("CPUs");

  auto pathelem03 = path->add_elem();

  pathelem03->set_name("CPU");

  auto pathelem04 = path->add_elem();

  pathelem04->set_name("CPUUsage");

 

  subscribeList->set_mode(::gnmi::SubscriptionList_Mode_POLL);

  subscribeList->set_encoding(::gnmi::JSON);

}

 

void gnmi_client::FillSubscribeRequestByStream(gnmi::SubscribeRequest &request)

{

  auto subscribeList =  request.mutable_subscribe();

 

  auto prefix = subscribeList->mutable_prefix();

  auto pathelem01 = prefix->add_elem();

  pathelem01->set_name("Diagnostic");

 

  auto subscribe = subscribeList->add_subscription();

 

  auto path = subscribe->mutable_path();

  auto pathelem02 = path->add_elem();

  pathelem02->set_name("CPUEvent");

  auto pathelem03 = path->add_elem();

  pathelem03->set_name("CPU");

   (*pathelem03->mutable_key())["Chassis#condition"] = "equal:1";

  subscribe->set_mode(::gnmi::ON_CHANGE);

  subscribe->set_sample_interval(1000);

  subscribe->set_suppress_redundant(false);

  subscribe->set_heartbeat_interval(1000);

 

  subscribeList->set_mode(::gnmi::SubscriptionList_Mode_STREAM);

  subscribeList->set_encoding(::gnmi::JSON);

}

 

void gnmi_client::FillSubscribeAlias(gnmi::SubscribeRequest &request)

{

  auto aliases = request.mutable_aliases();

  auto alias = aliases->add_alias();

 

  auto path = alias->mutable_path();

  auto pathelem01 = path->add_elem();

  pathelem01->set_name("Device");

  auto pathelem02 = path->add_elem();

  pathelem02->set_name("CPUs");

  auto pathelem03 = path->add_elem();

  pathelem03->set_name("CPU");

  auto pathelem04 = path->add_elem();

  pathelem04->set_name("CPUUsage");

 

  alias->set_alias("#cpu_usage");

}

(6)     调用Logout方法退出登录。

调用Logout方法的代码示例如下:

void GrpcServiceTest:: Logout ()

{

    LogoutRequest request;

    request.set_token_id(token);

    LogoutReply reply;

    ClientContext context;

    Status status = mStub->Logout(&context, request, &reply);

std::cout << "Logout! :" << reply.result() << std::endl;

}

5.5  gRPC Dial-in模式二次开发举例(GO

5.5.1  生成代码

开发代码之前,需要用protoc工具软件将收集到的Proto文件转换成GO代码,并将生成的代码加入到开发的工程中。

本例中,开发Subscribe操作的代码使用以下Proto文件:

·     grpc_service.proto

·     业务Proto文件,本例中为Syslog.proto

本例中,开发gNMI Subscribe操作的代码使用以下Proto文件:

·     grpc_service.proto

·     gnmi.protognmi_ext.proto

使用protoc工具生成GO代码的示例如下:

[root@ grpc]# cd protobuf

[root@ protobuf]# protoc --go_out=plugins=grpc:. grpc_service.proto

[root@ protobuf]# protoc --go_out=plugins=grpc:. Syslog.proto

[root@ protobuf]# protoc --go_out=plugins=grpc:. gnmi.proto

[root@ protobuf]# protoc --go_out=plugins=grpc:. gnmi_ext.proto

5.5.2  开发代码

1. 普通Subscribe操作

以调用GrpcServiceSyslogService服务类为例,编码步骤如下:

(1)     编写grpcConnect方法,实现登录设备和退出登录设备。

代码示例如下:

import(

  "flag"

  "io"

  "log"

  "fmt"

  "time"

  "github.com/gnmiTest/comwaresdk/sdk"

  "github.com/gnmiTest/comwaresdk/cmwproto/gnmi"

  "github.com/gnmiTest/comwaresdk/cmwproto/device"

  syslog "github.com/gnmiTest/comwaresdk/cmwproto/syslog"

  h3c "github.com/gnmiTest/comwaresdk/cmwproto/grpc_service"

)

var(

  address         string

  port            uint

  username        string

  password        string

  grpcSession*     sdk.GrpcSession

  isSyslog           bool

)

 

type grpcCon struct {

  grpcSession     *sdk.GrpcSession

  sysClient       syslog.SyslogServiceClient

  h3cClient       h3c.GrpcServiceClient

}

 

func grpcConnect() (*grpcCon, error) {

  grpcSession, err := sdk.NewClient(address, port, username , password)

  if err != nil {

      log.Println("Failed to open session.")

      return nil, err

  }

  sysClient := syslog.NewSyslogServiceClient(grpcSession.Conn)

 

  gt := grpcCon{

      grpcSession:    grpcSession,

      sysClient:      sysClient,

      h3cClient:      grpcSession.Client,

  }

  return &gt, err

}

(2)     发起对设备的RPC方法请求。

以订阅Syslog/LOGEvent事件为例,代码如下:

    rpc SubscribeLOGEvent(LOGEvent) returns (grpc_service.SubscribeReply) {}

(3)     编写方法syslogSubscribeEvent实现事件订阅和数据接收。

代码示例如下:

func (gt *grpcCon) syslogSubscribeEvent() error {

  streamName := "Syslog"

  mSubscribeRequest := syslog.EventStream{StreamName: &streamName}

  ctxWithToken, cancel := sdk.CtxWithToken(gt.grpcSession.Token, time.Second*100)

  defer cancel()

  sysReply, err := gt.sysClient.SubscribeEventStream(ctxWithToken, &mSubscribeRequest)

  if err != nil {

      log.Println("syslog subscribe error: ", err)

      return err

  }

  // print subscribe result

  log.Printf("SyslogResult: \n %v", sysReply.GetResult())

 

  // receive subscribe result

  mGetReportquest := h3c.GetReportRequest{TokenId: &gt.grpcSession.Token}

  stream, err := gt.h3cClient.GetEventReport(ctxWithToken, &mGetReportquest)

      for {

      mSubscribeResponse, err := stream.Recv()

 

      if err == io.EOF {

           break

      }

      if err != nil {

           log.Println("Recv error: ", err)

           return err

      }else{

           fmt.Println("success:", mSubscribeResponse.GetJsonText())

      }

  }

 

  return nil

}

(4)     编写main函数,向设备发起RPC请求。

代码示例如下:

func test() error {

  con, err := grpcConnect()

      defer con.grpcSession.Close()

  if err != nil {

      return err

  }

  if isSyslog{

      err := con.syslogSubscribeEvent()

      if err != nil {

          return err

      }

  }

//参数解析

func init() {

  flag.StringVar(&address, "a", "192.168.2.1", "Address to comware")

  flag.UintVar(&port, "gp", 50051, "Grpc port of comware")

  flag.StringVar(&username, "u", "admin", "Username to comware")

  flag.StringVar(&password, "up", "123456", "Password to comware")

  flag.BoolVar(&isSyslog, "syslog", false, "dial in syslog event examlpe")

}

//main函数

func main() {

  flag.Parse()

  test()

}

2. gNMI Subscribe操作

gNMI Subscribe操作的编码步骤如下:

(1)     编写grpcConnect方法,实现登录设备和退出登录设备。

代码示例如下:

type grpcCon struct {

  gnmiClient      gnmi.GNMIClient

  grpcSession     *sdk.GrpcSession

}

 

func grpcConnect() (*grpcCon, error) {

  grpcSession, err := sdk.NewClient(address, port, username , password)

  if err != nil {

      log.Println("Failed to open session.")

      return nil, err

  }

 

  gnmiClient := gnmi.NewGNMIClient(grpcSession.Conn)

  gt := grpcCon{

      gnmiClient:     gnmiClient,

      grpcSession:    grpcSession,

  }

  return &gt, err

}

(2)     发起对设备的RPC方法请求。

代码示例如下:

rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse);

(3)     实现自定义的订阅方法。

代码示例如下:

//once模式订阅

func (gt *grpcCon) gnmiSubscribeOnceTest() error {

 

  mSubscribeRequest := gnmi.SubscribeRequest{

      Request: &gnmi.SubscribeRequest_Subscribe{

          Subscribe: &gnmi.SubscriptionList{

              Prefix: &gnmi.Path{

                  Elem: []*gnmi.PathElem{

                      {Name: "Device"},

                  },

              },

              Subscription: []*gnmi.Subscription{

                  {Path: &gnmi.Path{

                      Elem: []*gnmi.PathElem{{Name: "Base"}},                

                  },},

              },

              Mode: gnmi.SubscriptionList_ONCE,

              Encoding: gnmi.Encoding_JSON,

          },

      },

  }

 

  ctxWithToken, cancel := sdk.CtxWithToken(gt.grpcSession.Token, time.Second*10)

  defer cancel()

  stream, err := gt.gnmiClient.Subscribe(ctxWithToken)

  if err != nil {

      log.Println("Subscribe error: ", err)

      return err

  }

  stream.Send(&mSubscribeRequest)

  for {

      mSubscribeResponse, err := stream.Recv()

          if err == io.EOF {

          break

      }

      if err != nil {

          log.Println("Recv error: ", err)

          return err

      }else{

          fmt.Println("success:", mSubscribeResponse)

      }

  }

  return nil

}

//stream模式订阅

func (gt *grpcCon) gnmiSubscribeStreamTest() error {

  mSubscribeRequest := gnmi.SubscribeRequest{

      Request: &gnmi.SubscribeRequest_Subscribe{

          Subscribe: &gnmi.SubscriptionList{

               Prefix: &gnmi.Path{

                  Elem: []*gnmi.PathElem{

                      // {Name: "Ifmgr"}, {Name: "Interfaces"},

                      {Name: "Device"}, 

                  },

               },

               Subscription: []*gnmi.Subscription{

                  {Path: &gnmi.Path{

                       // Elem: []*gnmi.PathElem{{Name: "Interface", Key: map[string]string{"IfIndex": "2",}},{Name: "ConfigDuplex"}},

                       Elem: []*gnmi.PathElem{{Name: "Base"}},                   

                   },

                   Mode: gnmi.SubscriptionMode_TARGET_DEFINED,

                   SampleInterval: uint64(1000000000),

                   SuppressRedundant: false,

                   HeartbeatInterval: uint64(1000),},

              },

              Mode: gnmi.SubscriptionList_STREAM,

              Encoding: gnmi.Encoding_JSON,

          },

       },

  }

 

  ctxWithToken, cancel := sdk.CtxWithToken(gt.grpcSession.Token, time.Second*10)

  defer cancel()

  stream, err := gt.gnmiClient.Subscribe(ctxWithToken)

  if err != nil {

      log.Println("Subscribe error: ", err)

      return err

  }

  stream.Send(&mSubscribeRequest)

  for {

      mSubscribeResponse, err := stream.Recv()

 

      if err == io.EOF {

          break

      }

      if err != nil {

          log.Println("Recv error: ", err)

          return err

      }else{

          fmt.Println("success:", mSubscribeResponse)

      }

  }

  return nil

}

//poll模式订阅

func (gt *grpcCon) gnmiSubscribePollTest() error {

  mSubscribeRequest := gnmi.SubscribeRequest{

      Request: &gnmi.SubscribeRequest_Subscribe{

          Subscribe: &gnmi.SubscriptionList{

              Prefix: &gnmi.Path{

                  Elem: []*gnmi.PathElem{

                      {Name: "Device"},

                  },

              },

              Subscription: []*gnmi.Subscription{

                  {Path: &gnmi.Path{

                      Elem: []*gnmi.PathElem{{Name: "Base"}},                

                  },},

              },

              Mode: gnmi.SubscriptionList_POLL,

              Encoding: gnmi.Encoding_JSON,

          },

      },

  }

 

  mPoll := gnmi.SubscribeRequest{

      Request: &gnmi.SubscribeRequest_Poll{

          Poll: &gnmi.Poll{

          },

      },

  }

 

  ctxWithToken, cancel := sdk.CtxWithToken(gt.grpcSession.Token, time.Second*10)

  defer cancel()

  stream, err := gt.gnmiClient.Subscribe(ctxWithToken)

  if err != nil {

      log.Println("Subscribe error: ", err)

      return err

  }

  stream.Send(&mSubscribeRequest)

  for {

      time.Sleep(time.Second)

      stream.Send(&mPoll)

      mSubscribeResponse, err := stream.Recv()

 

      if err == io.EOF {

          break

      }

      if err != nil {

          log.Println("Recv error: ", err)

          return err

      }else{

          fmt.Println("success:", mSubscribeResponse)

      }

  }

  return nil

}

(4)     编写main函数,向设备发起RPC请求。

代码示例如下:

func test() error {

  con, err := grpcConnect()

  defer con.grpcSession.Close()

  if err != nil {

      return err

  }

 

  if isSub{

      err := con.gnmiSubscribeOnceTest()

      if err != nil {

          return err

      }

  }

  return nil

}

//参数解析

func init() {

  flag.StringVar(&address, "a", "192.168.2.1", "Address to comware")

  flag.UintVar(&port, "gp", 50051, "Grpc port of comware")

  flag.StringVar(&username, "u", "admin", "Username to comware")

  flag.StringVar(&password, "up", "123456", "Password to comware")

  flag.BoolVar(&isSub, "sub", false, "gnmi subscribe examlpe")

}

//main函数

func main() {

  flag.Parse()

  test()

}

5.6  gRPC Dial-in模式二次开发举例(Python

5.6.1  生成代码

开发代码之前,需要使用工具软件protoc将收集到的Proto文件转换成Python代码,并将生成的代码加入到开发的工程中。

本例中,开发Subscribe操作的代码使用以下Proto文件:

·     grpc_service.proto

·     业务Proto文件,本例中为Syslog.proto

本例中,开发gNMI Subscribe操作的代码使用以下Proto文件:

·     grpc_service.proto

·     gnmi.protognmi_ext.proto

使用protoc工具生成Python代码的示例如下:

[root@ grpc]# cd protobuf

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. grpc_service.proto

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. Syslog.proto

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. gnmi.proto

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. gnmi_ext.proto

5.6.2  开发代码

1. 普通Subcribe操作

以调用GrpcServiceSyslogService服务类为例,编码步骤如下:

(1)     发起对设备的RPC方法请求。

以订阅日志触发事件为例,代码如下:

rpc SubscribeLOGEvent(LOGEvent) returns (grpc_service.SubscribeReply) {}

(2)     编写一个Client类,实现登录设备和退出登录设备。

代码示例如下:

import grpc

import json

from collections import OrderedDict

import grpc_service_pb2, grpc_service_pb2_grpc

import Syslog_pb2, Syslog_pb2_grpc

//Client客户端编程类

class Client:

    def __init__(self, username, password, channel):

        self.username = username

        self.password = password

        self.channel = channel

        self.__stub = grpc_service_pb2_grpc.GrpcServiceStub(channel)

        self.tokenid = ""

 

    def __enter__(self):

        return

 

    def __exit__(self, exc_type, exc_value, traceback):

        if self.tokenid:

            self.Logout()

 

    def __str__(self):

        return "{username=%s, password=%s, tokenid=%s}" % (self.username, self.password, self.tokenid)

 

    def metadata(self):

        return (("token_id", self.tokenid), )

    //Login函数,调用grpc_service_pb2.LoginRequest函数,完成登录设备

    def Login(self):

        if self.tokenid:

            return self

        request = grpc_service_pb2.LoginRequest(user_name=self.username, password=self.password)

        reply = self.__stub.Login(request)

        self.tokenid = reply.token_id

        return self

    //Logout函数,调用grpc_service_pb2.LogoutRequest函数,完成登出设备

    def Logout(self):

        if not self.tokenid:

            return

        request = grpc_service_pb2.LogoutRequest(token_id=self.tokenid)

        try:

            self.__stub.Logout(request)

        except Exception as e:

            logging.warning("Logout:" + e)

        self.tokenid = ""

        return

    //订阅事件流

    def SubscribeByStreamName(self, stream):

        request = grpc_service_pb2.SubscribeRequest(stream_name=stream)

        reply = self.__stub.SubscribeByStreamName(request, metadata=self.metadata())

        return reply.result

    //获取事件上报结果

    def GetEventReport(self):

        request = grpc_service_pb2.GetReportRequest(token_id=self.tokenid)

        yield from self.__stub.GetEventReport(request)

    //订阅Syslog logevent事件

    def sub(self, path):

        if path ==  "SubscribeLOGEvent":

           request = Syslog_pb2.LOGEvent()

           RpcMethod = Syslog_pb2_grpc.SyslogServiceStub(self.channel)

           reply = RpcMethod.SubscribeLOGEvent(request, metadata=self.metadata())

           return reply.result

(3)     编写main函数,下发订阅并且接收订阅。

代码示例如下:

//格式化Json字符串

def format_json(jsonstr):

    obj = json.loads(jsonstr, object_hook=OrderedDict)

    return json.dumps(obj, ensure_ascii=False, indent=4)

//客户端订阅事件并接受事件

def test():

    channel = grpc.insecure_channel("192.168.2.1:50051")

    client = Client("admin", "123456", channel)

    with client.Login():

        print(client)

        print(client.sub("SubscribeLOGEvent"))

        for e in client.GetEventReport():

            print(e)

            print(format_json(e.json_text))

//main函数

if __name__ == "__main__":

test()

2. gNMI Subscribe操作

gNMI Subscribe操作的编码步骤如下:

(1)     继承原有的gRPC Client类(请参见“5.6.2  1. (2)中的Client类),并定义gNMI RPC方法

代码示例如下:

from dialin import Client

import gnmi_pb2, gnmi_pb2_grpc

import grpc

import time

 

class GnmiClient(Client):

    @staticmethod

    def get_mode(s:str):

        mapping = {

            "stream": gnmi_pb2.SubscriptionList.Mode.STREAM,

            "once": gnmi_pb2.SubscriptionList.Mode.ONCE,

            "poll": gnmi_pb2.SubscriptionList.Mode.POLL,

            "target_defined": gnmi_pb2.TARGET_DEFINED,

            "on_change": gnmi_pb2.ON_CHANGE,

            "sample": gnmi_pb2.SAMPLE,

        }

        return mapping[s.lower()]

 

    def __init__(self, username, password, channel):

        super().__init__(username, password, channel)

        self.__stub = gnmi_pb2_grpc.gNMIStub(channel)

        return

 

    def make_poll_req(self):

        return gnmi_pb2.SubscribeRequest(poll=gnmi_pb2.Poll())

 

    def Subscribe(self, req):

        return self.__stub.Subscribe(req, metadata=self.metadata())

   

    def make_sub_obj(self,interval):

        path_obj = gnmi_pb2.Path()

        ele = path_obj.elem.add()

        ele.name = "Device"

        ele1 = path_obj.elem.add()

        ele1.name = "Base"

        mode = self.get_mode("sample")

        sample_interval = interval

        return gnmi_pb2.Subscription(path=path_obj, mode=mode, sample_interval=sample_interval)

   

    def make_sub_eventobj(self,interval,mode= "on_change"):

        path_obj = gnmi_pb2.Path()

        ele = path_obj.elem.add()

        ele.name = "Ifmgr"

        ele1 = path_obj.elem.add()

        ele1.name = "InterfaceEvent"

        mode = self.get_mode("on_change")

        sample_interval = interval

        return gnmi_pb2.Subscription(path=path_obj, mode=mode, sample_interval=sample_interval)

 

    def make_sub_req(self, sample_interval , mode, type ="sample",qos=0, updates_only=False):

        kwargs = {}

        kwargs["prefix"] = gnmi_pb2.Path()

        kwargs["qos"] = gnmi_pb2.QOSMarking(marking=qos)

        kwargs["mode"] = self.get_mode(mode)

        kwargs["encoding"] = gnmi_pb2.JSON

        kwargs["subscription"] = []

        if type == "sample":

            kwargs["subscription"].append(self.make_sub_obj(sample_interval))

        else:

            kwargs["subscription"].append(self.make_sub_eventobj(sample_interval)) 

        kwargs["updates_only"] = updates_only

        subscribeList = gnmi_pb2.SubscriptionList(**kwargs)

        return gnmi_pb2.SubscribeRequest(subscribe=subscribeList)

(2)     发起对设备的RPC方法请求。

代码示例如下:

rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse);

(3)     自定义订阅的实现方法。

代码示例如下:

def test_sub(client:GnmiClient):

    def poll_generator(n):

        req = client.make_sub_req(2000000000, mode="poll")

        yield req

        for _ in range(n):

            time.sleep(2)

            yield client.make_poll_req()

        return

   

    def sample_generator(t, sample_interval=2000000000, mode="stream"):

        req = client.make_sub_req(sample_interval,mode)

        yield req

        time.sleep(t)

        return

   

    def event_generator(t):

        req = client.make_sub_req(type ="event", sample_interval=10000000, mode="stream")

        yield req

        time.sleep(t)

        return

(4)     编写main函数,向设备下发订阅并且接收设备推送的采样数据。

代码示例如下:

if __name__ == "__main__":

    channel = grpc.insecure_channel("192.168.2.1:50051")

    client = GnmiClient("admin", "123456", channel)

    with client.Login():

        test_sub(client)

5.7  gRPC Dial-in模式二次开发举例(JAVA

5.7.1  生成代码

开发代码之前,需要开发人员先安装好maven环境,并在创建maven工程后修改pom.xml,实现下载protoc工具的步骤如下:

<plugins>

            <plugin>

                <groupId>org.xolstice.maven.plugins</groupId>

                <artifactId>protobuf-maven-plugin</artifactId>

                <version>0.6.1</version>

                <configuration>

                    <protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact>

                    <pluginId>grpc-java</pluginId>

                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.14.0:exe:${os.detected.classifier}</pluginArtifact>

                </configuration>

                <executions>

                    <execution>

                        <goals>

                            <goal>compile</goal>

                            <goal>compile-custom</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

然后,开发人员在maven环境中运行mvn install即可将收集到的Proto文件生成对应的JAVA代码。

本例中,开发Subscribe操作的代码使用以下Proto文件:

·     grpc_service.proto

·     业务Proto文件,本例中为Syslog.proto

本例中,开发gNMI Subscribe操作的代码使用以下Proto文件:

·     grpc_service.proto

·     gnmi.protognmi_ext.proto

5.7.2  开发代码

1. 普通Subcribe操作

以调用GrpcServiceSyslogService服务类为例,编码步骤如下:

(1)     编写一个DialinClient类,完成登录设备和退出登录设备。

代码示例如下:

public class DialinClient {

    private static final Logger logger = Logger.getLogger(DialinClient.class.getName());

    private final GrpcServiceGrpc.GrpcServiceBlockingStub blockingStub;

    private String tokenid;

 

    public DialinClient(Channel channel) {

        blockingStub = GrpcServiceGrpc.newBlockingStub(channel);

        tokenid = "";

    }

//设备登录

    public void login(String username, String password) throws Exception {

        logger.info("try to login as " + username + "...");

        LoginRequest request = LoginRequest.newBuilder().setUserName(username).setPassword(password).build();

        LoginReply loginReply;

        try {

            loginReply = blockingStub.login(request);

            tokenid = loginReply.getTokenId();

        } catch (StatusRuntimeException e) {

            logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());

            throw e;

        }

        System.out.println("Login : " + getTokenid());

    }

 

    public  String getTokenid() {

        return tokenid;

    }

//设备获取订阅上报结果

    public Iterator  getEventReport(String tokenid) {

        GrpcServiceOuterClass.GetReportRequest request = GrpcServiceOuterClass.GetReportRequest.newBuilder().setTokenId(tokenid).build();

        Iterator< GrpcServiceOuterClass.ReportEvent> eventIterator = null;

        try {

            eventIterator = blockingStub.getEventReport(request);

        } catch (Exception ignored) {

 

        }

        return  eventIterator;

    }

//退出设备登录

    public void logout() {

        LogoutRequest request = LogoutRequest.newBuilder().setTokenId(tokenid).build();

        LogoutReply logoutReply = null;

        try {

            logoutReply = blockingStub.logout(request);

            tokenid = "";

        } catch (Exception ignored) {

 

        }

        if(logoutReply != null) {

            System.out.println("Logout result: " + logoutReply.getResult());

        }

        return;

    }

}

(2)     发起对设备的RPC方法请求。

以订阅日志触发事件为例,代码如下:

rpc SubscribeLOGEvent(LOGEvent) returns (grpc_service.SubscribeReply) {}

(3)     编写一个SyslogClient类来封装发起的RPC方法。

代码示例如下:

public class SyslogClient {

    private static final Logger logger = Logger.getLogger(SyslogClient.class.getName());

    private SyslogServiceGrpc.SyslogServiceBlockingStub blockingStub;

    private String tokenid;

 

    public SyslogClient(Channel channel, String tokenid) {

        blockingStub = SyslogServiceGrpc.newBlockingStub(channel);

        this.tokenid = tokenid;

    }

 

    public String getTokenid() {

        return tokenid;

    }

 

    public void setTokenid(String tokenid) {

        this.tokenid = tokenid;

    }

 

    public String subLogEvent() throws Exception {

        Syslog.LOGEvent requset = Syslog.LOGEvent.newBuilder().addLog(Syslog.LOGEvent.LOG.newBuilder()).build();

        Metadata header = new Metadata();

        Metadata.Key<String> key = Metadata.Key.of("token_id", Metadata.ASCII_STRING_MARSHALLER);

        header.put(key, getTokenid());

        SyslogServiceGrpc.SyslogServiceBlockingStub blockingStub_tmp = MetadataUtils.attachHeaders(blockingStub, header);

        GrpcServiceOuterClass.SubscribeReply subscribeReply;

        try {

            subscribeReply = blockingStub_tmp.subscribeLOGEvent(requset);

        }catch (StatusRuntimeException e) {

            logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());

            throw e;

        }

        return subscribeReply.getResult();

    }

 

}

(4)     编写main函数实现事件的订阅。

代码示例如下:

public class Main {

 

    private static String ipPort = "192.168.2.1:50051";

    private static String usrname = "admin";

    private static String password = "123456";

//用户输入IP地址,用户名,密码

    public static ArrayList UserInput( ){

        System.out.println("Input ipaddress:port :");

        Scanner scanner = new Scanner(System.in);

        String str = "";

        ArrayList<String> stringArrayList = new ArrayList<String>();

        for (int i = 0;i < 3; i++){

            str = scanner.nextLine();

            stringArrayList.add(str);

            if(i == 0) {

                System.out.println("Input UserName :");

            }else if (i == 1) {

                System.out.println("Input PassWord :");

            }

        }

        scanner.close();

        return stringArrayList;

    }

//调用SyslogClient类,实现对syslog/logevent订阅

    public static void testSub (String tokenid) {

        long begin=0, end=0;

        int i;

        String s;

        ManagedChannel channel = null;

        try {

            channel = ManagedChannelBuilder.forTarget(ipPort).usePlaintext().build();

            SyslogClient syslogClient = new SyslogClient(channel, tokenid);

            begin = System.currentTimeMillis();

            s = syslogClient.subLogEvent();

            System.out.println(s);

            end = System.currentTimeMillis();

            System.out.printf("cost %dms\n", end-begin);

        } catch (Exception e) {

 

        }

 

    }

//调用DialinClient类,登录/订阅/登出设备

    public static void testDialin() throws  Exception {

        DialinClient client = null;

        ArrayList<String> arrayList = UserInput();

        ipPort = arrayList.get(0);

        usrname = arrayList.get(1);

        password = arrayList.get(2);

        ManagedChannel channel = ManagedChannelBuilder.forTarget(ipPort).usePlaintext().build();

        String tokenID = "";

        try {

            client = new DialinClient(channel);

            client.login(usrname, password);

            tokenID = client.getTokenid();

            testSub(tokenID);

            Iterator<GrpcServiceOuterClass.ReportEvent> eventIterator = client.getEventReport(tokenID);

            while (eventIterator.hasNext()) {

                System.out.println(eventIterator.next());

            }

        } catch (Exception e) {

            e.printStackTrace();

        }

        finally {

            System.out.println("created ");

            if (client != null) {

                client.logout();

            }

        }

}

//main函数

    public static void main (String[] agrs) throws Exception {

            testDialin();

    }

 

}

2. gNMI Subscribe操作

gNMI Subscribe操作的编写步骤如下:

(1)     编写一个DialinClient类,完成登录设备和退出设备登录。

步骤与“5.7.2  1. ”中编写DialinClient类相同。

(2)     发起对设备的RPC方法请求。

代码示例如下:

rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse);

(3)     编写gNMIClient类,并在该类中实现自定义订阅的方法。

代码示例如下:

public class gNMIClient {

    private static final Logger logger = Logger.getLogger(gNMIClient.class.getName());

    private gNMIGrpc.gNMIStub gNMIStub;

    private final String tokenId;

 

    public gNMIClient(Channel channel, String tokenId) {

        gNMIStub = gNMIGrpc.newStub(channel);

        this.tokenId = tokenId;

    }

    //gNMI订阅请求消息拼装

    public Gnmi.SubscribeRequest gnmiSubRequest(Gnmi.Path prefix, Gnmi.Path paths,

                                                Gnmi.SubscriptionList.Mode mode, Gnmi.SubscriptionMode subscriptionMode,

                                                int sample_interval) {

        Gnmi.Subscription.Builder subBuilder = Gnmi.Subscription.newBuilder();

        subBuilder.setPath(paths);

        if (subscriptionMode != null) {

            subBuilder.setMode(subscriptionMode);

        }

        if (sample_interval != 0)

        {

            subBuilder.setSampleInterval(sample_interval);

        }

        Gnmi.Subscription subscription = subBuilder.build();

        Gnmi.SubscriptionList.Builder subList = Gnmi.SubscriptionList.newBuilder();

        if (prefix != null) {

            subList.setPrefix(prefix);

        }

        subList.addSubscription(subscription);

        if (mode != null) {

            subList.setMode(mode);

        }

        subList.setEncoding(Gnmi.Encoding.forNumber(0));

        Gnmi.SubscriptionList subListReq = subList.build();

        Gnmi.SubscribeRequest.Builder requestBuilder = Gnmi.SubscribeRequest.newBuilder();

        requestBuilder.setSubscribe(subListReq);

        Gnmi.SubscribeRequest request = requestBuilder.build();

        return request;

    }

    //poll模式请求订阅消息

    public Gnmi.SubscribeRequest gnmiSubRequestByPoll (Gnmi.Path prefix, Gnmi.Path paths,Gnmi.SubscriptionList.Mode mode) {

        return gnmiSubRequest(prefix, paths,mode, null, 0);

    }

    //once模式请求订阅消息

    public  Gnmi.SubscribeRequest gnmiSubRequestByOnce(Gnmi.Path prefix, Gnmi.Path paths,Gnmi.SubscriptionList.Mode mode) {

        return gnmiSubRequest(prefix, paths,mode, null, 0);

    }

    //stream模式请求订阅消息

    public  Gnmi.SubscribeRequest gnmiSubRequestByStream(Gnmi.Path prefix, Gnmi.Path paths,Gnmi.SubscriptionList.Mode mode) {

        return gnmiSubRequest(prefix, paths,mode, Gnmi.SubscriptionMode.forNumber(2), 2000000000);

    }

   //once模式订阅

    public void testOnce(StreamObserver<Gnmi.SubscribeRequest> requestStreamObserver, Gnmi.Path prefix, Gnmi.Path paths) {

        Gnmi.SubscribeRequest request = gnmiSubRequestByOnce(prefix, paths, Gnmi.SubscriptionList.Mode.forNumber(1));

        requestStreamObserver.onNext(request);

    }

    //poll模式订阅

    public void testPoll(StreamObserver<Gnmi.SubscribeRequest> requestStreamObserver, Gnmi.Path prefix, Gnmi.Path paths) {

        Gnmi.SubscribeRequest request = gnmiSubRequestByPoll(prefix, paths, Gnmi.SubscriptionList.Mode.forNumber(2));

        requestStreamObserver.onNext(request);

        for (int i = 0; i < 5; i++) {

            Gnmi.SubscribeRequest poll = Gnmi.SubscribeRequest.newBuilder().setPoll(Gnmi.Poll.newBuilder()).build();

            requestStreamObserver.onNext(poll);

            try {

                Thread.sleep(5000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

 

    }

   //stream模式订阅

    public void testStream(StreamObserver<Gnmi.SubscribeRequest> requestStreamObserver, Gnmi.Path prefix, Gnmi.Path paths) {

        Gnmi.SubscribeRequest request = gnmiSubRequestByStream(prefix, paths, Gnmi.SubscriptionList.Mode.forNumber(0));

        requestStreamObserver.onNext(request);

    }

    //下发订阅请求,并接收上报结果

    public void testSubscribe(String name) {

        Metadata header = new Metadata();

        Metadata.Key<String> key = Metadata.Key.of("token_id", Metadata.ASCII_STRING_MARSHALLER);

        header.put(key,tokenId);

        gNMIGrpc.gNMIStub stub = MetadataUtils.attachHeaders(gNMIStub, header);

        Gnmi.Path.Builder pathBuilder = Gnmi.Path.newBuilder();

        String[] names = name.split("/");

        for (String na:names) {

            pathBuilder.addElem(Gnmi.PathElem.newBuilder().setName(na));

        }

        Gnmi.Path path = pathBuilder.build();

 

        StreamObserver<Gnmi.SubscribeResponse> observer = new StreamObserver<Gnmi.SubscribeResponse>() {

            @Override

            public void onNext(Gnmi.SubscribeResponse subscribeResponse) {

                if (subscribeResponse != null) {

                    System.out.println(subscribeResponse.getUpdate());

                }

            }

 

            @Override

            public void onError(Throwable throwable) {

                logger.log(Level.WARNING, "Subscribe is failed");

                throwable.printStackTrace();

                System.out.println(throwable);

            }

 

            @Override

            public void onCompleted() {

                System.out.println("onCompleted");

            }

        };

 

        StreamObserver<Gnmi.SubscribeRequest> requestStreamObserver = stub.subscribe(observer);

        //三种不同模式订阅

        //testOnce(requestStreamObserver,null,path);

        //testPoll(requestStreamObserver,null,path);

        //testStream(requestStreamObserver,null,path);

 

        try {

            Thread.sleep(50000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        requestStreamObserver.onCompleted();

    }

}

(4)     编写main函数实现事件的订阅。

代码示例如下:

public class Main {

    private static String ipPort = "192.168.2.1:50051";

    private static String usrname = "admin";

    private static String password = "123456";

 

    public static ArrayList UserInput( ){

        System.out.println("Input ipaddress:port :");

        Scanner scanner = new Scanner(System.in);

        String str = "";

        ArrayList<String> stringArrayList = new ArrayList<String>();

        for (int i = 0;i < 3; i++){

            str = scanner.nextLine();

            stringArrayList.add(str);

            if(i == 0) {

                System.out.println("Input UserName :");

            }else if (i == 1) {

                System.out.println("Input PassWord :");

            }

        }

        scanner.close();

        return stringArrayList;

    }

    //下发订阅,此处以Device/Base为例

    public static void testGnmi() throws  Exception{

        DialinClient dialinClient = null;

        ArrayList<String> arrayList = UserInput();

        ipPort = arrayList.get(0);

        usrname = arrayList.get(1);

        password = arrayList.get(2);

        ManagedChannel channel = ManagedChannelBuilder.forTarget(ipPort).usePlaintext().build();

 

        try {

            dialinClient = new DialinClient(channel);

            dialinClient.login(usrname, password);

            gNMIClient client = new gNMIClient(channel, dialinClient.getTokenid());

            client.testSubscribe("Device/Base");

        } finally {

            dialinClient.logout();

        }

 

    }

 

    public static void main (String[] agrs) throws Exception {

            testGnmi();

    }

 

}


6 Telemetry对接软件二次开发举例(Dial-out模式)

说明

本指南中展示的代码仅供参考,由于没有实际的代码框架,在实际的对接过程中不能直接使用。除举例中生成的代码之外,其它代码还需要开发人员自行开发。

 

对于Dial-out模式,主要是实现gRPC服务端的代码,使采集器能够接收设备推送的采集数据并进行解析。客户端代码主要包括以下两个部分:

·     继承自动生成的GRPCDialout::Service类,重载自动生成的RPC服务Dialout,并完成解析,获得相应字段内容。

·     RPC服务注册到指定的监听端口上。

本章节用于介绍如下三种Dial-out模式下,设备与对接服务端软件的开发过程:

·     二层普通Dial-out模式

·     三层Dial-out模式

·     二层gNMI Dial-out模式

6.1  开发前准备

6.1.1  开发人员要求

·     用户熟悉gRPC的开发(可通过https://doc.oschina.net/grpc学习

·     用户熟悉GPB编码的开发(可通过https://developers.google.com/protocol-buffers学习

·     用户熟悉对应语言(C++JAVAPythonGO)的开发。

6.1.2  开发环境准备

1. 获取Proto文件

联系H3C技术支持人员获取相关Proto文件。

2. 获取处理proto文件的工具软件protoc

下载地址:https://github.com/google/protobuf/releases

3. 获取对应开发语言的protobuf插件

下载地址:https://github.com/google/protobuf/releases

开发者需要准备好对应语言的开发环境,例如C++插件protobuf-cpp。本指南会给出当前主流语言的开发举例(C++JGOPythonJAVA)。

4. 存放Proto文件和工具软件

建议将获取到的Proto文件和protoc工具存在开发代码的工程目录下,具体以实际使用的开发环境为准。

6.2  组网需求

6-1所示,设备作为gRPC客户端与采集器相连,采集器为gRPC服务器,接收数据的端口号为50051

通过配置gRPC Dial-out模式,使设备以5秒为周期向采集器推送接口模块的设备能力信息。

图6-1 gRPC Dial-out模式配置组网图

 

6.3  配置设备侧的Telemetry订阅

执行以下配置之前,请确保gRPC服务器与gRPC客户端之间路由可达。

# 开启gRPC功能。

<Device> system-view

[Device] grpc enable

# 创建传感器组test,并添加采样路径为ifmgr/devicecapabilities

[Device] telemetry

[Device-telemetry] sensor-group test

[Device-telemetry-sensor-group-test] sensor path ifmgr/devicecapabilities

[Device-telemetry-sensor-group-test] quit

# 创建目标组collector1,并配置采集器的IP地址为2.2.2.2、端口号为50051

[Device-telemetry] destination-group collector1

[Device-telemetry-destination-group-collector1] ipv4-address 2.2.2.2 port 50051

[Device-telemetry-destination-group-collector1] quit

# 创建订阅A,配置关联传感器组为test,数据采样和推送周期为5秒,关联目标组为collector1

[Device-telemetry] subscription A

[Device-telemetry-subscription-A] sensor-group test sample-interval 5

[Device-telemetry-subscription-A] destination-group collector1

[Device-telemetry-subscription-A] quit

6.4  gRPC Dial-out模式二次开发举例(C++

6.4.1  生成代码

开发代码之前,需要使用protoc工具将收集到的Proto文件转换成C++代码,并将生成的代码加入到开发的工程中。

本例中,开发二层普通Dial-out模式的代码使用以下Proto文件:

·     grpc_dialout.proto

本例中,开发三层Dial-out模式的代码使用以下Proto文件:

·     grpc_dialout_v3.proto

·     telemetry.proto

·     业务Proto文件,本例中为Ifmgr_v3.proto

本例中,开发二层gNMI Dial-out模式的代码使用以下Proto文件:

·     dialout.proto

·     gnmi.protognmi_ext.proto

使用protoc工具生成C++代码的示例如下:

$ protoc --plugin=./grpc_cpp_plugin  --grpc_out=.  --cpp_out=. *.proto

6.4.2  开发代码

1. 二层普通Dial-out模式

编码步骤如下:

(1)     继承并重载RPC服务Dialout

新建一个类DialoutTest并继承GRPCDialout::Service,代码示例如下:

class DialoutTest final : public GRPCDialout::Service { //重载自动生成的抽象类

Status Dialout(ServerContext* context, ServerReader< DialoutMsg>* reader, DialoutResponse* response) override; //实现Dialout RPC方法

};

(2)     DialoutTest服务注册为gRPC服务,并指定监听端口。

代码示例如下:

using grpc::Server;

using grpc::ServerBuilder;

std::string server_address("0.0.0.0:50051"); //指定要监听的地址和端口

DialoutTest dialout_test; //定义(1)中声明的对象

ServerBuilder builder;

builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());//添加监听

builder.RegisterService(&dialout_test); //注册服务

std::unique_ptr<Server> server(builder.BuildAndStart()); //启动服务

server->Wait();

(3)     实现Dialout方法,实现数据解析。

代码示例如下:

Status DialoutTest::Dialout(ServerContext* context, ServerReader< DialoutMsg>* reader, DialoutResponse* response)

{

        DialoutMsg msg;

 

        while( reader->Read(&msg))

        {

            const DeviceInfo &device_msg = msg.devicemsg();

            std::cout<< "Producer-Name: " << device_msg.producername() << std::endl;

            std::cout<< "Device-Name: " << device_msg.devicename() << std::endl;

            std::cout<< "Device-Model: " << device_msg.devicemodel() << std::endl;

            std::cout<<"Sensor-Path: " << msg.sensorpath()<<std::endl;

            std::cout<<"Json-Data: " << msg.jsondata()<<std::endl;

            std::cout<<std::endl;

        }

        response->set_response("test");

 

        return Status::OK;

}

(4)     通过Read方法获取到Proto文件生成的DialoutMsg对象后,可以调用对应的方法获取相应的字段值。

2. 三层Dial-out模式

编码步骤如下:

(1)     继承并重载RPC服务Dialout

新建一个类DialoutV3Test并继承DialoutV3::Service,代码示例如下:

class DialoutV3Testfinal : public DialoutV3::Service { //重载自动生成的抽象类

Status DialoutV3Test::DialoutV3(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::grpc_dialout_v3::DialoutV3Args, ::grpc_dialout_v3::DialoutV3Args>* stream)override; //实现Dialout V3 RPC方法

};

(2)     DialoutV3Test服务注册为gRPC服务,并指定监听端口。

代码示例如下:

    string server_address("0.0.0.0.101:50051");

    DialoutV3Test dialout_test;

    ServerBuilder builder;

    cout << "runing on " << server_address << endl;

    builder.AddListeningPort(server_address, InsecureServerCredentials());

    builder.RegisterService(&dialout_test);

    unique_ptr<Server> server(builder.BuildAndStart());

    server->Wait();

(3)     实现DialoutV3方法,实现数据解析。

代码示例如下:

Status DialoutV3Test::DialoutV3(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::grpc_dialout_v3::DialoutV3Args, ::grpc_dialout_v3::DialoutV3Args>* stream)

{

DialoutV3Args msg;

    Telemetry msgTele;

    TelemetryGPBTable msgTable;

    TelemetryRowGPB msgRow;

    string buffdata = "";

    int buffsize = 0;

    string content; 

    std::cout << "peer info : " << context->peer() << std::endl; 

    while(stream->Read(&msg))

    {  

        int row_size = 0;

        int64_t ReqId = msg.reqid();

        string data = msg.data();

        string Err = msg.errors();

        int32_t TotalSize = msg.totalsize();

        buffdata = buffdata + data;

        buffsize = buffsize + data.size();

        if(buffsize >= TotalSize)

        {

        std::cout << "ReqId : " << ReqId << std::endl;

        std::cout << "errors: " << Err << std::endl;

        std::cout << "totalSize: " << TotalSize << std::endl;

        msgTele.ParseFromString(buffdata);

        std::cout << "data_size:" << buffdata.size() << std::endl;

        std::cout << "producer_name: " << msgTele.producer_name() << std::endl;

        std::cout << "Node_Id_str: " << msgTele.node_id_str() << std::endl;

        std::cout << "ProductName: " << msgTele.product_name() << std::endl;

        std::cout << "Sub_Id_str: " << msgTele.subscription_id_str() << std::endl;

        std::cout << "Sensor_path: " << msgTele.sensor_path() << std::endl;

        std::cout << "Collection_Id: " << msgTele.collection_id() << std::endl;

        std::cout << "Collection_start_time: " << msgTele.collection_start_time() << std::endl;

        std::cout << "msg_timestamp: " << msgTele.msg_timestamp() << std::endl;

        std::cout << "Collection_end_time: " << msgTele.collection_end_time() << std::endl;

        std::cout << "Current_period: " << msgTele.current_period() << std::endl;

        std::cout << "except_desc: " << msgTele.except_desc() << std::endl;

        std::cout << "Encoding: " << msgTele.Encoding_Name(msgTele.encoding()) << std::endl;

        if(msgTele.encoding() == 1)

        {

      std::cout << "Start----------------------------------" << std::endl;

            string path = msgTele.sensor_path();

            ConverXpath2MessageName(path);//XPATH转换成messagename

            Message *pGpbMsg = createMessage(path);//创建message解析三层数据

        if (pGpbMsg == NULL)

      {

                       std::cout << "break------------------------ " << std::endl;

        return Status::OK;

      }

            msgTable = msgTele.data_gpb();

            row_size = msgTable.row_size();

      std::cout << msgTable.DebugString() << std::endl;

 

      std::cout << "row_size: " << row_size << std::endl;

            for (int i = 0; i < row_size; i++)

               {

               msgRow = msgTable.row(i);

               content = msgRow.content();

               pGpbMsg->ParseFromString(content);

               std::cout << pGpbMsg->DebugString() << std::endl;

               }

        }

        if(msgTele.encoding() == 0)

        {

            std::cout << "JSON-Data: " << msgTele.data_str() << std::endl;

        }

        std::cout << "------------------------------------------" << std::endl;

   buffdata = "";

         buffsize = 0;

        }

    }

    return Status::OK;

}

Message *createMessage(const std::string &type_name)

{

    Message* pMessage = NULL;

    const google::protobuf::Descriptor* descriptor =

    google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(type_name);

    if (descriptor)

    {

        const Message* prototype =

            google::protobuf::MessageFactory::generated_factory()->GetPrototype(descriptor);

        if (prototype)

        {

            pMessage = prototype->New();

        }

    }

    return pMessage;

}

void ConverXpath2MessageName(string &path)

{

        int pos = path.find("/", 0);

        if (0 < pos)

        {

          string module = path.substr(0, pos);

          string msgname = path.substr(0, pos);

         for (int i = 0; i < pos; i++)

         {

           if ('-' == msgname[i])

           {

             msgname[i] = '_';

           }

           msgname[i] = tolower(msgname[i]);

         }

         for (int i = 0; i < pos; i++)

         {

           if ('-' == module[i])

           {

             module[i] = '_';

           }

         }

          msgname.append("_v3.");

          msgname.append(module);

          path.replace(0, path.length(), msgname);

        }

}

(4)     通过Read方法获取到Proto文件生成的DialoutV3Args对象后,可以调用对应的方法获取相应的字段值。

3. 二层gNMI Dial-out模式

编码步骤如下:

(1)     继承并重载RPC服务gNMIDialOut

新建一个类gNMI_Dialout_Sever并继承gNMIDialOut::Service,代码示例如下:

class gNMI_Dialout_Sever final : public gNMIDialOut::Service{

     Status Publish(ServerContext* context, ServerReaderWriter< PublishResponse, SubscribeResponse>* stream) override;

};

(2)     gNMI_Dialout_Sever服务注册为gRPC服务,并指定监听端口。

代码示例如下:

using grpc::Server;

using grpc::ServerBuilder;

std::string server_address("0.0.0.0:50051");//指定要监听的地址和端口

        gNMI_Dialout_Sever dialout_test; //定义(1)中声明的对象

      ServerBuilder builder;

        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());//添加监听

        builder.RegisterService(&dialout_test); //注册服务

      std::unique_ptr<Server> server(builder.BuildAndStart());//启动服务

      std::cout << "Server listening on " << server_address << std::endl;

      server->Wait();

(3)     实现gNMI Dialout方法,实现数据解析。

代码示例如下:

Status gNMI_Dialout_Sever::Publish(ServerContext* context, ServerReaderWriter< PublishResponse, SubscribeResponse>* stream)

{

    long long timestamp;

    SubscribeResponse response;

    std::vector<PublishResponse> publishes;

    while(stream->Read(&response)){

        std::cout << std::endl << "SubscribeResponse:" << response.DebugString() << std::endl;

        if(response.response_case() == ::gnmi::SubscribeResponse::kUpdate)

        {

            auto& notification = response.update();

            timestamp = notification.timestamp();

            for(int i = 0; i < notification.update_size(); ++i)

            {

                auto& update = notification.update(i);

                if(!update.has_val())

                    continue;

                auto& val = update.val();

                try

                {

                    if(val.value_case() == ::gnmi::TypedValue::kJsonVal)

                    {

                        std::cout << json::JSONPrettifyWithRapid(val.json_val()) << std::endl;

                    }

                }

                catch(std::exception& e)

                {

                    std::cout << "Json Format Error!" << std::endl;

                }

            }

            PublishResponse publish;

            publish.set_timestamp(timestamp);

            auto prefix = publish.mutable_prefix();

            prefix->add_elem()->set_name("test_prefix");

            auto path_add = publish.add_path();

            path_add->add_elem()->set_name("test_path");

            publishes.push_back(publish);

        }

        return Status::OK;

}

(4)     通过Read方法获取到Proto文件生成的DialoutMsg对象后,可以调用对应的方法获取相应的字段值。

6.5  gRPC Dial-out模式二次开发举例(GO

6.5.1  生成代码

开发代码之前,需要使用protoc工具将收集到的Proto文件转换成C++代码,并将生成的代码加入到开发的工程中。

本例中,开发二层普通Dial-out模式的代码使用以下Proto文件:

·     grpc_dialout.proto

本例中,开发三层Dial-out模式的代码使用以下Proto文件:

·     grpc_dialout_v3.proto

·     telemetry.proto

·     业务Proto文件,本例中为Ifmgr_v3.proto

本例中,开发二层gNMI Dial-out模式的代码使用以下Proto文件:

·     dialout.proto

·     gnmi.protognmi_ext.proto

使用protoc工具生成GO代码的示例如下:

[root@ grpc]# cd protobuf

[root@ protobuf]# protoc --go_out=plugins=grpc:. grpc_dialout.proto

[root@ protobuf]# protoc --go_out=plugins=grpc:. telemetry.proto

[root@ protobuf]# protoc --go_out=plugins=grpc:. dialout.proto

[root@ protobuf]# protoc --go_out=plugins=grpc:. gnmi.proto

[root@ protobuf]# protoc --go_out=plugins=grpc:. gnmi_ext.proto

6.5.2  开发代码

1. 二层普通Dial-out模式

编码步骤如下:

(1)     编写业务实现方法的容器。

代码示例如下:

       type Server struct {

          grpc_dialout.GRPCDialoutServer1.

}

        //采集数据结构体

type TelemetryData struct {

  DeviceMsg  *grpc_dialout.DeviceInfo

  SensorPath string

  JsonData   string

}

(2)     DialoutTest服务注册为gRPC服务,并指定监听端口

代码示例如下:

lis, err := net.Listen("tcp", 50051)

  if err != nil {

      fmt.Printf("failed to listen: %v\n", err)

  }

      s := grpc.NewServer([]grpc.ServerOption{grpc.NumStreamWorkers(10)}...)

  grpc_dialout.RegisterGRPCDialoutServer(s, &Server{})

  if err := s.Serve(lis); err != nil {

      fmt.Printf("failed to serve: %v\n", err)

  }

(3)     实现Dialout方法,实现数据解析。

代码示例如下:

func (s *Server) Dialout(gs grpc_dialout.GRPCDialout_DialoutServer) error {

  for{

           recvData, err := gs.Recv()

           if err == io.EOF {

               return gs.SendAndClose(&grpc_dialout.DialoutResponse{

                   Response: &grpc_dialout.DialoutMsg{

                        DeviceMsg: &grpc_dialout.DeviceInfo{

                            DeviceName: "H3C",

                        },

                   },

               })

           }

           if err != nil {

               return err

           }

           var data TelemetryData

           data.DeviceMsg = recvData.GetDeviceMsg()

           data.SensorPath = recvData.GetSensorPath()

           data.JsonData = recvData.GetJsonData()

  }

  return nil

}

(4)     通过获取到Proto文件生成的DialoutMsg对象后,可以调用对应的方法获取相应的字段值。

2. 三层Dial-out模式

编码步骤如下:

(1)     编写业务实现方法的容器。

代码示例如下:

// 业务实现方法的容器

type Server struct {

  grpc_dialout_v3.GRPCDialoutV3Server

}

 

type TelemetryV3Data struct {

  ReqId  int64

  data   []byte

  errors   string

  totalSize int32

}

 

type telemetryGpb struct {

    Row              []*telemetry.TelemetryRowGPB

}

 

type devicegbp struct {

    Base                      *device_v3.Device_MsgBase

}

(2)     GRPCDialoutV3服务注册为gRPC服务,并指定监听端口。

代码示例如下:

func RunS() {

  lis, err := net.Listen("tcp", port)

  if err != nil {

      fmt.Printf("failed to listen: %v\n", err)

  }

  s := grpc.NewServer([]grpc.ServerOption{grpc.NumStreamWorkers(10)}...)

  grpc_dialout_v3.RegisterGRPCDialoutV3Server(s, &Server{})

  if err := s.Serve(lis); err != nil {

      fmt.Printf("failed to serve: %v\n", err)

  }

 

}

 

func main() {

  fmt.Println("start grpc server for h3c roce telemetry")

      fmt.Printf("binding port:%v\n", port)

  RunS()

 

}

(3)     实现GRPCDialoutV3方法,解析数据。

代码示例如下:

func (s *Server) DialoutV3(stream grpc_dialout_v3.GRPCDialoutV3_DialoutV3Server) error {

  for{

           recvData, err := stream.Recv()

           if err == io.EOF {

               return grpc.Errorf(codes.Aborted, "stream EOF received")

           }

           if err != nil {

               return grpc.Errorf(grpc.Code(err), "received error from client")

           }

           var dataRecv TelemetryV3Data

           dataRecv.ReqId = recvData.GetReqId()

           dataRecv.data = recvData.GetData()

           dataRecv.totalSize = recvData.GetTotalSize()

           dataRecv.errors = recvData.GetErrors()

           //解析Telemetry层数据

            printGpb(dataRecv.data)

  }

  return nil

}

(4)     解析Telmetry层数据

Ifmgr/devicecapabilities为例的代码示例如下:

func typeForName(name string) (reflect.Type, error) {

    pt := proto.MessageType(name)

    if pt == nil {

        return nil, fmt.Errorf("unknown type: %q", name)

    }

    st := pt.Elem()

    return st, nil

}

//根据不同业务,获取业务模块proto文件,进行解码

func getProto(messageType string, messageBytes []byte) proto.Message {

    pt, _:= typeForName(messageType)

    msg := reflect.New(pt).Interface().(proto.Message)

    proto.Unmarshal(messageBytes, msg)

    return msg

}

 

func printGpb(bytes []byte) {

  msg := new(telemetry.Telemetry)

//进行Telemetry层数据解码

  proto.Unmarshal(bytes, msg)

  if msg.Encoding == telemetry.Telemetry_Encoding_JSON {

     fmt.Println("Json")

     fmt.Println(msg.DataStr)

     

    }else {

     fmt.Println("GPB")

     var gpbRow telemetryGpb

     gpbRow.Row = msg.GetDataGpb().GetRow()

     rowcontet := gpbRow.Row[0].GetContent()

     str := strings.Split(msg.SensorPath,"/")

         messageName := strings.ToLower(str[0]) + "_v3."+str[0]

       fmt.Println(messageName)

       //业务层数据解码

       context := getProto(messageName, rowcontet)

        fmt.Println(context)

    }

}

3. 二层gNMI Dial-out模式

编码步骤如下:

(1)     编写业务实现方法的容器。

代码示例如下:

// server is used to implement

type Server struct {

  dataStore interface{}  //For storing the data received

}

(2)     GNMIDialout服务注册为gRPC服务,并指定监听端口。

代码示例如下:

func main() {

  lis, err := net.Listen("tcp", 50051)

  if err != nil {

       log.Fatalf("failed to listen: %v", err)

  }

  s := grpc.NewServer()

  gs.RegisterGNMIDialOutServer(s, &Server{})

  if err := s.Serve(lis); err != nil {

       log.Fatalf("failed to serve: %v", err)

  }

}

(3)     实现GNMIDialout方法,并进行数据解析。

代码示例如下:

// Publish implements

func (srv *Server) Publish(stream gs.GNMIDialOut_PublishServer) error {

  for {

      subscribeResponse, err := stream.Recv()

      if err != nil {

           if err == io.EOF {

               return grpc.Errorf(codes.Aborted, "stream EOF received")

           }

           return grpc.Errorf(grpc.Code(err), "received error from client")

      }else {

           fmt.Println("success:", subscribeResponse)

      }

  }

}

(4)     根据Ifmgr_v3.proto文件,可以调用对应的方法获取相应的字段值。

6.6  gRPC Dial-out模式二次开发举例(Python

6.6.1  生成代码

开发代码之前,需要使用protoc工具将收集到的Proto文件转换成C++代码,并将生成的代码加入到开发的工程中。

本例中,开发二层普通Dial-out模式的代码使用以下Proto文件:

·     grpc_dialout.proto

本例中,开发三层Dial-out模式的代码使用以下Proto文件:

·     grpc_dialout_v3.proto

·     telemetry.proto

·     业务Proto文件,本例中为Ifmgr_v3.proto

本例中,开发二层gNMI Dial-out模式的代码使用以下Proto文件:

·     dialout.proto

·     gnmi.protognmi_ext.proto

使用protoc工具生成Python代码的示例如下:

[root@ grpc]# cd protobuf

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. grpc_dialout.proto

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. grpc_dialout_v3.proto

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. telemetry.proto

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. Ifmgr_v3.proto

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. dialout.proto

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. gnmi.proto.proto

[root@ protobuf]# python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. gnmi_ext.proto.proto

6.6.2  开发代码

1. 二层普通Dail-out模式

编写步骤如下:

(1)     新建DialoutServicer类,实现数据接收和响应

代码示例如下:

class DialoutServicer(grpc_dialout_pb2_grpc.GRPCDialoutServicer):

    def Dialout(self, request_iterator, context):

        for i, req in enumerate(request_iterator):

            print("thread: %d, message index: %d" % (threading.get_ident(), i))

            print(req)

            print()

        print("a client is disconnected")

        return grpc_dialout_pb2.DialoutResponse(response="anything")

(2)     DialoutServicer服务注册为gRPC服务,并指定监听端口

代码示例如下:

def serve():

    addr = "[::]:50051"

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))

    grpc_dialout_pb2_grpc.add_GRPCDialoutServicer_to_server(DialoutServicer(), server)

    server.add_insecure_port(addr)

    server.start()

    print(f"grpc dialout server is running on {addr}")

    try:

        server.wait_for_termination()

    except KeyboardInterrupt:

        print("Interrupted, now quiting")

    return

(3)     通过获取到的Proto文件解析接收到的采样数据,获取对应的字段值。

2. 三层Dial-out模式

编码步骤如下:

(1)     新建DialoutV3Servicer类,实现数据接收和响应。

代码示例如下:

class DialoutV3Servicer(grpc_dialout_v3_pb2_grpc.gRPCDialoutV3Servicer):

    def DialoutV3(self, request_iterator, context):

        for i, req in enumerate(request_iterator):

            print("thread: %d, message index: %d" % (threading.get_ident(), i))

            self.print_gpb(req)//接收的数据需要根据proto文件解析

            print()

        print("a client is disconnected\n")

        return

(2)     DialoutServicer服务注册为gRPC服务,并指定监听端口。

代码示例如下:

def serve():

    addr = "[::]:50051"

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))

    grpc_dialout_v3_pb2_grpc.add_gRPCDialoutV3Servicer_to_server(DialoutV3Servicer(), server)

    server.add_insecure_port(addr)

    server.start()

    print(f"grpc dialout server is running on {addr}")

    try:

        server.wait_for_termination()

    except KeyboardInterrupt:

        print("Interrupted, now quiting")

    return

(3)     DialoutV3Servicer类中新增方法解析接收到数据

代码示例如下:

def find_msg_cls(self, sensor_path:str):

        module_name = sensor_path.split("/")[0]

        proto_name = f"{module_name}_v3"

        pool = descriptor_pool.Default()

        file_desc = pool.FindFileByName(f"{proto_name}.proto")//业务部proto文件,解析不同GPB编码

        module_desc = file_desc.message_types_by_name[module_name]

        msg_fact = message_factory.MessageFactory(pool=pool)

        return msg_fact.GetPrototype(module_desc)

 

 

    def print_gpb(self, msg:grpc_dialout_v3_pb2.DialoutV3Args):

        telemetry = telemetry_pb2.Telemetry.FromString(msg.data)

        sensor_path = telemetry.sensor_path

        msg.ClearField("data")

        s = str(msg)

        if telemetry.encoding == telemetry_pb2.Telemetry.Encoding.Encoding_JSON:

            s += "\n" + str(telemetry)

            print(s)

            return

       

        tmp = ""

        try:

            msg_cls = self.find_msg_cls(sensor_path)

            for row in telemetry.data_gpb.row:

                gpb = row.content

                msec = "%03d" % (row.timestamp % 1000)

                timestr = "({})".format(datetime.fromtimestamp(row.timestamp/1000).strftime(f"%Y-%m-%d %H:%M:%S.{msec}"))

                tmp += "\ntimestamp: " + str(row.timestamp) + timestr

                tmp += "\nkeys: " + str(row.keys)

                tmp += f"\ncontent({sensor_path}):\n" + str(msg_cls.FromString(gpb))

            telemetry.ClearField("data_gpb")

            s += "\n" + str(telemetry) + tmp

            print(s)

        except Exception as e:

            print(e)

        return

3. 二层gNMI Dial-out模式

编码步骤如下:

(1)     新建GnmiDialoutServicer类,实现数据接收和响应。

代码示例如下:

class GnmiDialoutServicer(dial_out_pb2_grpc.gNMIDialOutServicer):

    def Publish(self, request_iterator, context):

        for i, req in enumerate(request_iterator):

            print("thread: %d, message index: %d" % (threading.get_ident(), i))

            if req.HasField("update"):

                tm = req.update.timestamp

                msec = "%03d" % ((tm % 1000000000) // 1000000)

                print(datetime.fromtimestamp(tm/1000000000).strftime(f"%Y-%m-%d %H:%M:%S.{msec}"))

            print(req)

            print()

        print("a client is disconnected")

        return

(2)     GnmiDialoutServicer服务注册为gRPC服务,并指定监听端口

代码示例如下:

def serve():

    addr = "[::]:50051"

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))

    dial_out_pb2_grpc.add_gNMIDialOutServicer_to_server(GnmiDialoutServicer(), server)

    server.add_insecure_port(addr)

    server.start()

    print(f"grpc dialout server is running on {addr}")

    try:

        server.wait_for_termination()

    except KeyboardInterrupt:

        print("Interrupted, now quiting")

    return

6.7  gRPC Dial-out模式二次开发举例(JAVA

6.7.1  生成代码

开发代码之前,需要开发人员先安装好maven环境,并在创建maven工程后修改pom.xml,实现下载protoc工具的步骤如下:

<plugins>

            <plugin>

                <groupId>org.xolstice.maven.plugins</groupId>

                <artifactId>protobuf-maven-plugin</artifactId>

                <version>0.6.1</version>

                <configuration>

                    <protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact>

                    <pluginId>grpc-java</pluginId>

                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.14.0:exe:${os.detected.classifier}</pluginArtifact>

                </configuration>

                <executions>

                    <execution>

                        <goals>

                            <goal>compile</goal>

                            <goal>compile-custom</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

然后,开发人员在maven环境中运行mvn install即可将收集到的Proto文件生成对应的JAVA代码。

本例中,开发二层普通Dial-out模式的代码使用以下Proto文件:

·     grpc_dialout.proto

本例中,开发三层Dial-out模式的代码使用以下Proto文件:

·     grpc_dialout_v3.proto

·     telemetry.proto

·     业务Proto文件,本例中为Ifmgr_v3.proto

本例中,开发二层gNMI Dial-out模式的代码使用以下Proto文件:

·     dialout.proto

·     gnmi.protognmi_ext.proto

6.7.2  开发代码

1. 二层普通Dail-out模式

编码步骤如下:

(1)     编写DialoutServer类,并将服务注册到指定监听端口上。

代码示例如下:

public DialoutServer(int port) {

        this.port = port;

        ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);

        server = serverBuilder.addService(new DialoutService()).build();

    }

(2)     DialoutServer类中新建DialoutService类,并继承PCDialoutGrpc.GRPCDialoutImplBase完成数据的接受和解析。

代码示例如下:

private static class DialoutService extends GRPCDialoutGrpc.GRPCDialoutImplBase {

        DialoutService() {}

 

        /**

         * Gets a stream of DialoutMsg

         *

         * @param responseObserver an observer to receive the dialout response.

         * @return an observer to receive the requested dialout messages.

         */

        @Override

        public StreamObserver<GrpcDialout.DialoutMsg> dialout(final StreamObserver<GrpcDialout.DialoutResponse> responseObserver) {

            return new StreamObserver<GrpcDialout.DialoutMsg>() {

 

                @Override

                public void onNext(GrpcDialout.DialoutMsg msg) {

                   //数据解析

                    System.err.println("--- received a msg ---");

 

                    GrpcDialout.DeviceInfo deviceInfo = msg.getDeviceMsg();

                    System.out.printf("producerName: %s\n", deviceInfo.getProducerName());

                    System.out.printf("deviceName: %s\n", deviceInfo.getDeviceName());

                    System.out.printf("deviceModel: %s\n", deviceInfo.getDeviceModel());

                    if (deviceInfo.hasDeviceModel()) {

                        System.out.printf("deviceIpAddr: %s\n", deviceInfo.getDeviceIpAddr());

                    }

                    if (deviceInfo.hasEventType()) {

                        System.out.printf("eventType: %s\n", deviceInfo.getEventType());

                    }

 

                    System.out.printf("sensorPath: %s\n", msg.getSensorPath());

                    System.out.printf("jsonData len: %s\n", msg.getJsonData().length());

                    System.out.printf("jsonData: %s\n", msg.getJsonData());

 

                    if (msg.hasChunkMsg()) {

                        GrpcDialout.ChunkInfo chunk = msg.getChunkMsg();

                        System.out.printf("totalSize: %d\n", chunk.getTotalSize());

                        System.out.printf("totalFragments: %d\n", chunk.getTotalFragments());

                        System.out.printf("nodeId: %d\n", chunk.getNodeId());

                    }

                    System.out.println();

                }

 

                @Override

                public void onError(Throwable throwable) {

                    logger.log(Level.WARNING, "Dialout cancelled");

                }

 

                @Override

                public void onCompleted() {

                    System.out.println("a client disconnectd.");

                    responseObserver.onCompleted();

                }

            };

        }

    }

(3)     DialoutServer类中实现main函数,服务端启动监听。

代码示例如下:

        public void start() throws IOException {

        server.start();

        logger.info("Server started, listening on  " + port);

        Runtime.getRuntime().addShutdownHook(new Thread() {

            @Override

            public void run() {

                System.err.println("*** shuting down gRPC server since JVM is shutting down");

                try {

                    DialoutServer.this.stop();

                }catch (InterruptedException e) {

                    e.printStackTrace(System.err);

                }

                System.err.println("*** server shut down");

            }

        });

    }

 

    public void stop() throws InterruptedException {

        if (server != null) {

            server.shutdown().awaitTermination(30, TimeUnit.SECONDS);

        }

    }

 

    public void blockUnitilShutdown() throws InterruptedException {

        if (server != null) {

            server.awaitTermination();

        }

    }

    //main函数

    public static void main (String[] args) throws Exception {

        //此函数是参数解析,可以自行实现

ArgParser argParser = new ArgParser();

        argParser.Parse(args);

        DialoutServer server = new DialoutServer(argParser.port);

        server.start();

        server.blockUnitilShutdown();

    }

2. 三层Dial-out模式

编码步骤如下:

(1)     编写gRPCgpb类,并将服务注册到指定监听端口上。

代码示例如下:

public gRPCgpb(int port) {

        this.port = port;

        ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);

        server = serverBuilder.addService(new Dialoutv3Service()).build();

    }

(2)     gRPCgpb类中新建Diaoutv3Service类,并继承gRPCDialoutV3Grpc.gRPCDialoutV3ImplBase

代码示例如下:

private static class Dialoutv3Service extends gRPCDialoutV3Grpc.gRPCDialoutV3ImplBase {

 

        Dialoutv3Service() {}

 

        @Override

        public StreamObserver<GrpcDialoutV3.DialoutV3Args> dialoutV3(final StreamObserver<GrpcDialoutV3.DialoutV3Args>  responseObserver) {

            return new StreamObserver<GrpcDialoutV3.DialoutV3Args>() {

                @Override

                public void onNext(GrpcDialoutV3.DialoutV3Args dialoutV3Args) {

                    System.err.println("--- received a msg ---");

                    System.out.printf("reqId: %d\n", dialoutV3Args.getReqId());

                    System.out.printf("errors: %s\n", dialoutV3Args.getErrors());

                    System.out.printf("reqId: %d\n", dialoutV3Args.getTotalSize());

                    paraseData(dialoutV3Args.getData());

                }

 

                @Override

                public void onError(Throwable throwable) {

                    logger.log(Level.WARNING, "Dialout cancelled");

                }

 

                @Override

                public void onCompleted() {

                    System.out.println("a client disconnectd.");

                    responseObserver.onCompleted();

                }

            };

        }

 

    }

}

(3)     在新建的Dialoutv3Service类中实现三层数据的解码。

Ifmgr/devicecapabilities为例,代码如下:

//根据业务Proto文件解析业务数据

private void  printObject(String sensorPath, TelemetryOuterClass.TelemetryRowGPB rowGPB){

            try {

                if (true ==sensorPath.equals("Ifmgr/Devicecapabilities")) {

                    Object object =IfmgrV3.Ifmgr.parseFrom(rowGPB.getContent());

                    System.out.println(object.toString());

                }

 

            }catch (InvalidProtocolBufferException e)

            {

                logger.log(Level.WARNING, "sensorPath:" + sensorPath + ",TelemetryRowGPB:" + rowGPB);

            }

 

        }

 

        private void copyTelemetryRowGPB(TelemetryOuterClass.TelemetryRowGPB rowGPB,String sensorPath){

            if (rowGPB != null) {

                System.out.printf("timestamp: %d\n", rowGPB.getTimestamp());

                System.out.printf("key: %s\n", rowGPB.getKeys().toString());

                printObject(sensorPath, rowGPB);

            }

        }

        //GPB数据解析

        public void paraseData (ByteString byteString) {

            try {

                TelemetryOuterClass.Telemetry telemetry TelemetryOuterClass.Telemetry.parseFrom(byteString);

                if (telemetry == null) {

                    return;

                }

                System.out.printf("producer_name: %s\n", telemetry.getProducerName());

                System.out.printf("node_id_str: %s\n", telemetry.getNodeIdStr());

                System.out.printf("product_name: %s\n", telemetry.getProductName());

                System.out.printf("sensorPath: %s\n", telemetry.getSensorPath());

                System.out.printf("collection_id: %d\n", telemetry.getCollectionId());

                System.out.printf("collection_start_time: %d\n", telemetry.getCollectionStartTime());

                System.out.printf("msg_timestamp: %d\n", telemetry.getMsgTimestamp());

                System.out.printf("collection_end_time: %d\n", telemetry.getCollectionEndTime());

                System.out.printf("current_period: %d\n", telemetry.getCurrentPeriod());

                System.out.printf("execpt_desc: %s\n", telemetry.getExceptDesc());

                System.out.printf("encoding: %s\n", telemetry.getEncoding().toString());

                if (telemetry.getEncodingValue() == 0) {

                    System.out.println(telemetry.getDataStr());

                }else {

                    TelemetryOuterClass.TelemetryGPBTable telemetryGPBTable = telemetry.getDataGpb();

 

                    if (telemetryGPBTable != null) {

                        int rowCount = telemetryGPBTable.getRowCount();

                        for (int i = 0; i < rowCount; i++) {

//Telemetryrow内容进行解码

                         copyTelemetryRowGPB(telemetryGPBTable.getRow(i),telemetry.getSensorPath());

                        }

                    }

                }

 

            }catch (Exception e) {

                System.err.printf("TelemetryRowGPB parased false");

            }

 

 

        }

(4)     gRPCgpb类中实现main函数,服务端启动监听。

代码示例如下:

public void start() throws  IOException {

        server.start();

        logger.info("Server started, listening on  " + port);

        Runtime.getRuntime().addShutdownHook(new Thread() {

            @Override

            public void run() {

                System.err.println("*** shuting down gRPC server since JVM is shutting down");

                try {

                    gRPCgpb.this.stop();

                }catch (InterruptedException e) {

                    e.printStackTrace(System.err);

                }

                System.err.println("*** server shut down");

            }

        });

    }

 

    public void stop() throws InterruptedException {

        if (server != null) {

            server.shutdown().awaitTermination(30, TimeUnit.SECONDS);

        }

    }

 

    public void blockUnitilShutdown() throws InterruptedException {

        if (server != null) {

            server.awaitTermination();

        }

    }

    //main函数

public static void main (String[] args) throws Exception {

    //此函数是参数解析,可以自行实现

        ArgParser argParser = new ArgParser();

        argParser.Parse(args);

        gRPCgpb server = new gRPCgpb(argParser.port);

        server.start();

        server.blockUnitilShutdown();

    }

3. 二层gNMI Dial-out模式

编码步骤如下:

(1)     编写gnmiDialout类,并将服务注册到指定监听端口上。

代码示例如下:

    public gnmiDailout(int port) {

        this.port = port;

        ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);

        server = serverBuilder.addService(new gnmiDailoutService()).build();

    }

(2)     gnmiDialout中新建gnmiDialoutService类,并继承gNMIDialOutGrpc.gNMIDialOutImplBase,实现数据的解析。

代码示例如下:

private static class gnmiDailoutService extends gNMIDialOutGrpc.gNMIDialOutImplBase {

 

        gnmiDailoutService() {}

 

        @Override

        public StreamObserver<Gnmi.SubscribeResponse> publish(final StreamObserver<DialOut.PublishResponse> responseObserver) {

            return new StreamObserver<Gnmi.SubscribeResponse>() {

                @Override

                public void onNext(Gnmi.SubscribeResponse subscribeResponse) {

                     System.err.println("--- received a msg ---");

                     //解析数据

                     Object object = subscribeResponse.getUpdate();

                     System.out.println(object.toString());

                }

 

                @Override

                public void onError(Throwable throwable) {

                    logger.log(Level.WARNING, "Dialout cancelled");

                }

 

                @Override

                public void onCompleted() {

                    System.out.println("a client disconnectd.");

                    responseObserver.onCompleted();

                }

            };

        }

 

    }

(3)     gnmiDialout类中编写main函数,启动服务端监听。

代码示例如下:

    public void start() throws IOException {

        server.start();

        logger.info("Server started, listening on  " + port);

        Runtime.getRuntime().addShutdownHook(new Thread() {

            @Override

            public void run() {

                System.err.println("*** shuting down gRPC server since JVM is shutting down");

                try {

                    gnmiDailout.this.stop();

                }catch (InterruptedException e) {

                    e.printStackTrace(System.err);

                }

                System.err.println("*** server shut down");

            }

        });

    }

 

    public void stop() throws InterruptedException {

        if (server != null) {

            server.shutdown().awaitTermination(30, TimeUnit.SECONDS);

        }

    }

 

    public void blockUnitilShutdown() throws InterruptedException {

        if (server != null) {

            server.awaitTermination();

        }

    }

 

    public static void main (String[] args) throws Exception {

        ArgParser argParser = new ArgParser();

        argParser.Parse(args);

        gnmiDailout server = new gnmiDailout(argParser.port);

        server.start();

        server.blockUnitilShutdown();

}


7 常见问题

Q:采样周期的准确性受那些因素影响?

A:正常情况下,设备会以用户配置的Telemetry采样周期对订阅的数据进行采样,但是由于现网环境复杂、业务较多,采样周期的准确性受以下因素影响:

·     受采样实例数目影响:部分节点的采样路径下包含的采样实例数目庞大,例如route/ipv4routes,当路由表项达到100k时,采样数据量较大,设备无法在一个较小的采样周期完成采集工作。

·     受采样数据源的采样周期影响:某些数据源有自己的最小采样周期,当它与用户配置的Telemetry采样周期不一致时,设备可能不会按照配置的采样周期上报数据。例如,采样路径device/base上的最小采样周期为5秒,若设备上配置的Telemetry采样周期为100毫秒,由于数据源无法达到配置的采样周期精度,gRPC模块会以数据源自身的最小采样周期上报数据。

·     CPU影响:当设备上的CPU繁忙时,Telemetry的采样工作可能无法正常进行,gRPC模块将不能在当前采样周期内完成采集工作。

 

新华三官网
联系我们