]> git.rkrishnan.org Git - pihpsdr.git/commitdiff
Re-designed memory allocation for network buffers: the system is no longer
authorc vw <dl1ycf@darc.de>
Tue, 14 Apr 2020 08:02:40 +0000 (10:02 +0200)
committerc vw <dl1ycf@darc.de>
Tue, 14 Apr 2020 08:02:40 +0000 (10:02 +0200)
flooded with malloc/free again and again and again on the same buffers.
This makes the program more stable on my Mac at least.

new_protocol.c

index 3aef1bd578ee3cf641f8e1d561367d6394c9ee70..30237a72b8a8c12f4ec3769f89a56d820969bf5d 100644 (file)
@@ -204,10 +204,54 @@ static socklen_t length=sizeof(addr);
 
 // Network buffers
 #define NET_BUFFER_SIZE 2048
-static unsigned char *iq_buffer[MAX_DDC];
-static unsigned char *command_response_buffer;
-static unsigned char *high_priority_buffer;
-static unsigned char *mic_line_buffer;
+
+
+//
+// DL1YCF: note that we allocate and free the buffers for the
+// network traffic at a very high rate, and I observed certain
+// problems with this (possibly due to deficiencies in the
+// runtime system when firing malloc and free from different threads
+// at very high rate).
+//
+// Therefore we now allocate a pool of network buffers *once*, make 
+// them a linked list, and do our own memory managemant for these
+//
+//
+// This is ONLY MEANT for allocating buffers from within the
+// thread reading the buffers. Buffers for sending have a fixed
+// static allocation (e.g. high_priority_buffer_to_radio), so the
+// only "malloc" left is in the new_protocol_restart function where
+// a temporary buffer for draining the input queue is used.
+//
+
+//
+// one buffer
+//
+
+struct mybuffer_ {
+   struct mybuffer_ *next;
+   int             free;
+   long            lowfence;
+   unsigned char   buffer[NET_BUFFER_SIZE];
+   long            highfence;
+} mybuffer_;
+
+typedef struct mybuffer_ mybuffer;
+
+//
+// number of buffers used (for statistics)
+//
+static int num_buf = 0; 
+
+//
+// head of buffer list
+//
+static mybuffer *buflist = NULL;
+
+static mybuffer *iq_buffer[MAX_DDC];
+static mybuffer *command_response_buffer;
+static mybuffer *high_priority_buffer;
+static mybuffer *mic_line_buffer;
 static int mic_bytes_read;
 
 static unsigned char general_buffer[60];
@@ -246,6 +290,40 @@ static void  process_command_response();
 static void  process_high_priority();
 static void  process_mic_data(int bytes);
 
