ntp: add filter option

Add an option to use the median filter to reduce noise in measurements
before they are accumulated to sourcestats, similarly to reference
clocks. The option specifies how many samples are reduced to a single
sample.

The filter is intended to be used with very short polling intervals in
local networks where it is acceptable to generate a lot of NTP traffic.
This commit is contained in:
Miroslav Lichvar 2018-08-06 18:12:12 +02:00
parent 99e3045df4
commit 189aafde9d
5 changed files with 81 additions and 24 deletions

View file

@ -800,6 +800,7 @@ handle_add_source(NTP_Source_Type type, CMD_Request *rx_message, CMD_Reply *tx_m
params.max_sources = ntohl(rx_message->data.ntp_source.max_sources); params.max_sources = ntohl(rx_message->data.ntp_source.max_sources);
params.min_samples = ntohl(rx_message->data.ntp_source.min_samples); params.min_samples = ntohl(rx_message->data.ntp_source.min_samples);
params.max_samples = ntohl(rx_message->data.ntp_source.max_samples); params.max_samples = ntohl(rx_message->data.ntp_source.max_samples);
params.filter_length = 0;
params.authkey = ntohl(rx_message->data.ntp_source.authkey); params.authkey = ntohl(rx_message->data.ntp_source.authkey);
params.max_delay = UTI_FloatNetworkToHost(rx_message->data.ntp_source.max_delay); params.max_delay = UTI_FloatNetworkToHost(rx_message->data.ntp_source.max_delay);
params.max_delay_ratio = params.max_delay_ratio =

View file

@ -59,6 +59,7 @@ CPS_ParseNTPSourceAdd(char *line, CPS_NTP_Source *src)
src->params.max_sources = SRC_DEFAULT_MAXSOURCES; src->params.max_sources = SRC_DEFAULT_MAXSOURCES;
src->params.min_samples = SRC_DEFAULT_MINSAMPLES; src->params.min_samples = SRC_DEFAULT_MINSAMPLES;
src->params.max_samples = SRC_DEFAULT_MAXSAMPLES; src->params.max_samples = SRC_DEFAULT_MAXSAMPLES;
src->params.filter_length = 0;
src->params.interleaved = 0; src->params.interleaved = 0;
src->params.sel_options = 0; src->params.sel_options = 0;
src->params.authkey = INACTIVE_AUTHKEY; src->params.authkey = INACTIVE_AUTHKEY;
@ -106,6 +107,9 @@ CPS_ParseNTPSourceAdd(char *line, CPS_NTP_Source *src)
} else if (!strcasecmp(cmd, "asymmetry")) { } else if (!strcasecmp(cmd, "asymmetry")) {
if (sscanf(line, "%lf%n", &src->params.asymmetry, &n) != 1) if (sscanf(line, "%lf%n", &src->params.asymmetry, &n) != 1)
return 0; return 0;
} else if (!strcasecmp(cmd, "filter")) {
if (sscanf(line, "%d%n", &src->params.filter_length, &n) != 1)
return 0;
} else if (!strcasecmp(cmd, "maxdelay")) { } else if (!strcasecmp(cmd, "maxdelay")) {
if (sscanf(line, "%lf%n", &src->params.max_delay, &n) != 1) if (sscanf(line, "%lf%n", &src->params.max_delay, &n) != 1)
return 0; return 0;

View file

@ -161,6 +161,11 @@ Set the minimum number of samples kept for this source. This overrides the
*maxsamples* _samples_::: *maxsamples* _samples_:::
Set the maximum number of samples kept for this source. This overrides the Set the maximum number of samples kept for this source. This overrides the
<<maxsamples,*maxsamples*>> directive. <<maxsamples,*maxsamples*>> directive.
*filter* _samples_:::
This option enables a median filter to reduce noise in NTP measurements. The
filter will reduce the specified number of samples to a single sample. It is
intended to be used with very short polling intervals in local networks where
it is acceptable to generate a lot of NTP traffic.
*offline*::: *offline*:::
If the server will not be reachable when *chronyd* is started, the *offline* If the server will not be reachable when *chronyd* is started, the *offline*
option can be specified. *chronyd* will not try to poll the server until it is option can be specified. *chronyd* will not try to poll the server until it is

View file

@ -37,6 +37,7 @@
#include "sched.h" #include "sched.h"
#include "reference.h" #include "reference.h"
#include "local.h" #include "local.h"
#include "samplefilt.h"
#include "smooth.h" #include "smooth.h"
#include "sources.h" #include "sources.h"
#include "util.h" #include "util.h"
@ -195,6 +196,9 @@ struct NCR_Instance_Record {
SRC_Instance source; SRC_Instance source;
/* Optional median filter for NTP measurements */
SPF_Instance filter;
int burst_good_samples_to_go; int burst_good_samples_to_go;
int burst_total_samples_to_go; int burst_total_samples_to_go;
@ -603,6 +607,12 @@ NCR_GetInstance(NTP_Remote_Address *remote_addr, NTP_Source_Type type, SourcePar
params->min_samples, params->max_samples, params->min_samples, params->max_samples,
params->min_delay, params->asymmetry); params->min_delay, params->asymmetry);
if (params->filter_length >= 1)
result->filter = SPF_CreateInstance(params->filter_length, params->filter_length,
NTP_MAX_DISPERSION, 0.0);
else
result->filter = NULL;
result->rx_timeout_id = 0; result->rx_timeout_id = 0;
result->tx_timeout_id = 0; result->tx_timeout_id = 0;
result->tx_suspended = 1; result->tx_suspended = 1;
@ -637,6 +647,9 @@ NCR_DestroyInstance(NCR_Instance instance)
if (instance->mode == MODE_ACTIVE) if (instance->mode == MODE_ACTIVE)
NIO_CloseServerSocket(instance->local_addr.sock_fd); NIO_CloseServerSocket(instance->local_addr.sock_fd);
if (instance->filter)
SPF_DestroyInstance(instance->filter);
/* This will destroy the source instance inside the /* This will destroy the source instance inside the
structure, which will cause reselection if this was the structure, which will cause reselection if this was the
synchronising source etc. */ synchronising source etc. */
@ -682,6 +695,9 @@ NCR_ResetInstance(NCR_Instance instance)
instance->updated_init_timestamps = 0; instance->updated_init_timestamps = 0;
UTI_ZeroNtp64(&instance->init_remote_ntp_tx); UTI_ZeroNtp64(&instance->init_remote_ntp_tx);
zero_local_timestamp(&instance->init_local_rx); zero_local_timestamp(&instance->init_local_rx);
if (instance->filter)
SPF_DropSamples(instance->filter);
} }
/* ================================================== */ /* ================================================== */
@ -765,6 +781,9 @@ get_poll_adj(NCR_Instance inst, double error_in_estimate, double peer_distance)
int samples; int samples;
if (error_in_estimate > peer_distance) { if (error_in_estimate > peer_distance) {
/* If the prediction is not even within +/- the peer distance of the peer,
we are clearly not tracking the peer at all well, so we back off the
sampling rate depending on just how bad the situation is */
poll_adj = -log(error_in_estimate / peer_distance) / log(2.0); poll_adj = -log(error_in_estimate / peer_distance) / log(2.0);
} else { } else {
samples = SST_Samples(SRC_GetSourcestats(inst->source)); samples = SST_Samples(SRC_GetSourcestats(inst->source));
@ -1451,6 +1470,52 @@ check_delay_dev_ratio(NCR_Instance inst, SST_Stats stats,
/* ================================================== */ /* ================================================== */
static void
process_sample(NCR_Instance inst, NTP_Sample *sample)
{
double estimated_offset, error_in_estimate, filtered_sample_ago;
NTP_Sample filtered_sample;
int filtered_samples;
/* Accumulate the sample to the median filter if it is enabled. When the
filter produces a result, check if it is not too old, i.e. the filter did
not miss too many samples due to missing responses or failing tests. */
if (inst->filter) {
SPF_AccumulateSample(inst->filter, sample);
filtered_samples = SPF_GetNumberOfSamples(inst->filter);
if (!SPF_GetFilteredSample(inst->filter, &filtered_sample))
return;
filtered_sample_ago = UTI_DiffTimespecsToDouble(&sample->time, &filtered_sample.time);
if (filtered_sample_ago > SOURCE_REACH_BITS / 2 * filtered_samples *
UTI_Log2ToDouble(inst->local_poll)) {
DEBUG_LOG("filtered sample dropped ago=%f poll=%d", filtered_sample_ago,
inst->local_poll);
return;
}
sample = &filtered_sample;
}
/* Get the estimated offset predicted from previous samples. The
convention here is that positive means local clock FAST of
reference, i.e. backwards to the way that 'offset' is defined. */
estimated_offset = SST_PredictOffset(SRC_GetSourcestats(inst->source), &sample->time);
error_in_estimate = fabs(-sample->offset - estimated_offset);
SRC_AccumulateSample(inst->source, sample);
SRC_SelectSource(inst->source);
adjust_poll(inst, get_poll_adj(inst, error_in_estimate,
sample->peer_dispersion + 0.5 * sample->peer_delay));
}
/* ================================================== */
static int static int
receive_packet(NCR_Instance inst, NTP_Local_Address *local_addr, receive_packet(NCR_Instance inst, NTP_Local_Address *local_addr,
NTP_Local_Timestamp *rx_ts, NTP_Packet *message, int length) NTP_Local_Timestamp *rx_ts, NTP_Packet *message, int length)
@ -1478,15 +1543,6 @@ receive_packet(NCR_Instance inst, NTP_Local_Address *local_addr,
/* Kiss-o'-Death codes */ /* Kiss-o'-Death codes */
int kod_rate; int kod_rate;
/* The estimated offset predicted from previous samples. The
convention here is that positive means local clock FAST of
reference, i.e. backwards to the way that 'offset' is defined. */
double estimated_offset;
/* The absolute difference between the offset estimate and
measurement in seconds */
double error_in_estimate;
NTP_Local_Timestamp local_receive, local_transmit; NTP_Local_Timestamp local_receive, local_transmit;
double remote_interval, local_interval, response_time; double remote_interval, local_interval, response_time;
double delay_time, precision; double delay_time, precision;
@ -1778,21 +1834,8 @@ receive_packet(NCR_Instance inst, NTP_Local_Address *local_addr,
SRC_UpdateReachability(inst->source, synced_packet); SRC_UpdateReachability(inst->source, synced_packet);
if (good_packet) { if (good_packet) {
/* Do this before we accumulate a new sample into the stats registers, obviously */ /* Adjust the polling interval, accumulate the sample, etc. */
estimated_offset = SST_PredictOffset(stats, &sample.time); process_sample(inst, &sample);
SRC_AccumulateSample(inst->source, &sample);
SRC_SelectSource(inst->source);
/* Now examine the registers. First though, if the prediction is
not even within +/- the peer distance of the peer, we are clearly
not tracking the peer at all well, so we back off the sampling
rate depending on just how bad the situation is. */
error_in_estimate = fabs(-sample.offset - estimated_offset);
/* Now update the polling interval */
adjust_poll(inst, get_poll_adj(inst, error_in_estimate,
sample.peer_dispersion + 0.5 * sample.peer_delay));
/* If we're in burst mode, check whether the burst is completed and /* If we're in burst mode, check whether the burst is completed and
revert to the previous mode */ revert to the previous mode */
@ -2265,6 +2308,9 @@ NCR_SlewTimes(NCR_Instance inst, struct timespec *when, double dfreq, double dof
if (!UTI_IsZeroTimespec(&inst->init_local_rx.ts)) if (!UTI_IsZeroTimespec(&inst->init_local_rx.ts))
UTI_AdjustTimespec(&inst->init_local_rx.ts, when, &inst->init_local_rx.ts, &delta, dfreq, UTI_AdjustTimespec(&inst->init_local_rx.ts, when, &inst->init_local_rx.ts, &delta, dfreq,
doffset); doffset);
if (inst->filter)
SPF_SlewSamples(inst->filter, when, dfreq, doffset);
} }
/* ================================================== */ /* ================================================== */

View file

@ -49,6 +49,7 @@ typedef struct {
int max_sources; int max_sources;
int min_samples; int min_samples;
int max_samples; int max_samples;
int filter_length;
int interleaved; int interleaved;
int sel_options; int sel_options;
uint32_t authkey; uint32_t authkey;