v

    Vitaly Shulgin

    1 year ago
    Hello Team, I'm trying to run test flow, the problem is that no one task except first is called, any ideas?
    nicholas

    nicholas

    1 year ago
    Hi @Vitaly Shulgin - would you mind modifying your original message to remove the logs output and place it in this thread? It blocks visibility to others' messages.
    (as a rule, please try to thread your messages when there are large chunks of code, stack traces, or pictures!)
    v

    Vitaly Shulgin

    1 year ago
    sure
    log entries
    [2020-12-18 12:07:48+0300] INFO - prefect.FlowRunner | Beginning Flow run for 'testing-etl-visitors'
    [2020-12-18 12:07:48+0300] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
    [2020-12-18 12:07:48+0300] DEBUG - prefect.FlowRunner | Flow 'testing-etl-visitors': Handling state change from Scheduled to Running
    [2020-12-18 12:07:48+0300] INFO - prefect.TaskRunner | Task 'context_init': Starting task run...
    [2020-12-18 12:07:48+0300] DEBUG - prefect.TaskRunner | Task 'context_init': Handling state change from Pending to Running
    [2020-12-18 12:07:48+0300] DEBUG - prefect.TaskRunner | Task 'context_init': Calling task.run() method...
    [2020-12-18 12:07:50+0300] DEBUG - prefect.context_init | Initializing context ...
    [2020-12-18 12:07:50+0300] DEBUG - prefect.context_init | Initializing injector ...
    [2020-12-18 12:07:50+0300] DEBUG - prefect.context_init | Initialized injector successfully
    [2020-12-18 12:07:50+0300] DEBUG - prefect.context_init | Succeeded with auth for firebase <<https://onedebot.firebaseio.com/visitors.json>>
    [2020-12-18 12:07:50+0300] DEBUG - prefect.context_init | Initialized context successfully
    [2020-12-18 12:07:50+0300] DEBUG - prefect.TaskRunner | Task 'context_init': Handling state change from Running to Success
    [2020-12-18 12:07:50+0300] INFO - prefect.TaskRunner | Task 'context_init': Finished task run for task with final state: 'Success'
    [2020-12-18 12:07:50+0300] INFO - prefect.TaskRunner | Task 'visitors_extract': Starting task run...
    [2020-12-18 12:07:50+0300] DEBUG - prefect.TaskRunner | Task 'visitors_extract': Handling state change from Pending to Running
    [2020-12-18 12:07:50+0300] DEBUG - prefect.TaskRunner | Task 'visitors_extract': Calling task.run() method...
    [2020-12-18 12:07:50+0300] DEBUG - prefect.TaskRunner | Task 'visitors_extract': Handling state change from Running to Success
    [2020-12-18 12:07:50+0300] INFO - prefect.TaskRunner | Task 'visitors_extract': Finished task run for task with final state: 'Success'
    [2020-12-18 12:07:51+0300] INFO - prefect.TaskRunner | Task 'visitors_transform': Starting task run...
    [2020-12-18 12:07:51+0300] DEBUG - prefect.TaskRunner | Task 'visitors_transform': Handling state change from Pending to Running
    [2020-12-18 12:07:51+0300] DEBUG - prefect.TaskRunner | Task 'visitors_transform': Calling task.run() method...
    [2020-12-18 12:07:51+0300] DEBUG - prefect.TaskRunner | Task 'visitors_transform': Handling state change from Running to Success
    [2020-12-18 12:07:51+0300] INFO - prefect.TaskRunner | Task 'visitors_transform': Finished task run for task with final state: 'Success'
    [2020-12-18 12:07:51+0300] INFO - prefect.TaskRunner | Task 'filter_visitor_by_user_agent': Starting task run...
    [2020-12-18 12:07:51+0300] DEBUG - prefect.TaskRunner | Task 'filter_visitor_by_user_agent': Handling state change from Pending to Failed
    [2020-12-18 12:07:51+0300] INFO - prefect.TaskRunner | Task 'filter_visitor_by_user_agent': Finished task run for task with final state: 'Failed'
    [2020-12-18 12:07:51+0300] INFO - prefect.TaskRunner | Task 'filter_final': Starting task run...
    [2020-12-18 12:07:51+0300] DEBUG - prefect.TaskRunner | Task 'filter_final': Handling state change from Pending to Failed
    [2020-12-18 12:07:51+0300] INFO - prefect.TaskRunner | Task 'filter_final': Finished task run for task with final state: 'Failed'
    [2020-12-18 12:07:51+0300] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    [2020-12-18 12:07:51+0300] DEBUG - prefect.FlowRunner | Flow 'testing-etl-visitors': Handling state change from Running to Failed
    flow
    with Flow("testing-etl-visitors") as flow:
        logger = prefect.context.get("logger")
        logger.setLevel(logging.DEBUG)
    
        init_succeeded      = context_init()
        json_data           = visitors_extract(init_succeeded)
        all_visitors        = visitors_transform(json_data)
        filtered_visitors   = filter_visitor_by_user_agent.map(all_visitors)
        final_visitors      = filter_final.map(filtered_visitors)
    
    result = flow.run()
    nicholas

    nicholas

    1 year ago
    Thanks @Vitaly Shulgin!
    To your question: do you have some tasks you can post as well?
    My guess is there's something up with the construction of your tasks
    v

    Vitaly Shulgin

    1 year ago
    @nicholas yes, you're right, figured it out, there is
    yiedl
    , which prevents task to be executed as expected
    nicholas

    nicholas

    1 year ago
    That makes sense! Glad you got it figured out.