ntp: allocate source records dynamically

This removes the limit on maximum number of added NTP sources.
This commit is contained in:
Miroslav Lichvar 2014-09-24 14:49:51 +02:00
parent d92583ed33
commit 9e71932c2e

View file

@ -31,6 +31,7 @@
#include "sysincl.h"
#include "array.h"
#include "ntp_sources.h"
#include "ntp_core.h"
#include "util.h"
@ -50,18 +51,12 @@ typedef struct {
NCR_Instance data; /* Data for the protocol engine for this source */
} SourceRecord;
#define N_RECORDS 256
/* Fixed size table, because we use a hard coded hash algorithm. It
is rather unlikely we would have anything approaching this number
of sources. */
static SourceRecord records[N_RECORDS];
/* Hash table of SourceRecord, the size should be a power of two */
static ARR_Instance records;
/* Number of sources in the hash table */
static int n_sources;
/* The largest number of sources we want to have stored in the hash table */
#define MAX_SOURCES 64
/* Flag indicating new sources will be started automatically when added */
static int auto_start_sources = 0;
@ -88,6 +83,7 @@ static NSR_SourceResolvingEndHandler resolving_end_handler = NULL;
/* Forward prototypes */
static void resolve_sources(void *arg);
static void rehash_records(void);
static void
slew_sources(struct timeval *raw,
@ -104,16 +100,23 @@ static int initialised = 0;
/* ================================================== */
static SourceRecord *
get_record(unsigned index)
{
return (SourceRecord *)ARR_GetElement(records, index);
}
/* ================================================== */
void
NSR_Initialise(void)
{
int i;
for (i=0; i<N_RECORDS; i++) {
records[i].remote_addr = NULL;
}
n_sources = 0;
initialised = 1;
records = ARR_CreateInstance(sizeof (SourceRecord));
rehash_records();
LCL_AddParameterChangeHandler(slew_sources, NULL);
}
@ -122,16 +125,20 @@ NSR_Initialise(void)
void
NSR_Finalise(void)
{
int i;
SourceRecord *record;
struct UnresolvedSource *us;
unsigned int i;
for (i = 0; i < N_RECORDS; i++) {
if (!records[i].remote_addr)
for (i = 0; i < ARR_GetSize(records); i++) {
record = get_record(i);
if (!record->remote_addr)
continue;
records[i].remote_addr = NULL;
NCR_DestroyInstance(records[i].data);
record->remote_addr = NULL;
NCR_DestroyInstance(record->data);
}
ARR_DestroyInstance(records);
while (unresolved_sources) {
us = unresolved_sources;
unresolved_sources = us->next;
@ -159,12 +166,13 @@ NSR_Finalise(void)
static void
find_slot(NTP_Remote_Address *remote_addr, int *slot, int *found)
{
SourceRecord *record;
uint32_t hash;
unsigned int i, size;
unsigned short port;
uint8_t *ip6;
size = N_RECORDS;
size = ARR_GetSize(records);
switch (remote_addr->ip_addr.family) {
case IPADDR_INET6:
@ -187,13 +195,14 @@ find_slot(NTP_Remote_Address *remote_addr, int *slot, int *found)
for (i = 0; i < size / 2; i++) {
/* Use quadratic probing */
*slot = (hash + (i + i * i) / 2) % size;
record = get_record(*slot);
if (!records[*slot].remote_addr)
if (!record->remote_addr)
break;
if (!UTI_CompareIPs(&records[*slot].remote_addr->ip_addr,
if (!UTI_CompareIPs(&record->remote_addr->ip_addr,
&remote_addr->ip_addr, NULL)) {
*found = records[*slot].remote_addr->port == port ? 2 : 1;
*found = record->remote_addr->port == port ? 2 : 1;
return;
}
}
@ -203,10 +212,58 @@ find_slot(NTP_Remote_Address *remote_addr, int *slot, int *found)
/* ================================================== */
/* Check if hash table of given size is sufficient to contain sources */
static int
check_hashtable_size(unsigned int sources, unsigned int size)
{
return sources * 2 + 1 < size;
}
/* ================================================== */
static void
rehash_records(void)
{
SourceRecord *record, *temp_records;
unsigned int i, old_size, new_size;
int slot, found;
old_size = ARR_GetSize(records);
temp_records = MallocArray(SourceRecord, old_size);
memcpy(temp_records, ARR_GetElements(records), old_size * sizeof (SourceRecord));
/* The size of the hash table is always a power of two */
for (new_size = 4; !check_hashtable_size(n_sources, new_size); new_size *= 2)
;
ARR_SetSize(records, new_size);
for (i = 0; i < new_size; i++)
get_record(i)->remote_addr = NULL;
for (i = 0; i < old_size; i++) {
if (!temp_records[i].remote_addr)
continue;
find_slot(temp_records[i].remote_addr, &slot, &found);
assert(!found);
record = get_record(slot);
record->remote_addr = temp_records[i].remote_addr;
record->data = temp_records[i].data;
}
Free(temp_records);
}
/* ================================================== */
/* Procedure to add a new source */
NSR_Status
NSR_AddSource(NTP_Remote_Address *remote_addr, NTP_Source_Type type, SourceParameters *params)
{
SourceRecord *record;
int slot, found;
assert(initialised);
@ -216,17 +273,25 @@ NSR_AddSource(NTP_Remote_Address *remote_addr, NTP_Source_Type type, SourceParam
if (found) {
return NSR_AlreadyInUse;
} else {
if (n_sources == MAX_SOURCES) {
return NSR_TooManySources;
} else if (remote_addr->ip_addr.family != IPADDR_INET4 &&
if (remote_addr->ip_addr.family != IPADDR_INET4 &&
remote_addr->ip_addr.family != IPADDR_INET6) {
return NSR_InvalidAF;
} else {
n_sources++;
records[slot].data = NCR_GetInstance(remote_addr, type, params); /* Will need params passing through */
records[slot].remote_addr = NCR_GetRemoteAddress(records[slot].data);
if (!check_hashtable_size(n_sources, ARR_GetSize(records))) {
rehash_records();
find_slot(remote_addr, &slot, &found);
}
assert(!found);
record = get_record(slot);
record->data = NCR_GetInstance(remote_addr, type, params);
record->remote_addr = NCR_GetRemoteAddress(record->data);
if (auto_start_sources)
NCR_StartInstance(records[slot].data);
NCR_StartInstance(record->data);
return NSR_Success;
}
}
@ -376,12 +441,12 @@ NSR_ResolveSources(void)
void NSR_StartSources(void)
{
int i;
unsigned int i;
for (i = 0; i < N_RECORDS; i++) {
if (!records[i].remote_addr)
for (i = 0; i < ARR_GetSize(records); i++) {
if (!get_record(i)->remote_addr)
continue;
NCR_StartInstance(records[i].data);
NCR_StartInstance(get_record(i)->data);
}
}
@ -401,8 +466,7 @@ void NSR_AutoStartSources(void)
NSR_Status
NSR_RemoveSource(NTP_Remote_Address *remote_addr)
{
int i, slot, found;
SourceRecord temp_records[N_RECORDS];
int slot, found;
assert(initialised);
@ -412,28 +476,13 @@ NSR_RemoveSource(NTP_Remote_Address *remote_addr)
}
n_sources--;
records[slot].remote_addr = NULL;
NCR_DestroyInstance(records[slot].data);
get_record(slot)->remote_addr = NULL;
NCR_DestroyInstance(get_record(slot)->data);
/* Rehash the table to make sure there are no broken probe sequences.
This is costly, but it's not expected to happen frequently. */
memcpy(temp_records, records, sizeof (records));
for (i = 0; i < N_RECORDS; i++) {
records[i].remote_addr = NULL;
}
for (i = 0; i < N_RECORDS; i++) {
if (!temp_records[i].remote_addr)
continue;
find_slot(temp_records[i].remote_addr, &slot, &found);
assert(!found);
records[slot].remote_addr = temp_records[i].remote_addr;
records[slot].data = temp_records[i].data;
}
rehash_records();
return NSR_Success;
}
@ -443,13 +492,15 @@ NSR_RemoveSource(NTP_Remote_Address *remote_addr)
void
NSR_RemoveAllSources(void)
{
int i;
SourceRecord *record;
unsigned int i;
for (i = 0; i < N_RECORDS; i++) {
if (!records[i].remote_addr)
for (i = 0; i < ARR_GetSize(records); i++) {
record = get_record(i);
if (!record->remote_addr)
continue;
NCR_DestroyInstance(records[i].data);
records[i].remote_addr = NULL;
NCR_DestroyInstance(record->data);
record->remote_addr = NULL;
}
}
@ -466,7 +517,7 @@ NSR_ProcessReceive(NTP_Packet *message, struct timeval *now, double now_err, NTP
find_slot(remote_addr, &slot, &found);
if (found == 2) { /* Must match IP address AND port number */
NCR_ProcessKnown(message, now, now_err, records[slot].data,
NCR_ProcessKnown(message, now, now_err, get_record(slot)->data,
local_addr->sock_fd, length);
} else {
NCR_ProcessUnknown(message, now, now_err, remote_addr, local_addr, length);
@ -483,14 +534,16 @@ slew_sources(struct timeval *raw,
LCL_ChangeType change_type,
void *anything)
{
int i;
SourceRecord *record;
unsigned int i;
for (i=0; i<N_RECORDS; i++) {
if (records[i].remote_addr) {
for (i = 0; i < ARR_GetSize(records); i++) {
record = get_record(i);
if (record->remote_addr) {
if (change_type == LCL_ChangeUnknownStep) {
NCR_ResetInstance(records[i].data);
NCR_ResetInstance(record->data);
} else {
NCR_SlewTimes(records[i].data, cooked, dfreq, doffset);
NCR_SlewTimes(record->data, cooked, dfreq, doffset);
}
}
}
@ -501,18 +554,20 @@ slew_sources(struct timeval *raw,
int
NSR_TakeSourcesOnline(IPAddr *mask, IPAddr *address)
{
int i;
SourceRecord *record;
unsigned int i;
int any;
NSR_ResolveSources();
any = 0;
for (i=0; i<N_RECORDS; i++) {
if (records[i].remote_addr) {
for (i = 0; i < ARR_GetSize(records); i++) {
record = get_record(i);
if (record->remote_addr) {
if (address->family == IPADDR_UNSPEC ||
!UTI_CompareIPs(&records[i].remote_addr->ip_addr, address, mask)) {
!UTI_CompareIPs(&record->remote_addr->ip_addr, address, mask)) {
any = 1;
NCR_TakeSourceOnline(records[i].data);
NCR_TakeSourceOnline(record->data);
}
}
}
@ -534,27 +589,29 @@ NSR_TakeSourcesOnline(IPAddr *mask, IPAddr *address)
int
NSR_TakeSourcesOffline(IPAddr *mask, IPAddr *address)
{
int i, any, syncpeer;
SourceRecord *record, *syncpeer;
unsigned int i, any;
any = 0;
syncpeer = -1;
for (i=0; i<N_RECORDS; i++) {
if (records[i].remote_addr) {
syncpeer = NULL;
for (i = 0; i < ARR_GetSize(records); i++) {
record = get_record(i);
if (record->remote_addr) {
if (address->family == IPADDR_UNSPEC ||
!UTI_CompareIPs(&records[i].remote_addr->ip_addr, address, mask)) {
!UTI_CompareIPs(&record->remote_addr->ip_addr, address, mask)) {
any = 1;
if (NCR_IsSyncPeer(records[i].data)) {
syncpeer = i;
if (NCR_IsSyncPeer(record->data)) {
syncpeer = record;
continue;
}
NCR_TakeSourceOffline(records[i].data);
NCR_TakeSourceOffline(record->data);
}
}
}
/* Take sync peer offline as last to avoid reference switching */
if (syncpeer >= 0) {
NCR_TakeSourceOffline(records[syncpeer].data);
NCR_TakeSourceOffline(syncpeer->data);
}
if (address->family == IPADDR_UNSPEC) {
@ -583,7 +640,7 @@ NSR_ModifyMinpoll(IPAddr *address, int new_minpoll)
if (found == 0) {
return 0;
} else {
NCR_ModifyMinpoll(records[slot].data, new_minpoll);
NCR_ModifyMinpoll(get_record(slot)->data, new_minpoll);
return 1;
}
}
@ -602,7 +659,7 @@ NSR_ModifyMaxpoll(IPAddr *address, int new_maxpoll)
if (found == 0) {
return 0;
} else {
NCR_ModifyMaxpoll(records[slot].data, new_maxpoll);
NCR_ModifyMaxpoll(get_record(slot)->data, new_maxpoll);
return 1;
}
}
@ -621,7 +678,7 @@ NSR_ModifyMaxdelay(IPAddr *address, double new_max_delay)
if (found == 0) {
return 0;
} else {
NCR_ModifyMaxdelay(records[slot].data, new_max_delay);
NCR_ModifyMaxdelay(get_record(slot)->data, new_max_delay);
return 1;
}
}
@ -640,7 +697,7 @@ NSR_ModifyMaxdelayratio(IPAddr *address, double new_max_delay_ratio)
if (found == 0) {
return 0;
} else {
NCR_ModifyMaxdelayratio(records[slot].data, new_max_delay_ratio);
NCR_ModifyMaxdelayratio(get_record(slot)->data, new_max_delay_ratio);
return 1;
}
}
@ -659,7 +716,7 @@ NSR_ModifyMaxdelaydevratio(IPAddr *address, double new_max_delay_dev_ratio)
if (found == 0) {
return 0;
} else {
NCR_ModifyMaxdelaydevratio(records[slot].data, new_max_delay_dev_ratio);
NCR_ModifyMaxdelaydevratio(get_record(slot)->data, new_max_delay_dev_ratio);
return 1;
}
}
@ -678,7 +735,7 @@ NSR_ModifyMinstratum(IPAddr *address, int new_min_stratum)
if (found == 0) {
return 0;
} else {
NCR_ModifyMinstratum(records[slot].data, new_min_stratum);
NCR_ModifyMinstratum(get_record(slot)->data, new_min_stratum);
return 1;
}
}
@ -697,7 +754,7 @@ NSR_ModifyPolltarget(IPAddr *address, int new_poll_target)
if (found == 0) {
return 0;
} else {
NCR_ModifyPolltarget(records[slot].data, new_poll_target);
NCR_ModifyPolltarget(get_record(slot)->data, new_poll_target);
return 1;
}
}
@ -708,16 +765,18 @@ int
NSR_InitiateSampleBurst(int n_good_samples, int n_total_samples,
IPAddr *mask, IPAddr *address)
{
int i;
SourceRecord *record;
unsigned int i;
int any;
any = 0;
for (i=0; i<N_RECORDS; i++) {
if (records[i].remote_addr) {
for (i = 0; i < ARR_GetSize(records); i++) {
record = get_record(i);
if (record->remote_addr) {
if (address->family == IPADDR_UNSPEC ||
!UTI_CompareIPs(&records[i].remote_addr->ip_addr, address, mask)) {
!UTI_CompareIPs(&record->remote_addr->ip_addr, address, mask)) {
any = 1;
NCR_InitiateSampleBurst(records[i].data, n_good_samples, n_total_samples);
NCR_InitiateSampleBurst(record->data, n_good_samples, n_total_samples);
}
}
}
@ -740,7 +799,7 @@ NSR_ReportSource(RPT_SourceReport *report, struct timeval *now)
rem_addr.port = 0;
find_slot(&rem_addr, &slot, &found);
if (found) {
NCR_ReportSource(records[slot].data, report, now);
NCR_ReportSource(get_record(slot)->data, report, now);
} else {
report->poll = 0;
report->latest_meas_ago = 0;
@ -752,7 +811,8 @@ NSR_ReportSource(RPT_SourceReport *report, struct timeval *now)
void
NSR_GetActivityReport(RPT_ActivityReport *report)
{
int i;
SourceRecord *record;
unsigned int i;
struct UnresolvedSource *us;
report->online = 0;
@ -760,9 +820,10 @@ NSR_GetActivityReport(RPT_ActivityReport *report)
report->burst_online = 0;
report->burst_offline = 0;
for (i=0; i<N_RECORDS; i++) {
if (records[i].remote_addr) {
NCR_IncrementActivityCounters(records[i].data, &report->online, &report->offline,
for (i = 0; i < ARR_GetSize(records); i++) {
record = get_record(i);
if (record->remote_addr) {
NCR_IncrementActivityCounters(record->data, &report->online, &report->offline,
&report->burst_online, &report->burst_offline);
}
}