0%

流、短读和短写(Streams, Short Reads and Short Writes)#

Boost.Asio中許多 I/0对象都是面向流的: - 没有消息边界。数据通过连续字节序列传输 - 读取或写入操做传输的数据可能少于请求的字节。这就叫做 短读(short read) or 短写(short write)。

提供面向流的I/O模型满足一个或多个如下要求: - SyncReadStream,使用read_some()的成员函数执行。 - AsyncReadStream,使用async_read_some()的成员函数执行。 - SyncWriteStream,使用write_some()的成员函数执行。 - AsyncWriteStream,使用async_write_some()的成员函数执行。

面向流的I/O对象有:ip::tcp::socket, ssl::stream<>, posix::stream_descriptor, windows::stream_handle 等

当短读(short read) or 短写(short write)出现时,程序必须重新启动操作,直到传输了所需的字数。Boost.Asio 提供自动执行此操作的常规函数:read()、async_read()、write()和async_write()。

Why EOF is an Error#

  • stream结束符可以导致 read, async_read, read_until or async_read_until函数终止。例如,由于 EOF,读取 N 个字节可能会提前完成。
  • EOF 错误可用于区分流的末尾和大小为 0 的成功读取。

Reactor式操作(Reactor-Style Operations)#

有时需要集成使用自定义I/0操作的第三方库。Boost.Asio 包括同步和异步操作,这些操作可用于等待socket准备好读取、准备写入或出现挂起的错误情况。下面是一个非阻塞读的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ip::tcp::socket socket(my_io_context);
...
socket.non_blocking(true);
...
socket.async_wait(ip::tcp::socket::wait_read, read_handler);
...
void read_handler(boost::system::error_code ec)
{
if (!ec)
{
std::vector<char> buf(socket.available());
socket.read_some(buffer(buf));
}
}
这些操作支持所有平台的socket,以及 POSIX 的面向流的描述符类。

基于行的操作(Line-Based Operations)#

许多常用的 internet 协议是基于行的,它们具有由字符序列“”分隔的元素,如 HTTP、SMTP 和 FTP。为了更容易地实现基于行的协议,Boost.Asio 提供了函数 read_until()和async_read_until()。 如下是在一个HTTP服务器使用 async_read_until() 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class http_connection
{
...

void start()
{
boost::asio::async_read_until(socket_, data_, "\r\n",
boost::bind(&http_connection::handle_request_line, this, _1));
}

void handle_request_line(boost::system::error_code ec)
{
if (!ec)
{
std::string method, uri, version;
char sp1, sp2, cr, lf;
std::istream is(&data_);
is.unsetf(std::ios_base::skipws);
is >> method >> sp1 >> uri >> sp2 >> version >> cr >> lf;
...
}
}

...

boost::asio::ip::tcp::socket socket_;
boost::asio::streambuf data_;
};
streambuf数据成员用作存储在搜索分隔符(delimiter)之前从套接字读取的数据的位置。分隔符后可能还有其他数据。剩余的数据应该留在streambuf中,以便后续的read_until()或async_read_until()处理。

分隔符可以指定为单个char、std::string或boost::regex。read_until()和async_read_until()函数还接受称为match condition的用户自定义函数对象的重载。例如,将数据读取到streambuf,直到遇到空白:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef boost::asio::buffers_iterator<
boost::asio::streambuf::const_buffers_type> iterator;

std::pair<iterator, bool> match_whitespace(iterator begin, iterator end)
{
iterator i = begin;
while (i != end)
if (std::isspace(*i++))
return std::make_pair(i, true);
return std::make_pair(i, false);
}
...
boost::asio::streambuf b;
boost::asio::read_until(s, b, match_whitespace);
要将数据读取到streambuf中,直到找到匹配的字符:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class match_char
{
public:
explicit match_char(char c) : c_(c) {}

template <typename Iterator>std::pair<Iterator, bool> operator()(
Iterator begin, Iterator end) const
{
Iterator i = begin;
while (i != end)
if (c_ == *i++)
return std::make_pair(i, true);
return std::make_pair(i, false);
}

private:
char c_;
};

namespace boost {
namespace asio {
template <> struct is_match_condition<match_char>: public boost::true_type {};
} } // namespace boost::asio
...
boost::asio::streambuf b;
boost::asio::read_until(s, b, match_char('a'));
对于函数和typedef的嵌套result_type类型的函数对象,is_match_condition<>类型特征会自动计算为 true。对于其他类型的特征必须显示特化(explicitly specialised)。

自定义内存分配(Custom Memory Allocation)#

许多异步操作需要分配一个对象去存储操作的状态。 另外,程序中包含了易于识别的异步操作链。 半双工协议(如HTTP服务器)对每个客户端连接维护一个有单向操作链(先接收后发送)。 全双工协议有两条并行的链。程序应该能够根据这些特性,为链中的所有异步操作重用这些内存。

