WhakerPy 0.7

https://sourceforge.net/projects/whakerpy/

Module whakerpy.httpd

Class HTTPDHandlerUtils

Constructor

Instantiate class, filter the path for getters method and get the headers data

Parameters
  • headers: (HTTPMessage|dict) the headers of the request
  • path: (str) the brut path get by the request
  • default_page: (str) optional parameter, default page when the page doesn't specify it
View Source
def __init__(self, headers: HTTPMessage | dict, path: str, default_page: str='index.html'):
    """Instantiate class, filter the path for getters method and get the headers data

    :param headers: (HTTPMessage|dict) the headers of the request
    :param path: (str) the brut path get by the request
    :param default_page: (str) optional parameter, default page when the page doesn't specify it

    """
    self.__path, self.__page_name = HTTPDHandlerUtils.filter_path(path, default_page)
    self.__headers = dict()
    if isinstance(headers, HTTPMessage) or isinstance(headers, dict):
        self.__headers = headers
    else:
        raise TypeError('The headers parameter has to be a dictionary or HTTPMessage class !')

Public functions

get_path

Get the path of the request after filtered true path in constructor.

Returns
  • (str) the path
View Source
def get_path(self) -> str:
    """Get the path of the request after filtered true path in constructor.

        :return: (str) the path

        """
    return self.__path

get_page_name

Get the name of the page after filtered path in constructor.

Returns
  • (str) the page name ask by the request
View Source
def get_page_name(self) -> str:
    """Get the name of the page after filtered path in constructor.

        :return: (str) the page name ask by the request

        """
    return self.__page_name

static_content

Return the file content and update the corresponding status.

Parameters
  • filepath: (str) The path of the file to return
Returns
  • (tuple[bytes, int]) The file content
View Source
def static_content(self, filepath: str) -> tuple[bytes, HTTPDStatus]:
    """Return the file content and update the corresponding status.

        :param filepath: (str) The path of the file to return
        :return: (tuple[bytes, int]) The file content

        """
    if os.path.exists(filepath) is False:
        status = HTTPDStatus(404)
        return (status.to_html(encode=True, msg_error=f'File not found : {filepath}'), status)
    if os.path.isfile(filepath) is False:
        status = HTTPDStatus(403)
        return (status.to_html(encode=True, msg_error=f'The path give access to a folder : {filepath}'), status)
    try:
        content = self.__open_file_to_binary(filepath)
        return (content, HTTPDStatus(200))
    except Exception as e:
        status = HTTPDStatus(500)
        return (status.to_html(encode=True, msg_error=str(e)), status)

process_post

Process the request body to return events and accept mime type.

Parameters
  • body: (BufferedReader) The body buffer of the request (rfile)
Returns
  • (dict, str) the body and accept mime type
View Source
def process_post(self, body: BufferedReader) -> tuple[dict, str]:
    """Process the request body to return events and accept mime type.

        :param body: (BufferedReader) The body buffer of the request (rfile)
        :return: (dict, str) the body and accept mime type

        """
    if self.__headers.get('REQUEST_METHOD', 'POST').upper() != 'POST':
        return (dict(), 'text/html')
    events = self.__extract_body_content(body)
    accept_type = self.__get_headers_value('Accept', 'text/html')
    if 'text/html' in accept_type:
        accept_type = 'text/html'
    return (events, accept_type)

get_mime_type

Return the mime type of given file name or path.

Parameters
  • filename: (str) The name or path of the file
Returns
  • (str) The mime type of the file or 'unknown' if we can't find the type
View Source
@staticmethod
def get_mime_type(filename: str) -> str:
    """Return the mime type of given file name or path.

        :param filename: (str) The name or path of the file
        :return: (str) The mime type of the file or 'unknown' if we can't find the type

        """
    mime_type, _ = mimetypes.guess_type(filename)
    if mime_type is None:
        return 'unknown'
    else:
        return mime_type

filter_path

Parse the path to return the correct filename and page name.

Parameters
  • path: (str) The path obtain from the request or environ
  • default_path: (str) The default path to add if the path ends with '/'
