ntp: save response when waiting for HW TX timestamp

Rework handling of late HW TX timestamps. Instead of suspending reading
from client-only sockets that have HW TX timestamping enabled, save the
whole response if it is valid and a HW TX timestamp was received for the
source before. When the timestamp is received, or the configurable
timeout is reached, process the saved response again, but skip the
authentication test as the NTS code allows only one response per
request. Only one valid response per source can be saved. If a second
valid response is received while waiting for the timestamp, process both
responses immediately in the order they were received.

The main advantage of this approach is that it works on all sockets, i.e.
even in the symmetric mode and with NTP-over-PTP, and the kernel does
not need to buffer invalid responses.
This commit is contained in:
Miroslav Lichvar 2023-03-28 15:33:50 +02:00
parent 4a11399c2e
commit 0189dac7d8
6 changed files with 191 additions and 161 deletions

View file

@ -2618,16 +2618,13 @@ If hardware timestamping is used with a close NTP server, or the NIC or its
driver is slow in providing the transmit timestamp of NTP requests, a response
from the server can be received before the transmit timestamp of the request.
To avoid calculating the offset with a less accurate transmit timestamp,
*chronyd* suspends reading of NTP packets from the socket until the hardware
transmit timestamp is provided. There is no guarantee that the timestamp will
actually be provided (NICs typically have a limited rate of transmit
timestamping). This directive configures how long should *chronyd* wait
for the timestamp before resuming reading from the socket.
+
The suspension is activated only on sockets that are not expected to receive
requests, i.e. it does not work with the *peer* directive and also with the
*server* and *pool* directives if the ports specified by the *port* and
*acquisitionport* directives are equal.
*chronyd* can save the response for later processing and wait for the hardware
transmit timestamp. There is no guarantee that the timestamp will be provided
(NICs typically have a limited rate of transmit timestamping). This directive
configures how long should *chronyd* wait for the timestamp after receiving a
valid response from the server. If a second valid response is received from the
server while waiting for the timestamp, they will be both processed
immediately.
+
The default value is 0.001 seconds, which should be sufficient with most
hardware. If you frequently see kernel transmit timestamps in the
@ -2635,8 +2632,7 @@ _measurements.log_ file or <<chronyc.adoc#ntpdata,*ntpdata*>> report, and it is
not a server handling a high rate of requests in the interleaved mode on the
same interface (which would compete with timestamping of the server's own
requests), increasing the timeout to 0.01 or possibly even longer might help.
Note that setting a timeout longer than the NTP polling interval causes the
responses to be ignored when the timestamp is missing.
Note that the maximum timeout is limited by the NTP polling interval.
[[keyfile]]*keyfile* _file_::
This directive is used to specify the location of the file containing symmetric

View file

