Introduction

FAQ

Why not asyncio ?

asyncio is a great library, and Ohne I/O can be used in combination with asyncio, but it can also be used in combination with raw sockets and/or any network library.

Ohne I/O does not intend to compete in any way with asyncio, it even intends to be used in combination with asyncio. (but it is not limited to asyncio)

The power of ohneio resides in the fact that it does not produce any I/O. It only buffers byte streams, and allows developers to write simple stream parser like they would write a coroutine.

This is how you would use an ohneio protocol:

parser = protocol()
parser.send(bytes_to_send)
received_bytes = parser.read()

Because ohneio can be considered as connection without I/O, this documentation will referred to them with the variable conn.

Writing an ohneio protocol parser is better than writing an asyncio protocol library, in the sense that your protocol could be then used by anybody with any library.

Ohne I/O relies on the concept of sans-io, which is the concept of writing generic protocol parser, easily testable.

How does it work internally?

ohneio buffers data coming in or out. Thanks to generator functions, it pauses protocol parsers execution until new data is fed in or read from the parser.

Why not use manual buffers?

ohneio kinda borrows from parser combinators, it allows you to write simple functions without passing objects around. And combine these functions without managing any state.

ohneio also provides some primitives to wait for a certain amount of data, or wait for more data.

Writing simple protocol parser, could very quickly lead to either spaghetti code, badly abstracted code. ohneio tries to avoid this.

Getting started

To get started we’ll write a simple line echo server. This protocol echoes each line sent to the server. A line is defined by a sequence of bytes terminated by the bytes 0x0A.

Writing a protocol

import ohneio


NEW_LINE = b'\n'


def wait_for(s):
    """Wait for a certain string to be available in the buffer."""
    while True:
        data = yield from ohneio.peek()
        pos = data.find(s)
        if pos > 0:
            return pos
        yield from ohneio.wait()


def read_upto(s):
    """Read data up to the specified string in the buffer.

    If this string is not available in the buffer, this waits for the string to be available.
    """
    pos = yield from wait_for(s)
    data = yield from ohneio.read(pos + len(s))
    return data


@ohneio.protocol
def echo():
    while True:
        line = yield from read_upto(NEW_LINE)
        yield from ohneio.write(line)

Testing the protocol

Because the protocol doesn’t produce any I/O, you can directly simulates in which order, and which segments of data will be read/buffered. And what should be send back.

import pytest


@pytest.fixture
def conn():
    return echo()


@pytest.mark.parametrize('input_, expected_output', [
    (b"Hello World", b""),
    (b"Hello World\n", b"Hello World\n"),
    (b"Hello World\nAnd", b"Hello World\n"),
    (b"Hello World\nAnd the universe\n", b"Hello World\nAnd the universe\n"),
])
def test_only_echo_complete_lines(conn, input_, expected_output):
    conn.send(input_)
    assert conn.read() == expected_output


def test_buffers_segments(conn):
    conn.send(b"Hello ")
    assert conn.read() == b''
    conn.send(b"World\n")
    assert conn.read() == b"Hello World\n"

Using a protocol

Now that you wrote your echo protocol, you need to make it use the network, you can use any network library you want:

Raw sockets

This example shows how to use Ohne I/O in combination with raw sockets and select(). This creates one instance of the echo() protocol per connection.

It then feeds data comming in the connection to the Ohne I/O protocol, then it feeds the output of the Ohne I/O protocol to the socket.

import contextlib
import select
import socket


BUFFER_SIZE = 1024


@contextlib.contextmanager
def create_server(host, port):
    for res in socket.getaddrinfo(host, port):
        af, socktype, proto, canonname, sa = res
        try:
            s = socket.socket(af, socktype, proto)
        except socket.error:
            continue
        try:
            s.bind(sa)
            s.listen(1)
            yield s
            return
        except socket.error:
            continue
        finally:
            s.close()
    raise RuntimeError("Couldn't create a connection")


def echo_server(host, port):
    connections = {}  # open connections: fileno -> (socket, protocol)
    with create_server(host, port) as server:
        while True:
            rlist, _, _ = select.select([server.fileno()] + list(connections.keys()), [], [])

            for fileno in rlist:
                if fileno == server.fileno():  # New connection
                    conn, _ = server.accept()
                    connections[conn.fileno()] = (conn, echo())
                else:  # Data comming in
                    sock, proto = connections[fileno]
                    data = sock.recv(BUFFER_SIZE)
                    if not data:  # Socket closed
                        del connections[fileno]
                        continue
                    proto.send(data)
                    data = proto.read()
                    if data:
                        sock.send(data)


if __name__ == '__main__':
    import sys
    echo_server(host=sys.argv[1], port=sys.argv[2])

gevent

This example shows how to use Ohne I/O in combination with gevent library:

import gevent
from gevent.server import StreamServer


BUFFER_SIZE = 1024


def echo_server(host, port):
    server = StreamServer((host, port), handle)
    try:
        server.serve_forever()
    finally:
        server.close()


def handle(socket, address):
    conn = echo()
    try:
        while True:
            data = socket.recv(BUFFER_SIZE)
            if not data:
                break
            conn.send(data)
            data = conn.read()
            if data:
                socket.send(data)
            gevent.sleep(0)  # Prevent one green thread from taking over
    finally:
        socket.close()


if __name__ == '__main__':
    import sys
    echo_server(host=sys.argv[1], port=int(sys.argv[2]))

asyncio

import asyncio


class EchoProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        self.ohneio = echo()

    def data_received(self, data):
        self.ohneio.send(data)
        output = self.ohneio.read()
        if output:
            self.transport.write(output)



if __name__ == '__main__':
    import sys
    loop = asyncio.get_event_loop()
    coro = loop.create_server(EchoProtocol, host=sys.argv[1], port=int(sys.argv[2]))
    server = loop.run_until_complete(coro)
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.run_until_complete(server.wait_closed())
        loop.close()