From 174018fe5d39cf68f1e0cfc420cc6ceaf2448fe6 Mon Sep 17 00:00:00 2001 From: pa3gsb Date: Mon, 16 Apr 2018 09:14:35 +0200 Subject: [PATCH] rewrite; threading --- radioberry.c | 269 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 194 insertions(+), 75 deletions(-) diff --git a/radioberry.c b/radioberry.c index edc450b..950be5a 100644 --- a/radioberry.c +++ b/radioberry.c @@ -57,14 +57,15 @@ static int display_width; static int running; static int sampleSpeed =0; -unsigned char iqdata[6]; unsigned char tx_iqdata[6]; static GThread *radioberry_thread_id; -//static pthread_t radioberry_thread_id; -static void start_radioberry_thread(); static gpointer radioberry_thread(gpointer arg); -//static void *radioberry_thread(void* arg); + +static GThread *rx1_stream_thread_id; +static gpointer rx1_stream_thread(gpointer arg); +static GThread *rx2_stream_thread_id; +static gpointer rx2_stream_thread(gpointer arg); static void setSampleSpeed(int r); static void handleReceiveStream(int r); @@ -83,15 +84,38 @@ void spiWriter(); void rx1_spiReader(); void rx2_spiReader(); - int prev_drive_level; static int rx1_count =0; static int rx2_count =0; static int txcount =0; -sem_t mutex; +GMutex buf_rx1_mutex; +GMutex buf_rx2_mutex; +GMutex mutex_rxtx; +GMutex mutex_rxstream; + +#define MAX_RX_BUFFER (2048 * 6) + + +unsigned char buffer_rx1[MAX_RX_BUFFER]; +int fill_rx1 = 0; +int use_rx1 = 0; +sem_t empty; +sem_t full; + +void put_rx1(unsigned char[]); +void get_rx1(unsigned char[]); +unsigned char buffer_rx2[MAX_RX_BUFFER]; +int fill_rx2 = 0; +int use_rx2 = 0; +sem_t empty_rx2; +sem_t full_rx2; + +void put_rx2(unsigned char[]); +void get_rx2(unsigned char[]); + #ifdef PSK static int psk_samples=0; static int psk_divisor=6; @@ -127,8 +151,12 @@ float timedifference_msec(struct timeval t0, struct timeval t1) void radioberry_protocol_init(int rx,int pixels) { int i; + sem_init(&empty, 0, MAX_RX_BUFFER); + sem_init(&full, 0, 0); + sem_init(&empty_rx2, 0, MAX_RX_BUFFER); + sem_init(&full_rx2, 0, 0); + fprintf(stderr,"radioberry_protocol_init\n"); - sem_init(&mutex, 0, 1); //mutal exlusion display_width=pixels; fprintf(stderr,"radioberry_protocol: buffer size: =%d\n", buffer_size); @@ -167,7 +195,6 @@ void radioberry_protocol_init(int rx,int pixels) { } } - //start_radioberry_thread(); radioberry_thread_id = g_thread_new( "radioberry", radioberry_thread, NULL); if( ! radioberry_thread_id ) { @@ -175,20 +202,87 @@ void radioberry_protocol_init(int rx,int pixels) { exit( -1 ); } fprintf(stderr, "radioberry_thread: id=%p\n",radioberry_thread_id); + + rx1_stream_thread_id = g_thread_new( "rx-streaming", rx1_stream_thread, NULL); + if( ! rx1_stream_thread_id ) + { + fprintf(stderr,"g_thread_new failed on rx_stream_thread\n"); + exit( -1 ); + } + + rx2_stream_thread_id = g_thread_new( "rx2-streaming", rx2_stream_thread, NULL); + if( ! rx2_stream_thread_id ) + { + fprintf(stderr,"g_thread_new failed on rx2_stream_thread\n"); + exit( -1 ); + } +} + +static gpointer rx1_stream_thread(gpointer arg) { + + int left_sample; + int right_sample; + double left_sample_double; + double right_sample_double; + + unsigned char _iqdata[6]; + + fprintf(stderr, "radioberry_protocol: rx_stream_thread\n"); + + while(running) { + + get_rx1(_iqdata); + + left_sample = (int)((signed char) _iqdata[0]) << 16; + left_sample |= (int)((((unsigned char)_iqdata[1]) << 8)&0xFF00); + left_sample |= (int)((unsigned char)_iqdata[2]&0xFF); + right_sample = (int)((signed char) _iqdata[3]) << 16; + right_sample |= (int)((((unsigned char)_iqdata[4]) << 8)&0xFF00); + right_sample |= (int)((unsigned char)_iqdata[5]&0xFF); + + left_sample_double=(double)left_sample/8388607.0; // 24 bit sample 2^23-1 + right_sample_double=(double)right_sample/8388607.0; // 24 bit sample 2^23-1 + + g_mutex_lock(&mutex_rxstream); + // add the samples to the receiver.. + add_iq_samples(receiver[0], left_sample_double,right_sample_double); + g_mutex_unlock(&mutex_rxstream); + } +} + +static gpointer rx2_stream_thread(gpointer arg) { + + int left_sample; + int right_sample; + double left_sample_double; + double right_sample_double; + + unsigned char _iqdata[6]; + + fprintf(stderr, "radioberry_protocol: rx_stream_thread\n"); + + while(running) { + + get_rx2(_iqdata); + + left_sample = (int)((signed char) _iqdata[0]) << 16; + left_sample |= (int)((((unsigned char)_iqdata[1]) << 8)&0xFF00); + left_sample |= (int)((unsigned char)_iqdata[2]&0xFF); + right_sample = (int)((signed char) _iqdata[3]) << 16; + right_sample |= (int)((((unsigned char)_iqdata[4]) << 8)&0xFF00); + right_sample |= (int)((unsigned char)_iqdata[5]&0xFF); + + left_sample_double=(double)left_sample/8388607.0; // 24 bit sample 2^23-1 + right_sample_double=(double)right_sample/8388607.0; // 24 bit sample 2^23-1 + + g_mutex_lock(&mutex_rxstream); + // add the samples to the receiver.. + add_iq_samples(receiver[1], left_sample_double,right_sample_double); + g_mutex_unlock(&mutex_rxstream); + + } } -/* -static void start_radioberry_thread() { - int rc; - fprintf(stderr,"radioberry_protocol starting radioberry thread\n"); - rc=pthread_create(&radioberry_thread_id,NULL,radioberry_thread,NULL); - if(rc != 0) { - fprintf(stderr,"radioberry_protocol: pthread_create failed on radioberry_thread: rc=%d\n", rc); - exit(-1); - } -}*/ - -//static void *radioberry_thread(void* arg) { static gpointer radioberry_thread(gpointer arg) { fprintf(stderr, "radioberry_protocol: radioberry_thread\n"); @@ -207,39 +301,21 @@ static gpointer radioberry_thread(gpointer arg) { if (!isTransmitting()) { - sem_wait(&mutex); + g_mutex_lock(&mutex_rxtx); gpioWrite(21, 0); - //possible to use different sample rates per receiver... - //if the sample rate differs between the receivers.. the highest sample rate - //must be called an extra time. - if (receivers==1){ - rx1_spiReader(); - } else { - if (receiver[0]->sample_rate == receiver[1]->sample_rate) { - rx1_spiReader(); - rx2_spiReader(); - } else { - if (receiver[0]->sample_rate > receiver[1]->sample_rate) { - rx1_spiReader(); - rx1_spiReader(); - rx2_spiReader(); - } else { - rx1_spiReader(); - rx2_spiReader(); - rx2_spiReader(); - } - } - } - sem_post(&mutex); + rx1_spiReader(); + rx2_spiReader(); + + g_mutex_unlock(&mutex_rxtx); } } } void radioberry_protocol_iq_samples(int isample,int qsample) { - sem_wait(&mutex); + g_mutex_lock(&mutex_rxtx); int power=0; if(tune && !transmitter->tune_use_drive) { @@ -261,7 +337,7 @@ void radioberry_protocol_iq_samples(int isample,int qsample) { spiWriter(); - sem_post(&mutex); + g_mutex_unlock(&mutex_rxtx); } void *radioberry_protocol_process_local_mic(unsigned char *buffer,int le) { @@ -281,27 +357,6 @@ void *radioberry_protocol_process_local_mic(unsigned char *buffer,int le) { } } -static void handleReceiveStream(int r) { - int left_sample; - int right_sample; - double left_sample_double; - double right_sample_double; - - left_sample = (int)((signed char) iqdata[0]) << 16; - left_sample |= (int)((((unsigned char)iqdata[1]) << 8)&0xFF00); - left_sample |= (int)((unsigned char)iqdata[2]&0xFF); - right_sample = (int)((signed char) iqdata[3]) << 16; - right_sample |= (int)((((unsigned char)iqdata[4]) << 8)&0xFF00); - right_sample |= (int)((unsigned char)iqdata[5]&0xFF); - - left_sample_double=(double)left_sample/8388607.0; // 24 bit sample 2^23-1 - right_sample_double=(double)right_sample/8388607.0; // 24 bit sample 2^23-1 - - // add the samples to the receiver.. - add_iq_samples(receiver[r], left_sample_double,right_sample_double); - -} - void setSampleSpeed(int r) { switch(receiver[r]->sample_rate) { @@ -334,8 +389,10 @@ void radioberry_protocol_stop() { } void rx1_spiReader() { - // wait till rxFIFO buffer is filled with at least one element - while ( gpioRead(13) == 1) {}; + unsigned char iqdata[6]; + + // return till rxFIFO buffer is filled with at least one element + while ( gpioRead(13) == 1) {return;}; setSampleSpeed(0); @@ -359,8 +416,9 @@ void rx1_spiReader() { iqdata[5] = (rxFrequency & 0xFF); spiXfer(rx1_spi_handler, iqdata, iqdata, 6); + + put_rx1(iqdata); - //firmware: tdata(56'h00010203040506) -> 0-1-2-3-4-5-6 (element 0 contains 0; second element contains 1) rx1_count ++; if (rx1_count == 48000) { rx1_count = 0; @@ -369,14 +427,18 @@ void rx1_spiReader() { printf("Code rx1 mode spi executed in %f milliseconds.\n", elapsed); gettimeofday(&rx1_t0, 0); } + - handleReceiveStream(0); } void rx2_spiReader() { + + unsigned char iqdata[6]; + + if (receivers==1) return; - // wait till rx2FIFO buffer is filled with at least one element - while ( gpioRead(16) == 1) {}; + // return till rx2FIFO buffer is filled with at least one element + while ( gpioRead(16) == 1) {return;}; setSampleSpeed(1); @@ -400,8 +462,9 @@ void rx2_spiReader() { iqdata[5] = (rxFrequency & 0xFF); spiXfer(rx2_spi_handler, iqdata, iqdata, 6); + + put_rx2(iqdata); - //firmware: tdata(56'h00010203040506) -> 0-1-2-3-4-5-6 (element 0 contains 0; second element contains 1) rx2_count ++; if (rx2_count == 48000) { rx2_count = 0; @@ -411,10 +474,8 @@ void rx2_spiReader() { gettimeofday(&rx2_t0, 0); } - handleReceiveStream(1); } - void spiWriter() { gpioWrite(21, 1); @@ -447,3 +508,61 @@ void spiWriter() { gettimeofday(&t20, 0); } } + +void put_rx1(unsigned char* value) { + sem_wait(&empty); + + g_mutex_lock(&buf_rx1_mutex); + int i =0; + for (i; i< 6; i++){ + buffer_rx1[fill_rx1] = value[i]; + fill_rx1 = (fill_rx1 + 1) % MAX_RX_BUFFER; + } + g_mutex_unlock(&buf_rx1_mutex); + + sem_post(&full); +} + +void get_rx1(unsigned char buffer[]) { + sem_wait(&full); + + g_mutex_lock(&buf_rx1_mutex); + int i =0; + for (i; i< 6; i++){ + buffer[i] = buffer_rx1[use_rx1]; + use_rx1 = (use_rx1 + 1) % MAX_RX_BUFFER; + } + g_mutex_unlock(&buf_rx1_mutex); + + sem_post(&empty); +} + +void put_rx2(unsigned char* value) { + sem_wait(&empty_rx2); + + g_mutex_lock(&buf_rx2_mutex); + int i =0; + for (i; i< 6; i++){ + buffer_rx2[fill_rx2] = value[i]; + fill_rx2 = (fill_rx2 + 1) % MAX_RX_BUFFER; + } + g_mutex_unlock(&buf_rx2_mutex); + + sem_post(&full_rx2); +} + +void get_rx2(unsigned char buffer[]) { + sem_wait(&full_rx2); + + g_mutex_lock(&buf_rx2_mutex); + int i =0; + for (i; i< 6; i++){ + buffer[i] = buffer_rx2[use_rx2]; + use_rx2 = (use_rx2 + 1) % MAX_RX_BUFFER; + } + g_mutex_unlock(&buf_rx2_mutex); + + sem_post(&empty_rx2); +} + +// end of source -- 2.45.2