对于用户自定义的Handler对象的副本 h,若需要分配与该处理程序关联的内存,这将有一个使用get_associated_allocator的分配器(allocator)。例如

1
boost::asio::associated_allocator_t<Handler> a = boost::asio::get_associated_allocator(h);
默认情况下,处理程序使用标准分配器(按照::operator new()和::operator delete()实现)。可以通过指定嵌套类型allocator_type和成员函数get_allocator(),为特定处理程序类型自定义分配器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class my_handler
{
public:
// Custom implementation of Allocator type requirements.
typedef my_allocator allocator_type;

// Return a custom allocator implementation.
allocator_type get_allocator() const noexcept
{
return my_allocator();
}

void operator()() { ... }
};
在更复杂的情况下,associated_allocator模板可能直接偏特化(partially specialised):
1
2
3
4
5
6
7
8
9
10
11
12
namespace boost { namespace asio {
template <typename Allocator> struct associated_allocator<my_handler, Allocator>
{
// Custom implementation of Allocator type requirements.
typedef my_allocator type;
// Return a custom allocator implementation.
static type get(const my_handler&, const Allocator& a = Allocator()) noexcept
{
return my_allocator();
}
};
} } // namespace boost::asio
- 这保证了释放将在调用associated handler之前发生,这意味着内存已经准备好被重用于handler启动的任何新的异步操作。

  • 该实现保证,对于库中包含的异步操作,该实现不会对内存分配函数进行并发调用。如果需要从不同线程调用分配函数,将插入适当的内存屏障,以确保内存正确。

Threads and Boost.Asio#

线程安全(Thread Safety)#

并发(concurrent)在不同对象间是安全的,但对于单一对象,却是不安全的。然而 io_context等类型可以保证 单一对象的并发线程安全

线程池(Thread Pools)#

多个线程可以调用 io_context::run()来建立一个线程池,从中可以调用完成处理程序。这种方法也可以与post()一起使用,作为跨线程池执行任意计算任务的方法。

所有加入到io_context's pool的线程都是对等的,io_context可以任意方式分发工作。

内部线程(Internal Threads)#

为了实现异步,有一些调用者不可见的 内部线程,他们必须满足: - 不能被库函数使用者 直接调用 - 屏蔽所有signal

这种方法辅之以以下保证:

  • 异步completion handlers程序将仅从当前正在调用io_context::run()的线程调用。

因此,库用户需要 创建 和 管理所有收消息的线程

这种方法的原因包括:

  • 单线程调用io_context::run(),可以避免与同步相关的开发复杂性。例如,库用户可以实现单线程的可扩展服务器
  • 库用户需要在线程启动后以及执行其他应用程序代码之前在线程中执行初始化。
  • 库接口与线程创建和管理接口解耦合,并允许在线程不可用的平台上实现

Strands: 使用没有显式锁定的线程(Strands: Use Threads Without Explicit Locking)#

Strand定义为event handlers的严格顺序调用(即没有并发调用)。使用Strand允许在多线程中执行代码,而无需显式加锁(例如,使用互斥)。

Strands可implicit 或 explicit: - 由于io_context保证了event handlers仅从io_context::run()里面触发,因此在单线程中调用 io_context::run(),意味着所有的event handlers在implicit strand中执行。 - 当与连接相关联的单链异步操作(例如采用半双工协议,如HTTP),是不可能存在并发的handlers,这种情况就是implicit strand。 - explicit strand是 strand<> or io_context::strand 的实例。所有的event handler函数 需要使用 boost::asio::bind_executor()绑定到strand上 或 通过strand对象发布/调度(posted/dispatched)

对于组合异步操作,如async_read()async_read_until(),如果completion handler通过strand,则所有中间handlers也应该经历相同的strand。这是确保调用方和组合操作之间共享的任何对象的线程安全

为此,所有异步操作都使用get_associated_executor函数获取handler的关联执行器。例如:

1
boost::asio::associated_executor_t<Handler> a = boost::asio::get_associated_executor(h);
通过指定嵌套类型executor_type成员函数get_executor(),可以针对特定处理程序类型自定义执行器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class my_handler
{
public:
// Custom implementation of Executor type requirements.
typedef my_executor executor_type;

// Return a custom executor implementation.
executor_type get_executor() const noexcept
{
return my_executor();
}

void operator()() { ... }
};
在更复杂的情况下,associated_executor模板可能直接偏特化(partially specialised):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct my_handler
{
void operator()() { ... }
};

