root/oscpack/trunk/ip/posix/UdpSocket.cpp

Revision 39, 14.7 kB (checked in by ross, 3 years ago)

added code to implement UdpSocket::AsynchronousBreak?() on posix systems

  • Property svn:eol-style set to native
Line 
1 /*
2         oscpack -- Open Sound Control packet manipulation library
3         http://www.audiomulch.com/~rossb/oscpack
4
5         Copyright (c) 2004-2005 Ross Bencina <rossb@audiomulch.com>
6
7         Permission is hereby granted, free of charge, to any person obtaining
8         a copy of this software and associated documentation files
9         (the "Software"), to deal in the Software without restriction,
10         including without limitation the rights to use, copy, modify, merge,
11         publish, distribute, sublicense, and/or sell copies of the Software,
12         and to permit persons to whom the Software is furnished to do so,
13         subject to the following conditions:
14
15         The above copyright notice and this permission notice shall be
16         included in all copies or substantial portions of the Software.
17
18         Any person wishing to distribute modifications to the Software is
19         requested to send the modifications to the original developer so that
20         they can be incorporated into the canonical version.
21
22         THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23         EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24         MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
25         IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
26         ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
27         CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28         WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 */
30 #include "ip/UdpSocket.h"
31
32 #include <vector>
33 #include <algorithm>
34 #include <stdexcept>
35 #include <assert.h>
36 #include <signal.h>
37 #include <math.h>
38 #include <errno.h>
39 #include <string.h> // for memset
40
41 #include <pthread.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44 #include <stdio.h>
45 #include <netdb.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <sys/time.h>
49 #include <netinet/in.h> // for sockaddr_in
50
51 #include "ip/PacketListener.h"
52 #include "ip/TimerListener.h"
53
54
55 #if defined(__APPLE__) && !defined(_SOCKLEN_T)
56 // pre system 10.3 didn have socklen_t
57 typedef ssize_t socklen_t;
58 #endif
59
60
61 static void SockaddrFromIpEndpointName( struct sockaddr_in& sockAddr, const IpEndpointName& endpoint )
62 {
63     memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
64     sockAddr.sin_family = AF_INET;
65
66         sockAddr.sin_addr.s_addr =
67                 (endpoint.address == IpEndpointName::ANY_ADDRESS)
68                 ? INADDR_ANY
69                 : htonl( endpoint.address );
70
71         sockAddr.sin_port =
72                 (endpoint.port == IpEndpointName::ANY_PORT)
73                 ? 0
74                 : htons( endpoint.port );
75 }
76
77
78 static IpEndpointName IpEndpointNameFromSockaddr( const struct sockaddr_in& sockAddr )
79 {
80         return IpEndpointName(
81                 (sockAddr.sin_addr.s_addr == INADDR_ANY)
82                         ? IpEndpointName::ANY_ADDRESS
83                         : ntohl( sockAddr.sin_addr.s_addr ),
84                 (sockAddr.sin_port == 0)
85                         ? IpEndpointName::ANY_PORT
86                         : ntohs( sockAddr.sin_port )
87                 );
88 }
89
90
91 class UdpSocket::Implementation{
92         bool isBound_;
93         bool isConnected_;
94
95         int socket_;
96         struct sockaddr_in connectedAddr_;
97         struct sockaddr_in sendToAddr_;
98
99 public:
100
101         Implementation()
102                 : isBound_( false )
103                 , isConnected_( false )
104                 , socket_( -1 )
105         {
106                 if( (socket_ = socket( AF_INET, SOCK_DGRAM, 0 )) == -1 ){
107             throw std::runtime_error("unable to create udp socket\n");
108         }
109
110                 memset( &sendToAddr_, 0, sizeof(sendToAddr_) );
111         sendToAddr_.sin_family = AF_INET;
112         }
113
114         ~Implementation()
115         {
116                 if (socket_ != -1) close(socket_);
117         }
118
119         IpEndpointName LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
120         {
121                 assert( isBound_ );
122
123                 // first connect the socket to the remote server
124         
125         struct sockaddr_in connectSockAddr;
126                 SockaddrFromIpEndpointName( connectSockAddr, remoteEndpoint );
127        
128         if (connect(socket_, (struct sockaddr *)&connectSockAddr, sizeof(connectSockAddr)) < 0) {
129             throw std::runtime_error("unable to connect udp socket\n");
130         }
131
132         // get the address
133
134         struct sockaddr_in sockAddr;
135         memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
136         socklen_t length = sizeof(sockAddr);
137         if (getsockname(socket_, (struct sockaddr *)&sockAddr, &length) < 0) {
138             throw std::runtime_error("unable to getsockname\n");
139         }
140        
141                 if( isConnected_ ){
142                         // reconnect to the connected address
143                         
144                         if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
145                                 throw std::runtime_error("unable to connect udp socket\n");
146                         }
147
148                 }else{
149                         // unconnect from the remote address
150                 
151                         struct sockaddr_in unconnectSockAddr;
152                         memset( (char *)&unconnectSockAddr, 0, sizeof(unconnectSockAddr ) );
153                         unconnectSockAddr.sin_family = AF_UNSPEC;
154                         // address fields are zero
155                         int connectResult = connect(socket_, (struct sockaddr *)&unconnectSockAddr, sizeof(unconnectSockAddr));
156                         if ( connectResult < 0 && errno != EAFNOSUPPORT ) {
157                                 throw std::runtime_error("unable to un-connect udp socket\n");
158                         }
159                 }
160
161                 return IpEndpointNameFromSockaddr( sockAddr );
162         }
163
164         void Connect( const IpEndpointName& remoteEndpoint )
165         {
166                 SockaddrFromIpEndpointName( connectedAddr_, remoteEndpoint );
167        
168         if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
169             throw std::runtime_error("unable to connect udp socket\n");
170         }
171
172                 isConnected_ = true;
173         }
174
175         void Send( const char *data, int size )
176         {
177                 assert( isConnected_ );
178
179         send( socket_, data, size, 0 );
180         }
181
182     void SendTo( const IpEndpointName& remoteEndpoint, const char *data, int size )
183         {
184                 sendToAddr_.sin_addr.s_addr = htonl( remoteEndpoint.address );
185         sendToAddr_.sin_port = htons( remoteEndpoint.port );
186
187         sendto( socket_, data, size, 0, (sockaddr*)&sendToAddr_, sizeof(sendToAddr_) );
188         }
189
190         void Bind( const IpEndpointName& localEndpoint )
191         {
192                 struct sockaddr_in bindSockAddr;
193                 SockaddrFromIpEndpointName( bindSockAddr, localEndpoint );
194
195         if (bind(socket_, (struct sockaddr *)&bindSockAddr, sizeof(bindSockAddr)) < 0) {
196             throw std::runtime_error("unable to bind udp socket\n");
197         }
198
199                 isBound_ = true;
200         }
201
202         bool IsBound() const { return isBound_; }
203
204     int ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, int size )
205         {
206                 assert( isBound_ );
207
208                 struct sockaddr_in fromAddr;
209         socklen_t fromAddrLen = sizeof(fromAddr);
210                  
211         int result = recvfrom(socket_, data, size, 0,
212                     (struct sockaddr *) &fromAddr, (socklen_t*)&fromAddrLen);
213                 if( result < 0 )
214                         return 0;
215
216                 remoteEndpoint.address = ntohl(fromAddr.sin_addr.s_addr);
217                 remoteEndpoint.port = ntohs(fromAddr.sin_port);
218
219                 return result;
220         }
221
222         int Socket() { return socket_; }
223 };
224
225 UdpSocket::UdpSocket()
226 {
227         impl_ = new Implementation();
228 }
229
230 UdpSocket::~UdpSocket()
231 {
232         delete impl_;
233 }
234
235 IpEndpointName UdpSocket::LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
236 {
237         return impl_->LocalEndpointFor( remoteEndpoint );
238 }
239
240 void UdpSocket::Connect( const IpEndpointName& remoteEndpoint )
241 {
242         impl_->Connect( remoteEndpoint );
243 }
244
245 void UdpSocket::Send( const char *data, int size )
246 {
247         impl_->Send( data, size );
248 }
249
250 void UdpSocket::SendTo( const IpEndpointName& remoteEndpoint, const char *data, int size )
251 {
252         impl_->SendTo( remoteEndpoint, data, size );
253 }
254
255 void UdpSocket::Bind( const IpEndpointName& localEndpoint )
256 {
257         impl_->Bind( localEndpoint );
258 }
259
260 bool UdpSocket::IsBound() const
261 {
262         return impl_->IsBound();
263 }
264
265 int UdpSocket::ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, int size )
266 {
267         return impl_->ReceiveFrom( remoteEndpoint, data, size );
268 }
269
270
271 struct AttachedTimerListener{
272         AttachedTimerListener( int id, int p, TimerListener *tl )
273                 : initialDelayMs( id )
274                 , periodMs( p )
275                 , listener( tl ) {}
276         int initialDelayMs;
277         int periodMs;
278         TimerListener *listener;
279 };
280
281
282 static bool CompareScheduledTimerCalls(
283                 const std::pair< double, AttachedTimerListener > & lhs, const std::pair< double, AttachedTimerListener > & rhs )
284 {
285         return lhs.first < rhs.first;
286 }
287
288
289 SocketReceiveMultiplexer *multiplexerInstanceToAbortWithSigInt_ = 0;
290
291 extern "C" /*static*/ void InterruptSignalHandler( int );
292 /*static*/ void InterruptSignalHandler( int )
293 {
294         multiplexerInstanceToAbortWithSigInt_->AsynchronousBreak();
295         signal( SIGINT, SIG_DFL );
296 }
297
298
299 class SocketReceiveMultiplexer::Implementation{
300         std::vector< std::pair< PacketListener*, UdpSocket* > > socketListeners_;
301         std::vector< AttachedTimerListener > timerListeners_;
302
303         volatile bool break_;
304         int breakPipe_[2]; // [0] is the reader descriptor and [1] the writer
305
306         double GetCurrentTimeMs() const
307         {
308                 struct timeval t;
309
310                 gettimeofday( &t, 0 );
311
312                 return ((double)t.tv_sec*1000.) + ((double)t.tv_usec / 1000.);
313         }
314
315 public:
316     Implementation()
317         {
318                 if( pipe(breakPipe_) != 0 )
319                         throw std::runtime_error( "creation of asynchronous break pipes failed\n" );
320         }
321
322     ~Implementation()
323         {
324                 close( breakPipe_[0] );
325                 close( breakPipe_[1] );
326         }
327
328     void AttachSocketListener( UdpSocket *socket, PacketListener *listener )
329         {
330                 assert( std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) ) == socketListeners_.end() );
331                 // we don't check that the same socket has been added multiple times, even though this is an error
332                 socketListeners_.push_back( std::make_pair( listener, socket ) );
333         }
334
335     void DetachSocketListener( UdpSocket *socket, PacketListener *listener )
336         {
337                 std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i =
338                                 std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) );
339                 assert( i != socketListeners_.end() );
340
341                 socketListeners_.erase( i );
342         }
343
344     void AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
345         {
346                 timerListeners_.push_back( AttachedTimerListener( periodMilliseconds, periodMilliseconds, listener ) );
347         }
348
349         void AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
350         {
351                 timerListeners_.push_back( AttachedTimerListener( initialDelayMilliseconds, periodMilliseconds, listener ) );
352         }
353
354     void DetachPeriodicTimerListener( TimerListener *listener )
355         {
356                 std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
357                 while( i != timerListeners_.end() ){
358                         if( i->listener == listener )
359                                 break;
360                         ++i;
361                 }
362
363                 assert( i != timerListeners_.end() );
364
365                 timerListeners_.erase( i );
366         }
367
368     void Run()
369         {
370                 break_ = false;
371
372                 // configure the master fd_set for select()
373
374                 fd_set masterfds, tempfds;
375                 FD_ZERO( &masterfds );
376                 FD_ZERO( &tempfds );
377                
378                 // in addition to listening to the inbound sockets we
379                 // also listen to the asynchronous break pipe, so that AsynchronousBreak()
380                 // can break us out of select() from another thread.
381                 FD_SET( breakPipe_[0], &masterfds );
382                 int fdmax = breakPipe_[0];             
383
384                 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
385                                 i != socketListeners_.end(); ++i ){
386
387                         if( fdmax < i->second->impl_->Socket() )
388                                 fdmax = i->second->impl_->Socket();
389                         FD_SET( i->second->impl_->Socket(), &masterfds );
390                 }
391
392
393                 // configure the timer queue
394                 double currentTimeMs = GetCurrentTimeMs();
395
396                 // expiry time ms, listener
397                 std::vector< std::pair< double, AttachedTimerListener > > timerQueue_;
398                 for( std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
399                                 i != timerListeners_.end(); ++i )
400                         timerQueue_.push_back( std::make_pair( currentTimeMs + i->initialDelayMs, *i ) );
401                 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
402
403                 const int MAX_BUFFER_SIZE = 4098;
404                 char *data = new char[ MAX_BUFFER_SIZE ];
405                 IpEndpointName remoteEndpoint;
406
407                 struct timeval timeout;
408
409                 while( !break_ ){
410                         tempfds = masterfds;
411
412                         struct timeval *timeoutPtr = 0;
413                         if( !timerQueue_.empty() ){
414                                 double timeoutMs = timerQueue_.front().first - GetCurrentTimeMs();
415                                 if( timeoutMs < 0 )
416                                         timeoutMs = 0;
417                        
418                                 // 1000000 microseconds in a second
419                                 timeout.tv_sec = (long)(timeoutMs * .001);
420                                 timeout.tv_usec = (long)((timeoutMs - (timeout.tv_sec * 1000)) * 1000);
421                                 timeoutPtr = &timeout;
422                         }
423
424                         if( select( fdmax + 1, &tempfds, 0, 0, timeoutPtr ) < 0 && errno != EINTR ){
425                                 throw std::runtime_error("select failed\n");
426                         }
427
428                         if ( FD_ISSET( breakPipe_[0], &tempfds ) ){
429                                 // clear pending data from the asynchronous break pipe
430                                 char c;
431                                 read( breakPipe_[0], &c, 1 );
432                         }
433                        
434                         if( break_ )
435                                 break;
436
437                         for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
438                                         i != socketListeners_.end(); ++i ){
439
440                                 if( FD_ISSET( i->second->impl_->Socket(), &tempfds ) ){
441
442                                         int size = i->second->ReceiveFrom( remoteEndpoint, data, MAX_BUFFER_SIZE );
443                                         if( size > 0 ){
444                                                 i->first->ProcessPacket( data, size, remoteEndpoint );
445                                                 if( break_ )
446                                                         break;
447                                         }
448                                 }
449                         }
450
451                         // execute any expired timers
452                         currentTimeMs = GetCurrentTimeMs();
453                         bool resort = false;
454                         for( std::vector< std::pair< double, AttachedTimerListener > >::iterator i = timerQueue_.begin();
455                                         i != timerQueue_.end() && i->first <= currentTimeMs; ++i ){
456
457                                 i->second.listener->TimerExpired();
458                                 if( break_ )
459                                         break;
460
461                                 i->first += i->second.periodMs;
462                                 resort = true;
463                         }
464                         if( resort )
465                                 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
466                 }
467
468                 delete [] data;
469         }
470
471     void Break()
472         {
473                 break_ = true;
474         }
475
476     void AsynchronousBreak()
477         {
478                 break_ = true;
479
480                 // Send a termination message to the asynchronous break pipe, so select() will return
481                 write( breakPipe_[1], "!", 1 );
482         }
483 };
484
485
486
487 SocketReceiveMultiplexer::SocketReceiveMultiplexer()
488 {
489         impl_ = new Implementation();
490 }
491
492 SocketReceiveMultiplexer::~SocketReceiveMultiplexer()
493 {       
494         delete impl_;
495 }
496
497 void SocketReceiveMultiplexer::AttachSocketListener( UdpSocket *socket, PacketListener *listener )
498 {
499         impl_->AttachSocketListener( socket, listener );
500 }
501
502 void SocketReceiveMultiplexer::DetachSocketListener( UdpSocket *socket, PacketListener *listener )
503 {
504         impl_->DetachSocketListener( socket, listener );
505 }
506
507 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
508 {
509         impl_->AttachPeriodicTimerListener( periodMilliseconds, listener );
510 }
511
512 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
513 {
514         impl_->AttachPeriodicTimerListener( initialDelayMilliseconds, periodMilliseconds, listener );
515 }
516
517 void SocketReceiveMultiplexer::DetachPeriodicTimerListener( TimerListener *listener )
518 {
519         impl_->DetachPeriodicTimerListener( listener );
520 }
521
522 void SocketReceiveMultiplexer::Run()
523 {
524         impl_->Run();
525 }
526
527 void SocketReceiveMultiplexer::RunUntilSigInt()
528 {
529         assert( multiplexerInstanceToAbortWithSigInt_ == 0 ); /* at present we support only one multiplexer instance running until sig int */
530         multiplexerInstanceToAbortWithSigInt_ = this;
531         signal( SIGINT, InterruptSignalHandler );
532         impl_->Run();
533         signal( SIGINT, SIG_DFL );
534         multiplexerInstanceToAbortWithSigInt_ = 0;
535 }
536
537 void SocketReceiveMultiplexer::Break()
538 {
539         impl_->Break();
540 }
541
542 void SocketReceiveMultiplexer::AsynchronousBreak()
543 {
544         impl_->AsynchronousBreak();
545 }
546
Note: See TracBrowser for help on using the browser.