Extracted the supplier from the xml::Parser/xml::Writer and made a xml::SocketReader/xml::SocketWriter for socket streams. This way we can make other suppliers for other sources like filestreams.

The xml::Socket has a output buffer and a Flush method to be able to create larger packets.
This commit is contained in:
Daniel Önnerby 2009-01-09 08:29:10 +00:00
parent 0b0971de56
commit 5de89cb4f4
15 changed files with 385 additions and 61 deletions

View File

@ -39,6 +39,7 @@
#include <core/Library/Remote.h>
#include <core/Query/Base.h>
#include <core/Preferences.h>
#include <core/xml/Socket.h>
#include <core/xml/Parser.h>
#include <core/xml/Writer.h>
#include <core/Crypt.h>
@ -168,7 +169,8 @@ void Library::Remote::ReadThread(){
try{
// Lets start recieving queries
xml::Parser parser(&this->socket);
xml::SocketReader xmlSocketReader(this->socket);
xml::Parser parser(&xmlSocketReader);
if( xml::ParserNode rootNode=parser.ChildNode("musik")){
// Start by waiting for the authentication node
@ -231,7 +233,8 @@ void Library::Remote::ReadThread(){
//////////////////////////////////////////
void Library::Remote::WriteThread(){
xml::Writer writer(&this->socket);
xml::SocketWriter xmlSocketWriter(this->socket);
xml::Writer writer(&xmlSocketWriter);
// Start by writing the musik-tag
xml::WriterNode rootNode(writer,"musik");
@ -243,6 +246,8 @@ void Library::Remote::WriteThread(){
authNode.Content() = musik::core::Crypt::Encrypt(this->password,this->sessionId);
}
writer.Flush();
while(!this->Exited()){
Query::Ptr query(this->GetNextQuery());
@ -300,7 +305,10 @@ void Library::Remote::WriteThread(){
}
}
writer.Flush();
}
writer.Flush();
}

View File

@ -40,6 +40,8 @@
#include <core/Query/TrackMetadata.h>
#include <core/Library/Base.h>
#include <core/LibraryTrack.h>
#include <core/xml/ParserNode.h>
#include <core/xml/WriterNode.h>
#include <boost/algorithm/string.hpp>
using namespace musik::core;

View File

@ -631,6 +631,14 @@
<Filter
Name="xml"
>
<File
RelativePath=".\xml\IReadSupplier.h"
>
</File>
<File
RelativePath=".\xml\IWriteSupplier.h"
>
</File>
<File
RelativePath=".\xml\Node.cpp"
>
@ -655,6 +663,18 @@
RelativePath=".\xml\ParserNode.h"
>
</File>
<File
RelativePath=".\xml\Reader.h"
>
</File>
<File
RelativePath=".\xml\Socket.cpp"
>
</File>
<File
RelativePath=".\xml\Socket.h"
>
</File>
<File
RelativePath=".\xml\Writer.cpp"
>

View File

@ -55,9 +55,9 @@
//#include <core/Track.h>
//#include <core/TrackMeta.h>
#include <core/xml/Node.h>
/*#include <core/xml/Node.h>
#include <core/xml/WriterNode.h>
#include <core/xml/ParserNode.h>
#include <core/xml/ParserNode.h>*/
//#include <core/Library/Base.h>
/*

View File

@ -41,6 +41,7 @@
#include <core/server/User.h>
#include <core/server/UserSession.h>
#include <core/xml/Socket.h>
#include <core/xml/Parser.h>
#include <core/xml/ParserNode.h>
#include <core/xml/Writer.h>
@ -94,7 +95,8 @@ bool Connection::Startup(){
void Connection::ReadThread(){
musik::core::xml::Parser xmlParser(&this->socket);
musik::core::xml::SocketReader xmlSocketReader(this->socket);
musik::core::xml::Parser xmlParser(&xmlSocketReader);
try{
@ -245,7 +247,8 @@ void Connection::ParseThread(){
void Connection::WriteThread(){
musik::core::xml::Writer xmlWriter(&this->socket);
musik::core::xml::SocketWriter xmlSocketWrite(this->socket);
musik::core::xml::Writer xmlWriter(&xmlSocketWrite);
try{
// Lets start with a <musik> node
@ -257,6 +260,7 @@ void Connection::WriteThread(){
musik::core::xml::WriterNode initNode(musikNode,"authentication");
initNode.Content() = this->salt;
}
xmlWriter.Flush();
// Wait for maximum 30 seconds for authentication
{
@ -301,6 +305,8 @@ void Connection::WriteThread(){
sendQuery->SendResults(queryNode,this);
}
xmlWriter.Flush();
// Remove the query from the queue
{
boost::mutex::scoped_lock lock(this->libraryMutex);

View File

@ -0,0 +1,57 @@
//////////////////////////////////////////////////////////////////////////////
//
// License Agreement:
//
// The following are Copyright © 2008, Daniel Önnerby
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// * Neither the name of the author nor the names of other contributors may
// be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////////
#pragma once
#include <core/config.h>
//////////////////////////////////////////////////////////////////////////////
namespace musik{ namespace core{ namespace xml{
//////////////////////////////////////////////////////////////////////////////
class IReadSupplier{
public:
virtual bool Read(long recommendedBytes=1024) = 0;
virtual char* Buffer() = 0;
virtual long BufferSize() = 0;
virtual bool Exited() = 0;
};
//////////////////////////////////////////////////////////////////////////////
} } }

View File

@ -0,0 +1,55 @@
//////////////////////////////////////////////////////////////////////////////
//
// License Agreement:
//
// The following are Copyright © 2008, Daniel Önnerby
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// * Neither the name of the author nor the names of other contributors may
// be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////////
#pragma once
#include <core/config.h>
//////////////////////////////////////////////////////////////////////////////
namespace musik{ namespace core{ namespace xml{
//////////////////////////////////////////////////////////////////////////////
class IWriteSupplier{
public:
virtual bool Write(const char* buffer,long bytes) = 0;
virtual void Flush() = 0;
virtual bool Exited() = 0;
};
//////////////////////////////////////////////////////////////////////////////
} } }

View File

@ -38,8 +38,6 @@
#include <core/xml/ParserNode.h>
#include <expat/expat.h>
#include <fstream>
using namespace musik::core::xml;
//////////////////////////////////////////////////////////////////////////////
@ -55,12 +53,11 @@ using namespace musik::core::xml;
///read from the socket when there is no buffer
///left in the parser.
//////////////////////////////////////////
Parser::Parser(boost::asio::ip::tcp::socket *socket)
Parser::Parser(IReadSupplier *supplier)
:level(0)
,supplier(supplier)
,xmlParser(NULL)
,socket(socket)
,xmlParserStatus(XML_Status::XML_STATUS_OK)
,readBufferLength(0)
,currentEventType(0)
,exit(false)
{
@ -150,25 +147,6 @@ void Parser::OnContentReal(const char *content,int length){
//////////////////////////////////////////////////////////////////////////////
void Parser::ReadFromSocket(){
boost::system::error_code error;
// This is a likely place where a thread will wait
this->readBufferLength = this->socket->read_some(boost::asio::buffer(this->readBuffer),error);
if(error){
// Connection closed or some other error occured
this->Exit();
}
// Log
// std::ofstream logFile("mc2_Parser.log",std::ios::app);
// logFile.write(this->readBuffer.c_array(),this->readBufferLength);
// logFile << std::endl;
}
//////////////////////////////////////////////////////////////////////////////
void Parser::ContinueParsing(){
this->xmlFound = false;
@ -178,8 +156,14 @@ void Parser::ContinueParsing(){
this->xmlParserStatus = XML_ResumeParser(this->xmlParser);
break;
case XML_Status::XML_STATUS_OK:
this->ReadFromSocket();
this->xmlParserStatus = XML_Parse(this->xmlParser,this->readBuffer.c_array(),(int)this->readBufferLength,0);
if(this->supplier->Exited()){
this->Exit();
return;
}
if(!this->supplier->Read()){
return;
}
this->xmlParserStatus = XML_Parse(this->xmlParser,this->supplier->Buffer(),(int)this->supplier->BufferSize(),0);
break;
case XML_Status::XML_STATUS_ERROR:
this->Exit();

View File

@ -40,9 +40,8 @@
#include <vector>
#include <set>
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <core/xml/IReadSupplier.h>
#include <core/xml/Node.h>
#include <core/xml/ParserNode.h>
@ -89,10 +88,11 @@ namespace musik{ namespace core{ namespace xml{
//////////////////////////////////////////
class Parser : public ParserNode{
public:
Parser(boost::asio::ip::tcp::socket *socket);
Parser(IReadSupplier *supplier);
~Parser();
private:
IReadSupplier *supplier;
// XML Parser status
XML_Parser xmlParser;
int xmlParserStatus;
@ -101,12 +101,6 @@ class Parser : public ParserNode{
int level;
std::vector<Node::Ptr> currentNodeLevels;
// Socket stuff
boost::asio::ip::tcp::socket *socket;
boost::array<char, 4096> readBuffer;
size_t readBufferLength;
private:
static void OnElementStart(void *thisobject,const char *name, const char **atts);
@ -117,9 +111,6 @@ class Parser : public ParserNode{
void OnContentReal(const char *content,int length);
private:
void ReadFromSocket();
private:
friend class ParserNode;

117
src/core/xml/Socket.cpp Normal file
View File

@ -0,0 +1,117 @@
//////////////////////////////////////////////////////////////////////////////
//
// License Agreement:
//
// The following are Copyright © 2008, Daniel Önnerby
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// * Neither the name of the author nor the names of other contributors may
// be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////////
#include "pch.hpp"
#include <core/xml/Socket.h>
using namespace musik::core::xml;
SocketReader::SocketReader(boost::asio::ip::tcp::socket &socket)
:socket(socket)
,exited(false)
,bufferSize(0)
{
}
SocketReader::~SocketReader(void){
}
bool SocketReader::Read(long recommendedBytes){
boost::system::error_code error;
this->bufferSize = this->socket.read_some(boost::asio::buffer(this->buffer),error);
if(!error){
return true;
}
this->exited = true;
return false;
}
char* SocketReader::Buffer(){
return this->buffer.c_array();
}
long SocketReader::BufferSize(){
return this->bufferSize;
}
bool SocketReader::Exited(){
return this->exited;
}
//////////////////////////////////////////////////////////////////////////////
SocketWriter::SocketWriter(boost::asio::ip::tcp::socket &socket)
:socket(socket)
,bufferSize(0)
,exited(false)
,maxBufferSize(1024)
{
}
SocketWriter::~SocketWriter(void){
}
bool SocketWriter::Write(const char* buffer,long bytes){
if(this->bufferSize+bytes>this->maxBufferSize){
this->Flush();
}
CopyMemory(this->buffer.c_array()+this->bufferSize, buffer, bytes);
this->bufferSize += bytes;
return !this->exited;
}
void SocketWriter::Flush(){
if(this->bufferSize && !this->Exited()){
boost::system::error_code error;
try{
boost::asio::write(this->socket,boost::asio::buffer(this->buffer,this->bufferSize));
}catch(...){
this->exited = true;
}
this->bufferSize = 0;
}
}
bool SocketWriter::Exited(){
return this->exited;
}

86
src/core/xml/Socket.h Normal file
View File

@ -0,0 +1,86 @@
//////////////////////////////////////////////////////////////////////////////
//
// License Agreement:
//
// The following are Copyright © 2008, Daniel Önnerby
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// * Neither the name of the author nor the names of other contributors may
// be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////////
#pragma once
#include <core/config.h>
#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <core/xml/IWriteSupplier.h>
#include <core/xml/IReadSupplier.h>
//////////////////////////////////////////////////////////////////////////////
namespace musik{ namespace core{ namespace xml{
//////////////////////////////////////////////////////////////////////////////
class SocketReader : public IReadSupplier{
public:
SocketReader(boost::asio::ip::tcp::socket &socket);
~SocketReader(void);
virtual bool Read(long recommendedBytes=1024);
virtual char* Buffer();
virtual long BufferSize();
virtual bool Exited();
private:
boost::asio::ip::tcp::socket &socket;
bool exited;
long bufferSize;
boost::array<char, 4096> buffer;
};
//////////////////////////////////////////////////////////////////////////////
class SocketWriter : public IWriteSupplier{
public:
SocketWriter(boost::asio::ip::tcp::socket &socket);
~SocketWriter(void);
virtual bool Write(const char* buffer,long bytes);
virtual void Flush();
virtual bool Exited();
private:
boost::asio::ip::tcp::socket &socket;
bool exited;
long bufferSize;
boost::array<char, 4096> buffer;
long maxBufferSize;
};
//////////////////////////////////////////////////////////////////////////////
} } }

View File

@ -37,7 +37,6 @@
#include <core/xml/Writer.h>
#include <core/xml/WriterNode.h>
#include <boost/algorithm/string/replace.hpp>
#include <fstream>
using namespace musik::core::xml;
@ -54,8 +53,8 @@ using namespace musik::core::xml;
///read from the socket when there is no buffer
///left in the parser.
//////////////////////////////////////////
Writer::Writer(boost::asio::ip::tcp::socket *socket)
:socket(socket)
Writer::Writer(IWriteSupplier *supplier)
:supplier(supplier)
,exit(false)
{
this->writer = this;
@ -174,12 +173,8 @@ void Writer::Send(){
try{
// Time to send the buffer
if(!sendBuffer.empty() && this->socket->is_open()){
boost::asio::write(*(this->socket),boost::asio::buffer(sendBuffer));
// Log
//std::ofstream logFile("mc2_Writer.log",std::ios::app);
//logFile << sendBuffer << std::endl;
if(!sendBuffer.empty() && !this->supplier->Exited()){
this->supplier->Write(sendBuffer.c_str(),sendBuffer.size());
sendBuffer.clear();
}
}

View File

@ -37,9 +37,8 @@
#include <core/config.h>
#include <string>
#include <boost/asio.hpp>
//#include <boost/array.hpp>
#include <core/xml/IWriteSupplier.h>
#include <core/xml/Node.h>
#include <core/xml/WriterNode.h>
@ -51,16 +50,12 @@ namespace musik{ namespace core{ namespace xml{
class Writer : public WriterNode{
public:
Writer(boost::asio::ip::tcp::socket *socket);
Writer(IWriteSupplier *supplier);
~Writer();
bool Exited();
private:
// Socket stuff
boost::asio::ip::tcp::socket *socket;
// boost::array<char, 4096> readBuffer;
// size_t readBufferLength;
IWriteSupplier *supplier;
private:
friend class WriterNode;

View File

@ -106,3 +106,9 @@ Node::AttributeMap& WriterNode::Attributes(){
return this->node->attributes;
}
void WriterNode::Flush(){
if(this->writer){
this->writer->supplier->Flush();
}
}

View File

@ -57,6 +57,8 @@ class WriterNode {
std::string& Content();
Node::AttributeMap& Attributes();
void Flush();
protected:
friend class Writer;