+//
+// obtain free buffer, if nothing is left allocate
+// 5 new ones. Note these buffer "live" as long as the
+// program lives. They are never free()d.
+//
+static mybuffer *free_buffer() {
+  int i;
+  mybuffer *bp=buflist;
+  while (bp) {
+    if (bp->free == 1) {
+      // found free buffer. Mark as used and return
+      bp->free=0;
+      return bp;
+    }
+    bp=bp->next;
+  }
+  //
+  // no free buffer found, allocate some extra ones
+  // and add to the head of the list
+  //
+  for (i=0; i<5; i++) {
+    bp = malloc(sizeof(mybuffer));
+    bp->free=1;
+    bp->next = buflist;
+    buflist=bp;
+    num_buf++;
+  }
+  fprintf(stderr,"NewProtocol: number of buffer increased to %d\n", num_buf);
+  // Mark the first buffer in list as used and return that one.
+  buflist->free=0;
+  return buflist;
+}
+
+
 #ifdef INCLUDED
 static void new_protocol_calc_buffers() {
   switch(sample_rate) {
@@ -1356,6 +1434,7 @@ void new_protocol_stop() {
 void new_protocol_restart() {
   fd_set fds;
   struct timeval tv;
+  mybuffer *bp;
   char *buffer;
   //
   // halt the protocol, wait 200 msec, and re-start it
@@ -1416,6 +1495,7 @@ static gpointer new_protocol_thread(gpointer data) {
     short sourceport;
     unsigned char *buffer;
     int bytesread;
+    mybuffer *mybuf;
 
 g_print("new_protocol_thread\n");
 
@@ -1428,7 +1508,8 @@ g_print("new_protocol_thread\n");
 
     while(running) {
 
-        buffer=malloc(NET_BUFFER_SIZE);
+        mybuf=free_buffer();
+        buffer=mybuf->buffer;
         bytesread=recvfrom(data_socket,buffer,NET_BUFFER_SIZE,0,(struct sockaddr*)&addr,&length);
 
         if (!running) {
@@ -1437,7 +1518,7 @@ g_print("new_protocol_thread\n");
          // we were doing "recvfrom". In this case, we do not want to "exit" but let the main
          // thread exit gracefully, including writing the props files.
          //
-         free(buffer);
+         mybuf->free=1;
          break;
        }
 
@@ -1469,7 +1550,7 @@ g_print("new_protocol_thread\n");
 #else
                 sem_wait(&iq_sem_ready[ddc]);
 #endif
-                iq_buffer[ddc]=buffer;
+                iq_buffer[ddc]=mybuf;
 #ifdef __APPLE__
                 sem_post(iq_sem_buffer[ddc]);
 #else
@@ -1483,7 +1564,7 @@ g_print("new_protocol_thread\n");
 #else
               sem_wait(&command_response_sem_ready);
 #endif
-              command_response_buffer=buffer;
+              command_response_buffer=mybuf;
 #ifdef __APPLE__
               sem_post(command_response_sem_buffer);
 #else
@@ -1497,7 +1578,7 @@ g_print("new_protocol_thread\n");
 #else
               sem_wait(&high_priority_sem_ready);
 #endif
-              high_priority_buffer=buffer;
+              high_priority_buffer=mybuf;
 #ifdef __APPLE__
               sem_post(high_priority_sem_buffer);
 #else
@@ -1511,7 +1592,7 @@ g_print("new_protocol_thread\n");
 #else
               sem_wait(&mic_line_sem_ready);
 #endif
-              mic_line_buffer=buffer;
+              mic_line_buffer=mybuf;
               mic_bytes_read=bytesread;
 #ifdef __APPLE__
               sem_post(mic_line_sem_buffer);
@@ -1521,7 +1602,7 @@ g_print("new_protocol_thread\n");
               break;
             default:
 g_print("new_protocol_thread: Unknown port %d\n",sourceport);
-              free(buffer);
+              mybuf->free=1;
               break;
         }
     }
@@ -1540,7 +1621,7 @@ static gpointer command_response_thread(gpointer data) {
     sem_wait(&command_response_sem_buffer);
 #endif
     process_command_response();
-    free(command_response_buffer);
+    command_response_buffer->free=1;
   }
 }
 
@@ -1555,7 +1636,7 @@ g_print("high_priority_thread\n");
     sem_wait(&high_priority_sem_buffer);
 #endif
     process_high_priority();
-    free(high_priority_buffer);
+    high_priority_buffer->free=1;
   }
 }
 
@@ -1574,7 +1655,7 @@ g_print("mic_line_thread\n");
 //  since this is our pace-maker
 //
     process_mic_data(mic_bytes_read);
-    free(mic_line_buffer);
+    mic_line_buffer->free=1;
   }
 }
 
@@ -1591,8 +1672,8 @@ static gpointer iq_thread(gpointer data) {
     sem_post(&iq_sem_ready[ddc]);
     sem_wait(&iq_sem_buffer[ddc]);
 #endif
-    buffer=iq_buffer[ddc];
-    if (buffer == NULL) continue;
+    if (iq_buffer[ddc] == NULL) continue;
+    buffer=iq_buffer[ddc]->buffer;
 //
 //  Perform sequence check HERE for all cases
 //
@@ -1621,7 +1702,7 @@ static gpointer iq_thread(gpointer data) {
          process_div_iq_data(buffer);
          break;
     }
-    free(buffer);
+    iq_buffer[ddc]->free=1;
   }
 }
 
