WhakerPy 0.9

https://sourceforge.net/projects/whakerpy/

Module whakerpy.httpd

Class HTTPDHandlerUtils

Constructor

Instantiate class, filter the path for getters method and get the headers data

Parameters
  • headers: (HTTPMessage|dict) the headers of the request
  • path: (str) the brut path get by the request
  • default_page: (str) optional parameter, default page when the page doesn't specify it
View Source
def __init__(self, headers: HTTPMessage | dict, path: str, default_page: str='index.html'):
    """Instantiate class, filter the path for getters method and get the headers data

    :param headers: (HTTPMessage|dict) the headers of the request
    :param path: (str) the brut path get by the request
    :param default_page: (str) optional parameter, default page when the page doesn't specify it

    """
    self.__path, self.__page_name = HTTPDHandlerUtils.filter_path(path, default_page)
    self.__headers = dict()
    if isinstance(headers, HTTPMessage) is True or isinstance(headers, dict) is True:
        self.__headers = headers
    else:
        raise TypeError('The headers parameter has to be a dictionary or HTTPMessage class!')

Public functions

get_path

Get the path of the request after filtered true path in constructor.

Returns
  • (str) the path
View Source
def get_path(self) -> str:
    """Get the path of the request after filtered true path in constructor.

        :return: (str) the path

        """
    return self.__path

get_page_name

Get the name of the page after filtered path in constructor.

Returns
  • (str) the page name ask by the request
View Source
def get_page_name(self) -> str:
    """Get the name of the page after filtered path in constructor.

        :return: (str) the page name ask by the request

        """
    return self.__page_name

static_content

Return the content of a static file and update the corresponding status.

This method checks the existence of the file and its permissions before returning its content. If the file does not exist or is a directory, an appropriate HTTP status and message will be logged.

Parameters
  • filepath: (str) The path of the file to return.
Returns
  • (tuple[bytes, HTTPDStatus]) A tuple containing the file content in bytes and the corresponding HTTP status.
View Source
def static_content(self, filepath: str) -> tuple[bytes, HTTPDStatus]:
    """Return the content of a static file and update the corresponding status.

        This method checks the existence of the file and its permissions before
        returning its content. If the file does not exist or is a directory,
        an appropriate HTTP status and message will be logged.

        :param filepath: (str) The path of the file to return.
        :return: (tuple[bytes, HTTPDStatus]) A tuple containing the file content
                 in bytes and the corresponding HTTP status.

        """
    if os.path.exists(filepath) is False:
        return self.__log_and_status(404, filepath, 'File not found')
    if os.path.isfile(filepath) is False:
        return self.__log_and_status(403, filepath, 'Folder access is not granted')
    try:
        p = UnixPermissions()
        checker = FileAccessChecker(filepath)
        if checker.read_allowed(who=f'{p.group}&{p.others}') is False:
            return self.__log_and_status(403, filepath, 'Attempted access to non-allowed file')
    except EnvironmentError:
        pass
    try:
        content = self.__open_file_to_binary(filepath)
        return (content, HTTPDStatus(200))
    except Exception as e:
        status = HTTPDStatus(500)
        return (status.to_html(encode=True, msg_error=str(e)), status)

process_post

Process the request body to return events and accept mime type.

Parameters
  • body: (BufferedReader) The body buffer of the request (rfile)
Returns
  • (dict, str) the body and accept mime type
View Source
def process_post(self, body: BufferedReader) -> tuple[dict, str]:
    """Process the request body to return events and accept mime type.

        :param body: (BufferedReader) The body buffer of the request (rfile)
        :return: (dict, str) the body and accept mime type

        """
    if self.__headers.get('REQUEST_METHOD', 'POST').upper() != 'POST':
        return (dict(), 'text/html')
    events = self.__extract_body_content(body)
    accept_type = self.__get_headers_value('Accept', 'text/html')
    if 'text/html' in accept_type:
        accept_type = 'text/html'
    return (events, accept_type)

get_mime_type

Return the mime type of given file name or path.

Parameters
  • filename: (str) The name or path of the file
Returns
  • (str) The mime type of the file or 'unknown' if we can't find the type
View Source
@staticmethod
def get_mime_type(filename: str) -> str:
    """Return the mime type of given file name or path.

        :param filename: (str) The name or path of the file
        :return: (str) The mime type of the file or 'unknown' if we can't find the type

        """
    mime_type, _ = mimetypes.guess_type(filename)
    if mime_type is None:
        return 'unknown'
    else:
        return mime_type

filter_path

Parse the path to return the correct filename and page name.

