• Vlad Koshelev

    Vlad Koshelev

    1 year ago
    Hi, everyone. I'm trying to build ETL flow for star schema DB with references and facts tables. They are loaded from csv files daily. The facts should be loaded after the references, so facts ETL flow depends on already loaded references for the day. The problem is that files come in unpredictable order. E.g. facts.csv first, references.csv next. Moreover, references.csv may not be present on some days. In that case facts flow should wait for references for some time. The cases are:1. references.csv uploaded first, facts.csv uploaded last: load references -> load facts, 2. facts.csv first, references.csv last: wait for references -> load references -> load facts, 3. facts.csv and no references.csv: wait for references with timeout -> load facts Another problem is that I need to forbid parallel execution of facts loading if there are several fact files uploaded. (facts data come in sliding time ranges without primary keys, so before loading them to DB I need to delete previous data within the time range). I guess I need a shared state which tells facts flow if references are already loaded or not and tells it that some other facts flow task is running. And the state is already present in Prefect DB tasks. Is there an API to get Prefect tasks from DB and which criteria I can use (e.g. some custom tags, dynamically set) to filter them? Something like this:
    references_task = StartFlowRun(flow_name='references-etl', project_name='etl', wait=True)
    facts_task = StartFlowRun(flow_name='facts-etl', project_name='etl', wait=True)
    
    with Flow('main-etl') as main_flow:
        files = get_files()
        references_task.map(files)
        facts_task.map(files)
    
    
    @task
    def get_group(file)
        # get files "group" name from the file (20200101-facts.csv -> group=20200101)
    
    @task
    def check_references_done(group, file):
        # get references tasks from Perfect DB for the group and check they are done
        # or check if wait timeout reached (e.g. get time of the file creation and check if now - created_at > timeout)
    
    @task
    def check_no_another_facts_running(group, file):
        # check if no "do_etl" tasks from Perfect DB with "running" state exist
    
    @task
    def do_etl(group, file):
        ...
    
    with Flow('facts-etl') as facts_flow:
        file = Parameter('file')
        group = get_group(file)
        check_references_done(group, file)
        check_no_another_facts_running(group, file)
        do_etl(group, file, upstream_tasks=[check_references_done, check_no_another_facts_running])
    Vlad Koshelev
    j
    2 replies
    Copy to Clipboard
  • l

    Lukas N.

    1 year ago
    Hi Prefect community 👋, I've got a problem with passing environment variables from Kubernetes agent to DaskKubernetesEnvironment. Anyone able to help? More in thread
    l
    2 replies
    Copy to Clipboard
  • Will Milner

    Will Milner

    1 year ago
    is there any extra config needed in order to have std_out from a ShellTask show up in the server logs? When I declare a task like this I see the output fine
    @task(log_stdout=True)
    def sample_print_task():
        print("hello")
    When I declare a shell task like this
    task_shell = ShellTask(return_all=True, log_stdout=True, log_stderr=True, stream_output=True)
    with Flow("test") as flow:
        print_test = task_shell(command="echo hi", task_args={"name": "hi"})
    I don't see anything printed after I register and run the flow. I have
    log_to_cloud
    set to True on the agent I am running
    Will Milner
    j
    8 replies
    Copy to Clipboard
  • v

    Vitaly Shulgin

    1 year ago
    Hi Prefect community, how can I configure agent to work for specific tenant? There is the error in agent log, is it possible to set it via env var?
    [2020-12-16 14:48:37,015] ERROR - agent | 400 Client Error: Bad Request for url: <http://prefect-apollo:4200/>
    
    The following error messages were provided by the GraphQL server:
    
        INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at
            "input.tenant_id"; Expected non-nullable type UUID! not to be null.
    
    The GraphQL query was:
    
        mutation($input: get_runs_in_queue_input!) {
                get_runs_in_queue(input: $input) {
                    flow_run_ids
            }
        }
    
    The passed variables were:
    
        {"input": {"before": "2020-12-16T14:48:36.911186+00:00", "labels": ["talynuc", "any"], "tenant_id": null}}
    v
    j
    6 replies
    Copy to Clipboard
  • dh

    dh

    1 year ago
    hello, prefect community. Does prefect has an idiomatic way to specify a) “please run this Task within this docker image” and b) “please execute this Task on a container using x amounts of cpu/memory + gpu”. I am curious about task-level specification, not flow-level.
    dh
    j
    +1
    15 replies
    Copy to Clipboard
  • Marc Lipoff

    Marc Lipoff

    1 year ago
    Does Prefect create a repository on AWS ECR, if it does not exist? I am getting this error
    The repository with name 'staging-prefect-insurance_export' does not exist in the registry with id '524279393077'
    Im running something like this :
    Docker(registry_url='<account_id>.<http://dkr.ecr.us-west-2.amazonaws.com|dkr.ecr.us-west-2.amazonaws.com>',
                              image_name="dev-prefect-<flow-name>",
                              python_dependencies=python_dependencies,
                              files={f: os.path.join('/modules/', os.path.basename(f))
                                                    for f in get_files_in_directory(current_module_directory)},
                              env_vars = {"PYTHONPATH": "$PYTHONPATH:modules/:modules/utils/"})
    Marc Lipoff
    j
    2 replies
    Copy to Clipboard
  • c

    Christian

    1 year ago
    Hi. 👋 I want to access prefect.context.parameters from within a task state_handler but the value I get is always
    None
    . The idea is to send an email (EmailTask) on a task failure and I want to parameterise the email address that will be alerted. Is there another way to pass a variable into a state_handler? Am I going in the wrong direction? Thanks
    c
    j
    5 replies
    Copy to Clipboard
  • Lech Głowiak

    Lech Głowiak

    1 year ago
    Hi! 👋 I admit my use-case isn't what Prefect was designed for, but here I am. I have to have manual trigger to let flow continue. That's because that are first runs of this flow in production, we want to manually check some state before going further. The problem is that after pause new Container is created. All state (both variables and temporary files stored in previous container) is lost this way and after pause it's missing. Is there some way to come around this obstacle?
    Lech Głowiak
    Matt Drago
    +1
    9 replies
    Copy to Clipboard
  • b

    Bob Primusanto

    1 year ago
    Hello, I'm looking for a way to get the flow_id. I want to record this as a column in my table, to trace back to a particular task if i encounter an issue. any ideas?
    b
    j
    2 replies
    Copy to Clipboard
  • Thomas Reynaud

    Thomas Reynaud

    1 year ago
    Hi! First I’d like to thank you for building such a great product!! We are currently experimenting with Prefect and we were wondering what was the best practice when running a docker agent (or any agent actually) on a remote server. Should you just start it behind a screen, detach and leave it as a background process? Do you have any recommendations? It seems very dirty when running multiple agents on the same machine..
    Thomas Reynaud
    Jim Crist-Harif
    2 replies
    Copy to Clipboard