https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Robert Bastian

    02/24/2021, 11:11 PM
    Hi! I’m wondering if anybody with Databricks Task experience can help me with this. I would like to capture the results of a Spark job to inform next steps in my flow. I can successfully submit Databricks jobs, but I didn’t see anyway to get the results. Looking further, I see a DatabricksHook that has a get_run_page_url() method that would get me close to what I need, but any attempt to use Databricks gives me a “None Type is not iterable…” exception. Here is my code:
    @task
    def get_url(run_id, hook):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("RUN ID: %s", run_id)
        url = hook.get_run_page_url(run_id=run_id)
        return url
    
    SubmitRun = DatabricksSubmitRun()
    
    with Flow("test_databricks", storage=STORAGE, run_config=RUN_CONFIG) as flow:
        conn = PrefectSecret('DATABRICKS_CONNECTION_STRING')
        json = get_job_config()
        run_id = SubmitRun(json=json, databricks_conn_secret=conn)
        hook = SubmitRun.get_hook()
        url = get_url(run_id, hook)
    Here is the exception:
    ERROR:prefect.TaskRunner:Unexpected error: TypeError("argument of type 'NoneType' is not iterable")
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/Users/rbastian/enverus/RAI/prefect/flows/test-databricks.py", line 32, in get_url
        url = hook.get_run_page_url(run_id=run_id)
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/tasks/databricks/databricks_hook.py", line 248, in get_run_page_url
        response = self._do_api_call(GET_RUN_ENDPOINT, json)
      File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/tasks/databricks/databricks_hook.py", line 148, in _do_api_call
        if "token" in self.databricks_conn:
    Thanks in advance!
  • d

    Dhiraj Golhar

    02/25/2021, 6:05 AM
    Hi prefect community! Is there any way to lock the flow when its running? My use case is, when specific flow is in running state ,that same flow should not run again parallely. When someone trigger the flow again, it should start running only when first run completes. Is there any simple way to achieve this?
    n
    a
    • 3
    • 5
  • j

    Joël Luijmes

    02/25/2021, 8:33 AM
    We are using Kubernetes a lot in our flows (creating services, pods and jobs), and sometimes this yields the following error. This usually happens when we burst kubernetes operations.
    {
      "kind": "Status",
      "apiVersion": "v1",
      "metadata": {},
      "status": "Failure",
      "message": "Operation cannot be fulfilled on resourcequotas \"gke-resource-quotas\": the object has been modified; please apply your changes to the latest version and try again",
      "reason": "Conflict",
      "details": {
        "name": "gke-resource-quotas",
        "kind": "resourcequotas"
      },
      "code": 409
    }
    Googling didn’t yield much result but it seems like an internal kubernetes issue. So to fix this, I think there are two ways: 1. Use prefect retry mechanism 2. Modify prefect tasks to retry on this error (willing to contribute myself) 3. Modify my code to retry My question: What would be the best approach here? With 1) it still might fail because on retry, the same burst exists when creating kubernetes objects (or can I perform random delay?) + in a resource manager I may create multiple resources -> retry does not exist (AFAIK), and if it does how would I track which resources exist. With 2) don’t know if this is right methodology, can imagine retrying in task lilbs is anti-pattern With 3) no downsides except I have to impleement this ev3rywere
  • m

    Michael Hadorn

    02/25/2021, 9:40 AM
    Is there a way to prioritize a set of parallel tasks with the localDaskExecutor? (--> basically longest first) The order of creation doesn't seem to work.
    • 1
    • 2
  • c

    Carl

    02/25/2021, 10:15 AM
    Thanks for all the help so far. Got another question! 😄 Hopefully I can explain this okay … Using the feature_engineering.py example as a template. In the flow, there are a number of functions (not tasks) and a class is instantiated too. The last call is a task, but it’s not being attached after the class instantiation. I’m expecting the following three to run in order. Not sure that
    upstream_task
    will fix this because the
    load(clean)
    task needs to run after the
    DataFrame()
    bit.
    clean = impute.map(data_cols, replacement_dict=unmapped({np.nan: 0})) # task
    clean = DataFrame(clean, column_names) # function, not a task
    load(clean) # task
    a
    a
    • 3
    • 8
  • a

    Adam

    02/25/2021, 12:07 PM
    Hi friends, we’re currently switching over from Docker storage to GCS storage. I noticed that every time we call
    register
    with an
    idempotency_key
    set, the flow is uploaded to storage despite it not having changed. Is this intended? Command used in the comments
    m
    • 2
    • 14
  • n

    Nikhil Akki

    02/25/2021, 1:46 PM
    Hello, We are evaluating prefect core to replace our workflow management. As part of security protocol all container images are scanned which included all the 5 docker containers of prefect core (server, Apollo, ui, Hasura/graphql & Postgres:11). All of these containers seem to have vulnerabilities and most of them are medium to low. However, the ones which are critical & high needs to be resolved before we can use them for prod, based on our observation these packages are system libs (like linux kernel, shadow, glibc etc. something which prefect doesn’t directly use). Is it possible to update the base image to say Ubuntu 20.4 (in our tests between Debian and ubuntu, the latter seems to have just about 16 [1 medium rest all low vulnerabilities])? or any other recommended approach we could follow to over come this issue is greatly appreciated. 🙂 PS - Scans have been made using GCP’s vulnerability scan service in GCR
    m
    • 2
    • 1
  • a

    Andor Tóth

    02/25/2021, 2:41 PM
    Any tips on how to pass datetime to context?
  • a

    Andor Tóth

    02/25/2021, 2:43 PM
    the task decorator has a task_run_name, which results in an error if
    date
    becomes a string Like this
    @task(task_run_name='{name}-{date:%FT%T}')                         
    def say_hello(name):                                               
        logger = prefect.context.get("logger")                         
        <http://logger.info|logger.info>("Hello, %s!", name)                                
        <http://logger.info|logger.info>('Flow run name: %s', prefect.context.flow_run_name)
    ✅ 1
    m
    • 2
    • 2
  • j

    jorwoods

    02/25/2021, 3:37 PM
    Hi everyone. I have a question about flows running flows, or event driven flows. I have a flow that will run and poll for some external state in a database. Once that conditional is met, I want to run a set of other flows. I looked at https://docs.prefect.io/core/idioms/flow-to-flow.html but it seems that my two options are either: 1. Add an entry in the upstream flow to call the downstream flows 2. Add a waiter in the downstream flows that could tie up a flow slot or an agent. Are there any other options for how I can tell a flow what its upstream dependencies are without creating a blocking waiter?
    m
    • 2
    • 4
  • p

    Pedro Machado

    02/25/2021, 4:56 PM
    Hi there. A couple of questions about "git repo" storage: 1. Has anyone looked at Azure Repos? Any idea of the level of effort to implement a new storage class for Azure Repos? 2. I don't believe the storage classes are designed to work with multiple files (a file that has the flow and other "utility" modules). Am I correct? Is it possible to use the existing classes to implement this pattern or do we have to necessarily create a python package? I have a client who is considering Kubernetes orchestration but doesn't like the idea of baking the flow code in the docker image. They'd like for the flow code (and additional utility modules) to be pulled from a repo every time a flow needs to run.
    m
    • 2
    • 11
  • a

    Andor Tóth

    02/25/2021, 5:12 PM
    How can I access LocalRun or LocalDaskRun run_config env variables during task execution? I have read though the relevant pages on docs.prefect.io, searched the examples, even browsed the source, but still no clue.
    m
    • 2
    • 5
  • a

    Andor Tóth

    02/25/2021, 5:30 PM
    maybe I have misunderstood something and LocalRun are not meant to be used to pass environment variables to tasks
  • a

    Andor Tóth

    02/25/2021, 5:33 PM
    though this suggests otherwise
  • p

    Peyton Runyan

    02/25/2021, 5:47 PM
    I'm getting an error with the following flow:
    ValueError: Could not infer an active Flow context.
    Details in thread
    with Flow("Batch update airtable flow") as f:
    
        proj, stry = get_selected_bases()
        cmdf = get_communities_2017()
    
        scrubbed_proj, scrubbed_stry = scrub_dataframes(proj, stry, cmdf)
    
        debug_task(scrubbed_proj, scrubbed_stry)
    • 1
    • 5
  • k

    Kieran

    02/25/2021, 6:13 PM
    I am getting a health check error when registering a flow with Docker storage:
    Beginning health checks...
    System Version check: OK
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 151, in <module>
        flows = cloudpickle_deserialization_check(flow_file_paths)
      File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
        flows.append(cloudpickle.loads(flow_bytes))
    ModuleNotFoundError: No module named 'flows.tasks'
    Registering the same flow locally is passing my check.
    from prefect.utilities.debug import is_serializable
    
    if is_serializable(flow): ...
    My PYTHONPATH is the root of the directory and when SSH'ing into the container I am able to import the "missing" module... Any pointers?
    m
    • 2
    • 4
  • m

    Matheus Calvelli

    02/25/2021, 7:22 PM
    Hi, community, quick question: I have a parametrized (with Prefects' Parameters) flow and i would like to schedule the same flow to run, lets say, at 14 o'clock each day. But, the same flow should run multiple times with different parameters. As i was looking into it, it seems there is no way to do it through the UI, only on the flow itself, unless i re-register the flow with different names. Is this correct? I was unable to find any guidelines regarding this on the docs. Thanks in advance.
    d
    m
    • 3
    • 12
  • d

    Dave

    02/25/2021, 7:31 PM
    Hi guys, I have been wondering about k8s and job executions, since it's widely known that Kubernetes cron jobs can create none or two executions of its schedule. My concern is of course that if something out of the ordinary happens in our cluster and our job either don't run or runs twice instead of the single run we would like to have. What guarantees is there for a flow when using the k8s agent that a job only runs once? Another concern is if we use CreateNamespacedJob from the task library, wouldn't that have the same problems as k8s cron jobs?
    j
    m
    k
    • 4
    • 7
  • p

    Peyton Runyan

    02/25/2021, 7:34 PM
    I imagine this is a fairly simple thing to figure out, but I'm not seeing it in the docs. If I have a task that takes 2 parameters, but also needs to wait until the completion of a task that doesn't return anything, how I do signal that dependency?
    with Flow as f:
       x, y = func_1():
       no_return_func():
       res = final_func(x,y)
    Using the example above, how do I signal for
    final_func
    to wait on
    no_return_func
    ? Do I just add a third argument to the function and have
    no_return_func
    return
    True
    ? Or is there a cleaner way?
    j
    d
    m
    • 4
    • 5
  • a

    Alex Welch

    02/25/2021, 10:03 PM
    hello, I was wondering if prefect had a way to interact with an AWS SNS queue. We are trying to detect the presence of a file to kick off a job. One solution is to have the flow check every 30 min or so and then delete the processed file, but I was wondering if there was a more elegant solution out there
    n
    • 2
    • 2
  • s

    Sean Talia

    02/25/2021, 10:26 PM
    has anyone worked with using a generic web cloud hook before? I'd like to configure the JSON payload that gets sent to the provided URL but it doesn't seem like there's a way to do that just by looking at the cloud interface?
    n
    m
    • 3
    • 21
  • m

    Maria

    02/25/2021, 11:59 PM
    Hi! I am wondering if anyone is able to use ShellTask on Windows? I'm running a script from tutorial and getting an error:
    from prefect import Flow
    from prefect.tasks.shell import ShellTask
    
    task = ShellTask(helper_script="cd ~")
    
    with Flow("My Flow") as f:
        contents = task(command='ls')
            
    out = f.run()
    [2021-02-26 10:32:28+1100] INFO - prefect.FlowRunner | Beginning Flow run for 'My Flow'
    [2021-02-26 10:32:28+1100] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
    [2021-02-26 10:32:28+1100] ERROR - prefect.TaskRunner | Unexpected error: FileNotFoundError(2, 'The system cannot find the file specified', None, 2, None)
    Traceback (most recent call last):
    .....
    ....
    python\python38-32\lib\subprocess.py", line 1307, in _execute_child
        hp, ht, pid, tid = _winapi.CreateProcess(executable, args,
    FileNotFoundError: [WinError 2] The system cannot find the file specified
    I do have bash installed, also specifying path doesn't help
    task = ShellTask(shell="C:\Windows\System32\bash.exe", helper_script="cd ~")
    m
    t
    m
    • 4
    • 16
  • m

    matta

    02/26/2021, 1:24 AM
    Trying to run a Flow of Flows but I'm getting this:
    Unexpected error: ClientError('400 Client Error: Bad Request for url: <http://prefect-apollo:4200/graphql/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n GRAPHQL_VALIDATION_FAILED: Cannot query field "user" on type "Query".\n\nThe GraphQL query was:\n\n query {\n user {\n default_membership {\n tenant {\n slug\n }\n }\n }\n }\n\nThe passed variables were:\n\n null\n',)
    c
    m
    • 3
    • 11
  • j

    Josh

    02/26/2021, 2:18 AM
    Does anyone know how to use
    case
    with
    mypy
    ? When I try to add a case to the flow, mypy throws
    Module not callable
    with Flow("flow") as flow:
      case(task_result, True):
        other_task()
  • a

    Alex Welch

    02/26/2021, 4:30 AM
    I swear I have seen this somewhere in the docs, but for the life of me I can’t find it now. I am looking to use prefect to spin up a docker container, clone a github repo, and then run the flow defined there. I see the ability to pull the container from our registry but I cannot find how to pull the github repo
    🙌 1
    a
    • 2
    • 5
  • k

    Kamil Okáč

    02/26/2021, 12:34 PM
    What's the correct way to store&pass environment information (like API endpoint URLs or database IPs) to flows when using Prefect Cloud? Approaches I considered (but none of them feels just right): • have multiple projects (one per enviorment), using parameters with different defaults for each environment • using agent environment variables • using secrets • storing in docker image used for flow runs
    e
    • 2
    • 2
  • l

    Laura Vaida

    02/26/2021, 4:36 PM
    hi folks, how to set the docker storage configuration right? i have this example:
    Docker(
    
        env_vars={
            # append modules directory to PYTHONPATH
            "PYTHONPATH": "C://Users/laura.vaida.000/anaconda3/envs/prefect/Lib/site-packages"
        },
        stored_as_script=True
    )
    j
    m
    • 3
    • 23
  • b

    Brian Mesick

    02/26/2021, 5:29 PM
    Hi all, here’s a weird case we’re running into today. A flow running on K8s that hasn’t changed in a few wks has started failing consistently with:
    Container 'flow' state: terminated
    		Exit Code:: 2
    		Reason: Error
    The job shows up submitted, then ~30 seconds later we get 4 of those errors. Other nearly identical flows are working. The pods don’t hang around long enough to get logs (if they even start). Has anyone else run into this?
    n
    • 2
    • 21
  • a

    Alex Welch

    02/26/2021, 7:30 PM
    When using Github Storage I have two issues. The first is that it exposes the secret in the
    KeyError
    . The second is that it is showing me the correct key and yet stating that it was not found. I have the secrets defined in the Prefect UI
    Failed to load and execute Flow's environment: KeyError('The secret <secret> was not found.  Please ensure that it was set correctly in your tenant: <https://docs.prefect.io/orchestration/concepts/secrets.html>')
    I followed the docs to establish the secret and am using the same variable name in both the UI and the code.
    GH_TOKEN
    GH_TOKEN = Secret("GH_TOKEN").get()
    
    STORAGE = GitHub(
        repo="<my_repo_name",
        path=f"flows/my_flow.py",
        access_token_secret=GH_TOKEN
    )
    m
    • 2
    • 6
  • s

    S K

    02/26/2021, 8:02 PM
    Need help for this. This is on server as the backend. On exception I am using "raise SystemExit(0)", but the flow is not quitting immediately and takes 15 minutes or so for the flow to fail. What is the issue?
    @task()
    def mainprocess(a):
        global df_single_record
        try:
            for indx in df_data_extracted.index + 1:
                df_single_record = df_data_extracted.iloc[indx - 1:indx]
                converttojson(df_single_record)
                postjsontoqapi(df_json_data)
        except Exception as e:
            logger = get_logger()
            logger.error(str(get_pst_time())
                         + '==========ERROR IN mainprocess() WHILE CONVERTING TO JSON/POSTING TO Q-API: '
                          + str(e))
            raise SystemExit(0)
    m
    • 2
    • 11
Powered by Linen
Title
s

S K

02/26/2021, 8:02 PM
Need help for this. This is on server as the backend. On exception I am using "raise SystemExit(0)", but the flow is not quitting immediately and takes 15 minutes or so for the flow to fail. What is the issue?
@task()
def mainprocess(a):
    global df_single_record
    try:
        for indx in df_data_extracted.index + 1:
            df_single_record = df_data_extracted.iloc[indx - 1:indx]
            converttojson(df_single_record)
            postjsontoqapi(df_json_data)
    except Exception as e:
        logger = get_logger()
        logger.error(str(get_pst_time())
                     + '==========ERROR IN mainprocess() WHILE CONVERTING TO JSON/POSTING TO Q-API: '
                      + str(e))
        raise SystemExit(0)
m

Michael Adkins

02/26/2021, 8:12 PM
Hi @S K this is likely not captured by our standard exception handling because
SystemExit
is a reserved exception type https://docs.python.org/3/library/exceptions.html#SystemExit
I'd recommend using https://docs.prefect.io/api/latest/engine/signals.html#fail instead
s

S K

02/26/2021, 8:28 PM
@Michael Adkins Thanks for the info, can you please show me the piece of code, how to fail the flow using class prefect.engine.signals.FAIL?
m

Michael Adkins

02/26/2021, 8:29 PM
Ah sorry, you just import it and raise it like an exception (ie
raise FAIL()
) https://docs.prefect.io/core/concepts/execution.html#state-signals
👏 1
s

S K

02/26/2021, 9:57 PM
@Michael Adkins After following the above, I am able to fail the task successfully, but other subsequent tasks are executed which I don't want. I want to fail the complete flow if any of the tasks fails, how to achieve this?
m

Michael Adkins

02/26/2021, 9:58 PM
I think you're looking for https://docs.prefect.io/core/concepts/flows.html#key-tasks
By default, if a task fails its downstream tasks should also fail. Did you set a custom trigger (https://docs.prefect.io/core/getting_started/next-steps.html#triggers) / are your tasks running in parallel?
s

S K

02/26/2021, 10:01 PM
No I did not set any custom triggers, tasks are not running in parallel and they run in sequential way
with Flow('flow_name', storage=Local()) as flow:
    check_if_flow_is_running()
    getdata = readssmandextractdata()
    vmainprocess = mainprocess(getdata)
    updatecontroltable(vmainprocess)

flow.run()
@Michael Adkins This is how I am executing the tasks
checkflow = check_if_flow_is_running()
getdata = readssmandextractdata(checkflow)
mainlogic = mainprocess(getdata)
updatecontroltable(mainlogic)
@Michael Adkins thx much, able to stop the flow by doing as above...
m

Michael Adkins

02/26/2021, 10:32 PM
Great!
View count: 1