abhishek-ram/django-pyas2

how would you convert a message back to original file?

Closed this issue · 7 comments

today i tried the django-pyas2 setup with 2 partners for the first time. all works fine over the web gui. so for testing i sent a image jpg file. in the messgae folder "inbox" and "store" you can then find the image file. but it is not like the original file and the filesize differs from the original file. so how can you convert the message by scripting it (not using the web gui for downloading)?

and is there also a filewatcher with detects new messages to start scripts for auto processing like mentioned here?
https://django-pyas2.readthedocs.io/en/latest/detailed-guide/extending.html
this script gets an error message pointing to:

    def join_all(self):
        for observer in self.observers:
            observer.join()

is there a documentation with more examples how to create automated scripts for building custom post processing tasks with the messages?

@comsyspro :

Please find "my current version" of filewatcher.py file here working with watchdog version 2.2.1 (watchdog==2.2.1) - I will make an update to the documentation, but this should help for the time:

`

import atexit
import logging
import os
import socket
import sys
import time  
from cachetools import TTLCache
from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from django.db import close_old_connections
from django.utils.translation import gettext as _
from pyas2 import settings
from pyas2.models import Organization, Partner
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserverVFS

logger = logging.getLogger("filewatcher")

DAEMONPORT = 16388
PAUSED = False
CACHE = TTLCache(maxsize=2048, ttl=1200)


class FileWatchHandle(PatternMatchingEventHandler):
    """
    FileWatchHandler that ignores directories. No Patterns defined by default. Any file in the
    directory will be sent.
    """

    def __init__(self, tasks, dir_watch):
        super(FileWatchHandle, self).__init__(ignore_directories=True)
        self.tasks = tasks
        self.dir_watch = dir_watch

    def handle_event(self, event):
        global PAUSED

        if PAUSED:
            return
        else:
            self.tasks.add(
                (
                    self.dir_watch["organization"],
                    self.dir_watch["partner"],
                    event.src_path,
                )
            )
            logger.info(f' "{event.src_path}" created. Adding to Task Queue.')

    def on_modified(self, event):
        self.handle_event(event)

    def on_created(self, event):
        self.handle_event(event)


class WatchdogObserversManager:
    """
    Creates and manages a list of watchdog observers as daemons. All daemons will have the same
    settings. By default, subdirectories are not searched.
    :param: force_vfs : if the underlying filesystem is a network share, OS events cannot be
                        used reliably. Polling to be done, which is expensive.
    """

    def __init__(self, is_daemon=True, force_vfs=False):
        self.observers = []
        self.is_daemon = is_daemon
        self.force_vfs = force_vfs

    def add_observer(self, tasks, dir_watch):
        if self.force_vfs:
            new_observer = PollingObserverVFS(stat=os.stat, listdir=os.scandir)
        else:
            new_observer = Observer()
        new_observer.daemon = self.is_daemon
        new_observer.schedule(
            FileWatchHandle(tasks, dir_watch), dir_watch["path"], recursive=False
        )
        new_observer.start()
        self.observers.append(new_observer)

    def stop_all(self):
        for observer in self.observers:
            observer.stop()

    def join_all(self):
        for observer in self.observers:
            observer.join()


class Command(BaseCommand):
    help = _(
        "Daemon process that watches the outbox of all as2 partners and "
        "triggers sendmessage when files become available"
    )

    @staticmethod
    def send_message(organization, partner, filepath):
        global CACHE
        max_attempts = 1
        attempt = 1

        if filepath in CACHE:
            logger.info(f' "{filepath}" already in cache, skipping.')
            return
        else:
            CACHE.__setitem__(key=filepath, value=None)

        filesize_probe_counter = 1
        filesize_probe_max = 10

        while filesize_probe_counter <= filesize_probe_max:
            if os.path.getsize(filepath) > 10:
                # give os time to finish writing if not done already
                time.sleep(1)
                break

            if filesize_probe_counter >= filesize_probe_max:
                logger.info(
                    _(
                        f"Max attempts reached {filesize_probe_max}, giving up. "
                        f"Filesize stayed below 10 bytes for {filepath}. Leave it for bulk cleanup to handle."
                    )
                )
                CACHE.__delitem__(key=filepath)
                return
            else:
                time.sleep(1)

            filesize_probe_counter += 1

        while attempt <= max_attempts:
            try:
                call_command(
                    "sendas2message", organization, partner, filepath, delete=True
                )
                if attempt > 1:
                    logger.info(_(f"Successfully retried on attempt {attempt}"))
                break

            # TODO: Retrying should only be considered when neither the retry of the AS2 server, nor the cleanup
            #       job would be picking up the file (as an AS2 message ID was already created and it might cause
            #       duplicate submission or wrong async responses). The cases where a retry should be done from here
            #       are currently not clear/known.
            except Exception as e:
                if attempt >= max_attempts:
                    logger.info(
                        _(
                            f"Max attempts reached {max_attempts}, giving up. "
                            f"Exception detail: {e}"
                        )
                    )
                    close_old_connections()
                else:
                    logger.info(
                        _(
                            f"Hit exception on attempt {attempt}/{max_attempts}. "
                            f"Retrying in 5 seconds. Exception detail: {e}"
                        )
                    )
                    # https://developpaper.com/django-database-connection-loss-problem/
                    close_old_connections()
                    time.sleep(5)
            attempt += 1

    def clean_out(self, dir_watch_data):
        global PAUSED
        PAUSED = True

        for dir_watch in dir_watch_data:
            files = [
                f
                for f in os.listdir(dir_watch["path"])
                if os.path.isfile(os.path.join(dir_watch["path"], f))
            ]
            for file in files:
                logger.info(
                    f"Send as2 message '{file}' "
                    f"from '{dir_watch['organization']}' "
                    f"to '{dir_watch['partner']}'"
                )

                self.send_message(
                    dir_watch["organization"],
                    dir_watch["partner"],
                    os.path.join(dir_watch["path"], file),
                )

        PAUSED = False

    def handle(self, *args, **options):
        logger.info(_("Starting PYAS2 send Watchdog daemon."))
        engine_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            engine_socket.bind(("127.0.0.1", DAEMONPORT))
        except socket.error:
            engine_socket.close()
            raise CommandError(_("An instance of the send daemon is already running"))
        else:
            atexit.register(engine_socket.close)

        tasks = set()

        # initialize the list containing the outbox directories
        dir_watch_data = []

        # build the paths for partners and organization and attach them to dir_watch_data
        for partner in Partner.objects.all():
            for org in Organization.objects.all():

                outbox_folder = os.path.join(
                    settings.DATA_DIR,
                    "messages",
                    partner.as2_name,
                    "outbox",
                    org.as2_name,
                )
                if not os.path.isdir(outbox_folder):
                    os.makedirs(outbox_folder)

                dir_watch_data.append(
                    {
                        "path": outbox_folder,
                        "organization": org.as2_name,
                        "partner": partner.as2_name,
                    }
                )

        if not dir_watch_data:
            logger.error(_("No partners have been configured!"))
            sys.exit(0)

        logger.info(_("Process existing files in the directory."))

        # process any leftover files in the directories

        self.clean_out(dir_watch_data)

        """Add WatchDog Thread Here"""
        logger.info(_(f"PYAS2 send Watchdog daemon started."))
        watchdog_file_observers = WatchdogObserversManager(
            is_daemon=True, force_vfs=True
        )
        for dir_watch in dir_watch_data:
            watchdog_file_observers.add_observer(tasks, dir_watch)
        try:
            logger.info(_("Watchdog awaiting tasks..."))
            start_time = time.time()
            last_clean_time = time.time()
            while True:
                if tasks:
                    task = tasks.pop()
                    logger.info(
                        f"Send as2 message '{task[2]}' "
                        f"from '{task[0]}' "
                        f"to '{task[1]}'"
                    )

                    self.send_message(task[0], task[1], task[2])

                if (
                    time.time() - start_time > 86400
                ):  # 24 hours * 60 minutes * 60 seconds
                    logger.info("Time out - 24 hours are through")
                    raise KeyboardInterrupt

                time.sleep(2)

                if time.time() - last_clean_time > 600:  # every 10 minutes
                    logger.info("Clean up start.")
                    self.clean_out(dir_watch_data)
                    last_clean_time = time.time()
                    logger.info("Clean up done.")

        except (Exception, KeyboardInterrupt) as msg:
            logger.info(f'Error in running task: "{msg}".')
            logger.info("Stopping all running Watchdog threads...")
            watchdog_file_observers.stop_all()
            logger.info("All Watchdog threads stopped.")

        logger.info("Waiting for all Watchdog threads to finish...")
        watchdog_file_observers.join_all()
        logger.info("All Watchdog threads finished. Exiting...")
        sys.exit(0)

`

