diff --git a/Modules/OpenIGTLink/mitkIGTLDevice.cpp b/Modules/OpenIGTLink/mitkIGTLDevice.cpp index e81f789030..71a0d4092c 100644 --- a/Modules/OpenIGTLink/mitkIGTLDevice.cpp +++ b/Modules/OpenIGTLink/mitkIGTLDevice.cpp @@ -1,445 +1,467 @@ /*=================================================================== The Medical Imaging Interaction Toolkit (MITK) Copyright (c) German Cancer Research Center, Division of Medical and Biological Informatics. All rights reserved. This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See LICENSE.txt or http://www.mitk.org for details. ===================================================================*/ #include "mitkIGTLDevice.h" //#include "mitkIGTTimeStamp.h" #include #include #include #include #include #include //remove later #include #include static const int SOCKET_SEND_RECEIVE_TIMEOUT_MSEC = 100; typedef itk::MutexLockHolder MutexLockHolder; mitk::IGTLDevice::IGTLDevice() : // m_Data(mitk::DeviceDataUnspecified), m_State(mitk::IGTLDevice::Setup), m_StopCommunication(false), m_PortNumber(-1), m_Name("Unspecified Device"), m_MultiThreader(NULL), m_ThreadID(0) { m_StopCommunicationMutex = itk::FastMutexLock::New(); m_StateMutex = itk::FastMutexLock::New(); // m_LatestMessageMutex = itk::FastMutexLock::New(); m_CommunicationFinishedMutex = itk::FastMutexLock::New(); // execution rights are owned by the application thread at the beginning m_CommunicationFinishedMutex->Lock(); m_MultiThreader = itk::MultiThreader::New(); // m_Data = mitk::DeviceDataUnspecified; // m_LatestMessage = igtl::MessageBase::New(); m_MessageFactory = mitk::IGTLMessageFactory::New(); m_SendQueue = mitk::IGTLMessageQueue::New(); m_ReceiveQueue = mitk::IGTLMessageQueue::New(); m_CommandQueue = mitk::IGTLMessageQueue::New(); } mitk::IGTLDevice::~IGTLDevice() { /* stop communication and disconnect from igtl device */ if (GetState() == Running) { this->StopCommunication(); } if (GetState() == Ready) { this->CloseConnection(); } /* cleanup tracking thread */ if ((m_ThreadID != 0) && (m_MultiThreader.IsNotNull())) { m_MultiThreader->TerminateThread(m_ThreadID); } m_MultiThreader = NULL; } mitk::IGTLDevice::IGTLDeviceState mitk::IGTLDevice::GetState() const { MutexLockHolder lock(*m_StateMutex); return m_State; } void mitk::IGTLDevice::SetState( IGTLDeviceState state ) { itkDebugMacro("setting m_State to " << state); MutexLockHolder lock(*m_StateMutex); // lock and unlock the mutex if (m_State == state) { return; } m_State = state; this->Modified(); } bool mitk::IGTLDevice::TestConnection() { return true; } unsigned int mitk::IGTLDevice::ReceivePrivate(igtl::Socket* socket) { // Create a message buffer to receive header igtl::MessageHeader::Pointer headerMsg; headerMsg = igtl::MessageHeader::New(); // Initialize receive buffer headerMsg->InitPack(); // Receive generic header from the socket int r = socket->Receive(headerMsg->GetPackPointer(), headerMsg->GetPackSize(), 1); if (r == 0) //connection error { // an error was received, therefor the communication with this socket // must be stopped return IGTL_STATUS_NOT_PRESENT; } else if (r == -1 ) //timeout { // a timeout was received, this is no error state, thus, do nothing return IGTL_STATUS_TIME_OUT; } else if (r == headerMsg->GetPackSize()) { // Deserialize the header and check the CRC int crcCheck = headerMsg->Unpack(1); if (crcCheck & igtl::MessageHeader::UNPACK_HEADER) { // Allocate a time stamp igtl::TimeStamp::Pointer ts; ts = igtl::TimeStamp::New(); // Get time stamp igtlUint32 sec; igtlUint32 nanosec; headerMsg->GetTimeStamp(ts); ts->GetTimeStamp(&sec, &nanosec); // std::cerr << "Time stamp: " // << sec << "." // << nanosec << std::endl; // std::cerr << "Dev type and name: " << headerMsg->GetDeviceType() << " " // << headerMsg->GetDeviceName() << std::endl; // headerMsg->Print(std::cout); //check the type of the received message //if it is a GET_, STP_ or RTS_ command push it into the command queue //otherwise continue reading the whole message from the socket const char* curDevType = headerMsg->GetDeviceType(); if ( std::strstr( curDevType, "GET_" ) != NULL || std::strstr( curDevType, "STP_" ) != NULL || std::strstr( curDevType, "RTS_" ) != NULL) { this->m_CommandQueue->PushMessage(headerMsg); this->InvokeEvent(CommandReceivedEvent()); return IGTL_STATUS_OK; } //Create a message according to the header message igtl::MessageBase::Pointer curMessage; curMessage = m_MessageFactory->CreateInstance(headerMsg); //check if the curMessage is created properly, if not the message type is //not supported and the message has to be skipped if ( curMessage.IsNull() ) { socket->Skip(headerMsg->GetBodySizeToRead(), 0); MITK_ERROR("IGTLDevice") << "The received type is not supported. Please " "add it to the message factory."; return IGTL_STATUS_NOT_FOUND; } //insert the header to the message and allocate the pack curMessage->SetMessageHeader(headerMsg); curMessage->AllocatePack(); // Receive transform data from the socket int receiveCheck = 0; receiveCheck = socket->Receive(curMessage->GetPackBodyPointer(), curMessage->GetPackBodySize()); if ( receiveCheck > 0 ) { int c = curMessage->Unpack(1); if ( !(c & igtl::MessageHeader::UNPACK_BODY) ) { // mitkThrow() << "crc error"; return IGTL_STATUS_CHECKSUM_ERROR; } //check the type of the received message //if it is a command push it into the command queue //otherwise into the normal receive queue //STP_ commands are handled here because they implemented additional //member variables that are not stored in the header message if ( std::strstr( curDevType, "STT_" ) != NULL ) { this->m_CommandQueue->PushMessage(curMessage); this->InvokeEvent(CommandReceivedEvent()); } else { this->m_ReceiveQueue->PushMessage(curMessage); this->InvokeEvent(MessageReceivedEvent()); } return IGTL_STATUS_OK; } else { MITK_ERROR("IGTLDevice") << "Received a valid header but could not " << "read the whole message."; return IGTL_STATUS_UNKNOWN_ERROR; } } else { //CRC check failed return IGTL_STATUS_CHECKSUM_ERROR; } } else { //Message size information and actual data size don't match. //this state is not suppossed to be reached, return unknown error return IGTL_STATUS_UNKNOWN_ERROR; } } void mitk::IGTLDevice::SendMessage(const mitk::IGTLMessage* msg) { this->SendMessage(msg->GetMessage()); } void mitk::IGTLDevice::SendMessage(igtl::MessageBase::Pointer msg) { //add the message to the queue m_SendQueue->PushMessage(msg); } unsigned int mitk::IGTLDevice::SendMessagePrivate(igtl::MessageBase::Pointer msg, igtl::Socket::Pointer socket) { //check the input message if ( msg.IsNull() ) { MITK_ERROR("IGTLDevice") << "Could not send message because message is not " "valid. Please check."; return false; } // add the name of this device to the message msg->SetDeviceName(this->GetName().c_str()); // Pack (serialize) and send msg->Pack(); int sendSuccess = socket->Send(msg->GetPackPointer(), msg->GetPackSize()); if (sendSuccess) return IGTL_STATUS_OK; else return IGTL_STATUS_UNKNOWN_ERROR; } void mitk::IGTLDevice::RunCommunication() { if (this->GetState() != Running) return; // keep lock until end of scope MutexLockHolder communicationFinishedLockHolder(*m_CommunicationFinishedMutex); // Because m_StopCommunication is used by two threads, access has to be guarded // by a mutex. To minimize thread locking, a local copy is used here bool localStopCommunication; // update the local copy of m_StopCommunication this->m_StopCommunicationMutex->Lock(); localStopCommunication = this->m_StopCommunication; this->m_StopCommunicationMutex->Unlock(); while ((this->GetState() == Running) && (localStopCommunication == false)) { // Check if other igtl devices want to connect with this one. This method // is overwritten for igtl servers but is doing nothing in case of a igtl // client this->Connect(); // Check if there is something to receive and store it in the message queue this->Receive(); // Check if there is something to send this->Send(); /* Update the local copy of m_StopCommunication */ this->m_StopCommunicationMutex->Lock(); localStopCommunication = m_StopCommunication; this->m_StopCommunicationMutex->Unlock(); // time to relax itksys::SystemTools::Delay(1); } // StopCommunication was called, thus the mode should be changed back to Ready now // that the tracking loop has ended. this->SetState(Ready); MITK_DEBUG("IGTLDevice::RunCommunication") << "Reached end of communication."; // returning from this function (and ThreadStartCommunication()) // this will end the thread return; } bool mitk::IGTLDevice::StartCommunication() { if (this->GetState() != Ready) return false; // go to mode Running this->SetState(Running); // set a timeout for the sending and receiving this->m_Socket->SetTimeout(SOCKET_SEND_RECEIVE_TIMEOUT_MSEC); // update the local copy of m_StopCommunication this->m_StopCommunicationMutex->Lock(); this->m_StopCommunication = false; this->m_StopCommunicationMutex->Unlock(); // transfer the execution rights to tracking thread m_CommunicationFinishedMutex->Unlock(); // start a new thread that executes the communication m_ThreadID = m_MultiThreader->SpawnThread(this->ThreadStartCommunication, this); // mitk::IGTTimeStamp::GetInstance()->Start(this); return true; } bool mitk::IGTLDevice::StopCommunication() { if (this->GetState() == Running) // Only if the object is in the correct state { // m_StopCommunication is used by two threads, so we have to ensure correct // thread handling m_StopCommunicationMutex->Lock(); m_StopCommunication = true; m_StopCommunicationMutex->Unlock(); // we have to wait here that the other thread recognizes the STOP-command // and executes it m_CommunicationFinishedMutex->Lock(); // mitk::IGTTimeStamp::GetInstance()->Stop(this); // notify realtime clock // StopCommunication was called, thus the mode should be changed back // to Ready now that the tracking loop has ended. this->SetState(Ready); } return true; } bool mitk::IGTLDevice::CloseConnection() { if (this->GetState() == Setup) { return true; } else if (this->GetState() == Running) { this->StopCommunication(); } m_Socket->CloseSocket(); /* return to setup mode */ this->SetState(Setup); // m_SerialCommunication = NULL; return true; } +bool mitk::IGTLDevice::SendRTSMessage(const char* type) +{ + //construct the device type for the return message, it starts with RTS_ and + //continues with the requested type + std::string returnType("RTS_"); + returnType.append(type); + //create a return message + igtl::MessageBase::Pointer rtsMsg = + this->m_MessageFactory->CreateInstance(returnType); + //if retMsg is NULL there is no return message defined and thus it is not + //necessary to send one back + if ( rtsMsg.IsNotNull() ) + { + this->SendMessage(rtsMsg); + return true; + } + else + { + return false; + } +} + igtl::MessageBase::Pointer mitk::IGTLDevice::GetNextMessage() { //copy the next message into the given msg igtl::MessageBase::Pointer msg = this->m_ReceiveQueue->PullMessage(); return msg; } igtl::MessageBase::Pointer mitk::IGTLDevice::GetNextCommand() { //copy the next command into the given msg igtl::MessageBase::Pointer msg = this->m_CommandQueue->PullMessage(); return msg; } //std::string mitk::IGTLDevice::GetNextMessageInformation() //{ // return this->m_ReceiveQueue->GetNextMsgInformationString(); //} //std::string mitk::IGTLDevice::GetNextMessageDeviceType() //{ // return this->m_ReceiveQueue->GetNextMsgDeviceType(); //} //std::string mitk::IGTLDevice::GetNextCommandInformation() //{ // return this->m_CommandQueue->GetNextMsgInformationString(); //} //std::string mitk::IGTLDevice::GetNextCommandDeviceType() //{ // return this->m_CommandQueue->GetNextMsgDeviceType(); //} ITK_THREAD_RETURN_TYPE mitk::IGTLDevice::ThreadStartCommunication(void* pInfoStruct) { /* extract this pointer from Thread Info structure */ struct itk::MultiThreader::ThreadInfoStruct * pInfo = (struct itk::MultiThreader::ThreadInfoStruct*)pInfoStruct; if (pInfo == NULL) { return ITK_THREAD_RETURN_VALUE; } if (pInfo->UserData == NULL) { return ITK_THREAD_RETURN_VALUE; } IGTLDevice *igtlDevice = (IGTLDevice*)pInfo->UserData; if (igtlDevice != NULL) { igtlDevice->RunCommunication(); } igtlDevice->m_ThreadID = 0; // erase thread id because thread will end. return ITK_THREAD_RETURN_VALUE; } diff --git a/Modules/OpenIGTLink/mitkIGTLDevice.h b/Modules/OpenIGTLink/mitkIGTLDevice.h index c94b0fa147..ab5803308d 100644 --- a/Modules/OpenIGTLink/mitkIGTLDevice.h +++ b/Modules/OpenIGTLink/mitkIGTLDevice.h @@ -1,353 +1,358 @@ /*=================================================================== The Medical Imaging Interaction Toolkit (MITK) Copyright (c) German Cancer Research Center, Division of Medical and Biological Informatics. All rights reserved. This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See LICENSE.txt or http://www.mitk.org for details. ===================================================================*/ #ifndef MITKIGTLDEVICE_H #define MITKIGTLDEVICE_H #include "mitkCommon.h" //itk #include "itkObject.h" #include "itkFastMutexLock.h" #include "itkMultiThreader.h" //igtl #include "igtlSocket.h" #include "igtlMessageBase.h" #include "igtlTransformMessage.h" //mitkIGTL #include "MitkOpenIGTLinkExports.h" #include "mitkIGTLMessageFactory.h" #include "mitkIGTLMessageQueue.h" #include "mitkIGTLMessage.h" namespace mitk { /**Documentation * \brief Interface for all Open IGT Link Devices * * Defines the methods that are common for all devices using Open IGT Link. * */ class MITK_OPENIGTLINK_EXPORT IGTLDevice : public itk::Object { public: mitkClassMacro(IGTLDevice, itk::Object) /** * Type for state variable. The IGTLDevice is always in one of these states */ enum IGTLDeviceState {Setup, Ready, Running}; /** * \brief Opens a connection to the device * * This may only be called if there is currently no connection to the * device. If OpenConnection() is successful, the object will change from * Setup state to Ready state */ virtual bool OpenConnection() = 0; /** * \brief Closes the connection to the device * * This may only be called if there is currently a connection to the * device, but device is not running (e.g. object is in Ready state) */ virtual bool CloseConnection(); /** * \brief Stops the communication between the two devices * * This may only be called if there the device is in Running state */ virtual bool StopCommunication(); bool StartCommunication(); void RunCommunication(); /** * \brief Sends a message via the open IGT Link. * * This may only be called after the connection to the device has been * established with a call to OpenConnection(). Note that the message * is not send directly. This method just adds it to the send queue. */ void SendMessage(igtl::MessageBase::Pointer msg); /** * \brief Sends a message via the open IGT Link. * * Convenience function to work with mitk::IGTLMessage directly. */ void SendMessage(const IGTLMessage* msg); /** * \brief return current object state (Setup, Ready or Running) */ IGTLDeviceState GetState() const; /** * \brief Returns the oldest command message * \param msg A smartpointer to the message base where the oldest command * shall be copied into * \retval true The latest message is stored in msg * \retval false The latest message could not been copied, do not use this * data */ igtl::MessageBase::Pointer GetNextCommand(); /** * \brief Returns the oldest received message * \param msg A smartpointer to the message base where the latest message * shall be copied into * \retval true The latest message is stored in msg * \retval false The latest message could not been copied, do not use this * data */ igtl::MessageBase::Pointer GetNextMessage(); // /** // * \brief Returns information about the oldest message in the receive queue // */ // std::string GetNextMessageInformation(); // /** // * \brief Returns device type about the oldest message in the receive queue // */ // std::string GetNextMessageDeviceType(); // /** // * \brief Returns information about the oldest message in the command queue // */ // std::string GetNextCommandInformation(); // /** // * \brief Returns device type about the oldest message in the command queue // */ // std::string GetNextCommandDeviceType(); /** * \brief return device data */ // igtl::MessageBase::Pointer GetData() const; /** * \brief set device data */ // void SetData(IGTLDeviceData data); /** * \brief Sets the port number of the device */ itkSetMacro(PortNumber,int); /** * \brief Returns the port number of the device */ itkGetMacro(PortNumber,int); /** * \brief Sets the ip/hostname of the device */ itkSetMacro(Hostname,std::string); /** * \brief Returns the ip/hostname of the device */ itkGetMacro(Hostname,std::string); /** * \brief Returns the name of this device */ itkGetConstMacro(Name, std::string); /** * \brief Sets the name of this device */ itkSetMacro(Name, std::string); /** * \brief Returns a const reference to the receive queue */ itkGetConstMacro(ReceiveQueue, mitk::IGTLMessageQueue::Pointer); /** * \brief Returns a const reference to the command queue */ itkGetConstMacro(CommandQueue, mitk::IGTLMessageQueue::Pointer); /** * \brief Returns a const reference to the send queue */ itkGetConstMacro(SendQueue, mitk::IGTLMessageQueue::Pointer); /** * \brief Returns the message factory */ itkGetMacro(MessageFactory, mitk::IGTLMessageFactory::Pointer); /** * \brief static start method for the tracking thread. */ static ITK_THREAD_RETURN_TYPE ThreadStartCommunication(void* data); /** * \brief TestConnection() tries to connect to a IGTL server on the current * ip and port * * \todo check this description * * TestConnection() tries to connect to a IGTL server on the current * ip and port and returns which device it has found. * \return It returns the type of the device that answers. Throws an exception * if no device is available on that ip/port. * @throw mitk::IGTHardwareException Throws an exception if there are errors * while connecting to the device. */ virtual bool TestConnection(); + /** + * \brief Send RTS message of given type + */ + bool SendRTSMessage(const char* type); + protected: /** * \brief Sends a message via the open IGT Link. * * This may only be called after the connection to the device has been * established with a call to OpenConnection() * * \retval IGTL_STATUS_OK the message was sent * \retval IGTL_STATUS_UNKONWN_ERROR the message was not sent because an * unknown error occurred */ unsigned int SendMessagePrivate(igtl::MessageBase::Pointer msg, igtl::Socket::Pointer socket); /** * \brief Call this method to receive a message. * * The message will be saved in the receive queue. */ virtual void Receive() = 0; /** * \brief Call this method to receive a message from the given client. * * The message will be saved in the receive queue. * * \retval IGTL_STATUS_OK a message or a command was received * \retval IGTL_STATUS_NOT_PRESENT the socket is not connected anymore * \retval IGTL_STATUS_TIME_OUT the socket timed out * \retval IGTL_STATUS_CHECKSUM_ERROR the checksum of the received msg was * incorrect * \retval IGTL_STATUS_UNKNOWN_ERROR an unknown error occurred */ unsigned int ReceivePrivate(igtl::Socket* client); /** * \brief Call this method to send a message. The message will be read from * the queue */ virtual void Send() = 0; /** * \brief Call this method to check for other devices that want to connect * to this one. * * In case of a client this method is doing nothing. In case of a server it * is checking for other devices and if there is one it establishes a * connection. */ virtual void Connect() = 0; /** * \brief Stops the communication with the given socket * */ virtual void StopCommunicationWithSocket(igtl::Socket* socket) = 0; /** * \brief change object state */ void SetState(IGTLDeviceState state); IGTLDevice(); virtual ~IGTLDevice(); // IGTLDeviceData m_Data; ///< current device Data IGTLDeviceState m_State; ///< current object state (Setup, Ready or Running) bool m_StopCommunication; ///< signal stop to thread /** mutex to control access to m_StopThread */ itk::FastMutexLock::Pointer m_StopCommunicationMutex; /** mutex to manage control flow of StopTracking() */ itk::FastMutexLock::Pointer m_CommunicationFinishedMutex; /** mutex to control access to m_State */ itk::FastMutexLock::Pointer m_StateMutex; /** mutex to control access to m_Socket */ // itk::FastMutexLock::Pointer m_SocketMutex; /** mutex to control access to m_ReceiveQueue */ // itk::FastMutexLock::Pointer m_ReceiveQueueMutex; /** mutex to control access to m_SendQueue */ // itk::FastMutexLock::Pointer m_SendQueueMutex; /** the hostname or ip of the device */ std::string m_Hostname; /** the port number of the device */ int m_PortNumber; /** the socket used to communicate with other IGTL devices */ igtl::Socket::Pointer m_Socket; /** the latest received message */ // igtl::MessageBase::Pointer m_LatestMessage; /** The message receive queue */ mitk::IGTLMessageQueue::Pointer m_ReceiveQueue; /** The message send queue */ mitk::IGTLMessageQueue::Pointer m_SendQueue; /** A queue that stores just command messages received by this device */ mitk::IGTLMessageQueue::Pointer m_CommandQueue; /** A message factory that provides the New() method for all msg types */ mitk::IGTLMessageFactory::Pointer m_MessageFactory; /** the name of this device */ std::string m_Name; private: /** creates worker thread that continuously polls interface for new messages */ itk::MultiThreader::Pointer m_MultiThreader; int m_ThreadID; ///< ID of polling thread }; /**Documentation * @brief connect to this Event to get noticed when a message was received * */ itkEventMacro( MessageReceivedEvent , itk::AnyEvent ); /**Documentation * @brief connect to this Event to get noticed when a command was received * */ itkEventMacro( CommandReceivedEvent , itk::AnyEvent ); /**Documentation * @brief connect to this Event to get noticed when another igtl device * connects with this device. * */ itkEventMacro( NewClientConnectionEvent , itk::AnyEvent ); } // namespace mitk #endif /* MITKIGTLDEVICE_H */ diff --git a/Modules/OpenIGTLink/mitkIGTLMessageProvider.cpp b/Modules/OpenIGTLink/mitkIGTLMessageProvider.cpp index 87dd688a15..92bd588d45 100644 --- a/Modules/OpenIGTLink/mitkIGTLMessageProvider.cpp +++ b/Modules/OpenIGTLink/mitkIGTLMessageProvider.cpp @@ -1,353 +1,341 @@ /*=================================================================== The Medical Imaging Interaction Toolkit (MITK) Copyright (c) German Cancer Research Center, Division of Medical and Biological Informatics. All rights reserved. This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See LICENSE.txt or http://www.mitk.org for details. ===================================================================*/ #include "mitkIGTLMessageProvider.h" #include "mitkIGTLDevice.h" #include "mitkIGTLMessage.h" #include "mitkIGTLMessageFactory.h" #include "mitkCallbackFromGUIThread.h" //Microservices #include "usServiceReference.h" #include "usModuleContext.h" #include "usServiceEvent.h" #include "mitkServiceInterface.h" #include "usGetModuleContext.h" //igt (remove this later) #include "igtlBindMessage.h" #include "igtlQuaternionTrackingDataMessage.h" #include "igtlTrackingDataMessage.h" #ifndef WIN32 #include #endif mitk::IGTLMessageProvider::IGTLMessageProvider() : mitk::IGTLDeviceSource() { this->SetName("IGTLMessageProvider"); m_MultiThreader = itk::MultiThreader::New(); m_StreamingTimeMutex = itk::FastMutexLock::New(); m_StopStreamingThreadMutex = itk::FastMutexLock::New(); m_ThreadId = 0; } mitk::IGTLMessageProvider::~IGTLMessageProvider() { // terminate worker thread on destruction this->m_StopStreamingThreadMutex->Lock(); this->m_StopStreamingThread = true; this->m_StopStreamingThreadMutex->Unlock(); if ( m_ThreadId >= 0) { this->m_MultiThreader->TerminateThread(m_ThreadId); } } void mitk::IGTLMessageProvider::GenerateData() { if (this->m_IGTLDevice.IsNull()) return; for (unsigned int index = 0; index < this->GetNumberOfIndexedInputs(); index++) { const IGTLMessage* msg = this->GetInput(index); assert(msg); if ( !msg->IsDataValid() ) { continue; } igtl::MessageBase::Pointer igtlMsg = msg->GetMessage(); if ( igtlMsg.IsNotNull() ) { //send the message this->m_IGTLDevice->SendMessage(igtlMsg); } } } void mitk::IGTLMessageProvider::CreateOutputs() { //if outputs are set then delete them if (this->GetNumberOfOutputs() > 0) { for (int numOP = this->GetNumberOfOutputs() - 1; numOP >= 0; numOP--) this->RemoveOutput(numOP); this->Modified(); } //fill the outputs if a valid OpenIGTLink device is set if (m_IGTLDevice.IsNull()) return; this->SetNumberOfIndexedOutputs(1); if (this->GetOutput(0) == NULL) { DataObjectPointer newOutput = this->MakeOutput(0); this->SetNthOutput(0, newOutput); this->Modified(); } } //void mitk::IGTLMessageProvider::UpdateOutputInformation() //{ // this->Modified(); // make sure that we need to be updated // Superclass::UpdateOutputInformation(); //} void mitk::IGTLMessageProvider::OnIncomingMessage() { } std::string RemoveRequestPrefixes(std::string requestType) { return requestType.substr(4); } void mitk::IGTLMessageProvider::OnIncomingCommand() { //get the next command igtl::MessageBase::Pointer curCommand = this->m_IGTLDevice->GetNextCommand(); //extract the type const char * requestType = curCommand->GetDeviceType(); //check the type std::string reqType(requestType); bool isGetMsg = !reqType.find("GET_"); bool isSTTMsg = !reqType.find("STT_"); bool isSTPMsg = !reqType.find("STP_"); bool isRTSMsg = !reqType.find("RTS_"); //get the type from the request type (remove STT_, STP_, GET_, RTS_) std::string type = RemoveRequestPrefixes(requestType); //check all microservices if there is a fitting source for the requested type mitk::IGTLMessageSource::Pointer source = this->GetFittingSource(type.c_str()); - //if there is no fitting source return an RTS message, if there is a RTS - //type defined in the message factory + //if there is no fitting source return a RTS message, if there is a RTS + //type defined in the message factory send it if ( source.IsNull() ) { - //construct the device type for the return message, it starts with RTS_ and - //continues with the requested type - std::string returnType("RTS_"); - returnType.append(type); - //get the message factory. In the final version the factory shall be loaded - //as microservice - mitk::IGTLMessageFactory::Pointer msgFactory = - this->GetIGTLDevice()->GetMessageFactory(); - //create a return message - igtl::MessageBase::Pointer rtsMsg = msgFactory->CreateInstance(returnType); - //if retMsg is NULL there is no return message defined and thus it is not - //necessary to send one back - if ( rtsMsg.IsNotNull() ) + if ( !this->GetIGTLDevice()->SendRTSMessage(type.c_str()) ) { - this->GetIGTLDevice()->SendMessage(rtsMsg); + //sending RTS message failed, probably because the type is not in the + //message factory + MITK_WARN("IGTLMessageProvider") << "Tried to send a RTS message but did " + "not succeed. Check if this type ( " + << type << " ) was added to the message " + "factory. "; } } else { if ( isGetMsg ) //if it is a single value push it into sending queue { //first it is necessary to update the source. This needs additional time //but is necessary. But are we really allowed to call this here? In which //thread are we? Is the source thread safe? source->Update(); mitk::IGTLMessage::Pointer sourceOutput = source->GetOutput(); if (sourceOutput.IsNotNull() && sourceOutput->IsDataValid()) { igtl::MessageBase::Pointer sourceMsg = sourceOutput->GetMessage(); if ( source.IsNotNull() ) { this->GetIGTLDevice()->SendMessage(sourceMsg); } } } else if ( isSTTMsg ) { - //if it is a stream establish a connection between the provider and the - //source - this->ConnectTo(source); + //so far the provider allows the streaming of a single source only + //if the streaming thread is already running return a RTS message + if ( this->m_ThreadId == 0 ) + { + //if it is a stream establish a connection between the provider and the + //source + this->ConnectTo(source); - //read the requested frames per second - int fps = 10; + //read the requested frames per second + int fps = 10; - //read the fps from the command - igtl::MessageBase* curCommandPt = curCommand.GetPointer(); - if ( std::strcmp( curCommand->GetDeviceType(), "STT_BIND" ) == 0 ) - { - fps = ((igtl::StartBindMessage*)curCommandPt)->GetResolution(); - } - else if ( std::strcmp( curCommand->GetDeviceType(), "STT_QTDATA" ) == 0 ) - { - fps = ((igtl::StartQuaternionTrackingDataMessage*)curCommandPt)->GetResolution(); + //read the fps from the command + igtl::MessageBase* curCommandPt = curCommand.GetPointer(); + if ( std::strcmp( curCommand->GetDeviceType(), "STT_BIND" ) == 0 ) + { + fps = ((igtl::StartBindMessage*)curCommandPt)->GetResolution(); + } + else if ( std::strcmp( curCommand->GetDeviceType(), "STT_QTDATA" ) == 0 ) + { + fps = ((igtl::StartQuaternionTrackingDataMessage*)curCommandPt)->GetResolution(); + } + else if ( std::strcmp( curCommand->GetDeviceType(), "STT_TDATA" ) == 0 ) + { + fps = ((igtl::StartTrackingDataMessage*)curCommandPt)->GetResolution(); + } + + // calculate the streaming time + this->m_StreamingTimeMutex->Lock(); + this->m_StreamingTime = 1.0 / (double) fps * 1000.0; + this->m_StreamingTimeMutex->Unlock(); + + // Create a command object. The function will be called later from the + // main thread + this->m_StreamingCommand = ProviderCommand::New(); + m_StreamingCommand->SetCallbackFunction(this, + &mitk::IGTLMessageProvider::Update); + + // For streaming we need a continues time signal, since there is no timer + // available we start a thread that generates a timing signal + // This signal is invoked from the other thread the update of the pipeline + // has to be executed from the main thread. Thus, we use the + // callbackfromGUIThread class to pass the execution to the main thread + this->m_ThreadId = m_MultiThreader->SpawnThread(this->TimerThread, this); } - else if ( std::strcmp( curCommand->GetDeviceType(), "STT_TDATA" ) == 0 ) + else { - fps = ((igtl::StartTrackingDataMessage*)curCommandPt)->GetResolution(); + MITK_WARN("IGTLMessageProvider") << "This provider just supports the " + "streaming of one source."; } - - // calculate the streaming time - this->m_StreamingTimeMutex->Lock(); - this->m_StreamingTime = 1.0 / (double) fps * 1000.0; - this->m_StreamingTimeMutex->Unlock(); - - // Create a command object. The function will be called later from the - // main thread - this->m_StreamingCommand = ProviderCommand::New(); - m_StreamingCommand->SetCallbackFunction(this, - &mitk::IGTLMessageProvider::Update); - - // For streaming we need a continues time signal, since there is no timer - // available we start a thread that generates a timing signal - // This signal is invoked from the other thread the update of the pipeline - // has to be executed from the main thread. Thus, we use the - // callbackfromGUIThread class to pass the execution to the main thread - this->m_ThreadId = m_MultiThreader->SpawnThread(this->TimerThread, this); } else if ( isSTPMsg ) { + this->DisconnectFrom(source); + this->m_StopStreamingThreadMutex->Lock(); this->m_StopStreamingThread = false; this->m_StopStreamingThreadMutex->Unlock(); } else { //do nothing } } } mitk::IGTLMessageSource::Pointer mitk::IGTLMessageProvider::GetFittingSource(const char* requestedType) { //get the context us::ModuleContext* context = us::GetModuleContext(); //define the interface name std::string interface = mitk::IGTLMessageSource::US_INTERFACE_NAME; //specify a filter that defines the requested type std::string filter = "(" + mitk::IGTLMessageSource::US_PROPKEY_DEVICETYPE + "=" + requestedType + ")"; //find the fitting service std::vector serviceReferences = context->GetServiceReferences(interface, filter); //check if a service reference was found. It is also possible that several //services were found. This is not checked here, just the first one is taken. if ( serviceReferences.size() ) { mitk::IGTLMessageSource::Pointer curSource = context->GetService(serviceReferences.front()); if ( curSource.IsNotNull() ) return curSource; } //no service reference was found or found service reference has no valid source return NULL; } void mitk::IGTLMessageProvider::Send(const mitk::IGTLMessage* msg) { if (msg != NULL) this->m_IGTLDevice->SendMessage(msg); } void mitk::IGTLMessageProvider::ConnectTo( mitk::IGTLMessageSource* UpstreamFilter ) { for (DataObjectPointerArraySizeType i = 0; i < UpstreamFilter->GetNumberOfOutputs(); i++) { this->SetInput(i, UpstreamFilter->GetOutput(i)); } } +void +mitk::IGTLMessageProvider::DisconnectFrom( mitk::IGTLMessageSource* UpstreamFilter ) +{ + for (DataObjectPointerArraySizeType i = 0; + i < UpstreamFilter->GetNumberOfOutputs(); i++) + { + this->RemoveInput(UpstreamFilter->GetOutput(i)); + } +} + ITK_THREAD_RETURN_TYPE mitk::IGTLMessageProvider::TimerThread(void* pInfoStruct) { // extract this pointer from thread info structure struct itk::MultiThreader::ThreadInfoStruct * pInfo = (struct itk::MultiThreader::ThreadInfoStruct*)pInfoStruct; mitk::IGTLMessageProvider* thisObject = static_cast(pInfo->UserData); itk::SimpleMutexLock mutex; mutex.Lock(); thisObject->m_StreamingTimeMutex->Lock(); unsigned int waitingTime = thisObject->m_StreamingTime; thisObject->m_StreamingTimeMutex->Unlock(); while (true) { thisObject->m_StopStreamingThreadMutex->Lock(); bool stopThread = thisObject->m_StopStreamingThread; thisObject->m_StopStreamingThreadMutex->Unlock(); if (stopThread) { break; } //wait for the time given //I know it is not the nicest solution but we just need an approximate time //sleeps for 20 ms #if defined (WIN32) || defined (_WIN32) Sleep(waitingTime); #else usleep(waitingTime * 1000); #endif // Ask to execute that command from the GUI thread mitk::CallbackFromGUIThread::GetInstance()->CallThisFromGUIThread( thisObject->m_StreamingCommand); - -// thisObject->m_WorkerBarrier->Wait(&mutex); - -// if (thisObject->m_StopThread) { break; } - -// thisObject->m_ImageMutex->Lock(); -// cv::Mat image = thisObject->m_InputImage.clone(); -// int inputImageId = thisObject->m_InputImageId; -// thisObject->m_ImageMutex->Unlock(); - -// cv::Mat mask = thisObject->GetMaskFromPointSets(); - -// cv::Mat result; -// if (thisObject->m_UseOnlyRegionAroundModelPoints) -// { -// result = cv::Mat(mask.rows, mask.cols, mask.type(), 0.0); -// thisObject->m_BoundingBox = thisObject->GetBoundingRectFromMask(mask); -// thisObject->RunSegmentation(image(thisObject->m_BoundingBox), mask(thisObject->m_BoundingBox)).copyTo(result(thisObject->m_BoundingBox)); -// } -// else -// { -// result = thisObject->RunSegmentation(image, mask); -// } - -// // save result to member attribute -// thisObject->m_ResultMutex->Lock(); -// thisObject->m_ResultMask = result; -// thisObject->m_ResultImageId = inputImageId; -// thisObject->m_ResultMutex->Unlock(); } + thisObject->m_ThreadId = 0; + mutex.Unlock(); return ITK_THREAD_RETURN_VALUE; } diff --git a/Modules/OpenIGTLink/mitkIGTLMessageProvider.h b/Modules/OpenIGTLink/mitkIGTLMessageProvider.h index b7a6717da9..1fb3c49bb6 100644 --- a/Modules/OpenIGTLink/mitkIGTLMessageProvider.h +++ b/Modules/OpenIGTLink/mitkIGTLMessageProvider.h @@ -1,138 +1,147 @@ /*=================================================================== The Medical Imaging Interaction Toolkit (MITK) Copyright (c) German Cancer Research Center, Division of Medical and Biological Informatics. All rights reserved. This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See LICENSE.txt or http://www.mitk.org for details. ===================================================================*/ #ifndef IGTLMESSAGEPROVIDER_H_HEADER_INCLUDED_ #define IGTLMESSAGEPROVIDER_H_HEADER_INCLUDED_ #include "mitkIGTLDevice.h" #include "mitkIGTLDeviceSource.h" //itk #include "itkCommand.h" namespace mitk { /**Documentation * \brief Provides information/objects from a MITK-Pipeline to other OpenIGTLink * devices * * This class is intended as the drain of the pipeline. Other OpenIGTLink * devices connect with the device hold by this provider. The other device asks * for a special data. The provider checks if there are other IGTLMessageSources * available that provide this data type. If yes, they connect with this source * and send the message to the requesting device. * * */ class MITK_OPENIGTLINK_EXPORT IGTLMessageProvider : public IGTLDeviceSource { public: mitkClassMacro(IGTLMessageProvider, IGTLDeviceSource); itkFactorylessNewMacro(Self) itkCloneMacro(Self) typedef itk::SimpleMemberCommand ProviderCommand; /** * \brief sends the msg to the requesting client * * Note: so far it broadcasts the message to all registered clients */ void Send(const IGTLMessage* msg); protected: IGTLMessageProvider(); virtual ~IGTLMessageProvider(); /** * \brief filter execute method * * queries the OpenIGTLink device for new messages and updates its output * igtl::MessageBase objects with it. * \warning Will raise a std::out_of_range exception, if tools were added to * the OpenIGTLink device after it was set as input for this filter */ virtual void GenerateData(); /** * \brief Create the necessary outputs for the m_IGTLDevice * * This Method is called internally whenever outputs need to be reset. Old * Outputs are deleted when called. **/ void CreateOutputs(); /** * \brief This method is called when the IGTL device hold by this class * receives a new message **/ virtual void OnIncomingMessage(); /** * \brief This method is called when the IGTL device hold by this class * receives a new command **/ virtual void OnIncomingCommand(); /** *\brief Connects the input of this filter to the outputs of the given * IGTLMessageSource * * This method does not support smartpointer. use FilterX.GetPointer() to * retrieve a dumbpointer. */ void ConnectTo( mitk::IGTLMessageSource* UpstreamFilter ); + /** + *\brief Disconnects this filter from the outputs of the given + * IGTLMessageSource + * + * This method does not support smartpointer. use FilterX.GetPointer() to + * retrieve a dumbpointer. + */ + void DisconnectFrom( mitk::IGTLMessageSource* UpstreamFilter ); + /** * \brief Looks for microservices that provide messages with the requested * type. **/ mitk::IGTLMessageSource::Pointer GetFittingSource(const char* requestedType); private: /** * \brief a command that has to be executed in the main thread */ ProviderCommand::Pointer m_StreamingCommand; /** * \brief Timer thread for generating a continuous time signal for the stream * * Everyt time the time is passed a time signal is invoked. * * \param pInfoStruct pointer to the mitkIGTLMessageProvider object * \return */ static ITK_THREAD_RETURN_TYPE TimerThread(void* pInfoStruct); int m_ThreadId; /** \brief timer thread will terminate after the next wakeup if set to true */ bool m_StopStreamingThread; itk::SmartPointer m_MultiThreader; /** \brief the time used for streaming */ unsigned int m_StreamingTime; /** \brief mutex for guarding m_Time */ itk::SmartPointer m_StreamingTimeMutex; /** \brief mutex for guarding m_StopStreamingThread */ itk::SmartPointer m_StopStreamingThreadMutex; }; } // namespace mitk #endif /* MITKIGTLMESSAGEPROVIDER_H_HEADER_INCLUDED_ */