namespace boost { namespace asio {

template <class Executor>
struct associated_executor<my_handler, Executor>
{
// Custom implementation of Executor type requirements.
typedef my_executor type;

// Return a custom executor implementation.
static type get(const my_handler&,
const Executor& = Executor()) noexcept
{
return my_executor();
}
};

} } // namespace boost::asio

boost::asio::bind_executor()函数用于绑定特殊executor对象(如一个strand)到completion handler。例如:

1
2
3
4
5
6
my_socket.async_read_some(my_buffer,
boost::asio::bind_executor(my_strand,
[](error_code ec, size_t length) // 这是个lambda函数
{
// ...
}));

Buffers#

I/O 涉及数据在内存的连续区域(称为缓冲区)之间传输,这些缓冲区可以简单地表示为由指针和字节大小组成的元组。为了开发高效的网络应用程序,Boost.Asio 包括对分散收集(scatter-gather)操作的支持:

  • 散点读取(scatter-read)将数据接收到多个缓冲区中。
  • 收集写入(gather-write)传输多个缓冲区

除了将缓冲区指定为指针和大小(以字节为单位)之外,Boost.Asio 还区分了可修改内存(称为mutable)和非可修改内存(其中后者从存储中创建为 const 限定变量)。因此,这两种类型可以定义如下:

1
2
typedef std::pair<void*, std::size_t> mutable_buffer;
typedef std::pair<const void*, std::size_t> const_buffer;
mutable_buffer 可以转换为const_buffer,反之不行。

但是,Boost.Asio 没有使用上述定义,而是定义两个类:mutable_buffer和const_buffer。这个目的是提供连续内存的不透明表示形式,其中:

  • 转换行为与std::pair一样:mutable_buffer可单向转换为const_buffer,反之不行。
  • 有缓冲区溢出保护。给定一个缓冲区实例,用户只能创建另一个表示相同内存范围或其子范围的缓冲区。为了提供进一步的安全性,库还包括用于从 POD 元素的数组、boost::array或std::vector或std::string自动确定缓冲区大小的机制。
  • 使用data()成员函数显式访问底层内存。通常应用程序不需要这样做,但库实现需要将原始内存传递给底层操作系统函数

最后,通过将缓冲区对象放入容器中,可以将多个缓冲区传递给scatter-gather操作(如read()或 write())。MutableBufferSequence和ConstBufferSequence概念已经定义好,这样就可以使用诸如std::vector、std::list、std::array或boost::array之类的容器。

与Iostreams集成的Streambuf#

boost::asio::basic_streambuf 类继承自 std::basic_streambuf,可以将 输入和输出序列(sequence) 关联到一个或多个 字符串数组对象,这些元素存储任意值。这些字符数组对象是 streambuf 对象的内部对象,但提供了对数组元素的直接访问,以允许它们用于 I/O 操作,例如套接字的发送或接收操作

缓冲序列的字节遍历#

buffers_iterator<>模板类,允许遍历buffer序列(即满足MutableBufferSequence或ConstBufferSequence要求的类型),就好像它们是连续的字节序列一样。还提供了名为buffers_begin()和buffers_end()的help函数。

例如,从socket读取一行,并且转换为string:

1
2
3
4
5
6
7
boost::asio::streambuf sb;
...
std::size_t n = boost::asio::read_until(sock, sb, '\n');
boost::asio::streambuf::const_buffers_type bufs = sb.data();
std::string line(
boost::asio::buffers_begin(bufs),
boost::asio::buffers_begin(bufs) + n);

缓冲区调试#

某些标准库的实现(如Microsoft Visual C++ 8.0 和更高版本一起搭载的库)提供了一个称为迭代器调试的特性。这意味着在运行时检查迭代器的有效性。如果程序尝试使用已失效的迭代器,则会触发断言。例如:

1
2
3
4
std::vector<int> v(1)
std::vector<int>::iterator i = v.begin();
v.clear(); // invalidates iterators
*i = 0; // assertion!
Boost.Asio 利用此功能添加缓冲区调试。请考虑以下代码:
1
2
3
4
5
void dont_do_this()
{
std::string msg = "Hello, world!";
boost::asio::async_write(sock, boost::asio::buffer(msg), my_handler);
}
调用异步读或写时,需要确保操作的缓冲区调用completion handler之前是有效的。在上面的示例中,缓冲区是std::string变量msg。这个变量在栈上(局部变量),因此在异步操作完成之前它就超出了函数的范围(scope)。如果幸运的话,应用程序会crash,但随机失败的可能性更大。

启用缓冲区调试时,Boost.Asio 将迭代器存储到字符串中,直到异步操作完成,然后取消引用以检查其有效性。