Returns
  • (tuple[str, str]) the requested filename and the requested page name
View Source
@staticmethod
def filter_path(path: str, default_path: str='index.html') -> tuple[str, str]:
    """Parse the path to return the correct filename and page name.

        :param path: (str) The path obtain from the request or environ
        :param default_path: (str) The default path to add if the path ends with '/'
        :return: (tuple[str, str]) the requested filename and the requested page name

        """
    path = unquote(path)
    if '?' in path:
        path = path[:path.index('?')]
    if len(path) == 0:
        return (f'/{default_path}', default_path)
    filepath = path
    page_name = os.path.basename(path)
    _, extension = os.path.splitext(path)
    if len(page_name) == 0 or len(extension) == 0:
        page_name = default_path
        if filepath.endswith('/'):
            filepath += default_path
    return (filepath, page_name)

has_to_return_data

Boolean expression to know if the server has to respond data or a HTML page.

Parameters
  • accept_type: (str) The mime type of the 'Accept' header request
Returns
  • (bool) True if we have to return data, False if we have to return html content
View Source
@staticmethod
def has_to_return_data(accept_type: str) -> bool:
    """Boolean expression to know if the server has to respond data or a HTML page.

        :param accept_type: (str) The mime type of the 'Accept' header request
        :return: (bool) True if we have to return data, False if we have to return html content

        """
    return accept_type == 'application/json' or accept_type.startswith('image/') or accept_type.startswith('video/')

bakery

Process received events and bake the given page.

Parameters
  • pages: (dict) A dictionary with key=page_name and value=ResponseRecipe
  • page_name: (str) The current page name
  • headers: (dict) The headers of the http request
  • events: (dict) The events extract from the request (only for POST request, send empty dict for GET)
  • hastoreturn_data: (bool) False by default, Boolean to know if we have to return the html page or data
Returns
  • (tuple[bytes, HTTPDStatus]) The content to answer to the client and the status of the response
View Source
@staticmethod
def bakery(pages: dict, page_name: str, headers: dict, events: dict, has_to_return_data: bool=False) -> tuple[bytes, HTTPDStatus]:
    """Process received events and bake the given page.

        :param pages: (dict) A dictionary with key=page_name and value=ResponseRecipe
        :param page_name: (str) The current page name
        :param headers: (dict) The headers of the http request
        :param events: (dict) The events extract from the request (only for POST request, send empty dict for GET)
        :param has_to_return_data: (bool) False by default, Boolean to know if we have to return the html page or data
        :return: (tuple[bytes, HTTPDStatus]) The content to answer to the client and the status of the response

        """
    response = pages.get(page_name)
    if response is None:
        status = HTTPDStatus(404)
        return (status.to_html(encode=True, msg_error=f'Page not found : {page_name}'), status)
    content = bytes(response.bake(events, headers=headers), 'utf-8')
    if has_to_return_data:
        content = response.get_data()
        if isinstance(content, bytes) is False and isinstance(content, bytearray) is False:
            content = bytes(content, 'utf-8')
        response.reset_data()
    status = response.status
    if isinstance(status, int):
        status = HTTPDStatus(status)
    elif hasattr(status, 'code') is False:
        raise TypeError(f'The status has to be an instance of HTTPDStatus (or int). Got: {status}')
    return (content, status)

Protected functions

__get_headers_value

Get headers value for a given key, try different keys format depending on server (httpd or wsgi).

Parameters
  • key: (str) the header key
  • default_value: (object) optional parameter, value returned if the header doesn't contain the key
Returns
  • (object) the value in the header or the default value
View Source
def __get_headers_value(self, key: str, default_value: object=None) -> object:
    """Get headers value for a given key, try different keys format depending on server (httpd or wsgi).

        :param key: (str) the header key
        :param default_value: (object) optional parameter, value returned if the header doesn't contain the key
        :return: (object) the value in the header or the default value

        """
    value = self.__headers.get(key)
    if value is None:
        new_key = key.upper().replace('-', '_')
        value = self.__headers.get(new_key)
        if value is None:
            return default_value
        else:
            return value
    else:
        return value