@@ -1788,13 +1869,14 @@ static void process_ps_iq_data(unsigned char *buffer) {
 
 static void process_command_response() {
     long sequence;
-    sequence=((command_response_buffer[0]&0xFF)<<24)+((command_response_buffer[1]&0xFF)<<16)+((command_response_buffer[2]&0xFF)<<8)+(command_response_buffer[3]&0xFF);
+    unsigned char *buffer = command_response_buffer->buffer;
+    sequence=((buffer[0]&0xFF)<<24)+((buffer[1]&0xFF)<<16)+((buffer[2]&0xFF)<<8)+(buffer[3]&0xFF);
     if (sequence != response_sequence) {
        g_print("CommRes SeqErr: expected=%ld seen=%ld\n", response_sequence, sequence);
        response_sequence=sequence;
     }
     response_sequence++;
-    response=command_response_buffer[4]&0xFF;
+    response=buffer[4]&0xFF;
     g_print("CommandResponse with seq=%ld and command=%d\n",sequence,response);
 #ifdef __APPLE__
     sem_post(response_sem);
@@ -1803,13 +1885,14 @@ static void process_command_response() {
 #endif
 }
 
-static void process_high_priority(unsigned char *buffer) {
+static void process_high_priority() {
     long sequence;
     int previous_ptt;
     int previous_dot;
     int previous_dash;
+    unsigned char *buffer=high_priority_buffer->buffer;
 
-    sequence=((high_priority_buffer[0]&0xFF)<<24)+((high_priority_buffer[1]&0xFF)<<16)+((high_priority_buffer[2]&0xFF)<<8)+(high_priority_buffer[3]&0xFF);
+    sequence=((buffer[0]&0xFF)<<24)+((buffer[1]&0xFF)<<16)+((buffer[2]&0xFF)<<8)+(buffer[3]&0xFF);
     if (sequence != highprio_rcvd_sequence) {
        g_print("HighPrio SeqErr Expected=%ld Seen=%ld\n", highprio_rcvd_sequence, sequence);
        highprio_rcvd_sequence=sequence;
@@ -1820,15 +1903,15 @@ static void process_high_priority(unsigned char *buffer) {
     previous_dot=dot;
     previous_dash=dash;
 
-    local_ptt=high_priority_buffer[4]&0x01;
-    dot=(high_priority_buffer[4]>>1)&0x01;
-    dash=(high_priority_buffer[4]>>2)&0x01;
-    pll_locked=(high_priority_buffer[4]>>4)&0x01;
-    adc_overload=high_priority_buffer[5]&0x01;
-    exciter_power=((high_priority_buffer[6]&0xFF)<<8)|(high_priority_buffer[7]&0xFF);
-    alex_forward_power=((high_priority_buffer[14]&0xFF)<<8)|(high_priority_buffer[15]&0xFF);
-    alex_reverse_power=((high_priority_buffer[22]&0xFF)<<8)|(high_priority_buffer[23]&0xFF);
-    supply_volts=((high_priority_buffer[49]&0xFF)<<8)|(high_priority_buffer[50]&0xFF);
+    local_ptt=buffer[4]&0x01;
+    dot=(buffer[4]>>1)&0x01;
+    dash=(buffer[4]>>2)&0x01;
+    pll_locked=(buffer[4]>>4)&0x01;
+    adc_overload=buffer[5]&0x01;
+    exciter_power=((buffer[6]&0xFF)<<8)|(buffer[7]&0xFF);
+    alex_forward_power=((buffer[14]&0xFF)<<8)|(buffer[15]&0xFF);
+    alex_reverse_power=((buffer[22]&0xFF)<<8)|(buffer[23]&0xFF);
+    supply_volts=((buffer[49]&0xFF)<<8)|(buffer[50]&0xFF);
 
     if (cw_keyer_internal) {
       // Stops CAT cw transmission if paddle hit in "internal" CW
@@ -1854,8 +1937,9 @@ static void process_mic_data(int bytes) {
   int i;
   short sample;
   float fsample;
+  unsigned char *buffer=mic_line_buffer->buffer;
 
-  sequence=((mic_line_buffer[0]&0xFF)<<24)+((mic_line_buffer[1]&0xFF)<<16)+((mic_line_buffer[2]&0xFF)<<8)+(mic_line_buffer[3]&0xFF);
+  sequence=((buffer[0]&0xFF)<<24)+((buffer[1]&0xFF)<<16)+((buffer[2]&0xFF)<<8)+(buffer[3]&0xFF);
   if (sequence != micsamples_sequence) {
     g_print("MicSample SeqErr Expected=%ld Seen=%ld\n", micsamples_sequence, sequence);
     micsamples_sequence=sequence;
@@ -1863,8 +1947,8 @@ static void process_mic_data(int bytes) {
   micsamples_sequence++;
   b=4;
   for(i=0;i<MIC_SAMPLES;i++) {
-    sample=(short)(mic_line_buffer[b++]<<8);
-    sample |= (short) (mic_line_buffer[b++]&0xFF);
+    sample=(short)(buffer[b++]<<8);
+    sample |= (short) (buffer[b++]&0xFF);
     fsample = transmitter->local_microphone ? audio_get_next_mic_sample() : (float) sample * 0.00003051;
     add_mic_sample(transmitter,fsample);
   }