Parameters
  • path: (str) The path obtain from the request or environ
  • default_path: (str) The default path to add if the path ends with '/'
Returns
  • (tuple[str, str]) the requested filename and the requested page name
View Source
@staticmethod
def filter_path(path: str, default_path: str='index.html') -> tuple[str, str]:
    """Parse the path to return the correct filename and page name.

        :param path: (str) The path obtain from the request or environ
        :param default_path: (str) The default path to add if the path ends with '/'
        :return: (tuple[str, str]) the requested filename and the requested page name

        """
    path = unquote(path)
    if '?' in path:
        path = path[:path.index('?')]
    if len(path) == 0:
        return (f'/{default_path}', default_path)
    filepath = path
    page_name = os.path.basename(path)
    _, extension = os.path.splitext(path)
    if len(page_name) == 0 or len(extension) == 0:
        page_name = default_path
        if filepath.endswith('/'):
            filepath += default_path
    return (filepath, page_name)

has_to_return_data

Determine the type of the server return: True for data.

Determine if the server should return data (e.g., JSON, image, video, etc.) instead of an HTML page based on the 'Accept' header's MIME type.

Parameters
  • accept_type: (str) The MIME type of the 'Accept' header request
Returns
  • (bool) True if the server should return data, False if HTML content is expected
View Source
@staticmethod
def has_to_return_data(accept_type: str) -> bool:
    """Determine the type of the server return: True for data.

        Determine if the server should return data (e.g., JSON, image, video,
        etc.) instead of an HTML page based on the 'Accept' header's MIME type.

        :param accept_type: (str) The MIME type of the 'Accept' header request
        :return: (bool) True if the server should return data, False if HTML content is expected

        """
    data_types = ['application/json', 'image/', 'video/', 'audio/', 'application/ogg']
    for d in data_types:
        if accept_type.startswith(d) is True:
            return True
    return False

bakery

Process received events and bake the given page.

Parameters
  • pages: (dict) A dictionary with key=page_name and value=ResponseRecipe
  • page_name: (str) The current page name
  • headers: (dict) The headers of the http request
  • events: (dict) The events extract from the request (only for POST request, send empty dict for GET)
  • hastoreturn_data: (bool) False by default, Boolean to know if we have to return the html page or data
Returns
  • (tuple[bytes, HTTPDStatus]) The content to answer to the client and the status of the response
View Source
@staticmethod
def bakery(pages: dict, page_name: str, headers: dict, events: dict, has_to_return_data: bool=False) -> tuple[bytes, HTTPDStatus]:
    """Process received events and bake the given page.

        :param pages: (dict) A dictionary with key=page_name and value=ResponseRecipe
        :param page_name: (str) The current page name
        :param headers: (dict) The headers of the http request
        :param events: (dict) The events extract from the request (only for POST request, send empty dict for GET)
        :param has_to_return_data: (bool) False by default, Boolean to know if we have to return the html page or data
        :return: (tuple[bytes, HTTPDStatus]) The content to answer to the client and the status of the response

        """
    response = pages.get(page_name)
    if response is None:
        status = HTTPDStatus(404)
        return (status.to_html(encode=True, msg_error=f'Page not found: {page_name}'), status)
    content = bytes(response.bake(events, headers=headers), 'utf-8')
    if has_to_return_data is True:
        content = response.get_data()
        if isinstance(content, (bytes, bytearray)) is False:
            content = bytes(content, 'utf-8')
        response.reset_data()
    status = response.status
    if isinstance(status, int):
        status = HTTPDStatus(status)
    elif hasattr(status, 'code') is False:
        raise TypeError(f'The status has to be an instance of HTTPDStatus or int.Got {status} instead.')
    return (content, status)

Protected functions

__log_and_status

Log the error message and return the corresponding HTTP status.

This method logs the provided message along with the file path and returns an HTML error message with the appropriate HTTP status.

Parameters
  • code: (int) The HTTP status code to return.
  • filepath: (str) The path of the file related to the error.
  • msg: (str) The message to log regarding the error.
Returns
  • (tuple[str, HTTPDStatus]) A tuple containing the HTML error message and the corresponding HTTP status.
View Source
def __log_and_status(self, code: int, filepath: str, msg: str) -> tuple[bytes, HTTPDStatus]:
    """Log the error message and return the corresponding HTTP status.

        This method logs the provided message along with the file path and
        returns an HTML error message with the appropriate HTTP status.

        :param code: (int) The HTTP status code to return.
        :param filepath: (str) The path of the file related to the error.
        :param msg: (str) The message to log regarding the error.
        :return: (tuple[str, HTTPDStatus]) A tuple containing the HTML error
                 message and the corresponding HTTP status.
        """
    status = HTTPDStatus(code)
    logging.error(f'{msg}: {filepath}')
    msg = f'{msg}: {os.path.basename(filepath)}'
    return (status.to_html(encode=True, msg_error=msg), status)

