@@ -74,27 +74,34 @@ namespace osuCrypto {
74
74
std::string remoteName)
75
75
:
76
76
mIos(endpoint.getIOService()),
77
- mWork(new boost::asio::io_service::work(endpoint.getIOService().mIoService)),
77
+ mWork(endpoint.getIOService(), "Channel:" + endpoint.mBase->mName + "." + localName
78
+ + (endpoint.mBase ->mMode == SessionMode::Server ? " (server)" : " (client)" )),
78
79
mSession(endpoint.mBase ),
79
80
mRemoteName(remoteName),
80
81
mLocalName(localName),
81
82
mChannelRefCount(1 ),
82
83
mStrand(endpoint.getIOService().mIoService.get_executor())
83
84
{
85
+ std::lock_guard<std::mutex> lock (mIos .mWorkerMtx );
86
+ mIos .mChannels .insert (this );
84
87
}
85
88
86
89
ChannelBase::ChannelBase (IOService& ios, SocketInterface* sock)
87
90
:
88
91
mIos(ios),
89
- mWork(new boost::asio::io_service::work(ios. mIoService ) ),
92
+ mWork(ios, " Channel: SocketInterface. " + std::to_string((u64)sock) ),
90
93
mChannelRefCount(1 ),
91
94
mHandle(sock),
92
95
mStrand(ios.mIoService .get_executor())
93
96
{
97
+ std::lock_guard<std::mutex> lock (mIos .mWorkerMtx );
98
+ mIos .mChannels .insert (this );
94
99
}
95
100
96
101
ChannelBase::~ChannelBase ()
97
102
{
103
+ std::lock_guard<std::mutex> lock (mIos .mWorkerMtx );
104
+ mIos .mChannels .erase (mIos .mChannels .find (this ));
98
105
assert (mChannelRefCount ==0 );
99
106
}
100
107
@@ -338,7 +345,7 @@ namespace osuCrypto {
338
345
339
346
void StartSocketOp::connectCallback (const error_code& ec)
340
347
{
341
- IF_LOG (mChl ->mLog .push (" calling StartSocketOp::asyncConnectToServer(...) cb1 " ));
348
+ // IF_LOG(mChl->mLog.push("calling StartSocketOp::asyncConnectToServer(...) cb1 "));
342
349
343
350
boost::asio::dispatch (mStrand , [this , ec] {
344
351
// lout << "calling StartSocketOp::asyncConnectToServer(...) cb1 " << time() << std::endl;
@@ -366,11 +373,11 @@ namespace osuCrypto {
366
373
367
374
if (ec2)
368
375
{
376
+ IF_LOG (
369
377
auto msg = " async connect. Failed to set option ~ ec=" + ec2.message () + " \n "
370
378
+ " isOpen=" + std::to_string (sock.is_open ())
371
379
+ " stopped=" + std::to_string (canceled ());
372
-
373
- IF_LOG (mChl ->mLog .push (msg));
380
+ mChl ->mLog .push (msg));
374
381
375
382
// retry.
376
383
retryConnect (ec2);
@@ -451,11 +458,13 @@ namespace osuCrypto {
451
458
{
452
459
auto & sock = mSock ->mSock ;
453
460
454
- auto msg = " async connect. Failed to send ConnectionString ~ ec=" + ec.message () + " \n "
461
+ IF_LOG (
462
+ auto msg = " async connect. Failed to send ConnectionString ~ ec=" + ec.message () + " \n "
455
463
+ " isOpen=" + std::to_string (sock.is_open ())
456
464
+ " canceled=" + std::to_string (canceled ());
457
465
458
- IF_LOG (mChl ->mLog .push (msg));
466
+ mChl ->mLog .push (msg)
467
+ );
459
468
460
469
// Unknown issue where we connect but then the pipe is broken.
461
470
// Simply retrying seems to be a workaround.
@@ -474,19 +483,11 @@ namespace osuCrypto {
474
483
475
484
void osuCrypto::StartSocketOp::retryConnect (const error_code& ec)
476
485
{
477
-
478
-
479
486
error_code ec2;
480
487
mSock ->mSock .close (ec2);
481
488
482
- IF_LOG (if (ec2)
483
- mChl ->mLog .push (" error closing boost socket (3), ec = " + ec2.message ()));
484
-
485
489
auto count = static_cast <u64>(mBackoff );
486
-
487
490
mTimer .expires_from_now (boost::posix_time::millisec (count));
488
-
489
-
490
491
mBackoff = std::min (mBackoff * 1.2 , 1000.0 );
491
492
if (mBackoff >= 1000.0 )
492
493
{
@@ -527,16 +528,18 @@ namespace osuCrypto {
527
528
528
529
Channel& Channel::operator =(Channel&& move)
529
530
{
530
- if (mBase )
531
- --mBase ->mChannelRefCount ;
531
+ if (mBase && --mBase ->mChannelRefCount == 0 )
532
+ mBase ->close ();
533
+
532
534
mBase = std::move (move.mBase );
533
535
return *this ;
534
536
}
535
537
536
538
Channel& Channel::operator =(const Channel& copy)
537
539
{
538
- if (mBase )
539
- --mBase ->mChannelRefCount ;
540
+ if (mBase && --mBase ->mChannelRefCount == 0 )
541
+ mBase ->close ();
542
+
540
543
mBase = copy.mBase ;
541
544
if (mBase )
542
545
++mBase ->mChannelRefCount ;
@@ -625,9 +628,9 @@ namespace osuCrypto {
625
628
if (mBase ) mBase ->close ();
626
629
}
627
630
628
- void Channel::cancel ()
631
+ void Channel::cancel (bool close )
629
632
{
630
- if (mBase ) mBase ->cancel ();
633
+ if (mBase ) mBase ->cancel (close );
631
634
}
632
635
633
636
void Channel::asyncClose (std::function<void ()> completionHandle)
@@ -636,9 +639,9 @@ namespace osuCrypto {
636
639
else completionHandle ();
637
640
}
638
641
639
- void Channel::asyncCancel (std::function<void ()> completionHandle)
642
+ void Channel::asyncCancel (std::function<void ()> completionHandle, bool close )
640
643
{
641
- if (mBase ) mBase ->asyncCancel (std::move (completionHandle));
644
+ if (mBase ) mBase ->asyncCancel (std::move (completionHandle), close );
642
645
else completionHandle ();
643
646
}
644
647
@@ -668,7 +671,7 @@ namespace osuCrypto {
668
671
#else
669
672
char str= 0 ;
670
673
#endif
671
- mRecvQueue .push_back (std::move (op));
674
+ mRecvQueue .push_back (std::forward<SBO_ptr<details::RecvOperation>> (op));
672
675
auto lifetime = shared_from_this ();
673
676
674
677
// a strand is like a lock. Stuff posted (or dispatched) to a strand will be executed sequentially
@@ -680,16 +683,21 @@ namespace osuCrypto {
680
683
bool available = recvSocketAvailable ();
681
684
bool startRecving = hasItems && available;
682
685
683
- LOG_MSG (" recv queuing " +str+" : start = " + std::to_string (startRecving) + " = " + std::to_string (hasItems) + " && " + std::to_string (available));
684
686
// the queue must be guarded from concurrent access, so add the op within the strand
685
687
// queue up the operation.
686
688
if (startRecving)
687
689
{
690
+ assert (mStatus != Channel::Status::Closed);
691
+
688
692
// ok, so there isn't any recv operations currently underway. Lets kick off the first one. Subsequent recvs
689
693
// will be kicked off at the completion of this operation.
690
694
mRecvLoopLifetime = std::move (lifetime);
691
695
asyncPerformRecv ();
692
696
}
697
+ else
698
+ {
699
+ LOG_MSG (" recv defered " +str+" : " + std::to_string (hasItems) + " && " + std::to_string (available));
700
+ }
693
701
});
694
702
}
695
703
@@ -703,23 +711,25 @@ namespace osuCrypto {
703
711
char str = 0 ;
704
712
#endif
705
713
706
- mSendQueue .push_back (std::move (op));
714
+ mSendQueue .push_back (std::forward<SBO_ptr<details::SendOperation>> (op));
707
715
auto lifetime = shared_from_this ();
708
716
709
717
// a strand is like a lock. Stuff posted (or dispatched) to a strand will be executed sequentially
710
718
boost::asio::dispatch (mStrand , [this , str, lifetime = std::move (lifetime)]() mutable
711
719
{
712
720
auto hasItems = (mSendQueue .isEmpty () == false );
713
- auto avaliable = sendSocketAvailable ();
714
- auto startSending = hasItems && avaliable;
715
-
716
- LOG_MSG (" send queuing " +str+" : start = " + std::to_string (startSending) + " = " + std::to_string (hasItems)
717
- + " & " + std::to_string (avaliable));
721
+ auto available = sendSocketAvailable ();
722
+ auto startSending = hasItems && available;
718
723
719
724
if (startSending)
720
725
{
726
+ assert (mStatus != Channel::Status::Closed);
721
727
mSendLoopLifetime = std::move (lifetime);
722
728
asyncPerformSend ();
729
+ }
730
+ else
731
+ {
732
+ LOG_MSG (" send defered " +str+" : " + std::to_string (hasItems) + " && " + std::to_string (available));
723
733
}
724
734
});
725
735
}
@@ -830,17 +840,17 @@ namespace osuCrypto {
830
840
mIos .printError (s);
831
841
}
832
842
833
- void ChannelBase::cancel ()
843
+ void ChannelBase::cancel (bool close )
834
844
{
835
845
std::promise<void > prom;
836
846
asyncCancel ([&]() {
837
847
prom.set_value ();
838
- });
848
+ }, close );
839
849
840
850
prom.get_future ().get ();
841
851
}
842
852
843
- void ChannelBase::asyncCancel (std::function<void ()> completionHandle)
853
+ void ChannelBase::asyncCancel (std::function<void ()> completionHandle, bool close )
844
854
{
845
855
LOG_MSG (" cancel()" );
846
856
@@ -851,27 +861,34 @@ namespace osuCrypto {
851
861
std::atomic<u32> mCount ;
852
862
std::function<void ()> mFn ;
853
863
std::shared_ptr<ChannelBase> mLifetime ;
854
- CancelState (std::function<void ()>&& fn, std::shared_ptr<ChannelBase>&& p)
864
+ bool mClose ;
865
+ CancelState (std::function<void ()>&& fn, std::shared_ptr<ChannelBase>&& p, bool close)
855
866
: mCount (2 )
856
867
, mFn (std::move(fn))
857
- , mLifetime (std::move(p)){}
868
+ , mLifetime (std::move(p))
869
+ , mClose (close)
870
+ {}
858
871
};
859
872
860
- auto state = std::make_shared<CancelState>(std::move (completionHandle), shared_from_this ());
861
- // auto lifetime = shared_from_this();
862
- boost::asio::dispatch (mStrand , [this , state]() mutable {
873
+ auto state = std::make_shared<CancelState>(std::move (completionHandle), shared_from_this (), close );
874
+
875
+ boost::asio::post (mStrand , [this , state]() mutable {
876
+
863
877
mStatus = Channel::Status::Canceling;
878
+
864
879
auto ec = boost::system::errc::make_error_code (boost::system ::errc::operation_canceled);
865
880
mRecvCancelNew = true ;
866
881
mSendCancelNew = true ;
867
882
868
-
869
883
auto cb = [this ,state]() mutable {
870
884
if (--state->mCount == 0 )
871
885
{
872
886
LOG_MSG (" cancel callback." );
887
+
888
+ if (state->mClose )
889
+ mStatus = Channel::Status::Closed;
890
+
873
891
mHandle .reset (nullptr );
874
- mWork .reset (nullptr );
875
892
state->mFn ();
876
893
}
877
894
};
@@ -948,7 +965,7 @@ namespace osuCrypto {
948
965
mHandle ->close ();
949
966
}
950
967
mHandle .reset (nullptr );
951
- mWork .reset (nullptr );
968
+ mWork .reset ();
952
969
LOG_MSG (" Closed" );
953
970
954
971
auto c = std::move (ch);
0 commit comments