diff --git a/pkg/cat/kkc.go b/pkg/cat/kkc.go index 7d95f946..b981c7b1 100644 --- a/pkg/cat/kkc.go +++ b/pkg/cat/kkc.go @@ -685,7 +685,7 @@ func (kc *KTranslate) Run(ctx context.Context) error { } assureInput() kc.metrics.SnmpDeviceData = kt.NewSnmpMetricSet(kc.registry) - err := snmp.StartSNMPPolls(ctx, kc.inputChan, kc.metrics.SnmpDeviceData, kc.registry, kc.apic, kc.log, kc.config.SNMPInput, kc.resolver, kc.confMgr) + err := snmp.StartSNMPPolls(ctx, kc.inputChan, kc.metrics.SnmpDeviceData, kc.registry, kc.apic, kc.log, kc.config.SNMPInput, kc.resolver, kc.confMgr, kc.logTee) if err != nil { return err } diff --git a/pkg/inputs/snmp/metrics/poll.go b/pkg/inputs/snmp/metrics/poll.go index 1cc86fca..1baee16a 100644 --- a/pkg/inputs/snmp/metrics/poll.go +++ b/pkg/inputs/snmp/metrics/poll.go @@ -34,7 +34,7 @@ type Poller struct { pingSec int } -func NewPoller(server *gosnmp.GoSNMP, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, jchfChan chan []*kt.JCHF, metrics *kt.SnmpDeviceMetric, profile *mibs.Profile, log logger.ContextL) *Poller { +func NewPoller(server *gosnmp.GoSNMP, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, jchfChan chan []*kt.JCHF, metrics *kt.SnmpDeviceMetric, profile *mibs.Profile, log logger.ContextL, logchan chan string) *Poller { // Default poll rate is 5 min. This is what a lot of SNMP billing is on. counterTimeSec := 5 * 60 if conf != nil && conf.PollTimeSec > 0 { @@ -84,7 +84,7 @@ func NewPoller(server *gosnmp.GoSNMP, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpD } // If we are extending the metrics for this device in any way, set it up now. - ext, err := extension.NewExtension(jchfChan, gconf, conf, metrics, log) + ext, err := extension.NewExtension(jchfChan, gconf, conf, metrics, log, logchan) if err != nil { log.Errorf("Cannot setup extension for %s -> %s: %v", err, conf.DeviceIP, conf.DeviceName) } else if ext != nil { @@ -144,7 +144,7 @@ func NewPollerForPing(gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, jch return &poller } -func NewPollerForExtention(gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, jchfChan chan []*kt.JCHF, metrics *kt.SnmpDeviceMetric, profile *mibs.Profile, log logger.ContextL) *Poller { +func NewPollerForExtention(gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, jchfChan chan []*kt.JCHF, metrics *kt.SnmpDeviceMetric, profile *mibs.Profile, log logger.ContextL, logchan chan string) *Poller { // Default poll rate is 5 min. This is what a lot of SNMP billing is on. counterTimeSec := 5 * 60 if conf != nil && conf.PollTimeSec > 0 { @@ -174,7 +174,7 @@ func NewPollerForExtention(gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig } // If we are extending the metrics for this device in any way, set it up now. - ext, err := extension.NewExtension(jchfChan, gconf, conf, metrics, log) + ext, err := extension.NewExtension(jchfChan, gconf, conf, metrics, log, logchan) if err != nil { log.Errorf("Cannot setup extension for %s -> %s: %v", err, conf.DeviceIP, conf.DeviceName) } else if ext != nil { diff --git a/pkg/inputs/snmp/pollOnce.go b/pkg/inputs/snmp/pollOnce.go index 4ec830ae..8262bdd7 100644 --- a/pkg/inputs/snmp/pollOnce.go +++ b/pkg/inputs/snmp/pollOnce.go @@ -14,7 +14,7 @@ import ( "github.com/kentik/ktranslate/pkg/kt" ) -func pollOnce(ctx context.Context, tdevice string, conf *kt.SnmpConfig, connectTimeout time.Duration, retries int, jchfChan chan []*kt.JCHF, metrics *kt.SnmpMetricSet, registry go_metrics.Registry, log logger.ContextL) error { +func pollOnce(ctx context.Context, tdevice string, conf *kt.SnmpConfig, connectTimeout time.Duration, retries int, jchfChan chan []*kt.JCHF, metrics *kt.SnmpMetricSet, registry go_metrics.Registry, log logger.ContextL, logchan chan string) error { device := conf.Devices[tdevice] if device == nil { for _, dev := range conf.Devices { @@ -50,7 +50,7 @@ func pollOnce(ctx context.Context, tdevice string, conf *kt.SnmpConfig, connectT nm := kt.NewSnmpDeviceMetric(registry, device.DeviceName) metadataPoller := metadata.NewPoller(metadataServer, conf.Global, device, jchfChan, nm, profile, log) - metricPoller := snmp_metrics.NewPoller(metricsServer, conf.Global, device, jchfChan, nm, profile, log) + metricPoller := snmp_metrics.NewPoller(metricsServer, conf.Global, device, jchfChan, nm, profile, log, logchan) metadataPoller.StartLoop(ctx) // Give a little time to get this done. diff --git a/pkg/inputs/snmp/snmp.go b/pkg/inputs/snmp/snmp.go index 2b9824e3..a44197cc 100644 --- a/pkg/inputs/snmp/snmp.go +++ b/pkg/inputs/snmp/snmp.go @@ -57,7 +57,7 @@ func init() { flag.BoolVar(&validateMib, "snmp_validate", false, "If true, validate mib profiles and exit.") } -func StartSNMPPolls(ctx context.Context, jchfChan chan []*kt.JCHF, metrics *kt.SnmpMetricSet, registry go_metrics.Registry, apic *api.KentikApi, log logger.ContextL, cfg *ktranslate.SNMPInputConfig, resolv *resolv.Resolver, confMgr config.ConfigManager) error { +func StartSNMPPolls(ctx context.Context, jchfChan chan []*kt.JCHF, metrics *kt.SnmpMetricSet, registry go_metrics.Registry, apic *api.KentikApi, log logger.ContextL, cfg *ktranslate.SNMPInputConfig, resolv *resolv.Resolver, confMgr config.ConfigManager, logchan chan string) error { snmpFile := cfg.SNMPFile // Do this once here just to see if we need to exit right away. conf, connectTimeout, retries, err := initSnmp(ctx, snmpFile, log) @@ -92,12 +92,12 @@ func StartSNMPPolls(ctx context.Context, jchfChan chan []*kt.JCHF, metrics *kt.S // If we just want to poll one device and exit, do this here. if v := cfg.PollNowTarget; v != "" { - return pollOnce(ctx, v, conf, connectTimeout, retries, jchfChan, metrics, registry, log) + return pollOnce(ctx, v, conf, connectTimeout, retries, jchfChan, metrics, registry, log, logchan) } // Now, launch a metadata and metrics server for each configured or discovered device. if conf.Trap == nil || !conf.Trap.TrapOnly { // Unless we are turning off everything but snmp traps. - go wrapSnmpPolling(ctx, snmpFile, jchfChan, metrics, registry, apic, log, 0, cfg, confMgr) + go wrapSnmpPolling(ctx, snmpFile, jchfChan, metrics, registry, apic, log, 0, cfg, confMgr, logchan) } // Run a trap listener? @@ -147,9 +147,9 @@ func initSnmp(ctx context.Context, snmpFile string, log logger.ContextL) (*kt.Sn return conf, connectTimeout, retries, nil } -func wrapSnmpPolling(ctx context.Context, snmpFile string, jchfChan chan []*kt.JCHF, metrics *kt.SnmpMetricSet, registry go_metrics.Registry, apic *api.KentikApi, log logger.ContextL, restartCount int, cfg *ktranslate.SNMPInputConfig, confMgr config.ConfigManager) { +func wrapSnmpPolling(ctx context.Context, snmpFile string, jchfChan chan []*kt.JCHF, metrics *kt.SnmpMetricSet, registry go_metrics.Registry, apic *api.KentikApi, log logger.ContextL, restartCount int, cfg *ktranslate.SNMPInputConfig, confMgr config.ConfigManager, logchan chan string) { ctxSnmp, cancel := context.WithCancel(ctx) - err := runSnmpPolling(ctxSnmp, snmpFile, jchfChan, metrics, registry, apic, log, restartCount, cfg) + err := runSnmpPolling(ctxSnmp, snmpFile, jchfChan, metrics, registry, apic, log, restartCount, cfg, logchan) if err != nil { log.Errorf("There was an error when polling for SNMP devices: %v.", err) } @@ -173,10 +173,10 @@ func wrapSnmpPolling(ctx context.Context, snmpFile string, jchfChan chan []*kt.J // If we got this signal, redo the snmp system. cancel() - go wrapSnmpPolling(ctx, snmpFile, jchfChan, metrics, registry, apic, log, restartCount+1, cfg, confMgr) // Track how many times through here we've been. + go wrapSnmpPolling(ctx, snmpFile, jchfChan, metrics, registry, apic, log, restartCount+1, cfg, confMgr, logchan) // Track how many times through here we've been. } -func runSnmpPolling(ctx context.Context, snmpFile string, jchfChan chan []*kt.JCHF, metrics *kt.SnmpMetricSet, registry go_metrics.Registry, apic *api.KentikApi, log logger.ContextL, restartCount int, cfg *ktranslate.SNMPInputConfig) error { +func runSnmpPolling(ctx context.Context, snmpFile string, jchfChan chan []*kt.JCHF, metrics *kt.SnmpMetricSet, registry go_metrics.Registry, apic *api.KentikApi, log logger.ContextL, restartCount int, cfg *ktranslate.SNMPInputConfig, logchan chan string) error { // Parse again to make sure nothing's changed. conf, connectTimeout, retries, err := initSnmp(ctx, snmpFile, log) if err != nil || conf == nil || conf.Global == nil { @@ -238,7 +238,7 @@ func runSnmpPolling(ctx context.Context, snmpFile string, jchfChan chan []*kt.JC return err } - err = launchSnmp(ctx, conf.Global, device, jchfChan, connectTimeout, retries, nm, profile, cl) + err = launchSnmp(ctx, conf.Global, device, jchfChan, connectTimeout, retries, nm, profile, cl, logchan) if err != nil { return err } @@ -261,7 +261,7 @@ func launchSnmpTrap(ctx context.Context, conf *kt.SnmpConfig, jchfChan chan []*k return nil } -func launchSnmp(ctx context.Context, conf *kt.SnmpGlobalConfig, device *kt.SnmpDeviceConfig, jchfChan chan []*kt.JCHF, connectTimeout time.Duration, retries int, metrics *kt.SnmpDeviceMetric, profile *mibs.Profile, log logger.ContextL) error { +func launchSnmp(ctx context.Context, conf *kt.SnmpGlobalConfig, device *kt.SnmpDeviceConfig, jchfChan chan []*kt.JCHF, connectTimeout time.Duration, retries int, metrics *kt.SnmpDeviceMetric, profile *mibs.Profile, log logger.ContextL, logchan chan string) error { // Sometimes this device is pinging only. In this case, start the ping loop and return. if device.PingOnly { return launchPingOnly(ctx, conf, device, jchfChan, connectTimeout, retries, metrics, profile, log) @@ -273,7 +273,7 @@ func launchSnmp(ctx context.Context, conf *kt.SnmpGlobalConfig, device *kt.SnmpD // Sometimes a device is only going to be running its extention. if device.Ext != nil && device.Ext.ExtOnly { - return launchExtOnly(ctx, conf, device, jchfChan, connectTimeout, retries, metrics, profile, log) + return launchExtOnly(ctx, conf, device, jchfChan, connectTimeout, retries, metrics, profile, log, logchan) } // We need two of these, to avoid concurrent access by the two pollers. @@ -293,7 +293,7 @@ func launchSnmp(ctx context.Context, conf *kt.SnmpGlobalConfig, device *kt.SnmpD } metadataPoller := metadata.NewPoller(metadataServer, conf, device, jchfChan, metrics, profile, log) - metricPoller := snmp_metrics.NewPoller(metricsServer, conf, device, jchfChan, metrics, profile, log) + metricPoller := snmp_metrics.NewPoller(metricsServer, conf, device, jchfChan, metrics, profile, log, logchan) // We've now done everything we can do synchronously -- return to the client initialization // code, and do everything else in the background @@ -469,8 +469,8 @@ func launchPingOnly(ctx context.Context, conf *kt.SnmpGlobalConfig, device *kt.S * Handle the case where we're only doing a extention loop of a device. */ -func launchExtOnly(ctx context.Context, conf *kt.SnmpGlobalConfig, device *kt.SnmpDeviceConfig, jchfChan chan []*kt.JCHF, connectTimeout time.Duration, retries int, metrics *kt.SnmpDeviceMetric, profile *mibs.Profile, log logger.ContextL) error { - metricPoller := snmp_metrics.NewPollerForExtention(conf, device, jchfChan, metrics, profile, log) +func launchExtOnly(ctx context.Context, conf *kt.SnmpGlobalConfig, device *kt.SnmpDeviceConfig, jchfChan chan []*kt.JCHF, connectTimeout time.Duration, retries int, metrics *kt.SnmpDeviceMetric, profile *mibs.Profile, log logger.ContextL, logchan chan string) error { + metricPoller := snmp_metrics.NewPollerForExtention(conf, device, jchfChan, metrics, profile, log, logchan) // We've now done everything we can do synchronously -- return to the client initialization // code, and do everything else in the background diff --git a/pkg/inputs/snmp/x/ext.go b/pkg/inputs/snmp/x/ext.go index 7577fea1..033fe610 100644 --- a/pkg/inputs/snmp/x/ext.go +++ b/pkg/inputs/snmp/x/ext.go @@ -16,7 +16,7 @@ type Extension interface { GetName() string } -func NewExtension(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, metrics *kt.SnmpDeviceMetric, log logger.ContextL) (Extension, error) { +func NewExtension(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, metrics *kt.SnmpDeviceMetric, log logger.ContextL, logchan chan string) (Extension, error) { if conf.Ext == nil { // No extensions set. return nil, nil } @@ -24,7 +24,7 @@ func NewExtension(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf *kt if conf.Ext.EAPIConfig != nil { return arista.NewEAPIClient(jchfChan, gconf, conf, metrics, log) } else if conf.Ext.MerakiConfig != nil { - return meraki.NewMerakiClient(jchfChan, gconf, conf, metrics, log) + return meraki.NewMerakiClient(jchfChan, gconf, conf, metrics, log, logchan) } return nil, nil diff --git a/pkg/inputs/snmp/x/meraki/meraki.go b/pkg/inputs/snmp/x/meraki/meraki.go index 3a839a95..8c70a7d5 100644 --- a/pkg/inputs/snmp/x/meraki/meraki.go +++ b/pkg/inputs/snmp/x/meraki/meraki.go @@ -35,6 +35,7 @@ type MerakiClient struct { timeout time.Duration cache *clientCache maxRetry int + logchan chan string } type orgDesc struct { @@ -69,7 +70,7 @@ var ( } ) -func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, metrics *kt.SnmpDeviceMetric, log logger.ContextL) (*MerakiClient, error) { +func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, metrics *kt.SnmpDeviceMetric, log logger.ContextL, logchan chan string) (*MerakiClient, error) { c := MerakiClient{ log: log, jchfChan: jchfChan, @@ -81,6 +82,7 @@ func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf timeout: 30 * time.Second, cache: newClientCache(log), maxRetry: conf.Ext.MerakiConfig.MaxAPIRetry, + logchan: logchan, } host := conf.Ext.MerakiConfig.Host @@ -261,11 +263,19 @@ func (c *MerakiClient) Run(ctx context.Context, dur time.Duration) { doNetworkClients := c.conf.Ext.MerakiConfig.MonitorNetworkClients doVpnStatus := c.conf.Ext.MerakiConfig.MonitorVpnStatus doNetworkAttr := c.conf.Ext.MerakiConfig.Prefs["show_network_attr"] - if !doUplinks && !doDeviceClients && !doDeviceStatus && !doOrgChanges && !doNetworkClients && !doVpnStatus && !doNetworkAttr { + doNetworkApplianceSecurityEvents := c.conf.Ext.MerakiConfig.Prefs["log_network_security_events"] + doNetworkEvents := c.conf.Ext.MerakiConfig.Prefs["log_network_events"] + if !doUplinks && !doDeviceClients && !doDeviceStatus && !doOrgChanges && !doNetworkClients && !doVpnStatus && !doNetworkAttr && !doNetworkApplianceSecurityEvents && !doNetworkEvents { doUplinks = true } - c.log.Infof("Running Every %v with uplinks=%v, device_clients=%v, device_status=%v, orgs=%v, networks=%v, vpn_status=%v, network_attr=%v", - dur, doUplinks, doDeviceClients, doDeviceStatus, doOrgChanges, doNetworkClients, doVpnStatus, doNetworkAttr) + c.log.Infof("Running Every %v with uplinks=%v, device_clients=%v, device_status=%v, orgs=%v, networks=%v, vpn_status=%v, network_attr=%v, network_security_events=%v, network_events=%v", + dur, doUplinks, doDeviceClients, doDeviceStatus, doOrgChanges, doNetworkClients, doVpnStatus, doNetworkAttr, doNetworkApplianceSecurityEvents, doNetworkEvents) + + // This gets all network events, I can't find a way to not start at the beginning of each week. + // Pulling it out here so its only on startup for now. Will run weekly, very slowly over the orgs and networks selected. + if doNetworkEvents { + go c.getNetworkEventsWrapper(ctx) + } for { select { @@ -328,6 +338,12 @@ func (c *MerakiClient) Run(ctx context.Context, dur time.Duration) { } } + if doNetworkApplianceSecurityEvents { + if err := c.getNetworkApplianceSecurityEvents(dur); err != nil { + c.log.Infof("Meraki cannot get network security events: %v", err) + } + } + case <-ctx.Done(): c.log.Infof("Meraki Poll Done") return @@ -335,6 +351,185 @@ func (c *MerakiClient) Run(ctx context.Context, dur time.Duration) { } } +func (c *MerakiClient) getNetworkApplianceSecurityEvents(dur time.Duration) error { + startTime := time.Now().Add(-1 * dur) + startTimeStr := fmt.Sprintf("%v", startTime.Unix()) + c.log.Infof("Starting network security events") + + var getNetworkEvents func(nextToken string, network networkDesc, timeouts int) error + getNetworkEvents = func(nextToken string, network networkDesc, timeouts int) error { + params := appliance.NewGetNetworkApplianceSecurityEventsParamsWithTimeout(c.timeout) + params.SetNetworkID(network.ID) + params.SetT0(&startTimeStr) + if nextToken != "" { + params.SetStartingAfter(&nextToken) + } + + prod, err := c.client.Appliance.GetNetworkApplianceSecurityEvents(params, c.auth) + if err != nil { + if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry { + sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second + c.log.Warnf("Network Security Events: %s 429, sleeping %v", network.Name, sleepDur) + time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec. + timeouts++ + return getNetworkEvents(nextToken, network, timeouts) + } + return err + } + + results := prod.GetPayload() + for _, result := range results { + if emap, ok := result.(map[string]interface{}); ok { + emap["network"] = network.Name + emap["orgName"] = network.org.Name + emap["orgId"] = network.org.ID + emap["eventType"] = kt.KENTIK_EVENT_EXT + b, err := json.Marshal(emap) + if err != nil { + return err + } + c.logchan <- string(b) + } + } + + nextLink := getNextLink(prod.Link) + if nextLink != "" { + return getNetworkEvents(nextLink, network, timeouts) + } else { + return nil + } + } + + for _, org := range c.orgs { + for _, network := range org.networks { + err := getNetworkEvents("", network, 0) + if err != nil { + if strings.Contains(err.Error(), "(status 400)") { // There are no valid logs to worry about here. + continue + } + return err + } + } + } + + c.log.Infof("Done with network security events") + return nil +} + +func (c *MerakiClient) getNetworkEventsWrapper(ctx context.Context) { + c.log.Infof("Network Events Check Starting") + logCheck := time.NewTicker(1 * time.Hour) + + nextPageEndAt := "" // Start from the very beginning. Looks like 1 week ago. + + // Check once on startup. + err, np := c.getNetworkEvents(nextPageEndAt) + if err != nil { + c.log.Errorf("Cannot get network events: %v", err) + } else { + nextPageEndAt = np + } + + for { + select { + case _ = <-logCheck.C: + err, np := c.getNetworkEvents(nextPageEndAt) + if err != nil { + c.log.Errorf("Cannot get network events: %v", err) + } else { + nextPageEndAt = np + } + + case <-ctx.Done(): + c.log.Infof("Network Events Check Done") + logCheck.Stop() + return + } + } +} + +type eventWrapper struct { + networks.GetNetworkEventsOKBodyEventsItems0 + Network string `json:"network"` + OrgName string `json:"orgName"` + OrgId string `json:"orgId"` + EventType string `json:"eventType"` +} + +func (c *MerakiClient) getNetworkEvents(lastPageEndAt string) (error, string) { + var getNetworkEvents func(nextToken string, network networkDesc, prodType string, timeouts int) (error, string) + getNetworkEvents = func(nextToken string, network networkDesc, prodType string, timeouts int) (error, string) { + params := networks.NewGetNetworkEventsParamsWithTimeout(c.timeout) + params.SetNetworkID(network.ID) + if prodType != "" { + params.SetProductType(&prodType) + } + if nextToken != "" { + params.SetStartingAfter(&nextToken) + } + + prod, err := c.client.Networks.GetNetworkEvents(params, c.auth) + if err != nil { + if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry { + sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second + c.log.Warnf("Network Events: %s 429, sleeping %v", network.Name, sleepDur) + time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec. + timeouts++ + return getNetworkEvents(nextToken, network, prodType, timeouts) + } + if strings.Contains(err.Error(), "(status 400)") { // There are no valid logs to worry about here. + return nil, nextToken + } + return err, nextToken + } + + results := prod.GetPayload() + for _, event := range results.Events { + ew := eventWrapper{*event, network.Name, network.org.Name, network.org.ID, kt.KENTIK_EVENT_EXT} + b, err := json.Marshal(ew) + if err != nil { + return err, nextToken + } + c.logchan <- string(b) + } + + nextLink := getNextLink(prod.Link) + if nextLink != "" { + return getNetworkEvents(nextLink, network, prodType, timeouts) + } else { + return nil, results.PageEndAt + } + } + + nextPageEndAt := lastPageEndAt + c.log.Infof("Starting network events download from %s.", lastPageEndAt) + for _, org := range c.orgs { + for _, network := range org.networks { + if len(c.conf.Ext.MerakiConfig.ProductTypes) > 0 { + for _, pt := range c.conf.Ext.MerakiConfig.ProductTypes { + err, lp := getNetworkEvents(lastPageEndAt, network, pt, 0) + if err != nil { + return err, nextPageEndAt + } + nextPageEndAt = lp + } + } else { + err, lp := getNetworkEvents(lastPageEndAt, network, "", 0) + if err != nil { + return err, nextPageEndAt + } + nextPageEndAt = lp + } + + // after each network, sleep for a while so we don't hit rate limiting. + time.Sleep(30 * time.Second) + } + } + + c.log.Infof("Done with network events download to %s", nextPageEndAt) + return nil, nextPageEndAt +} + type orgLog struct { TimeStamp time.Time `json:"ts"` AdminName string `json:"adminName"`