__get_headers_value

Get headers value for a given key, try different keys format depending on server (httpd or wsgi).

Parameters
  • key: (str) the header key
  • default_value: (object) optional parameter, value returned if the header doesn't contain the key
Returns
  • (object) the value in the header or the default value
View Source
def __get_headers_value(self, key: str, default_value: object=None) -> object:
    """Get headers value for a given key, try different keys format depending on server (httpd or wsgi).

        :param key: (str) the header key
        :param default_value: (object) optional parameter, value returned if the header doesn't contain the key
        :return: (object) the value in the header or the default value

        """
    value = self.__headers.get(key)
    if value is None:
        new_key = key.upper().replace('-', '_')
        value = self.__headers.get(new_key)
        if value is None:
            return default_value
        else:
            return value
    else:
        return value

__open_file_to_binary

Open and read the given file and transform the content to bytes value.

Parameters
  • filepath: (str) The path of the file to read
Returns
  • (bytes) the file content in bytes format
View Source
def __open_file_to_binary(self, filepath: str) -> bytes:
    """Open and read the given file and transform the content to bytes value.

        :param filepath: (str) The path of the file to read
        :return: (bytes) the file content in bytes format

        """
    if self.__get_headers_value('Content-Type') is None:
        file_type = HTTPDHandlerUtils.get_mime_type(filepath)
    else:
        file_type = self.__get_headers_value('Content-Type')
    if file_type is not None and (file_type.startswith('text/') or file_type == 'application/javascript' or file_type == 'application/json'):
        with codecs.open(filepath, 'r', 'utf-8') as fp:
            content = bytes('', 'utf-8')
            for line in fp.readlines():
                content += bytes(line, 'utf-8')
            return content
    else:
        return open(filepath, 'rb').read()

__extract_body_content

Read and parse the body content of a POST request.

Parameters
  • content: (Binary object) the body of the POST request
Returns
  • (dict) the dictionary that contains the events to process, or an empty dictionary if there is an error.
View Source
def __extract_body_content(self, content) -> dict:
    """Read and parse the body content of a POST request.

        :param content: (Binary object) the body of the POST request
        :return: (dict) the dictionary that contains the events to process,
                        or an empty dictionary if there is an error.

        """
    content_type = self.__get_headers_value('Content-Type')
    content_length = self.__get_headers_value('Content-Length', '0')
    try:
        content_length = int(content_length)
    except (TypeError, ValueError):
        content_length = 0
    data = content.read(content_length)
    try:
        data = data.decode('utf-8')
    except UnicodeError:
        logging.debug('Not an utf-8 content.')
        pass
    if content_type is None or content_length == 0:
        data = dict()
    elif 'application/json' in content_type:
        try:
            data = json.loads(data)
        except json.JSONDecodeError:
            logging.error(f"Can't decode JSON posted data : {data}")
    elif 'multipart/form-data; boundary=' in content_type:
        if isinstance(data, bytes) is True:
            filename, mime_type, content = HTTPDHandlerUtils.__extract_binary_form_data_file(content_type, data)
        else:
            filename, mime_type, content = HTTPDHandlerUtils.__extract_form_data_file(content_type, data)
        data = {'upload_file': {'filename': filename, 'mime_type': mime_type, 'file_content': content}}
    else:
        data = dict(parse_qsl(data, keep_blank_values=True, strict_parsing=False))
    if 'upload_file' in data:
        logging.debug(f" -- upload_file[{data['upload_file']['filename']}]")
    return data

__extract_form_data_file

Extract the body of a "formdata request" to upload a file.

Use this function with an utf-8 file content.

Parameters
  • content_type: (str) The content type in the header of the request
  • data: (str | bytes) the body of the request in bytes or string format
Returns
  • (tuple[str, str, str]) the data extracted: filename, fime mime type and file content