此功能对 Microsoft Visual Studio 8.0 或更高版本可用,而 GCC 需要定义了**_GLIBCXX_DEBUG时才可用。此检查会带来性能开销,因此仅在调试生成中启用缓冲区调试。对于其他编译器,可以通过定义 BOOST_ASIO_ENABLE_BUFFER_DEBUGGING来启用它。也可以通过定义 BOOST_ASIO_DISABLE_BUFFER_DEBUGGING来显式禁用**。

introduction

C++11的新特性--可变模版参数(variadic templates)是C++11新增的最强大的特性之一,它对参数进行了高度泛化,表示0到任意个数、任意类型的参数.

Read more »

如何生成密钥?#

1
# ssh-keygen -t rsa -C ‘xxxxx@company.com’ -f ~/.ssh/id_rsa_gitlab

-f : 指定生成密钥的位置 和 命名(可选) -t :指定密钥生成的算法(可选) -C : 指定账号(可选)

多个ssh密钥,如何指定#

windows下ssh密钥文件在 Users.ssh, 系统默认会取 id_rsa 作为密钥文件。

不同ip or 网址,如果需要指定使用 哪个密钥文件,只需要修改 **Users.ssh*, 如:

1
2
3
4
5
6
7
8
9
10
Host 192.168.xx.xx
HostName 192.168.xx.xx
User leo
Port 4000
IdentityFile "C:\Users\liuwen03\.ssh\id_rsa_liuwen03"

Host github.com
HostName github.com
User liuwen
IdentityFile "C:\Users\liuwen03\.ssh\id_rsa_github"

错误排查#

1
2
3
4
5
6
7
8
# mongo
MongoDB shell version: 3.0.12
connecting to: test
2021-04-06T10:58:44.862+0800 W NETWORK Failed to connect to 127.0.0.1:27017, reason: errno:111 Connection refused
2021-04-06T10:58:44.864+0800 E QUERY Error: couldn't connect to server 127.0.0.1:27017 (127.0.0.1), connection attempt failed
at connect (src/mongo/shell/mongo.js:179:14)
at (connect):1:6 at src/mongo/shell/mongo.js:179
exception: connect failed

这个错误是由于mongo shell不能与mongod server通信,可能的原因有: - mongod server没启动 - ip和端口不正确

查看进程是否启动

1
ps aux|grep mongo
果然没有启动,尝试启动
1
2
3
4
5
6
7
8
9
# mongod --dbpath /var/lib/mongodb/ --repair
2021-04-06T10:53:01.376+0800 I CONTROL [initandlisten] MongoDB starting : pid=12221 port=27017 dbpath=/var/lib/mongodb/ 64-bit host=g37-dev-acfd361f
2021-04-06T10:53:01.376+0800 I CONTROL [initandlisten] db version v3.0.12
2021-04-06T10:53:01.376+0800 I CONTROL [initandlisten] git version: 33934938e0e95d534cebbaff656cde916b9c3573
2021-04-06T10:53:01.376+0800 I CONTROL [initandlisten] build info: Linux build14.ny.cbi.10gen.cc 2.6.32-431.3.1.el6.x86_64 #1 SMP Fri Jan 3 21:39:27 UTC 2014 x86_64 BOOST_LIB_VERSION=1_49
2021-04-06T10:53:01.376+0800 I CONTROL [initandlisten] allocator: tcmalloc
2021-04-06T10:53:01.376+0800 I CONTROL [initandlisten] options: { repair: true, storage: { dbPath: "/var/lib/mongodb/" } }
2021-04-06T10:53:01.400+0800 I STORAGE [initandlisten] exception in initAndListen: 98 Unable to create/open lock file: /var/lib/mongodb/mongod.lock errno:13 Permission denied Is a mongod instance already running?, terminating
2021-04-06T10:53:01.401+0800 I CONTROL [initandlisten] dbexit: rc: 100
这是因为权限不够,用 stat 查看
1
2
3
4
5
6
7
8
9
# stat /var/lib/mongodb
File: ‘/var/lib/mongodb’
Size: 60 Blocks: 0 IO Block: 4096 directory
Device: fd00h/64768d Inode: 1235426 Links: 4
Access: (0755/drwxr-xr-x) Uid: ( 505/ mongodb) Gid: ( 505/ mongodb)
Access: 2021-04-06 10:49:04.428179272 +0800
Modify: 2021-04-06 10:48:40.036107174 +0800
Change: 2021-04-06 10:48:40.036107174 +0800
Birth: -
仅mongodb用户可读可写

solution 1:修改路径#

1
2
mongod --dbpath /home/liuwen03/mongo/ --repair
mongod --dbpath /home/liuwen03/mongo/ --journal

solution 2:修改文件权限#

sudo chmod ... # need to do

reference#

  1. stackoverflow mongo
  2. Unable to create/open lock file: /data/mongod.lock errno:13 Permission denied