thank you. i'll try it, so this should automate the send direction. but could you also say something how to convert the received files back or parse them back to the original file by scripting it in a similar filewatcher? at the moment i can only download the files in the web gui but i want to develop some scripts which detects new received messages and convert them back to original files and move them to other folders or post process them for different use cases. for example if a image file was received i want to convert the image to a graycolor image and save it into a specific user folder. all done by raw scripting without gui.

You need to set a Command on Message Receipt in the partner setting. It can be any command callable from the command line. As per documentation you can simply pass the filename to this command and then it will do whatever your command will do with that file: https://django-pyas2.readthedocs.io/en/latest/detailed-guide/partners.html#advanced-settings

a sample would be:

echo "$receiver got a message from $sender"

taken from here: #81

ok, but i invastigated that for example when i sent an image "test.jpg" with filesize 1000kb then it is received as "test.somerandom.jpg" or "somestring.msg" with a different filesize like 1070kb and also this jpg file can't be opened. there is also a header file and i want to know how can i rebuild the original file? at the moment i don't understand how or in which format the files are saved to the reciever and what have to be done to parse them back or write them back in a binary file like the original.

Indeed strange. Did you try with django-pyas2 on sender and receiver side? Can you share your settings for this behavior in particular to signature/encryption? Did you try without signature/encryption?

what i found out now is that setting content type of partner to "binary" the recieved file is exactly the same. so the question is why should i use for example content type "application/edi-consent"?

image

is it secure enough/sufficient to use http:// or is it better to run on https:// or should you proxy pass http:// in production environment (for example with nginx proxy manager)?

i also switched to https with this tutorial. should this be the right way?
https://timonweb.com/django/https-django-development-server-ssl-certificate/

how to put pyas2 into production mode (this warning appears after doing https tutorial)?
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.

I cannot comment on your first question as of yet, but glad you found a working solution.

On production deployment, there are differing views. I use both in production, HTTP as well as HTTPS. Nginx or HAProxy as a reverse proxy applying HTTPS encryption. Then as WSGI server I use gunicorn. The internet is full of deployment guides for Django, depending on infrastructure, host providers etc - here the "official" one. https://docs.djangoproject.com/en/4.2/howto/deployment/

However, the tutorial you are pointing to is django's own development webserver, which you should definitely NOT use in production.