@ -64,6 +64,17 @@ typedef enum {
MD_BURST_WAS_ONLINE, /* Burst sampling, return to online afterwards */
} OperatingMode;
/* Structure holding a response and other data waiting to be processed when
a late HW transmit timestamp of the request is available, or a timeout is
reached */
struct SavedResponse {
NTP_Local_Address local_addr;
NTP_Local_Timestamp rx_ts;
NTP_Packet message;
NTP_PacketInfo info;
SCH_TimeoutID timeout_id;
};
/* ================================================== */
/* Structure used for holding a single peer/server's
protocol machine */
@ -204,6 +215,12 @@ struct NCR_Instance_Record {
SPF_Instance filter;
int filter_count;
/* Flag indicating HW transmit timestamps are expected */
int had_hw_tx_timestamp;
/* Response waiting for a HW transmit timestamp of the request */
struct SavedResponse *saved_response;
int burst_good_samples_to_go;
int burst_total_samples_to_go;
@ -328,6 +345,11 @@ static double get_transmit_delay(NCR_Instance inst, int on_tx);
static double get_separation(int poll);
static int parse_packet(NTP_Packet *packet, int length, NTP_PacketInfo *info);
static void process_sample(NCR_Instance inst, NTP_Sample *sample);
static int has_saved_response(NCR_Instance inst);
static void process_saved_response(NCR_Instance inst);
static int process_response(NCR_Instance inst, int saved, NTP_Local_Address *local_addr,
NTP_Local_Timestamp *rx_ts, NTP_Packet *message,
NTP_PacketInfo *info);
static void set_connectivity(NCR_Instance inst, SRC_Connectivity connectivity);
/* ================================================== */
@ -526,6 +548,11 @@ close_client_socket(NCR_Instance inst)
SCH_RemoveTimeout(inst->rx_timeout_id);
inst->rx_timeout_id = 0;
if (has_saved_response(inst)) {
SCH_RemoveTimeout(inst->saved_response->timeout_id);
inst->saved_response->timeout_id = 0;
}
}
/* ================================================== */
@ -668,6 +695,8 @@ NCR_CreateInstance(NTP_Remote_Address *remote_addr, NTP_Source_Type type,
else
result->filter = NULL;
result->saved_response = NULL;
result->rx_timeout_id = 0;
result->tx_timeout_id = 0;
result->tx_suspended = 1;
@ -704,6 +733,9 @@ NCR_DestroyInstance(NCR_Instance instance)
if (instance->filter)
SPF_DestroyInstance(instance->filter);
if (instance->saved_response)
Free(instance->saved_response);
NAU_DestroyInstance(instance->auth);
/* This will destroy the source instance inside the
@ -762,6 +794,8 @@ NCR_ResetInstance(NCR_Instance instance)
if (instance->filter)
SPF_DropSamples(instance->filter);
instance->filter_count = 0;
instance->had_hw_tx_timestamp = 0;
}
/* ================================================== */
@ -1282,6 +1316,15 @@ transmit_timeout(void *arg)
inst->tx_timeout_id = 0;
if (has_saved_response(inst)) {
process_saved_response(inst);
/* Wait for the new transmission timeout (if the response was still
valid and it did not cause switch to offline) */
if (inst->tx_timeout_id != 0)
return;
}
switch (inst->opmode) {
case MD_BURST_WAS_ONLINE:
/* With online burst switch to online before last packet */
@ -1732,7 +1775,69 @@ process_sample(NCR_Instance inst, NTP_Sample *sample)
/* ================================================== */
static int
process_response(NCR_Instance inst, NTP_Local_Address *local_addr,
has_saved_response(NCR_Instance inst)
{
return inst->saved_response && inst->saved_response->timeout_id > 0;
}
/* ================================================== */
static void
process_saved_response(NCR_Instance inst)
{
SCH_RemoveTimeout(inst->saved_response->timeout_id);
inst->saved_response->timeout_id = 0;
DEBUG_LOG("Processing saved response from %s", UTI_IPToString(&inst->remote_addr.ip_addr));
process_response(inst, 1, &inst->saved_response->local_addr, &inst->saved_response->rx_ts,
&inst->saved_response->message, &inst->saved_response->info);
}
/* ================================================== */
static void
saved_response_timeout(void *arg)
{
NCR_Instance inst = arg;
inst->saved_response->timeout_id = 0;
process_saved_response(inst);
}
/* ================================================== */
static int
save_response(NCR_Instance inst, NTP_Local_Address *local_addr,
NTP_Local_Timestamp *rx_ts, NTP_Packet *message, NTP_PacketInfo *info)
{
double timeout = CNF_GetHwTsTimeout();
if (timeout <= 0.0)
return 0;
/* If another message is already saved, process both immediately */
if (has_saved_response(inst)) {
process_saved_response(inst);
return 0;
}
if (!inst->saved_response)
inst->saved_response = MallocNew(struct SavedResponse);
inst->saved_response->local_addr = *local_addr;
inst->saved_response->rx_ts = *rx_ts;
inst->saved_response->message = *message;
inst->saved_response->info = *info;
inst->saved_response->timeout_id = SCH_AddTimeoutByDelay(timeout, saved_response_timeout,
inst);
DEBUG_LOG("Saved valid response for later processing");
return 1;
}
/* ================================================== */
static int
process_response(NCR_Instance inst, int saved, NTP_Local_Address *local_addr,
NTP_Local_Timestamp *rx_ts, NTP_Packet *message, NTP_PacketInfo *info)
{
NTP_Sample sample;
@ -1826,8 +1931,10 @@ process_response(NCR_Instance inst, NTP_Local_Address *local_addr,
/* Test 4 would check for denied access. It would always pass as this
function is called only for known sources. */
/* Test 5 checks for authentication failure */
test5 = NAU_CheckResponseAuth(inst->auth, message, info);
/* Test 5 checks for authentication failure. If it is a saved message,
which had to pass all these tests before, avoid authenticating it for
the second time (that is not allowed in the NTS code). */
test5 = saved || NAU_CheckResponseAuth(inst->auth, message, info);
/* Test 6 checks for unsynchronised server */
test6 = pkt_leap != LEAP_Unsynchronised &&
@ -1843,6 +1950,20 @@ process_response(NCR_Instance inst, NTP_Local_Address *local_addr,
valid_packet = test1 && test2 && test3 && test5;
synced_packet = valid_packet && test6 && test7;
/* If the server is very close and/or the NIC hardware/driver is slow, it
is possible that a response from the server is received before the HW
transmit timestamp of the request. To avoid getting a less accurate
offset or failing one of the later tests, save the response and wait for
the transmit timestamp or timeout. Allow this only for the first valid
response to the request, when at least one good response has already been
accepted to avoid incorrectly confirming a tentative source. */
if (valid_packet && synced_packet && !saved && !inst->valid_rx &&
inst->had_hw_tx_timestamp && inst->local_tx.source != NTP_TS_HARDWARE &&
inst->report.total_good_count > 0) {
if (save_response(inst, local_addr, rx_ts, message, info))
return 1;
}
/* Check for Kiss-o'-Death codes */
kod_rate = 0;
if (test1 && test2 && test5 && pkt_leap == LEAP_Unsynchronised &&
@ -2188,6 +2309,7 @@ process_response(NCR_Instance inst, NTP_Local_Address *local_addr,
}
/* Get rid of old timeout and start a new one */
if (!saved)
assert(inst->tx_timeout_id);
restart_timeout(inst, delay_time);
}
@ -2377,7 +2499,7 @@ NCR_ProcessRxKnown(NCR_Instance inst, NTP_Local_Address *local_addr,
return 0;
}
return process_response(inst, local_addr, rx_ts, message, &info);
return process_response(inst, 0, local_addr, rx_ts, message, &info);
} else if (proc_as_unknown) {
NCR_ProcessRxUnknown(&inst->remote_addr, local_addr, rx_ts, message, length);
/* It's not a reply to our request, don't return success */
@ -2560,6 +2682,13 @@ NCR_ProcessTxKnown(NCR_Instance inst, NTP_Local_Address *local_addr,
update_tx_timestamp(&inst->local_tx, tx_ts, &inst->local_ntp_rx, &inst->local_ntp_tx,
message);
if (tx_ts->source == NTP_TS_HARDWARE) {
inst->had_hw_tx_timestamp = 1;
if (has_saved_response(inst))
process_saved_response(inst);
}
}
/* ================================================== */
@ -2616,6 +2745,10 @@ NCR_SlewTimes(NCR_Instance inst, struct timespec *when, double dfreq, double dof
if (inst->filter)
SPF_SlewSamples(inst->filter, when, dfreq, doffset);
if (has_saved_response(inst))
UTI_AdjustTimespec(&inst->saved_response->rx_ts.ts, when, &inst->saved_response->rx_ts.ts,
&delta, dfreq, doffset);
}
/* ================================================== */

View file

@ -169,9 +169,6 @@ close_socket(int sock_fd)
if (sock_fd == INVALID_SOCK_FD)
return;
#ifdef HAVE_LINUX_TIMESTAMPING
NIO_Linux_NotifySocketClosing(sock_fd);
#endif
SCH_RemoveFileHandler(sock_fd);
SCK_CloseSocket(sock_fd);
}

View file

@ -88,21 +88,6 @@ static int ts_tx_flags;
/* Flag indicating the socket options can't be changed in control messages */
static int permanent_ts_options;
/* When sending client requests to a close and fast server, it is possible that
a response will be received before the HW transmit timestamp of the request
itself. To avoid processing of the response without the HW timestamp, we
suspend reading of packets from the receive queue until a HW transmit
timestamp is received from the error queue or a timeout reached. */
struct HwTsSocket {
int sock_fd;
int suspended;
SCH_TimeoutID timeout_id;
};
/* Array of (HwTsSocket *) indexed by the file descriptor */
static ARR_Instance hw_ts_socks;
/* Unbound socket keeping the kernel RX timestamping permanently enabled
in order to avoid a race condition between receiving a server response
and the kernel actually starting to timestamp received packets after
@ -428,7 +413,6 @@ NIO_Linux_Initialise(void)
/* Kernels before 4.7 ignore timestamping flags set in control messages */
permanent_ts_options = !SYS_Linux_CheckKernelVersion(4, 7);
hw_ts_socks = ARR_CreateInstance(sizeof (struct HwTsSocket *));
dummy_rxts_socket = INVALID_SOCK_FD;
}
@ -440,10 +424,6 @@ NIO_Linux_Finalise(void)
struct Interface *iface;
unsigned int i;
for (i = 0; i < ARR_GetSize(hw_ts_socks); i++)
Free(*(struct HwTsSocket **)ARR_GetElement(hw_ts_socks, i));
ARR_DestroyInstance(hw_ts_socks);
if (dummy_rxts_socket != INVALID_SOCK_FD)
SCK_CloseSocket(dummy_rxts_socket);
@ -492,89 +472,6 @@ NIO_Linux_SetTimestampSocketOptions(int sock_fd, int client_only, int *events)
/* ================================================== */
static struct HwTsSocket *
get_hw_ts_socket(int sock_fd, int new)
{
struct HwTsSocket *s, **sp;
if (sock_fd < 0)
return NULL;
while (sock_fd >= ARR_GetSize(hw_ts_socks)) {
if (!new)
return NULL;
s = NULL;
ARR_AppendElement(hw_ts_socks, &s);
}
sp = ARR_GetElement(hw_ts_socks, sock_fd);
if (!*sp && new) {
*sp = s = MallocNew(struct HwTsSocket);
s->sock_fd = sock_fd;
s->suspended = 0;
s->timeout_id = 0;
}
return *sp;
}
/* ================================================== */
static void
resume_socket(int sock_fd)
{
struct HwTsSocket *ts_sock = get_hw_ts_socket(sock_fd, 0);
if (!ts_sock)
return;
if (ts_sock->suspended) {
SCH_SetFileHandlerEvent(ts_sock->sock_fd, SCH_FILE_INPUT, 1);
DEBUG_LOG("Resumed RX processing %s timeout fd=%d",
ts_sock->timeout_id ? "before" : "on", ts_sock->sock_fd);
}
ts_sock->suspended = 0;
SCH_RemoveTimeout(ts_sock->timeout_id);
ts_sock->timeout_id = 0;
}
/* ================================================== */
static void
resume_timeout(void *arg)
{
struct HwTsSocket *ts_sock = arg;
ts_sock->timeout_id = 0;
resume_socket(ts_sock->sock_fd);
}
/* ================================================== */
static void
suspend_socket(int sock_fd)
{
struct HwTsSocket *ts_sock = get_hw_ts_socket(sock_fd, 1);
double timeout = CNF_GetHwTsTimeout();
if (!ts_sock || timeout <= 0.0)
return;
/* Remove previous timeout if there is one */
SCH_RemoveTimeout(ts_sock->timeout_id);
ts_sock->suspended = 1;
ts_sock->timeout_id = SCH_AddTimeoutByDelay(timeout, resume_timeout, ts_sock);
SCH_SetFileHandlerEvent(ts_sock->sock_fd, SCH_FILE_INPUT, 0);
DEBUG_LOG("Suspended RX processing fd=%d", ts_sock->sock_fd);
}
/* ================================================== */
static struct Interface *
get_interface(int if_index)
{
@ -835,11 +732,6 @@ NIO_Linux_ProcessMessage(SCK_Message *message, NTP_Local_Address *local_addr,
} else {
DEBUG_LOG("HW clock not found for interface %d", ts_if_index);
}
/* If a HW transmit timestamp was received, resume processing
of non-error messages on this socket */
if (is_tx)
resume_socket(local_addr->sock_fd);
}
if (local_ts->source == NTP_TS_DAEMON && !UTI_IsZeroTimespec(&message->timestamp.kernel) &&
@ -902,23 +794,9 @@ NIO_Linux_RequestTxTimestamp(SCK_Message *message, int sock_fd)
if (!ts_flags)
return;
/* If a HW transmit timestamp is requested on a client-only socket,
suspend reading from it to avoid processing a response before the
HW timestamp of the request is received */
if (ts_tx_flags & SOF_TIMESTAMPING_TX_HARDWARE && !NIO_IsServerSocket(sock_fd))
suspend_socket(sock_fd);
/* Check if TX timestamping is disabled on this socket */
if (permanent_ts_options || !NIO_IsServerSocket(sock_fd))
return;
message->timestamp.tx_flags = ts_tx_flags;
}
/* ================================================== */
void
NIO_Linux_NotifySocketClosing(int sock_fd)
{
resume_socket(sock_fd);
}

View file

@ -40,6 +40,4 @@ extern int NIO_Linux_ProcessMessage(SCK_Message *message, NTP_Local_Address *loc
extern void NIO_Linux_RequestTxTimestamp(SCK_Message *message, int sock_fd);
extern void NIO_Linux_NotifySocketClosing(int sock_fd);
#endif

View file

@ -80,7 +80,7 @@ get_random_key_id(void)
}
static void
send_request(NCR_Instance inst)
send_request(NCR_Instance inst, int late_hwts)
{
NTP_Local_Address local_addr;
NTP_Local_Timestamp local_ts;
@ -104,6 +104,16 @@ send_request(NCR_Instance inst)
NCR_ProcessTxKnown(inst, &local_addr, &local_ts, &req_buffer, req_length);
}
if (late_hwts) {
inst->had_hw_tx_timestamp = 1;
inst->report.total_good_count++;
} else {
if (random() % 2)
inst->had_hw_tx_timestamp = 0;
else
inst->report.total_good_count = 0;
}
}
static void
@ -267,7 +277,8 @@ send_response(int interleaved, int authenticated, int allow_update, int valid_ts
}
static void
proc_response(NCR_Instance inst, int good, int valid, int updated_sync, int updated_init)
proc_response(NCR_Instance inst, int good, int valid, int updated_sync,
int updated_init, int save)
{
NTP_Local_Address local_addr;
NTP_Local_Timestamp local_ts;
@ -292,6 +303,19 @@ proc_response(NCR_Instance inst, int good, int valid, int updated_sync, int upda
ret = NCR_ProcessRxKnown(inst, &local_addr, &local_ts, res, res_length);
if (save) {
TEST_CHECK(ret);
TEST_CHECK(inst->saved_response);
TEST_CHECK(inst->saved_response->timeout_id != 0);
TEST_CHECK(has_saved_response(inst));
if (random() % 2)
saved_response_timeout(inst);
else
transmit_timeout(inst);
TEST_CHECK(inst->saved_response->timeout_id == 0);
TEST_CHECK(!has_saved_response(inst));
}
if (good > 0)
TEST_CHECK(ret);
else if (!good)
@ -327,7 +351,7 @@ process_replay(NCR_Instance inst, NTP_Packet *packet_queue,
do {
res_buffer = packet_queue[random() % queue_length];
} while (!UTI_CompareNtp64(&res_buffer.transmit_ts, &inst->remote_ntp_tx));
proc_response(inst, 0, 0, 0, updated_init);
proc_response(inst, 0, 0, 0, updated_init, 0);
advance_time(1e-6);
}
@ -392,7 +416,7 @@ test_unit(void)
"local",
"keyfile ntp_core.keys"
};
int i, j, k, interleaved, authenticated, valid, updated, has_updated;
int i, j, k, interleaved, authenticated, valid, updated, has_updated, late_hwts;
CPS_NTP_Source source;
NTP_Remote_Address remote_addr;
NCR_Instance inst1, inst2;
@ -439,6 +463,8 @@ test_unit(void)
for (j = 0; j < 50; j++) {
DEBUG_LOG("client/peer test iteration %d/%d", i, j);
late_hwts = random() % 2;
authenticated = random() % 2;
interleaved = random() % 2 && (inst1->mode != MODE_CLIENT ||
inst1->tx_count < MAX_CLIENT_INTERLEAVED_TX);
authenticated = random() % 2;
@ -454,35 +480,35 @@ test_unit(void)
(int)source.params.authkey, source.params.version,
interleaved, authenticated, valid, updated, has_updated);
send_request(inst1);
send_request(inst1, late_hwts);
send_response(interleaved, authenticated, 1, 0, 1);
DEBUG_LOG("response 1");
proc_response(inst1, 0, 0, 0, updated);
proc_response(inst1, 0, 0, 0, updated, 0);
if (source.params.authkey) {
send_response(interleaved, authenticated, 1, 1, 0);
DEBUG_LOG("response 2");
proc_response(inst1, 0, 0, 0, 0);
proc_response(inst1, 0, 0, 0, 0, 0);
}
send_response(interleaved, authenticated, 1, 1, 1);
DEBUG_LOG("response 3");
proc_response(inst1, -1, valid, valid, updated);
proc_response(inst1, -1, valid, valid, updated, valid && late_hwts);
DEBUG_LOG("response 4");
proc_response(inst1, 0, 0, 0, 0);
proc_response(inst1, 0, 0, 0, 0, 0);
advance_time(-1.0);
send_response(interleaved, authenticated, 1, 1, 1);
DEBUG_LOG("response 5");
proc_response(inst1, 0, 0, 0, updated && valid);
proc_response(inst1, 0, 0, 0, updated && valid, 0);
advance_time(1.0);
send_response(interleaved, authenticated, 1, 1, 1);
DEBUG_LOG("response 6");
proc_response(inst1, 0, 0, valid && updated, updated);
proc_response(inst1, 0, 0, valid && updated, updated, 0);
}
NCR_DestroyInstance(inst1);
@ -494,12 +520,12 @@ test_unit(void)
for (j = 0; j < 20; j++) {
DEBUG_LOG("server test iteration %d/%d", i, j);
send_request(inst1);
send_request(inst1, 0);
process_request(&remote_addr);
proc_response(inst1,
!source.params.interleaved || source.params.version != 4 ||
inst1->mode == MODE_ACTIVE || j != 2,
1, 1, 0);
1, 1, 0, 0);
advance_time(1 << inst1->local_poll);
}
@ -515,7 +541,9 @@ test_unit(void)
for (j = 0; j < 20; j++) {
DEBUG_LOG("peer replay test iteration %d/%d", i, j);
send_request(inst1);
late_hwts = random() % 2;
send_request(inst1, late_hwts);
res_buffer = req_buffer;
assert(!res_length || res_length == req_length);
res_length = req_length;
@ -523,7 +551,7 @@ test_unit(void)
TEST_CHECK(inst1->valid_timestamps == (j > 0));
DEBUG_LOG("response 1->2");
proc_response(inst2, j > source.params.interleaved, j > 0, j > 0, 1);
proc_response(inst2, j > source.params.interleaved, j > 0, j > 0, 1, 0);
packet_queue[(j * 2) % PACKET_QUEUE_LENGTH] = res_buffer;
@ -536,14 +564,14 @@ test_unit(void)
advance_time(1 << (source.params.minpoll - 1));
send_request(inst2);
send_request(inst2, 0);
res_buffer = req_buffer;
assert(res_length == req_length);
TEST_CHECK(inst2->valid_timestamps == (j > 0));
DEBUG_LOG("response 2->1");
proc_response(inst1, 1, 1, 1, 1);
proc_response(inst1, 1, 1, 1, 1, late_hwts);
packet_queue[(j * 2 + 1) % PACKET_QUEUE_LENGTH] = res_buffer;