Divide Framework 0.1
A free and open-source 3D Framework under heavy development
Loading...
Searching...
No Matches
Client.cpp
Go to the documentation of this file.
1
2
3#ifndef OPCODE_ENUM
4#define OPCODE_ENUM OPcodes
5#endif
6
7#include "Headers/Client.h"
8#include "Headers/ASIO.h"
11
12#include <boost/asio/write.hpp>
13#include <boost/asio/read.hpp>
14#include <boost/asio/read_until.hpp>
15
16namespace Divide
17{
18 Client::Client( ASIO* asioPointer, boost::asio::io_context& io_context, const bool debugOutput )
19 : _debugOutput( debugOutput )
20 , _socket( io_context.get_executor() )
21 , _deadline( io_context.get_executor() )
22 , _heartbeatTimer( io_context.get_executor() )
23 , _asioPointer( asioPointer )
24 {
25 }
26
28 {
29 _packetQueue.push_back( p );
30 _heartbeatTimer.expires_at( boost::posix_time::neg_infin );
31
32 return true;
33 }
34
36 {
38 }
39
40 void Client::start( const boost::asio::ip::tcp::resolver::iterator endpoint_iter )
41 {
42 start_connect( MOV( endpoint_iter ) );
43 _deadline.async_wait( [&]( const boost::system::error_code )
44 {
46 } );
47 }
48
50 {
51 _stopped = true;
52 _socket.close();
53 _deadline.cancel();
54 _heartbeatTimer.cancel();
55 }
56
58 {
59 // Set a deadline for the read operation.
60 _deadline.expires_from_now( boost::posix_time::seconds( 30 ) );
61 _header = 0;
62 _inputBuffer.consume( _inputBuffer.size() );
63 // Start an asynchronous operation to read a newline-delimited message.
64 async_read(
65 _socket, boost::asio::buffer( &_header, sizeof _header ),
66 [&]( const boost::system::error_code ec, const std::size_t N )
67 {
68 handle_read_body( ec, N );
69 } );
70 }
71
72 void Client::handle_read_body( const boost::system::error_code& ec,
73 [[maybe_unused]] size_t bytes_transferred )
74 {
75 if ( _stopped )
76 {
77 return;
78 }
79
80 if ( !ec )
81 {
82 _deadline.expires_from_now( boost::posix_time::seconds( 30 ) );
83 async_read(
84 _socket, _inputBuffer.prepare( _header ),
85 [&]( const boost::system::error_code code, const std::size_t N )
86 {
87 handle_read_packet( code, N );
88 } );
89 }
90 else
91 {
92 stop();
93 }
94 }
95
96 void Client::handle_read_packet( const boost::system::error_code& ec,
97 [[maybe_unused]] size_t bytes_transferred )
98 {
99
100 if ( _stopped )
101 {
102 return;
103 }
104 if ( !ec )
105 {
106 _inputBuffer.commit( _header );
107
108 WorldPacket packet;
109 if (!packet.loadFromBuffer(_inputBuffer))
110 {
111 ASIO::LOG_PRINT( Util::StringFormat( LOCALE_STR( "ASIO_EXCEPTION" ), "WorldPacket::loadFromBuffer" ).c_str(), true );
112 }
113
114 if ( packet.opcode() == OPCodes::SMSG_SEND_FILE )
115 {
116 async_read_until(
117 _socket, _requestBuf, "\n\n",
118 [&]( const boost::system::error_code code, const std::size_t N )
119 {
120 handle_read_file( code, N );
121 } );
122 }
123 else
124 {
125 receivePacket( packet );
126 start_read();
127 }
128 }
129 else
130 {
131 stop();
132 }
133 }
134
135 void Client::handle_read_file( [[maybe_unused]] const boost::system::error_code& ec, const size_t bytes_transferred )
136 {
137
138 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_READ_FILE"), __FUNCTION__, bytes_transferred, _requestBuf.in_avail(), _requestBuf.size(), _requestBuf.max_size()).c_str() );
139
140 std::istream request_stream( &_requestBuf );
141 string file_path;
142 request_stream >> file_path;
143 request_stream >> _fileSize;
144 request_stream.read( _buf.data(), 2 ); // eat the "\n\n"
145
146 {
147 stringstream ss;
148 ss << request_stream.tellg();
149 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_FILE_INFO"), __FUNCTION__, file_path.c_str(), _fileSize, ss.str().c_str()).c_str() );
150 }
151
152 const size_t pos = file_path.find_last_of( '\\' );
153 if ( pos != string::npos ) file_path = file_path.substr( pos + 1 );
154 _outputFile.open( file_path.c_str(), std::ios_base::binary );
155 if ( !_outputFile )
156 {
157 ASIO::LOG_PRINT( Util::StringFormat( LOCALE_STR( "ASIO_FAIL_OPEN_FILE" ), file_path.c_str() ).c_str(), true );
158 return;
159 }
160 // write extra bytes to file
161 do
162 {
163 request_stream.read( _buf.data(), (std::streamsize)_buf.size() );
164 ASIO::LOG_PRINT( Util::StringFormat( LOCALE_STR( "ASIO_WRITE_BYTES" ), __FUNCTION__, request_stream.gcount() ).c_str() );
165
166 _outputFile.write( _buf.data(), request_stream.gcount() );
167 }
168 while ( request_stream.gcount() > 0 );
169
170 async_read( _socket, boost::asio::buffer( _buf.data(), _buf.size() ),
171 [&]( const boost::system::error_code code, const std::size_t N )
172 {
173 handle_read_file_content( code, N );
174 } );
175 }
176
177 void Client::handle_read_file_content( const boost::system::error_code& err, std::size_t bytes_transferred )
178 {
179 if ( bytes_transferred > 0 )
180 {
181 _outputFile.write( _buf.data(), (std::streamsize)bytes_transferred );
182 stringstream ss;
183 ss << _outputFile.tellp();
184 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_READ_BYTES"), __FUNCTION__ , ss.str().c_str() ).c_str() );
185 if ( _outputFile.tellp() >= (std::streamsize)_fileSize )
186 {
187 return;
188 }
189 }
190 if ( err ) stop();
191 start_read();
192 }
193
195 {
196 if ( _stopped )
197 {
198 return;
199 }
200
201 if ( _packetQueue.empty() )
202 {
204 heart << I8_ZERO;
205 _packetQueue.push_back( heart );
206 }
207
208 boost::asio::streambuf buf;
209 if (!_packetQueue.front().saveToBuffer(buf))
210 {
211 ASIO::LOG_PRINT( Util::StringFormat( LOCALE_STR( "ASIO_EXCEPTION" ), "WorldPacket::saveToBuffer" ).c_str(), true );
212 }
213
214 size_t header = buf.size();
216 buffers.push_back( boost::asio::buffer( &header, sizeof header ) );
217 buffers.push_back( buf.data() );
218 async_write( _socket, buffers, [&]( const boost::system::error_code ec, const size_t )
219 {
220 handle_write( ec );
221 } );
222 }
223
224 void Client::handle_write( const boost::system::error_code& ec )
225 {
226 if ( _stopped )
227 {
228 return;
229 }
230
231 if ( !ec )
232 {
233 _packetQueue.pop_front();
234 _heartbeatTimer.expires_from_now( boost::posix_time::seconds( 2 ) );
235 _heartbeatTimer.async_wait( [&]( const boost::system::error_code )
236 {
237 start_write();
238 } );
239 }
240 else
241 {
242 if ( _debugOutput )
243 {
244 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_PACKET_ERROR"), ec.message().c_str()).c_str(), true );
245 stop();
246 }
247 }
248 }
249
251 {
252 if ( _stopped )
253 {
254 return;
255 }
256 // Check whether the deadline has passed. We compare the deadline against
257 // the current time since a new asynchronous operation may have moved the
258 // deadline before this actor had a chance to run.
259 if ( _deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now() )
260 {
261 // The deadline has passed. The socket is closed so that any outstanding
262 // asynchronous operations are cancelled.
263 _socket.close();
264
265 // There is no longer an active deadline. The expiry is set to positive
266 // infinity so that the actor takes no action until a new deadline is
267 // set.
268 _deadline.expires_at( boost::posix_time::pos_infin );
269 }
270
271 // Put the actor back to sleep.
272 _deadline.async_wait( [&]( const boost::system::error_code )
273 {
275 } );
276 }
277
278 void Client::start_connect( boost::asio::ip::tcp::resolver::iterator endpoint_iter )
279 {
280 if ( endpoint_iter != boost::asio::ip::tcp::resolver::iterator() )
281 {
282 if ( _debugOutput )
283 {
284 std::stringstream ss;
285 ss << endpoint_iter->endpoint();
286 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_CONNECTING_TO_IP"), ss.str().c_str()).c_str() );
287 }
288 // Set a deadline for the connect operation.
289 _deadline.expires_from_now( boost::posix_time::seconds( 60 ) );
290
291 // Start the asynchronous connect operation.
292 _socket.async_connect(
293 endpoint_iter->endpoint(),
294 [&, endpoint_iter]( const boost::system::error_code ec )
295 {
296 handle_connect( ec, endpoint_iter );
297 } );
298 }
299 else
300 {
301 // There are no more endpoints to try. Shut down the client.
302 stop();
303 }
304 }
305
306 void Client::handle_connect( const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator endpoint_iter )
307 {
308 if ( _stopped )
309 {
310 return;
311 }
312 // The async_connect() function automatically opens the socket at the start
313 // of the asynchronous operation. If the socket is closed at this time then
314 // the timeout handler must have run first.
315 if ( !_socket.is_open() )
316 {
317 if ( _debugOutput )
318 {
319 ASIO::LOG_PRINT( LOCALE_STR("ASIO_CONNECT_TIME_OUT"), true );
320 }
321 // Try the next available endpoint.
322 start_connect( ++endpoint_iter );
323 }
324
325 // Check if the connect operation failed before the deadline expired.
326 else if ( ec )
327 {
328 if ( _debugOutput )
329 {
330 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_EXCEPTION"),ec.message().c_str()).c_str(), true );
331 }
332 // We need to close the socket used in the previous connection attempt
333 // before starting a new one.
334 _socket.close();
335
336 // Try the next available endpoint.
337 start_connect( ++endpoint_iter );
338 }
339
340 // Otherwise we have successfully established a connection.
341 else
342 {
343 if ( _debugOutput )
344 {
345 std::stringstream ss;
346 ss << endpoint_iter->endpoint();
347 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_CONNECTED_TO_IP"), ss.str().c_str()).c_str() );
348 }
349 // Start the input actor.
350 start_read();
351
352 // Start the heartbeat actor.
353 start_write();
354 }
355 }
356
357}; // namespace Divide
#define LOCALE_STR(X)
Definition: Localization.h:91
#define MOV(...)
static void LOG_PRINT(const char *msg, bool error=false)
Definition: ASIO.cpp:123
virtual void handlePacket(WorldPacket &p)=0
void receivePacket(WorldPacket &p) const
Definition: Client.cpp:35
size_t _header
Definition: Client.h:102
void handle_read_packet(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: Client.cpp:96
ASIO * _asioPointer
Definition: Client.h:113
void handle_read_file_content(const boost::system::error_code &err, std::size_t bytes_transferred)
Definition: Client.cpp:177
eastl::deque< WorldPacket > _packetQueue
Definition: Client.h:106
void start_read()
Definition: Client.cpp:57
std::ofstream _outputFile
Definition: Client.h:109
deadline_timer _heartbeatTimer
Definition: Client.h:105
boost::asio::streambuf _inputBuffer
Definition: Client.h:103
bool _stopped
Definition: Client.h:100
void start_write()
Definition: Client.cpp:194
void handle_read_file(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: Client.cpp:135
tcp_socket _socket
Definition: Client.h:101
void start(boost::asio::ip::tcp::resolver::iterator endpoint_iter)
Definition: Client.cpp:40
void check_deadline()
Definition: Client.cpp:250
bool _debugOutput
Definition: Client.h:100
bool sendPacket(const WorldPacket &p)
Definition: Client.cpp:27
void stop()
Definition: Client.cpp:49
Client(ASIO *asioPointer, boost::asio::io_context &service, bool debugOutput)
Definition: Client.cpp:18
deadline_timer _deadline
Definition: Client.h:104
std::array< char, 1024 > _buf
Definition: Client.h:112
void handle_read_body(const boost::system::error_code &ec, size_t bytes_transferred)
Definition: Client.cpp:72
void handle_connect(const boost::system::error_code &ec, boost::asio::ip::tcp::resolver::iterator endpoint_iter)
Definition: Client.cpp:306
boost::asio::streambuf _requestBuf
Definition: Client.h:110
void start_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iter)
Definition: Client.cpp:278
size_t _fileSize
Definition: Client.h:111
void handle_write(const boost::system::error_code &ec)
Definition: Client.cpp:224
static const ValueType SMSG_SEND_FILE
Definition: OPCodesTpl.h:19
static const ValueType MSG_HEARTBEAT
Definition: OPCodesTpl.h:18
bool loadFromBuffer(boost::asio::streambuf &buf)
Definition: WorldPacket.cpp:27
Str StringFormat(const char *fmt, Args &&...args)
Handle console commands that start with a forward slash.
Definition: AIProcessor.cpp:7
std::basic_stringstream< char, std::char_traits< char >, dvd_allocator< char > > stringstream
Definition: STLString.h:43
constexpr I8 I8_ZERO
eastl::vector< Type > vector
Definition: Vector.h:42