Ajay

    Ajay

    1 year ago
    Hi team, I am pretty new to prefect. I am stuck at trying to write an ETL . Is there a way to pass run time variables to state handlers. What i want is something like this.
    def task_state_transition_handler(flow_info:Task, old_state: State, new_state: State):
        if new_state.is_queued():
            aquire_lock(path=new_state.context['flow_name'] + "/" +
                                 new_state.context['today']+"/",
                        data=new_state.context['flow_run_id'])
        elif new_state.is_finished():
            release_lock(path=new_state.context['flow_name'] + "/" +
                                 new_state.context['today']+"/",
                        data=new_state.context['flow_run_id'])
    
        return
    But the context dict is empty. FYI: acquire_lock creates a _lock file
    j

    josh

    1 year ago
    @Ajay try grabbing context like this inside the handler:
    def your_handler(...):
        from prefect import context
        print(context['flow_name'])
        ...
    Ajay

    Ajay

    1 year ago
    Hi @josh. Thanks for the help. working now, Also want to understand can I set a value at runtime in the context and access it in the state handlers.?
    j

    josh

    1 year ago
    Yeah you should be able to
    Ajay

    Ajay

    1 year ago
    so in this
    with Flow("ETL", result=s3_result, state_handlers=[flow_state_transition_handler]) as flow:
     prefect.context.event_name = keyName
    and using it in
    def flow_state_transition_handler(flow_info: Flow, old_state: State, new_state: State):
        if new_state.is_successful():
            create_file(context["event_name"] + "/" +
                        context['today'] + "/_success", get_s3_bucket(),
                        bucket_name, "")
    is giving me
    KeyError: 'event_name'
    @josh
    j

    josh

    1 year ago
    Oh nah that would’t do it. Context would need to be populated when you call something like this:
    with Flow():
        ...
    
    with prefect.context(event_name="name"):
        flow.run()
    And when running with a backend (server/cloud) you can provide context at runtime:
    Ajay

    Ajay

    1 year ago
    ohhhh noice..:hattip: thanks a lot