Skip to main content

Taskflow update #7

 Continuing the node writing topic. So last time i showed how easy it is to define the interface of a node, now let's take a look at a whole node.

 Продолжая тему написания нод. Прошлый раз я продемонстрировал простой АПИ для создания интерфейса ноды, теперь же давайте посмотрим на написание ноды целиком

It will be a simple node that just creates a file with given name and and writes one of task's attributes there.

Для примера сделаем ноду, которая банально записывает значение атрибута таска в файл, путь которого берется из параметра ноды

first we just need to subclass the BaseNode class

Начнём с наследования класса BaseNode

class LogSomeAttribute(BaseNode):
    def __init__(self, name):
        super().__init__(name)


Now we initialize interface, as we might remember from previous post 

Инициализируем интерфейс, как в прошлом посте

    ui = self.get_ui()
    with ui.initializing_interface_lock():
        ui.add_parameter('where to write', 'Log File Path', NodeParameterType.STRING, '')

Here we create one single parameter where to write (we can have any unicode strings as parameter names, not just ascii letters). The display label will be "Log File Path", default value - empty

В этом примере мы создали один единственный параметра where to write (и да, в качестве имени параметра может быть любая юникод строка, а не только аски буквоцифры). Отображаемым именем будет "Log File Path", значение по умолчанию - пустое

Next we will implement some abstract classmethods that will identify our new node type

Далее реализуем некоторые методы класса, идентифицирующие новый тип нод.

    @classmethod
    def label(cls) -> str:
        return 'log an attribute'

    @classmethod
    def tags(cls) -> Iterable[str]:
        return 'tutorial', 'log', 'attribute'

    @classmethod
    def type_name(cls) -> str:
        return 'tutorial::logattr'

The names of methods are pretty self-explanatory: label is how this new node type will be called in display, tags - just a set of words to help finding the node type, and type_name - the internal unique name of the node type

Имена этих методов говорят сами за себя: label - отображаемое имя типа ноды, tags - набор ключевых слов для поиска, type_name - внутреннее имя типа ноды

And we are done with  all the boiler plate code, only actual node logic implementation remaining.

На этом мы закончили со вспомогательным кодом, время реализовывать собственно логику.

First, let's briefly remember how task processing happens in a node.

 Для начала вспомним, как происходит обработка таска.

  1. task arrives at node and waits to be processed
  2. scheduler executes node's processing logic in a separate thread, still on scheduler machine.
  3. on step 2 is when a node through it's logic can decide to generate an invocation job - something to be executed on one of worker machines. If such invocation job was generated - scheduler will assign it to some free worker that meets job's requirements
  4. worker executes invocation job and returns job metadata and maybe even some processed files back to the scheduler
  5. scheduler initializes post-processing logic in a separate thread. here a node has a chance to analyze invocation job's results and change some attributes on the task, or even create another invocation job and go back to step 3.
  6. after that is done - the task is considered processed by a node, and scheduler will move it to the next node in the graph
    1. таск приходит в ноду и ожидает обработки
    2. шедулер выполняет процессинг логику ноды в отдельном потоке, всё еще внутри процесса шедулера
    3. на стадии 2 логика процессинга конкретного типа ноды может решить создать работу(invocation) для выполнения на рабочих в сети.
    4. рабочие выполняют созданную работу и возвращают метадату результата (и потенциально дополнительные файлы) шедулеру
    5. шедулер выполняет пост-процессинг логику конкретного типа ноды, по аналогии с шагом 2. На этом этапе результаты работы могут быть проанализированы, и таск может быть изменён соответственно. Даже может быть создана новая работа, в таком случае далее возвращаемся на шаг 3
    6. после всего - таск считается обработанным нодой, и шедулер направит его далее по графу

For this particular example we just want to write a file, no invocation jobs, no post-processings, so we are interested only in steps 1 and 2. Sure we could create an invocation job that would do our log writing, but it doesn't worth it. Our task is not CPU intensive at all, not even IO intensive, so we can leave it to the scheduler itself.

