#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/time.h>
#include <sys/select.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <assert.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <signal.h>
#define CAP_MAX 20
#define PORT 15000
#define MAXBUFLEN 100000
#define PKSIZE 1472
#define MAX_N 200 // max num of probing samples
#define HEADER 28
unsigned long long CPU_HZ = 0;
long K = 10;
struct shared {
double cap_send[CAP_MAX];
double cap_recv1[CAP_MAX];
double cap_recv2[CAP_MAX];
int cap_k[CAP_MAX];
int cap_last_k[CAP_MAX];
double cap_C;
double cap_RTT_SUM;
int cap_index;
int n;
long mode;
int cur_mode;
double G;
};
__inline__ unsigned long long rdtsc(){
unsigned long long int x;
__asm__ volatile (".byte 0x0f, 0x31" : "=A" (x));
return x;
}
struct sockaddr_in addr_f;struct sockaddr_in addr_b;
int shm_init(struct shared *info) {
int i;
info->cap_RTT_SUM = 1000000000;
for (i=0;i<CAP_MAX;i++){
info->cap_send[i] = -1;
info->cap_recv1[i] = -1;
info->cap_recv2[i] = -1;
}
info->n = 0;
return 0;
}int A_send(int send_socket, struct shared *info, int m) {
int *DATA;
int numbytes;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 1000 * 10; while (info->cur_mode == m){
DATA = (int *)malloc(PKSIZE);
memset(DATA,'1',PKSIZE);
DATA[0] = m;
if ((numbytes=sendto(send_socket, DATA, PKSIZE, 0, (struct sockaddr *)&addr_f, sizeof(struct sockaddr))) == -1) {
perror("sendto");
free(DATA);
exit(1);
}
free(DATA);
nanosleep(&ts,NULL); }
return 0;
}int A_recv(int send_socket, int recv_socket, struct shared *info, int m) {
char buf[MAXBUFLEN];
int addr_len;
int numbytes;
int *DATA;
int i, t, n, p;
addr_len = sizeof(struct sockaddr);
n = 0;
while (1) {
if ((numbytes=recvfrom(recv_socket, buf, MAXBUFLEN-1, 0, (struct sockaddr *)&addr_b, &addr_len)) == -1) {
perror("recvfrom");
exit(1);
}
if (numbytes != PKSIZE){
continue;
}
DATA = buf;
t = DATA[0];
p = DATA[1];
if (t==m-1) {
n++;
n %= CAP_MAX;
if (info->cur_mode < m) info->cur_mode = m;
DATA = (int *)malloc(PKSIZE);
memset(DATA,'1',PKSIZE);
for (i=0;i<=K;i++){
DATA[0] = m;
DATA[1] = p;
DATA[2] = K;
DATA[3] = i;
if ((numbytes=sendto(send_socket, DATA, PKSIZE, 0, (struct sockaddr *)&addr_f, sizeof(struct sockaddr))) == -1) {
perror("sendto");
free(DATA);
exit(1);
}
}
free(DATA);
} else if (t==m+1) {
info->cur_mode = m+2;
break;
} else { }
if (n>500) {
printf("n>500!!\n");
info->cur_mode = m+2;
break;
}
}
return 0;
}int B_send(int send_socket, struct shared *info, int m) {
int i;
int *DATA;
int numbytes;
struct timespec ts;
ts.tv_sec = 0;
i = 0;
while(info->n < MAX_N){
ts.tv_nsec = 1000 * info->G; DATA = (int *)malloc(PKSIZE);
memset(DATA,'1',PKSIZE);
DATA[0] = m;
DATA[1] = i;
info->cap_send[i%CAP_MAX] = (double)rdtsc() / CPU_HZ;
info->cap_recv1[i%CAP_MAX] = -1;
info->cap_recv2[i%CAP_MAX] = -1;
if ((numbytes=sendto(send_socket, DATA, PKSIZE, 0, (struct sockaddr *)&addr_f, sizeof(struct sockaddr))) == -1) {
perror("sendto");
free(DATA);
exit(1);
}
free(DATA);
i++;
i %= CAP_MAX;
nanosleep(&ts,NULL);
}
return 0;
}int B_recv(int send_socket, int recv_socket, struct shared *info, int m) {
char buf[MAXBUFLEN];
int addr_len;
int numbytes;
int *DATA;
int i;
double disp, rtt1, rtt2, rttsum, C;
double t1,t2;
info->cap_C = 0;
info->cap_RTT_SUM = 10000000;
addr_len = sizeof(struct sockaddr);
t1 = (double)rdtsc()/CPU_HZ;
while (1) {
if ((numbytes=recvfrom(recv_socket, buf, MAXBUFLEN-1, 0, (struct sockaddr *)&addr_b, &addr_len)) == -1) {
perror("recvfrom");
exit(1);
}
if (numbytes != PKSIZE) continue;
DATA = buf;
if (DATA[0]!=m) { continue; }
i = DATA[1] % CAP_MAX;
if (DATA[3]==0){
info->cap_k[i] = DATA[2];
info->cap_last_k[i] = 0;
info->cap_recv1[i] = (double)rdtsc() / CPU_HZ;
} else if (DATA[3]==info->cap_last_k[i]+1){
info->cap_last_k[i] = DATA[3];
if (DATA[3] == info->cap_k[i]){
info->cap_recv2[i] = (double)rdtsc() / CPU_HZ; rtt1 = info->cap_recv1[i] - info->cap_send[i];
rtt2 = info->cap_recv2[i] - info->cap_send[i];
disp = rtt2 - rtt1;
rttsum = rtt1 + rtt2;
if (disp>0 && disp<100){
info->n = info->n + 1;
C = (PKSIZE + HEADER) * 8 * K / disp / (double) 1000000;
if (rttsum < info->cap_RTT_SUM) {
info->G = disp * 1000 * 2 / 0.05; info->cap_RTT_SUM = rttsum;
info->cap_C = C;
}
printf("%d C= %3.6lf cap_C= %3.6lf disp= %3.3lf\n",info->n,C,info->cap_C,disp);
}
}
if (info->n >= MAX_N) {
info->cur_mode = m+2;
DATA = (int *)malloc(PKSIZE);
memset(DATA,'1',PKSIZE);
DATA[0] = m+1;
if ((numbytes=sendto(send_socket, DATA, PKSIZE, 0, (struct sockaddr *)&addr_f, sizeof(struct sockaddr))) == -1) {
perror("sendto");
free(DATA);
exit(1);
}
free(DATA);
break;
}
}
}
t2 = (double)rdtsc()/CPU_HZ;
printf("\n Time used: %3.9lf sec\n\n", t2 - t1);
return 0;
}int main(int argc, char *argv[]) {
char *addr = NULL;
int port = -1;
int pid_recv;
int send_socket, recv_socket;
struct hostent *he;
char *shmptr;
int shmid;
struct shmid_ds buf;
struct shared *info;
long mode;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 1000 * 50;
{
FILE *Proc;
char fbuf[300];
Proc = fopen("/proc/cpuinfo","r");
while(fgets(fbuf,255,Proc)){
if (strstr(fbuf,"cpu MHz")){
fbuf[strlen(fbuf)-1]='\0';
CPU_HZ = atof(fbuf+11)*1000000;
break;
}
}
fclose(Proc); }
if (argc != 4) {
fprintf(stderr,"usage: %s Mode IP_address K\n", argv[0]);
fprintf(stderr,"\tMode 1 : sender \n");
fprintf(stderr,"\tMode 2 : receiver \n");
exit(1);
}
mode = atol(argv[1]);
K = atol(argv[3]);
if ((he=gethostbyname(argv[2])) == NULL) { perror("gethostbyname");
exit(1);
}
if ((send_socket = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
perror("socket");
exit(1);
}
if ((recv_socket = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
perror("socket");
exit(1);
}
addr_f.sin_family = AF_INET; addr_f.sin_port = htons(PORT); addr_f.sin_addr = *((struct in_addr *)he->h_addr);
memset(&(addr_f.sin_zero), '\0', 8);
addr_b.sin_family = AF_INET; addr_b.sin_port = htons(PORT); addr_b.sin_addr.s_addr = INADDR_ANY;
memset(&(addr_b.sin_zero), '\0', 8); if (bind(recv_socket, (struct sockaddr *) &addr_b, sizeof(addr_b)) != 0) {
perror("bind");
abort();
}
shmid = shmget (IPC_PRIVATE, sizeof(struct shared), SHM_R | SHM_W) ;
if (shmid == -1) {
perror("shmget");
exit(1);
}
shmptr = shmat(shmid,0,0);
if (shmptr == NULL) {
perror("shmat");
exit(1);
}
info = (struct shared *) shmptr;
shmctl(shmid, IPC_RMID, &buf);
info->mode = mode;
pid_recv = fork ();
info->cur_mode = 1;
info->G = 40; info->n = 0;
if (info->mode==1) { if (pid_recv==0){
A_send(send_socket, info, 1);
} else {
A_recv(send_socket, recv_socket, info, 3);
}
while (info->cur_mode < 5){
nanosleep(&ts,NULL); } if (pid_recv==0){
B_send(send_socket, info, 5);
} else {
B_recv(send_socket, recv_socket, info, 6);
kill(pid_recv, SIGKILL);
}
} else if (info->mode==2) { if (pid_recv==0){
int addr_len;
int numbytes;
char sbuf[MAXBUFLEN];
int *DATA;
int t;
addr_len = sizeof(struct sockaddr);
while (1) {
if ((numbytes=recvfrom(recv_socket, sbuf, MAXBUFLEN-1, 0, (struct sockaddr *)&addr_b, &addr_len)) == -1) {
perror("recvfrom");
exit(1);
}
if (numbytes != PKSIZE){
printf("this is B\b");
continue;
}
DATA = sbuf;
t = DATA[0];
printf("t=%d %d %d %d\n",DATA[0],DATA[1],DATA[2],DATA[3]);
if (t==1) {
info->cur_mode = 3;
break;
}
}
B_send(send_socket, info, 2);
} else {
B_recv(send_socket, recv_socket, info, 3);
} if (pid_recv==0){ } else {
A_recv(send_socket, recv_socket, info, 6);
kill(pid_recv, SIGKILL);
}
while (info->cur_mode < 8){
nanosleep(&ts,NULL); }
} else { if (pid_recv == 0) {
fprintf(stderr,"Mode (mode=%ld) is incorrect!\n", info->mode);
} else {
kill(pid_recv, SIGKILL);
}
}
shmdt(shmptr);
exit(0);
}