However, there are some things that muscled does not do, such as read or write from the hard disk, authenticate users, or implement application-specific logic. But if your client-server application needs to do such things, don't worry, the muscled architecture can be seamlessly extended to include the functionality you require. This document will show you how to do that.
This document assumes that you have some basic knowledge of C++, and how MUSCLE works (at least from a client program's perspective). If you don't, you may want to review the Beginner's Guide to MUSCLE programming, or the MUSCLE client programming BeNews article. You will want to have the MUSCLE header files (muscle/*/*.h) handy as you read along, as this document does not attempt to be comprehensive in its API descriptions.
Note that the server side of MUSCLE is 100% single-threaded. It uses select(), non-blocking I/O, and message queues to achieve a multithreaded effect, but is single-threaded for purposes of stability and portability. Trying to integrate multithreaded code into a MUSCLE server is probably not a good idea, and would require serialization of all MUSCLE method calls, at the very least. I suggest you stick with the single-thread model if you can, it's not as bad as you think ;^).
When the server receives a new TCP connection from a client, it uses the factory object passed in to its constructor to create a new AbstractReflectSession object. After the session object is created and initialized, this method is called on it. Here is where you can do any setup work that needs to be done for your new client, and return B_NO_ERROR if everything is okay. If something has gone horribly wrong, you can return B_ERROR, and the connection to the client will be closed, and this session object deleted. Be sure to call your superclass's AttachedToServer() method as the first thing in your implementation, otherwise your session won't get set up properly!
This method is called whenever your session's client computer has sent a PortableMessage to the MUSCLE server. You would generally implement this method to parse the PortableMessage and respond appropriately to your client's request. Be sure to call your superclass's implementation of this method for any messages that you can't parse yourself.
This method is called whenever another AbstractReflectSession has sent you a PortableMessage. It is via this method that clients can interact with each other, e.g. for client A to send a message through the MUSCLE server to client B, it would go something like this:
When implementing your own session logic, there are several different "pre-made" AbstractReflectSession subclasses to choose from. Depending on how much code you want to re-use, you can override AbstractReflectSession directly and handle everything yourself, or you can override one of the following, and take advantage of the logic already provided:
Called when the server starts up. You can do any initialization work here, and return B_NO_ERROR if you are ready to go. (B_ERROR will abort startup).
Called just before the server shuts down. You can do any tidying up here if you like. (Note that this is not currently called if the server gets shut down with a control-C...)
If you need actions to be performed at a specific time (as opposed to only doing thing in response to incoming PortableMessages), you can override these two methods. Note that this sort of behaviour is discouraged if you can avoid it, as it is easy to abuse. (Remember that polling is inefficient and evil!) GetPulseTime() can be overridden to specify when Pulse() should be called next. By default, Pulse() is never called.
// Factory class for my custom session type class MySessionFactory : public ReflectSessionFactory { public: virtual AbstractReflectSession * CreateSession() { AbstractReflectSession * ret = newnothrow MyReflectSessionSubclass(); if (ret == NULL) WARN_OUT_OF_MEMORY; return ret; } } int main(int argc, char ** argv) { ReflectServer myServer; // instantiate regular ReflectServer or your own subclass myServer.PutAcceptPort(new MySessionFactory, 2960); // Whenever we get a connection on port 2960, use my factory to create a new session object status_t ret = myServer.ServerProcessLoop(); // everything happens here! Will not return for a long time (if ever!) if (ret == B_NO_ERROR) printf("Server terminated normally.\n"); else printf("Server exited with an error condition!\n"); myServer.Cleanup(); return 0; }The ServerProcessLoop() method is where all the serving get done--it won't return until the server decides to quit--either due to a critical error, or because your code told it to shut down.
Note that if you are creating a new, custom server application, you almost certainly WON'T need to make your own AbstractMessageIOGateway subclass. MUSCLE comes with a very nice PortableMessageIOGateway class, which knows how to translate any PortableMessage into the standard MUSCLE byte-stream format. The PortableMessageIOGateway class is used by default, and in most cases, it is all you need. An additional benefit to sticking with the standard PortableMessageIOGateway class is that you will remain stream-compatible with other MUSCLE code that is out there--defining your own AbstractMessageIOGateway class means breaking protocol compatibility, which may cause headaches later on. Nonetheless, there are situations where a custom IO gateway is useful:
This method must return true if and only if you have bytes ready to send out over the TCP connection. It is used to determine whether or not the MUSCLE server should wake up and call DoOutput() when their is room to send more bytes.
This method is called when you have bytes to send (see above) and there is space in the TCP buffers to send it. You should send as many bytes as possible to your PortableDataIO object (get messages from the outgoing message queue, convert them to bytes, send the bytes, repeat), add the number of bytes you were able to send to (addWrittenBytes), and then return B_NO_ERROR. Returning B_ERROR is taken to mean that the stream has been broken, and will usually result in the termination of the session. This method should never block.
This method is called whenever incoming bytes are available on your PortableDataIO object's TCP stream. You should read as many bytes as possible, add the number of bytes you were able to read to (addReadBytes), and then convert as many bytes as possible into PortableMessage objects, and add them to the incoming message queue for eventual pickup by the session object. Any bytes that represent a partial message (i.e. you don't have the entire PortableMessage's worth yet) should be held for next time.
AbstractMessageIOGateway * MyReflectSession :: CreateGateway(int socket) { TCPSocketDataIO * io = newnothrow TCPSocketDataIO(socket, false); // false == non-blocking I/O if (io) { MyGateway * gw = newnothrow MyGateway(io); // Custom I/O gateway! if (gw) return gw; // success! Return early. else { io->ReleaseSocket(); // caller still owns the socket on error delete io; } } WARN_OUT_OF_MEMORY; return NULL; }
This method returns a reference to a PortableMessage that is shared by all sessions. Any session may access or alter this message, and other sessions can call GetCentralState() to see the changes. For example, StorageReflectSession uses this message to store the root of the node database. Please be gentle while altering the central state message, however, as other sessions may be relying on their data being there.
Returns a list of references to all currently attached sessions. Handy if you need "down and dirty" direct access to the other sessions. (Note that this type of direct access usually leads to dynamic casting, and the same functionality can often be implemented more cleanly by message passing)
Given a session ID, returns you a reference to that session's object... or B_ERROR if no such session exists.
Adds the given new session to the session list. (socket) should be the TCP socket used by the new session (the server will select() on this to determine when incoming data is available), or -1 if the new session has no TCP socket of its own (i.e. a 'fake' session)
Sort of a session's hara-kiri method. Call this on a session and that session will be marked for removal and deletion by the server at the next opportunity. The session isn't deleted immediately (as it may be in the middle of an operation), but will be ASAP.
Causes the session to be terminated, and (newSession) to be put in its place. (newSession) will be given the same TCP socket and IOGateway as the old session had. This method is a quick way to swap out the server-side logic of a session while maintaining its connection to the client.
void MyReflectSession :: SetupMyStuff(const PortableMessage & msg1, const PortableMessage & msg2, const PortableMessage & msg3) { // Add our three messages as nodes in our database PortableMessage * msg = GetMessageFromPool(PR_COMMAND_SETDATA); if (msg) { msg->AddMessage("appnodes/node1", msg1); msg->AddMessage("appnodes/node2", msg2); msg->AddMessage("appnodes/node3", msg3); PortableMessageRef msgRef(msg, GetMessagePool(), GetMessageRefPool()); StorageReflectSession::MessageReceivedFromGateway(msgRef); } else WARN_OUT_OF_MEMORY; }The above technique will work for any message type that is supported for clients (i.e. PR_COMMAND_*). The semantics will be the same as they would be for the client, if it were to send the same message.
If you need more control than the client messaging API can provide, however, you can call the public and protected members of the StorageReflectSession class directly. For example, you can create or set nodes in your subtree of the database by calling SetDataNode():
void MyReflectSession :: SetupMyStuff2(const PortableMessage & msg1, const PortableMessage & msg2, const PortableMessage & msg3) { PortableMessageRef msgRef1(new PortableMessage(msg1), GetMessagePool(), GetMessageRefPool()); PortableMessageRef msgRef2(new PortableMessage(msg2), GetMessagePool(), GetMessageRefPool()); PortableMessageRef msgRef3(new PortableMessage(msg3), GetMessagePool(), GetMessageRefPool()); SetDataNode("appnodes/node1", msgRef1, true, true); SetDataNode("appnodes/node2", msgRef2, true, true); SetDataNode("appnodes/node3", msgRef3, true, true); }This method gives you some additional control, allowing you to specify whether or not nodes should be created or overwritten (the third and fourth arguments of SetDataNode()).
Sometimes you'll want to do your own queries of the server-side database. One way to do this would be to create a PR_COMMAND_GETDATA message, pass it to StorageReflectSession::MessageReceivedFromGateway(), and then parse the resulting PR_RESULTS_DATAITEMS message that is given to your MessageReceivedFromNeighbor() method. But that method is somewhat inefficient, and a little bit error-prone (how are you to know which PR_REZULTS_DATAITEMS message corresponds with which PR_COMMAND_GETDATA message?). A better way to do it is by setting up your own query callback. In the following example, we will execute a query for all connected clients' session nodes ("*/*"), and our callback will be executed once for every node that matches the query string.
void MyReflectSession :: FindAllSessionNodes() { PortableQueueThe above code also demonstrates the use of the (userData) field to carry additional information into the callback. Whatever value you pass in as the last argument of DoTraversal() is passed back to your callback method, to do with as you wish. Here it is used to pass in a pointer to a PortableQueue to which the path names of the matching nodes are added.myList; // collect results here WildPathMatcher matcher; matcher.AddPathString("*/*"); matcher.DoTraversal((PathMatchCallback)MyCallback, this, *_globalRoot, &myList); // (myList) now contains path names of all session nodes... } int MyReflectSession :: MyCallback(DataNode & node, void * userData) { printf("MyCallback called for node: [%s]\n", node.GetNodePath()); printf("Message for this node is: "); const PortableMessage * nodeData = node.GetData()->GetItemPointer(); if (nodeData) nodeData->PrintToStream(); else printf(" \n"); PortableQueue * myList = (PortableQueue *) userData; // as passed in to DoTraversal() myList->AddTail(node.GetNodePath()); return node.GetDepth(); // continue traversal as usual }
The return value of your callback function is also important. It should specify the node-depth at which to continue the traversal. This value can be used to dynamically prune the search of the database tree, for efficiency. For a full traversal, you should always return node.GetDepth(). On the other hand, if you have found the data you wanted and wish to terminate the search immediately, you would return 0. Or, if you wanted the search to continue at the next session node, you could return 2 (which is the level of the session nodes in the tree). As a final example, if you want the search to continue, but not to recurse into the subtree below the current node, you would return node.GetDepth()-1.
A WildPathMatcher can do a query made up of several path strings at once. For example, if you wanted to do a query on all host nodes AND all session nodes, you could do this:
WildPathMatcher matcher; matcher.AddPathString("*"); // query on all host nodes (level 1) matcher.AddPathString("*/*"); // query on all session nodes (level 2) matcher.DoTraversal((PathMatchCallback)MyCustomCallback, this, *_globalRoot, NULL);A single traversal will not trigger the callback function for any given node more than once, even if that node is matched by more than one path string.
The third argument to DoTraversal() (*_globalRoot in the examples above) is the node to start the search at. For searches of the entire database, *_globalRoot is the correct value to place here; however you may wish to limit your search to only a subtree of the database tree. For example, if you wish to make your search relative to the current session's node only (and thus search only nodes that your own session created), you could put *_sessionDir.GetItemPointer() here instead. Note that using a different starting point does change the semantics of the path strings... e.g. in that case "*" would mean all children underneath the session node, rather than all children beneath the root node.
Leading slashes in the path strings are NOT handled by the WildPathMatcher--all WildPathMatcher path strings are taken to be relative paths, and are relative to the node passed in as the third argument to DoTraversal(). If you want to be able to handle leading slashes and give a default prefix to relative path strings, you may find the method WildPathMatcher::AdjustStringPrefix() to be useful.
status_t MyReflectServer :: ReadyToRun() { status_t ret = ReflectServer::ReadyToRun(); if (ret != B_NO_ERROR) return ret; FakeSession * fakeSession = newnothrow FakeSession(); if (fakeSession) { // Add with socket = -1, meaning there is no client for this session. if (AddNewSession(AbstractReflectSessionRef(fakeSession, NULL, NULL), -1) == B_NO_ERROR) { return B_NO_ERROR; // success! } } else WARN_OUT_OF_MEMORY; return B_ERROR; }Once created and added, this session can handle messages from the other sessions, create database nodes, and do just about anything any other session can do (except send messages to its client, as it doesn't have one).
LogTime(MUSCLE_LOG_CRITICALERROR, "The sky is falling! You have %i minutes to live!\n", minutesToLive);This call would cause a log entry like this to be generated:
[C 11/27 15:03:33] The sky is falling! You have 5 minutes to live!The letter in the second column indicates the log level, followed by the date and time, and finally the log message. Note that the carriage return should be included in your messages text. This is so that you can assemble a log message from several log calls if you wish. Log() is the same as LogTime(), except that the bracketed level/time/date header isn't printed.
You may, in some cases, wish to do other things with the log message besides printing it to stdout or to a log file. In that case, you can use AddLogCallback() to install your own LogCallback object. This object's Log() method will be called whenever a message is logged, and you can then handle it as you wish.
MUSCLE code doesn't use C++ exceptions. This is because I'm not comfortable with the way exceptions integrate into C++; it's far too easy to get a memory leak or other unexpected result when an exception is thrown. Instead, MUSCLE uses the classic C-style method of handling errors--error codes are returned, and manually propogated up to the code that can handle them. (yes, it's a bit tedious. Yes, I still like it better than C++ exceptions. No whining! ;^))
The standard C++ new operator throws a bad_alloc exception when it cannot allocate the requested amount of memory. To avoid this, MUSCLE code uses the new (nothrow) operator instead. To make things a little more flexible and easier to key in, MUSCLE #defines a synonym, "newnothrow", which means the same thing. The canonical MUSCLE style of dynamically allocating something is this:
Something * thing = newnothrow Something(); if (thing) { DoStuffWith(thing); [...] } else WARN_OUT_OF_MEMORY;The WARN_OUT_OF_MEMORY macro simply logs a message like this:
[C 11/27 15:03:33] ERROR--OUT OF MEMORY (MyCode.cpp:396)So later on, when you are trying to figure out why your code didn't get executed, you will know there was a problem allocating memory.
Another feature that MUSCLE provides (for any project that links in ReflectServer.cpp) is automatic memory-usage tracking and out-of-memory callbacks. The memory-usage-tracking allows you to place an upper limit on dynamic memory allocation, either to guarantee that your server won't overtax the machine it is running on, or just to test your out-of-memory handling code. It is easy to use; just pass in the maximum number of bytes that may be allocated at once into the ReflectServer constructor; and the new operator will fail when that amount is reached. (Note that malloc/free calls aren't tracked--I suggest using new/delete for all your allocations anyway).
Out-of-memory callbacks let you free up unecessary memory in the event of a memory shortage. For example, MUSCLE uses ObjectPools to reduce the number of memory allocations and deletions that must be done on a regular basis. As such, MUSCLE may have several hundred unused objects lying around, awaiting reuse. When the new operator detects a shortage, however, it calls an out-of-memory callback that knows how to drain these pools, thus freeing up some extra memory and allowing the server a little more breathing room. You can add your own out-of-memory callback handler by calling AddOutOfMemoryHandler() (defined in muscle/reflector/ReflectServer.h).
One other thing to note: If you want your code to handle out-of-memory conditions, you can't trust the GetItemPointer() or () operator of your PortableRef objects. This includes PortableMessageRefs, AbstractSessionRefs, etc. This is because the PortableRef object does a dynamic allocation internally, and if that allocation fails, it may show up later as a NULL result in GetItemPointer(). So for 100%-bulletproof code, you shouldn't do this:
void foo(PortableMessageRef & ref) { ref.GetItemPointer()->AddString("test", "test"); }but rather this:
void foo(PortableMessageRef & ref) { PortableMessage * msg = ref.GetItemPointer(); if (msg) msg->AddString("test", "test"); }(Note that there is no real need to call WARN_OUT_OF_MEMORY here, as an out-of-memory error would already have been logged by the PortableRef class)
-Jeremy