В данном примере мы всего лишь хотим писать в файл, так что мы не будем создавать работы для рабочих, и нам не нужен пост-процессинг, так что для нас актуальны только шаги 1 и 2. Да, мы могли бы выполнить запись в файл на рабочих, но так как наша задача совсем не загружает цпу, и даже не загружает диск - проще и быстрее оставиь выполнение шедулеру.

All processing logic is implemented in process_task virtual method. It expects ProcessingContext as argument, and should produce ProcessingResult as result.

Вся логика реализуется в одном виртуальном методе process_task. Он ожидает ProcessingContex на вход и должен вернуть переменную типа ProcessingResult.

ProcessingContext here provides us with, well, context in which to evaluate node parameters if those have expressions.

ProcessingContext даёт нам возможность вычислять значения параметров, вычисляя возможные экспрешны в них в контексте выполняемого таска.

So the implementation of processing is just the following:

Так что вся реализация сводится к:

def process_task(self, context: ProcessingContext) -> ProcessingResult:
    with open(context.param_value('where to write', 'w') as f:
        f.write(str(context.task_attribute('some_attr')))
    return ProcessingResult()

And that is all, we will open file with path given by "where to write" parameter, and write there the value of "some_attr" attribute

 В общем вот и всё, мы открываем файл по пути, взятому из параметра "where to write", и записывает туда значение атрибута "some_attr"

Now we completely did not care here about possible errors and exceptions, which is not good practice, though scheduler will catch any exception raised from here and will set the processed task into "error" state, saving cought exception and stacktrace.

Вы могли заметить, что мы совершенно игнорируем обработку ошибок, что является очень плохой практикой. Но шедулер ловит все исключения в обработке таска, так что если что-то случится - таск будет переключен в состояние "error", и информация об ошибке будет сохранена в нём.

Now having attribute name "some_attr" hardcoded into the node is not the best idea. What if we want to save some other attribute?

Если подумать, мы сохраняем атрибут с захардкоженным именем "some_attr", что несколько чересчур специффично. Что если мы захотим сохранить значение другого атрибута?

We can actually can go even more general: instead of writing some fixed attribute - we will write the value of another node parameter, but that parameter will have an expression that evaluates some parameter from the task being processed.

Сделаем один дополнительный шаг в сторону дженерализации: вместо атрибута - мы будем записывать значение другого параметра ноды, но в качестве дефолтного значения в этом атрибуте мы зададим выражение, возвращающее атрибут таска.

Then first we need to add another parameter to the interface:

Таким образом интерфейс нашего типа ноды изменится так:

class LogSomeAttribute(BaseNode):
    def __init__(self, name):
        super().__init__(name)
        ui = self.get_ui()
    	with ui.initializing_interface_lock():
        	ui.add_parameter('where to write'), 'Log File Path', NodeParameterType.STRING, '')
            ui.add_parameter('what to write', 'Saved Data', NodeParameterType.STRING, "`task['some_attr']`")


Note that as the default value for string parameter "what to write" we set ‘task['some_attr']‘. Just like in Houdini, here string node parameters are being expanded. so everything within ‘ ‘ quotes will be treated as an inline expression. This particular expression just takes the value of attribute "some_attr" of the task currently being processed.

Заметьте, в качестве значения по умолчанию атрибута "what to write" мы задали ‘task['some_attr']‘. Так же, как и в гудини, значения строковых параметров нод экспандятся, что значит, что все выражения в этих кавычках `` вычислаются как питон код, результат вычисления будет подставлен вместо всего выражения в ``

And then the processing function will look like this:

Итак, метод process_task будет выглядеть следующим образом

def process_task(self, context: ProcessingContext) -> ProcessingResult:
    with open(context.param_value('where to write'), 'w') as f:
        f.write(str(context.param_value('what to write')))
    return ProcessingResult()

And in the viewer you will see this node like this:

Теперь во вьюере наша нода будет выглядеть так:


But  the idea is that an average user should not need to write it's own nodes. There should be enough standard ones, there is also a python node that allows one to easily write scripts to be executed on scheduler and on workers

Но задумка в том, что среднему пользователю не должно быть необходимости писать свои ноды - для почти всего должно быть достаточно встроенных нод. Так же в наличии общая питон нода, позволяющая легко писать скрипты для выполнения как на шедулере, так и на машинах рабочих.