__open_file_to_binary

Open and read the given file and transform the content to bytes value.

Parameters
  • filepath: (str) The path of the file to read
Returns
  • (bytes) the file content in bytes format
View Source
def __open_file_to_binary(self, filepath: str) -> bytes:
    """Open and read the given file and transform the content to bytes value.

        :param filepath: (str) The path of the file to read
        :return: (bytes) the file content in bytes format

        """
    if self.__get_headers_value('Content-Type') is None:
        file_type = HTTPDHandlerUtils.get_mime_type(filepath)
    else:
        file_type = self.__get_headers_value('Content-Type')
    if file_type is not None and (file_type.startswith('text/') or file_type == 'application/javascript' or file_type == 'application/json'):
        with codecs.open(filepath, 'r', 'utf-8') as fp:
            content = bytes('', 'utf-8')
            for line in fp.readlines():
                content += bytes(line, 'utf-8')
            return content
    else:
        return open(filepath, 'rb').read()

__extract_body_content

Read and parse the body content of a POST request.

Parameters
  • content: (Binary object) the body of the POST request
Returns
  • (dict) the dictionary that contains the events to process, or an empty dictionary if there is an error.
View Source
def __extract_body_content(self, content) -> dict:
    """Read and parse the body content of a POST request.

        :param content: (Binary object) the body of the POST request
        :return: (dict) the dictionary that contains the events to process,
                        or an empty dictionary if there is an error.

        """
    content_type = self.__get_headers_value('Content-Type')
    content_length = self.__get_headers_value('Content-Length', '0')
    try:
        content_length = int(content_length)
    except (TypeError, ValueError):
        content_length = 0
    data = content.read(content_length)
    try:
        data = data.decode('utf-8')
    except UnicodeError:
        pass
    if content_type is None or content_length == 0:
        data = dict()
    elif 'application/json' in content_type:
        try:
            data = json.loads(data)
        except json.JSONDecodeError:
            logging.error(f"Can't decode JSON posted data : {data}")
    elif 'multipart/form-data; boundary=' in content_type:
        if isinstance(data, str):
            filename, mime_type, content = HTTPDHandlerUtils.__extract_form_data_file(content_type, data)
        else:
            filename, mime_type, content = HTTPDHandlerUtils.__extract_binary_form_data_file(content_type, data)
        data = {'upload_file': {'filename': filename, 'mime_type': mime_type, 'file_content': content}}
    else:
        data = dict(parse_qsl(data, keep_blank_values=True, strict_parsing=False))
    if 'upload_file' in data:
        logging.debug(f"POST -- data: upload_file[{data['upload_file']['filename']}]")
    else:
        logging.debug('POST -- data: {}'.format(data))
    return data

__extract_form_data_file

Extract the body of a "formdata request" to upload a file. Use this function with utf-8 files.

Parameters
  • content_type: (str) The content type in the header of the request
  • data: (str | bytes) the body of the request in bytes or string format
Returns
  • (tuple[str, str, str]) the data extracted: filename, fime mime type and file content
View Source
@staticmethod
def __extract_form_data_file(content_type: str, data: str) -> tuple[str, str, str]:
    """Extract the body of a "formdata request" to upload a file.
        Use this function with utf-8 files.

        :param content_type: (str) The content type in the header of the request
        :param data: (str | bytes) the body of the request in bytes or string format
        :return: (tuple[str, str, str]) the data extracted : filename, fime mime type and file content

        """
    filename, end_index_filename = HTTPDHandlerUtils.__extract_form_data_filename(data)
    data = data[end_index_filename:]
    mimetype, end_index_type = HTTPDHandlerUtils.__extract_form_data_mimetype(data)
    data = data[end_index_type + 1:]
    boundary = HTTPDHandlerUtils.__extract_form_data_boundary(content_type)
    start_content = data.index('\n') + 1
    end_content = data[start_content:].index(boundary)
    content = data[start_content:end_content]
    content = content.replace('\r', '')
    return (filename, mimetype, content)

