The Media Kit: BSubscriber

Derived from: public BObject

Declared in: <media/Subscriber.h>


Overview

BSubscriber objects receive and process buffers of media-specific data. These buffers are allocated and sent (to the BSubscriber) by a media server; for example, buffers of audio data are sent by the Audio Server. Each server can control more than one buffer stream (the Audio Server has a sound-in stream and a sound-out stream). A BSubscriber can receive buffers from only one stream.

More than one BSubscriber can "subscribe" to the same stream. The collection of same- stream BSubscribers stand shoulder-to-shoulder and pass buffers down the stream, in the style of a bucket brigade. When a BSubscriber receives a buffer it does something to it --typically, it examines, adds to, or filters the data it finds there--and then passes it to the next BSubscriber (or, more accurately, lets the server pass it to the next BSubscriber).

The media servers take care of managing the data buffers in their streams --they allocate new buffers, pass them between BSubscribers, clear existing buffers for re-use, and so on. A BSubscriber's primary tasks are these (and in this order):

The BSubscribers that subscribe to the same stream needn't belong to the same application. This means that your BSubscriber may be examining, adding to, or filtering data that was generated in another application.

Most buffer streams need to "flow" quickly and uninterruptedly (this is especially true of the Audio Server's streams). The processing that a single BSubscriber performs when it receives a buffer from the server should be as brief and efficient as possible.


Identifying a Server

BSubscriber is an abstract class--you never construct instances of BSubscriber directly. Instead, you construct instances of one of its derived classes. Each BSubscriber-derived class provided by the Media Kit corresponds to a particular media server. Identifying a server, therefore, is implied by the act of choosing a BSubscriber-derived class with which you instantiate an object.

Currently, the only BSubscriber-derived class that's supplied by the Media Kit is BAudioSubscriber. Instances of this class receive buffers from, obviously enough, the Audio Server.


Subscribing

The first thing you do with your BSubscriber object, once you've constructed it, is to ask its server's permission to be sent buffers of data. This is performed through the Subscribe() function. Subscription doesn't cause buffers to actually be sent, but it does get the BSubscriber into the ballpark. The act by which a BSubscriber receives buffers (the EnterStream() function) depends on a successful subscription.

As part of a BSubscriber's subscription, it must tell the server which stream it wants to enter, which other BSubscribers it's willing to share the buffer stream with, and whether it's willing to wait for "undesirable" brethren to get out of the stream before it gets in. The object's opinions on these topics are registered through arguments to the Subscribe() function:

long Subscribe(long stream, subscriber_id clique, bool willWait)

The arguments are discussed in the following sections.

The Stream

A server can shepherd more than one stream. For example, the Audio Server controls access to two streams: The sound-out stream terminates at the DAC, the sound-in stream begins at the ADC. You identify the stream you want by using one of the stream constants defined by the server. The Audio Server defines the constants B_DAC_STREAM for sound-out and B_ADC_STREAM for sound-in.

A BSubscriber may only subscribe to one stream at a time.

The Clique

Note: The clique concept is being reconsidered. You can still set a clique value, but the mechanism may be removed in a subsequent release, or moved into a different level of the Kit's software. For now, it's recommended that you always set the clique to B_SHARED_SUBSCRIBER_ID .

A BSubscriber's clique (passed as the clique argument to Subscribe()) identifies the cabal of BSubscribers that the calling object is willing to share the server's buffer stream with. The value of clique acts as a "key" to the stream: To gain access to the stream, you have to have the proper key.

Here's how it works: The first BSubscriber that calls Subscribe() passes some value as the clique argument. This value becomes the key to the buffer stream; any other BSubscriber that wants to subscribe to that stream must pass the same clique value (unless you want to be an "invisible" subscriber, as described in the next section). The actual value that's used to represent the clique is irrelevant; matching is the only concern. A given clique is enforced until all subscribed objects have unsubscribed (through the Unsubscribe() function) at which point the next object that subscribes will establish a new clique value.

Note: The clique argument is type cast as a subscriber_id. Such values are tokens that uniquely identify BSubscriber objects among all extant BSubscribers of the same class (across all applications). That the clique is represented as a subscriber_id is primarily a convenience: Just as the actual clique value has no significance, neither does its type imply any special properties about the clique.

Choosing a Clique Value

With regard to cliques, there are four types of BSubscribers: Those that want utterly exclusive access to the buffer stream, those that are willing to share access with certain (but not all) other BSubscribers, those that will share with any other BSubscriber, and those that want to crash the party. The clique value that you choose depends on which of these characterizations describes your BSubscriber:

      /* FirstSub is assumed to be a valid BSubscriber
       * object (currently, it must be an instance of
       * BAudioSubscriber).
       */
      subscriber_id firstID = FirstSub->ID();
      FirstSub->Subscribe(..., firstID, ...);

      /* First... */
      subscriber_id firstID = FirstSub->ID();
      FirstSub->Subscribe(..., firstID, ...);
      ...
      
      /* Notice that the second subscriber passes the 
       * first subscriber's ID value as the clique argument.
       */
      SecondSub->Subscribe(..., firstID, ...);

To share the stream with BSubscribers in other applications, the first subscriber's application would have to broadcast the first subscriber's ID value (through a BMessage, for example).

      FirstSub->Subscribe(..., B_SHARED_SUBSCRIBER_ID, ...);

Note, however, that the B_SHARED_SUBSCRIBER_ID clique doesn't guarantee that every BSubscriber will be allowed in the stream. If an unsharing BSubscriber has already set the clique to some other value, a BSubscriber that passes B_SHARED_SUBSCRIBER_ID will be turned down. Conversely, if the clique is set to B_SHARED_SUBSCRIBER_ID and a BSubscriber comes along that tries to subscribe with a less generous clique value, it's subscription will be denied.

Waiting for Access

If a BSubscriber is denied access to a server because it didn't pass the clique test, it can either give up immediately, or wait for the current clique members to unsubscribe. This is expressed in Subscribe()'s final argument, the boolean willWait:


Entering the Stream

Having successfully subscribed to a server's stream, the BSubscriber's next task is to enter the stream. By this, the object will begin receiving buffers of data. You do this through the EnterStream() function:

virtual long EnterStream(subscriber_id neighbor ,
bool before,
void *arg,
enter_stream_hook enterHook,
exit_stream_hook exitHook,
bool background)

The function's operations and arguments are described in the following sections.

Positioning your BSubscriber

The first two EnterStream() arguments position the BSubscriber with respect to the other BSubscriber objects that are already in the stream (if any):

EnterStream(subscriber_id neighbor, bool before, ...)

The neighbor argument identifies the BSubscriber (by its ID number, as returned by the ID() function) that you want the entering BSubscriber to stand next to; before places the entering object before (TRUE) or after (FALSE) the neighbor. The neighbor needn't belong to the same application as the entering object, but it must already have entered the stream.

If you want to place the BSubscriber at one or the other end of the stream (or to add the first BSubscriber to the stream), you pass NULL as the neighbor. A before value of TRUE thus places the BSubscriber at the "front" of the stream (the object will be the first to receive each buffer that flows through the stream), and a value of FALSE places it at the "back" (it's the last to receive buffers before they're realized or recycled).

A BSubscriber's position in the stream can't be locked. If, for example, you place your BSubscriber to stand at the back of the stream, some other BSubscriber--from some other application, possibly--can come along later and also claim the back. Your object will be bumped forward (towards the front of the stream) in deference to the newcomer.

Receiving and Processing Buffers

After your BSubscriber has entered the buffer stream, it will begin receiving buffers of data. The third, fourth, and last arguments to EnterStream() pertain to the means by which your object receives these buffers:

EnterStream(..., void *arg, enter_stream_hook entryHook, ..., bool background)

The arguments, taken out of order, are:

Of initial interest, here, is the "entry hook" that you must supply: This is global C function or static C++ member function that's invoked once for each buffer that the BSubscriber receives. The protocol for the function (which is typedef 'd as enter_stream_hook) is:

