ntp: rework receiving messages
Allocate buffers for received messages on heap instead of stack and prepare the code for receiving multiple messages at the same time.
This commit is contained in:
parent
82e76c39d9
commit
d18f9ca75a
1 changed files with 132 additions and 75 deletions
207
ntp_io.c
207
ntp_io.c
|
@ -30,6 +30,7 @@
|
|||
|
||||
#include "sysincl.h"
|
||||
|
||||
#include "array.h"
|
||||
#include "ntp_io.h"
|
||||
#include "ntp_core.h"
|
||||
#include "ntp_sources.h"
|
||||
|
@ -51,6 +52,25 @@ union sockaddr_in46 {
|
|||
struct sockaddr u;
|
||||
};
|
||||
|
||||
struct Message {
|
||||
union sockaddr_in46 name;
|
||||
struct iovec iov;
|
||||
NTP_Receive_Buffer buf;
|
||||
/* Aligned buffer for control messages */
|
||||
struct cmsghdr cmsgbuf[CMSGBUF_SIZE / sizeof (struct cmsghdr)];
|
||||
};
|
||||
|
||||
struct MessageHeader {
|
||||
struct msghdr msg_hdr;
|
||||
unsigned int msg_len;
|
||||
};
|
||||
|
||||
#define MAX_RECV_MESSAGES 1
|
||||
|
||||
/* Arrays of Message and MessageHeader */
|
||||
static ARR_Instance recv_messages;
|
||||
static ARR_Instance recv_headers;
|
||||
|
||||
/* The server/peer and client sockets for IPv4 and IPv6 */
|
||||
static int server_sock_fd4;
|
||||
static int client_sock_fd4;
|
||||
|
@ -288,6 +308,33 @@ close_socket(int sock_fd)
|
|||
}
|
||||
|
||||
/* ================================================== */
|
||||
|
||||
static void
|
||||
prepare_buffers(unsigned int n)
|
||||
{
|
||||
struct MessageHeader *hdr;
|
||||
struct Message *msg;
|
||||
unsigned int i;
|
||||
|
||||
for (i = 0; i < n; i++) {
|
||||
msg = ARR_GetElement(recv_messages, i);
|
||||
hdr = ARR_GetElement(recv_headers, i);
|
||||
|
||||
msg->iov.iov_base = &msg->buf;
|
||||
msg->iov.iov_len = sizeof (msg->buf);
|
||||
hdr->msg_hdr.msg_name = &msg->name;
|
||||
hdr->msg_hdr.msg_namelen = sizeof (msg->name);
|
||||
hdr->msg_hdr.msg_iov = &msg->iov;
|
||||
hdr->msg_hdr.msg_iovlen = 1;
|
||||
hdr->msg_hdr.msg_control = &msg->cmsgbuf;
|
||||
hdr->msg_hdr.msg_controllen = sizeof (msg->cmsgbuf);
|
||||
hdr->msg_hdr.msg_flags = 0;
|
||||
hdr->msg_len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* ================================================== */
|
||||
|
||||
void
|
||||
NIO_Initialise(int family)
|
||||
{
|
||||
|
@ -296,6 +343,12 @@ NIO_Initialise(int family)
|
|||
assert(!initialised);
|
||||
initialised = 1;
|
||||
|
||||
recv_messages = ARR_CreateInstance(sizeof (struct Message));
|
||||
ARR_SetSize(recv_messages, MAX_RECV_MESSAGES);
|
||||
recv_headers = ARR_CreateInstance(sizeof (struct MessageHeader));
|
||||
ARR_SetSize(recv_headers, MAX_RECV_MESSAGES);
|
||||
prepare_buffers(MAX_RECV_MESSAGES);
|
||||
|
||||
server_port = CNF_GetNTPPort();
|
||||
client_port = CNF_GetAcquisitionPort();
|
||||
|
||||
|
@ -368,6 +421,8 @@ NIO_Finalise(void)
|
|||
close_socket(server_sock_fd6);
|
||||
server_sock_fd6 = client_sock_fd6 = INVALID_SOCK_FD;
|
||||
#endif
|
||||
ARR_DestroyInstance(recv_headers);
|
||||
ARR_DestroyInstance(recv_messages);
|
||||
initialised = 0;
|
||||
}
|
||||
|
||||
|
@ -483,103 +538,105 @@ NIO_IsServerSocket(int sock_fd)
|
|||
/* ================================================== */
|
||||
|
||||
static void
|
||||
read_from_socket(int sock_fd, int event, void *anything)
|
||||
process_receive(struct msghdr *hdr, int length, int sock_fd)
|
||||
{
|
||||
/* This should only be called when there is something
|
||||
to read, otherwise it will block. */
|
||||
|
||||
int status;
|
||||
NTP_Receive_Buffer message;
|
||||
union sockaddr_in46 where_from;
|
||||
unsigned int flags = 0;
|
||||
struct timeval now;
|
||||
double now_err;
|
||||
NTP_Remote_Address remote_addr;
|
||||
NTP_Local_Address local_addr;
|
||||
struct cmsghdr cmsgbuf[CMSGBUF_SIZE / sizeof (struct cmsghdr)];
|
||||
struct msghdr msg;
|
||||
struct iovec iov;
|
||||
struct cmsghdr *cmsg;
|
||||
|
||||
assert(initialised);
|
||||
struct timeval now;
|
||||
double now_err;
|
||||
|
||||
SCH_GetLastEventTime(&now, &now_err, NULL);
|
||||
|
||||
iov.iov_base = &message.ntp_pkt;
|
||||
iov.iov_len = sizeof(message);
|
||||
msg.msg_name = &where_from;
|
||||
msg.msg_namelen = sizeof(where_from);
|
||||
msg.msg_iov = &iov;
|
||||
msg.msg_iovlen = 1;
|
||||
msg.msg_control = (void *) cmsgbuf;
|
||||
msg.msg_controllen = sizeof(cmsgbuf);
|
||||
msg.msg_flags = 0;
|
||||
if (hdr->msg_namelen > sizeof (union sockaddr_in46)) {
|
||||
DEBUG_LOG(LOGF_NtpIO, "Truncated source address");
|
||||
return;
|
||||
}
|
||||
|
||||
status = recvmsg(sock_fd, &msg, flags);
|
||||
UTI_SockaddrToIPAndPort((struct sockaddr *)hdr->msg_name,
|
||||
&remote_addr.ip_addr, &remote_addr.port);
|
||||
|
||||
/* Don't bother checking if read failed or why if it did. More
|
||||
likely than not, it will be connection refused, resulting from a
|
||||
previous sendto() directing a datagram at a port that is not
|
||||
listening (which appears to generate an ICMP response, and on
|
||||
some architectures e.g. Linux this is translated into an error
|
||||
reponse on a subsequent recvfrom). */
|
||||
local_addr.ip_addr.family = IPADDR_UNSPEC;
|
||||
local_addr.sock_fd = sock_fd;
|
||||
|
||||
if (status > 0) {
|
||||
if (msg.msg_namelen > sizeof (where_from))
|
||||
LOG_FATAL(LOGF_NtpIO, "Truncated source address");
|
||||
|
||||
UTI_SockaddrToIPAndPort(&where_from.u, &remote_addr.ip_addr, &remote_addr.port);
|
||||
|
||||
local_addr.ip_addr.family = IPADDR_UNSPEC;
|
||||
local_addr.sock_fd = sock_fd;
|
||||
|
||||
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
|
||||
for (cmsg = CMSG_FIRSTHDR(hdr); cmsg; cmsg = CMSG_NXTHDR(hdr, cmsg)) {
|
||||
#ifdef HAVE_IN_PKTINFO
|
||||
if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
|
||||
struct in_pktinfo ipi;
|
||||
if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
|
||||
struct in_pktinfo ipi;
|
||||
|
||||
memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi));
|
||||
local_addr.ip_addr.addr.in4 = ntohl(ipi.ipi_spec_dst.s_addr);
|
||||
local_addr.ip_addr.family = IPADDR_INET4;
|
||||
}
|
||||
memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi));
|
||||
local_addr.ip_addr.addr.in4 = ntohl(ipi.ipi_spec_dst.s_addr);
|
||||
local_addr.ip_addr.family = IPADDR_INET4;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef HAVE_IN6_PKTINFO
|
||||
if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
|
||||
struct in6_pktinfo ipi;
|
||||
if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
|
||||
struct in6_pktinfo ipi;
|
||||
|
||||
memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi));
|
||||
memcpy(&local_addr.ip_addr.addr.in6, &ipi.ipi6_addr.s6_addr,
|
||||
sizeof (local_addr.ip_addr.addr.in6));
|
||||
local_addr.ip_addr.family = IPADDR_INET6;
|
||||
}
|
||||
memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi));
|
||||
memcpy(&local_addr.ip_addr.addr.in6, &ipi.ipi6_addr.s6_addr,
|
||||
sizeof (local_addr.ip_addr.addr.in6));
|
||||
local_addr.ip_addr.family = IPADDR_INET6;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef SO_TIMESTAMP
|
||||
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMP) {
|
||||
struct timeval tv;
|
||||
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMP) {
|
||||
struct timeval tv;
|
||||
|
||||
memcpy(&tv, CMSG_DATA(cmsg), sizeof(tv));
|
||||
LCL_CookTime(&tv, &now, &now_err);
|
||||
}
|
||||
memcpy(&tv, CMSG_DATA(cmsg), sizeof(tv));
|
||||
LCL_CookTime(&tv, &now, &now_err);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
DEBUG_LOG(LOGF_NtpIO, "Received %d bytes from %s:%d to %s fd %d",
|
||||
status, UTI_IPToString(&remote_addr.ip_addr), remote_addr.port,
|
||||
UTI_IPToString(&local_addr.ip_addr), local_addr.sock_fd);
|
||||
|
||||
if (status >= NTP_NORMAL_PACKET_LENGTH) {
|
||||
|
||||
NSR_ProcessReceive((NTP_Packet *) &message.ntp_pkt, &now, now_err,
|
||||
&remote_addr, &local_addr, status);
|
||||
|
||||
} else {
|
||||
|
||||
/* Just ignore the packet if it's not of a recognized length */
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
DEBUG_LOG(LOGF_NtpIO, "Received %d bytes from %s:%d to %s fd %d",
|
||||
length, UTI_IPToString(&remote_addr.ip_addr), remote_addr.port,
|
||||
UTI_IPToString(&local_addr.ip_addr), local_addr.sock_fd);
|
||||
|
||||
/* Just ignore the packet if it's not of a recognized length */
|
||||
if (length < NTP_NORMAL_PACKET_LENGTH || length > sizeof (NTP_Receive_Buffer))
|
||||
return;
|
||||
|
||||
NSR_ProcessReceive((NTP_Packet *)hdr->msg_iov[0].iov_base, &now, now_err,
|
||||
&remote_addr, &local_addr, length);
|
||||
}
|
||||
|
||||
/* ================================================== */
|
||||
|
||||
static void
|
||||
read_from_socket(int sock_fd, int event, void *anything)
|
||||
{
|
||||
/* This should only be called when there is something
|
||||
to read, otherwise it will block */
|
||||
|
||||
struct MessageHeader *hdr;
|
||||
unsigned int i, n;
|
||||
int status;
|
||||
|
||||
hdr = ARR_GetElements(recv_headers);
|
||||
n = ARR_GetSize(recv_headers);
|
||||
assert(n >= 1);
|
||||
|
||||
n = 1;
|
||||
status = recvmsg(sock_fd, &hdr[0].msg_hdr, 0);
|
||||
if (status >= 0)
|
||||
hdr[0].msg_len = status;
|
||||
|
||||
if (status < 0) {
|
||||
DEBUG_LOG(LOGF_NtpIO, "Could not receive from fd %d : %s", sock_fd,
|
||||
strerror(errno));
|
||||
return;
|
||||
}
|
||||
|
||||
for (i = 0; i < n; i++) {
|
||||
hdr = ARR_GetElement(recv_headers, i);
|
||||
process_receive(&hdr->msg_hdr, hdr->msg_len, sock_fd);
|
||||
}
|
||||
|
||||
/* Restore the buffers to their original state */
|
||||
prepare_buffers(n);
|
||||
}
|
||||
|
||||
/* ================================================== */
|
||||
|
|
Loading…
Reference in a new issue