__extract_binary_form_data_file

Extract the body of a "formdata request" to upload a file. Use this function with binary files.

Parameters
  • content_type: (str) The content type in the header of the request
  • data: (str | bytes) the body of the request in bytes or string format
Returns
  • (tuple[str, str, str]) the data extracted: filename, fime mime type and file content
View Source
@staticmethod
def __extract_binary_form_data_file(content_type: str, data: bytes) -> tuple:
    """Extract the body of a "formdata request" to upload a file.
        Use this function with binary files.

        :param content_type: (str) The content type in the header of the request
        :param data: (str | bytes) the body of the request in bytes or string format
        :return: (tuple[str, str, str]) the data extracted : filename, fime mime type and file content

        """
    file_content_begin = None
    content_type_pass = False
    prefix = ''
    for i in range(len(data)):
        if data[i] <= 127:
            prefix += chr(data[i])
        else:
            file_content_begin = i
            break
        if content_type_pass is True:
            index = prefix.index('Content-Type')
            if '\n\n' in prefix[index:] or '\r\n\r\n' in prefix[index:]:
                file_content_begin = i + 1
                break
        if content_type_pass is False and 'Content-Type' in prefix:
            content_type_pass = True
    reversed_boundary = HTTPDHandlerUtils.__extract_form_data_boundary(content_type)[::-1]
    file_content_end = None
    postfix = ''
    for i in range(len(data) - 1, file_content_begin, -1):
        if reversed_boundary not in postfix:
            postfix += chr(data[i])
        else:
            file_content_end = i
            break
    content = data[file_content_begin:file_content_end + 1]
    filename = HTTPDHandlerUtils.__extract_form_data_filename(prefix)[0]
    mimetype = HTTPDHandlerUtils.__extract_form_data_mimetype(prefix)[0]
    return (filename, mimetype, content)

__extract_form_data_filename

Extract the filename from the form data uploaded file.

Parameters
  • text: (str) the body or a part received from the request.
Returns
  • (tuple[str, str, str]) the filename and the index where the filename value ends.
View Source
@staticmethod
def __extract_form_data_filename(text: str) -> tuple[str, int]:
    """Extract the filename from the form data uploaded file.

        :param text: (str) the body or a part received from the request.
        :return: (tuple[str, str, str]) the filename and the index where the filename value ends.

        """
    start_index_filename = text.index('filename="') + len('filename="')
    end_index_filename = start_index_filename + text[start_index_filename:].index('"')
    return (text[start_index_filename:end_index_filename], end_index_filename)

__extract_form_data_mimetype

Extract the mimetype from the form data uploaded file.

Parameters
  • text: (str) the body or a part received from the request.
Returns
  • (tuple[str, str, str]) the mimetype and the index where the mimetype value ends.
View Source
@staticmethod
def __extract_form_data_mimetype(text: str) -> tuple[str, int]:
    """Extract the mimetype from the form data uploaded file.

        :param text: (str) the body or a part received from the request.
        :return: (tuple[str, str, str]) the mimetype and the index where the mimetype value ends.

        """
    start_index_type = text.index('Content-Type: ') + len('Content-Type: ')
    end_index_type = start_index_type + text[start_index_type:].index('\n')
    mimetype = text[start_index_type:end_index_type]
    mimetype = mimetype.replace('\r', '')
    return (mimetype, end_index_type)

__extract_form_data_boundary

Extract the boundary from the form data content type which delimited the uploaded file content.

Parameters
  • content_type: (str) the content type in the header of the received request.
Returns
  • (tuple[str, str, str]) the boundary.
View Source
@staticmethod
def __extract_form_data_boundary(content_type: str) -> str:
    """Extract the boundary from the form data content type which delimited the uploaded file content.

        :param content_type: (str) the content type in the header of the received request.
        :return: (tuple[str, str, str]) the boundary.

        """
    start_boundary = content_type.index('boundary=') + len('boundary=')
    boundary = '--' + content_type[start_boundary:] + '--'
    return boundary