bool stream_function(void *arg, char * buffer, long count)

  • arg is the same as the arg argument that you passed to EnterStream().

  • buffer is a pointer to the buffer that has just arrived.

  • count is the number of bytes of data in the buffer.
  • You have to implement the entry hook yourself; the Media Kit doesn't supply any entry hook candidates. From within your implementation of the function, you're expected to process the data in buffer as fits your intentions. As mentioned earlier, your processing should be designed with efficiency in mind. The only rule by which you should abide is this:

    Don't Clear the Buffer

    If you're generating data, you should add it into the data that you find in the buffer. Thank-you.

    When you're done with your processing, you simply return from the entry hook. You don't have to do anything to send the buffer to the next BSubscriber in the stream; the Media Kit takes care of that for you. The value that the stream function returns is important: If it returns TRUE, the BSubscriber continues receiving buffers; if it returns FALSE , the object is removed from the stream.

    Exiting the Stream

    There are two ways to remove a BSubscriber from a stream. The first was mentioned above: Return FALSE from the stream function. The second method is to call ExitStream() directly. The ExitStream() function is particularly useful if you're running the stream function in the background and you want to pull the trigger from another thread.

    Whichever method is used, the BSubscriber's "exit hook" is invoked upon exiting the stream. This is an optional call-back function, similar to the stream function in its application, that you supply as the fifth argument to EnterStream():

    EnterStream(..., exit_stream_hook exitHook, ... )

    The protocol for the completion function is:

    long completion_function(void *arg, long error)

  • The arg value is, again, taken from the EnterStream() call.

  • error is a code that explains why the BSubscriber is exiting the stream.
  • Normally, error is B_NO_ERROR. This means that the BSubscriber is exiting naturally: Either because the stream function returned FALSE or because ExitStream() was called. If error is B_TIMED_OUT, then the BSubscriber is exiting because of a delay in receiving the next buffer. (You set the time-out limit through BSubscriber's SetTimeout() function, specifying the limit in microseconds; by default the object will wait forever.) Any other error code will have been generated by a lower-level entity and can be lumped into the general category of "something went wrong."

    The completion function is executed in the same thread as the stream function. If this isn't a background thread, the value returned by the completion function is then returned by EnterStream(). If you are using a background thread, the return value is lost.

    You can perform whatever clean-up is necessary in your implementation of the completion function. The only thing that you mustn't do in the completion function is delete the BSubscriber itself.


    Processing Data in a Member Function

    Typically, the stream functions is implemented as a "dummy" static member function of some class. In this case, EnterStream() 's arg argument is a pointer to an instance of that class. In the implemention of the static function, the "real" stream function is invoked on the arg pointer that the function receives. The class that implements the functions derive from BSubscriber.

    For example, in the (fictitious) SoundDuller class, a static function called _dull_sound() and a non-static function DullSound() are defined. Both of these functions are private. In addition, it defines public Start() and Stop() functions that will run the show, and some private variables --including a BAudioSubscriber object--that it requires to perform:

       class SoundDuller : public BObject
       {
          public:
             void Start(void);
             void Stop(void);
       
          private:
             static bool _dull_sound(void *arg, 
                         char *buf, 
                         long count);
             bool DullSound(char *buf, long count);
       
             BAudioSubscriber a_sub;
             short previous;
       }

    The implementation of _dull_sound() casts the arg pointer and then invokes DullSound():

       bool SoundDuller::_dull_sound(void *arg, char *buf, long count)
       {
          return (((SoundDuller *)arg)->DullSound(buf,count));
       }

    DullSound() performs the actual stream data processing. The function shown here implements a simple low-pass filter (the "HelloWorld" of signal processing). The function assumes that the stream data is one channel of 16-bit sound:

       bool SoundDuller::DullSound(char *buf, long count)
       {
          long short_count = count/2;
          short *s_buf = (short *)buf;
          
          while (short_count-- > 0) {
             *s_buf += previous;
             previous = *s_buf++;
          }
       }

    The Start() function initializes the BAudioSubscriber and the previous variable, and then calls EnterStream():

       void SoundDuller::Start(void)
       {
          if (a_sub.Subscribe(B_DAC_STREAM, B_SHARED_SUBSCRIBER_ID,
                         FALSE) < B_NO_ERROR)
             return;
          previous = 0;
       
          /* Enter at the stream's tail; run in the background. */
          a_sub.EnterStream(NULL, FALSE, 
                         this, _make_dull, NULL, TRUE);
       }

    Stop() removes the subscriber from the stream by calling ExitStream(). The function's argument says whether we want to wait until the object is really out of the stream; it's always a good idea to re-synchronize if the subscriber is running in the background:

       void SoundDuller::Stop(void)
       {
          a_sub.ExitStream(TRUE);
          a_sub.Unsubscribe();
       }

    Sound details used in this example, such as the meaning of the B_DAC_STREAM constant, are explained in the BAudioSubscriber class. For another example of a stream function implementation, see the BSoundFile class.


    Constructor and Destructor


    BSubscriber()

          BSubscriber(const char *name = NULL)

    Creates and returns a new BSubscriber object. The object can be given a name; the name needn't be unique.

    After creating a BSubscriber, you typically do the following (in this order):

  • Subscribe the object to a buffer stream by calling Subscribe().

  • Allow the object to begin receiving buffers by calling EnterStream().
  • The construction of a BSubscriber never fails. This function doesn't set the object's Error() value.

    See also: Subscribe(), EnterStream()


    ~BSubscriber()

          virtual ~BSubscriber(void)

    Destroys the BSubscriber. You should never delete a BSubscriber from within an implementation of the object's stream function or completion function.

    It isn't necessary to tell the object to exit the buffer stream or to unsubscribe it before deleting. These actions will happen automatically.


    Member Functions


    Clique()

          subscriber_id Clique(void)

    Returns the clique (a subscriber_id value) that this BSubscriber used in its most recent attempt to subscribe. The attempt need not have been successful, nor is there any guarantee that the object hasn't since unsubscribed. If the object hasn't attempted to subscribe, this returns B_NO_SUBSCRIBER_ID .

    See also: Subscribe()


    EnterStream()

          virtual long EnterStream(subscriber_id neighbor,
             bool before,
             void *arg,
             enter_stream_hook streamFunction,
             exit_stream_hook completionFunction,
             bool background)

    Causes the BSubscriber to begin receiving buffers of data from its stream. The object must have successfully subscribed (through a call to Subscribe()) for this function to succeed.

    The arguments to this function (and the function in general) is the topic of most of the overview to this class; look there for the whole story. Briefly, the arguments are:

    If the designated neighbor isn't in the buffer stream, EnterStream() returns B_SUBSCRIBER_NOT_FOUND . If the BSubscriber is already in the stream, B_BAD_SUBSCRIBER is returned.

    If background is TRUE, EnterStream() immediately returns B_NO_ERROR; if it's FALSE, EnterStream() returns the value returned by the completion function. If a completion function isn't supplied, EnterStream() returns a value that indicates the success of the communication with the server; unless something's gone wrong, it should return B_NO_ERROR. In all cases, the Error() value is set to the value returned here.

    See also: ExitStream()


    Error()

          long Error(void)

    Returns an error code that reflects the success of the function that was most recently invoked upon this object. The error codes that a particular function uses are listed in that function's description.


    ExitStream()

          virtual long ExitStream(bool andWait = FALSE)

    Causes the BSubscriber to leave the buffer stream after it completes the processing of its current buffer. If andWait is TRUE, the function doesn't return until the object has completed processing this final buffer and has actually left the stream. If a completion function was supplied in the EnterStream() invocation, it will run to completion before ExitStream() returns. If andWait is FALSE (the default), ExitStream() returns immediately.

    If the object isn't in the stream, the B_SUBSCRIBER_NOT_FOUND is returned. Otherwise the function returns B_NO_ERROR .

    Note: In release 1.1d7, ExitStream() doesn't return a reliable value --but it does set the error code properly.

    See also: EnterStream()


    ID()

          subscriber_id ID(void)

    Returns the subscriber_id value that uniquely identifies this BSubscriber. A subscriber ID is issued when the object subscribes to a stream; it's withdrawn when the object unsubscribes. ID values are used, primarily, to position a BSubscriber with respect to some other BSubscriber within a buffer stream.

    If the BSubscriber isn't currently subscribed to a stream, B_NO_SUBSCRIBER_ID is returned.


    IsInStream()

          bool IsInStream(void)

    Returns TRUE if the object is currently in a stream; otherwise it returns FALSE .


    Name()

          const char *Name(void)

    Returns a pointer to the name of the BSubscriber. The name is set through an argument to the BSubscriber constructor.


    SetTimeout(), Timeout()

          void SetTimeout(double microseconds)
          double Timeout(void)

    These functions set and return the amount of time, measured in microseconds, that a BSubscriber that has entered the buffer stream is willing to wait from the time that it finishes processing one buffer till the time that it gets the next. If the time limit expires before the next buffer arrives, the BSubscriber exits the stream and the completion function is called with its error argument set to B_TIMED_OUT .

    A time limit of 0 (the default) means no time limit--the BSubscriber will wait forever for its next buffer.

    See also: EnterStream()


    StreamParameters()

          long StreamParameters(long *bufferSize,
             long *bufferCount,
             bool *isRunning,
             long *subscriberCount,
             subscriber_id *clique)

    Returns information about the stream to which the BSubscriber is currently subscribed:

    You can set the buffer size and buffer count parameters (and so fine-tune the latency of the stream) through the SetStreamBuffers() function. isRunning can be toggled through calls to StartStreaming() and StopStreaming(). The other two parameters (subscriberCount and clique) vary as subscribers come and go.

    You must have successfully subscribed to the stream to call this function. If you haven't, B_BAD_SUBSCRIBER is returned. Otherwise, the function returns B_NO_ERROR.


    SetStreamBuffers()

          long SetStreamBuffers(long bufferSize, long bufferCount)

    Sets the size (in bytes) and number of buffers that are used to transport data through the stream. Although it's up to the server to provide reasonable default values, you can fine- tune the performance of the stream by fiddling with this function:

    You must have successfully subscribed to the stream to call this function. If you haven't, B_RESOURCE_UNAVAILABLE is returned. Otherwise, the function returns B_NO_ERROR.

    The Audio Server initializes its streams to use eight buffers (per stream), where each buffer is a single page (4096 bytes). Currently, there's no way to automatically restore these default values after you've mangled one of the audio streams.


    StartStreaming(), StopStreaming()

          long StartStreaming(void)
          long StopStreaming(void)

    Starts and stops the passing of buffers through the stream to which the BSubscriber is subscribed. By default, the stream begins running when the first BSubscriber enters it, and it stops when the final remaining BSubscriber exits. You should only need to call StartStreaming() or StopStreaming() if you want to interrupt this automation.

    You must have successfully subscribed to the stream to call this function. If you haven't, B_RESOURCE_UNAVAILABLE is returned. Otherwise, the function returns B_NO_ERROR.


    Subscribe()

          virtual long Subscribe(long stream, subscriber_id clique, bool willWait)

    Asks for admission into the server's list of BSubscribers to which it (the server) will send buffers of data. Subscribing doesn't cause the BSubscriber to begin receiving buffers, it simply gives the object the right to do so. (To receive buffers, you must invoke EnterStream() on a BSubscriber that has successfully subscribed.)

    The arguments are described fully in the overview to this class. Briefly, they are:

    A successful subscription returns B_NO_ERROR. If the subscription is denied (because stream doesn't identify a valid stream, or the clique value isn't acceptable) and the BSubscriber isn't waiting, Subscribe() returns RESOURCE_NOT_AVAILABLE. The Error() value is set to the value returned directly here.

    Note: The timeout value that you can set through the SetTimeout() function doesn't apply to subscription (it only applies to the inter-buffer lacuna). A BSubscriber that's willing to wait for admission might be waiting a long time.

    See also: Unsubscribe()


    Timeout() see SetTimeout()


    Unsubscribe()

          virtual long Unsubscribe(void)

    Revokes the BSubscriber's access to its media server and sets its subscriber ID to B_NO_SUBSCRIBER_ID . If the object is currently in a stream, it automatically exits the stream and the object's completion function is called.

    When you delete a BSubscriber, it's automatically unsubscribed.

    If the object isn't currently subscribed, the function returns B_BAD_SUBSCRIBER. Otherwise, it returns B_NO_ERROR .

    See also: Subscribe()




    The Be Book, HTML Edition, for Developer Release 8 of the Be Operating System.

    Copyright © 1996 Be, Inc. All rights reserved.

    Be, the Be logo, BeBox, BeOS, BeWare, and GeekPort are trademarks of Be, Inc.

    Last modified September 6, 1996.