View Source
@staticmethod
def __extract_form_data_file(content_type: str, data: str) -> tuple[str, str, str]:
    """Extract the body of a "formdata request" to upload a file.

        Use this function with an utf-8 file content.

        :param content_type: (str) The content type in the header of the request
        :param data: (str | bytes) the body of the request in bytes or string format
        :return: (tuple[str, str, str]) the data extracted : filename, fime mime type and file content

        """
    filename, end_index_filename = HTTPDHandlerUtils.__extract_form_data_filename(data)
    data = data[end_index_filename:]
    mimetype, end_index_type = HTTPDHandlerUtils.__extract_form_data_mimetype(data)
    data = data[end_index_type + 1:]
    boundary = HTTPDHandlerUtils.__extract_form_data_boundary(content_type)
    start_content = data.index('\n') + 1
    end_content = data[start_content:].index(boundary)
    content = data[start_content:end_content]
    content = content.replace('\r', '')
    return (filename, mimetype, content)

__extract_binary_form_data_file

Extract the body of a "formdata request" to upload a file.

Use this function with a binary file content.

Parameters
  • content_type: (str) The content type in the header of the request
  • data: (str | bytes) the body of the request in bytes or string format
Returns
  • (tuple[str, str, str]) the data extracted: filename, fime mime type and file content
View Source
@staticmethod
def __extract_binary_form_data_file(content_type: str, data: bytes) -> tuple:
    """Extract the body of a "formdata request" to upload a file.

        Use this function with a binary file content.

        :param content_type: (str) The content type in the header of the request
        :param data: (str | bytes) the body of the request in bytes or string format
        :return: (tuple[str, str, str]) the data extracted : filename, fime mime type and file content

        """
    file_content_begin = None
    content_type_pass = False
    prefix = ''
    for i in range(len(data)):
        if data[i] <= 127:
            prefix += chr(data[i])
        else:
            file_content_begin = i
            break
        if content_type_pass is True:
            index = prefix.index('Content-Type')
            if '\n\n' in prefix[index:] or '\r\n\r\n' in prefix[index:]:
                file_content_begin = i + 1
                break
        if content_type_pass is False and 'Content-Type' in prefix:
            content_type_pass = True
    reversed_boundary = HTTPDHandlerUtils.__extract_form_data_boundary(content_type)[::-1]
    file_content_end = None
    postfix = ''
    for i in range(len(data) - 1, file_content_begin, -1):
        if reversed_boundary not in postfix:
            postfix += chr(data[i])
        else:
            file_content_end = i
            break
    content = data[file_content_begin:file_content_end + 1]
    filename = HTTPDHandlerUtils.__extract_form_data_filename(prefix)[0]
    mimetype = HTTPDHandlerUtils.__extract_form_data_mimetype(prefix)[0]
    return (filename, mimetype, content)

__extract_form_data_filename

Extract the filename from the form data uploaded file.

Parameters
  • text: (str) the body or a part received from the request.
Returns
  • (tuple[str, str, str]) the filename and the index where the filename value ends.
View Source
@staticmethod
def __extract_form_data_filename(text: str) -> tuple[str, int]:
    """Extract the filename from the form data uploaded file.

        :param text: (str) the body or a part received from the request.
        :return: (tuple[str, str, str]) the filename and the index where the filename value ends.

        """
    start_index_filename = text.index('filename="') + len('filename="')
    end_index_filename = start_index_filename + text[start_index_filename:].index('"')
    return (text[start_index_filename:end_index_filename], end_index_filename)

__extract_form_data_mimetype

Extract the mimetype from the form data uploaded file.

Parameters
  • text: (str) the body or a part received from the request.
Returns
  • (tuple[str, str, str]) the mimetype and the index where the mimetype value ends.
View Source
@staticmethod
def __extract_form_data_mimetype(text: str) -> tuple[str, int]:
    """Extract the mimetype from the form data uploaded file.

        :param text: (str) the body or a part received from the request.
        :return: (tuple[str, str, str]) the mimetype and the index where the mimetype value ends.

        """
    start_index_type = text.index('Content-Type: ') + len('Content-Type: ')
    end_index_type = start_index_type + text[start_index_type:].index('\n')
    mimetype = text[start_index_type:end_index_type]
    mimetype = mimetype.replace('\r', '')
    return (mimetype, end_index_type)

__extract_form_data_boundary

Extract the boundary from the form data content type which delimited the uploaded file content.

Parameters
  • content_type: (str) the content type in the header of the received request.
Returns
  • (tuple[str, str, str]) the boundary.
View Source
@staticmethod
def __extract_form_data_boundary(content_type: str) -> str:
    """Extract the boundary from the form data content type which delimited the uploaded file content.

        :param content_type: (str) the content type in the header of the received request.
        :return: (tuple[str, str, str]) the boundary.

        """
    start_boundary = content_type.index('boundary=') + len('boundary=')
    boundary = '--' + content_type[start_boundary:] + '--'
    return boundary