https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    asm

    07/27/2021, 2:46 PM
    anything going on with cloud.prefect.io? the interface seems to be hanging for me quite a lot when viewing individual flows
    n
    • 2
    • 10
  • a

    asm

    07/27/2021, 2:46 PM
    not sure if this is isolated to my account or if it is site-wide
  • m

    Madison Schott

    07/27/2021, 3:08 PM
    Hi team, I noticed this error when trying to deploy my flow to ecs. How would I go about fixing this? I already downloaded Docker onto my cluster
    [27 July 2021 11:01am]: Unsupported Storage type: Docker
    k
    • 2
    • 52
  • p

    Pedro Machado

    07/27/2021, 4:12 PM
    Hi again. How can I troubleshoot a schedule? I set up a daily clock/schedule via the Prefect Cloud UI but I don't see any scheduled flows and nothing ran as expected. The schedule is enabled.
    m
    k
    m
    • 4
    • 6
  • m

    Mehdi Nazari

    07/27/2021, 5:18 PM
    Hello Team, Has anyone had a chance to test/investigate packaging a Flow with multiple .py files (or in hierarchical package structure) to be executed in Docker Storage? I’m trying to avoid adding extra parameters to the Docker Storage when writing a flow; I’d rather have my Flow packaged up as a python package and copied to the docker image which will later used by Prefect to execute the Flow.
    with Flow("Example-Flow",
    	storage=Docker(
    			image_name="example_flow_import",
    			image_tag="dev",
    			dockerfile="<path/to/dockerfile>",
    			python_dependencies=["numpy", "pandas"],
    			files={}, # Trying to avoid this
    			env_vars={}, # Trying to avoid this
    		)) as flow:
    k
    • 2
    • 9
  • t

    Trevor Kramer

    07/27/2021, 5:41 PM
    Hi everyone. I am getting this error when calling create_flow_run from one flow to call another. This used to work. Is this still supported?
    Unexpected error: TypeError('Object of type SUCCESS is not JSON serializable')
    Traceback (most recent call last):
    File "/root/.local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
    File "/root/.local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 911, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
    File "/root/.local/lib/python3.8/site-packages/prefect/engine/results/prefect_result.py", line 62, in write
    new.location = self.serializer.serialize(new.value).decode("utf-8")
    File "/root/.local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 110, in serialize
    return json.dumps(value).encode()
    File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
    File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
    File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
    File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
    TypeError: Object of type SUCCESS is not JSON serializable
    k
    • 2
    • 9
  • j

    Jelle Vegter

    07/27/2021, 7:14 PM
    Is it possible to loop through a dyamically created list and do a series of tasks on each item in the list? Like: for item in list: task1, task2, task3?
    k
    • 2
    • 2
  • c

    chicago-joe

    07/27/2021, 8:05 PM
    Hey All, really quick question. How can I set a CronClock schedule with a timezone? I wasn't clear on the docs on this
    👍 1
    k
    • 2
    • 2
  • r

    Ross Timm

    07/27/2021, 8:11 PM
    Hello Everyone, does anybody know how to run a flow every 15 mins except if it is already running?
    k
    • 2
    • 21
  • m

    Michael S

    07/27/2021, 8:35 PM
    Hello, how can I set my flow to run on the 2nd Tuesday of each month at 6am ? The chron job "0 6 8-14 * TUE" isn't working for me as it's running every day between the 8th and 14th of the month and on Tuesdays. I'm trying to update this in the UI under Settings->Schedules
    m
    n
    • 3
    • 6
  • b

    Brad I

    07/27/2021, 8:37 PM
    Is there a way to run a dependent flow when running locally for testing without needing to run a prefect server? Using the built-in task (https://docs.prefect.io/core/idioms/flow-to-flow.html) calls to the API which we normally don’t run to keep development lightweight. We add a main to our flows that looks something like:
    def main():
      state = flow.run(executor=LocalExecutor())
      parameters = {...}
    k
    • 2
    • 2
  • a

    Andre Muraro

    07/27/2021, 8:39 PM
    Is it possible to force a flow to finish in a skipped state? I've tried using the terminal_state_handler:
    def terminal_state_handler(flow, state, reference_task_states):
    for task_state in reference_task_states:
    if task_state.is_skipped():
    return Skipped(state.message, state.result, state.context, state.cached_inputs)
    return state
    which kind of works, however the UI is still counting the duration as if it was still running...
    k
    j
    • 3
    • 8
  • a

    An Hoang

    07/27/2021, 10:03 PM
    Hi there I would love to have a debugging tutorial for prefect flows, or if it already exists, please point me to it 🙂 . I struggled through debugging a fairly simple flow and could see beginners being confused by the cryptic errors thus giving up trying out the awesome Prefect tool. Some of the things that could be added to https://docs.prefect.io/core/advanced_tutorials/local-debugging.html 1. If error contains something about `Not found UUID`: your local flow signature is different from back-end registered flow signature
    21:15:07 | ERROR   | Failed to retrieve task state with error: ClientError([{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}])
    Traceback (most recent call last):
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
        task_run_info = self.client.get_task_run_info(
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 1721, in get_task_run_info
        result = self.graphql(mutation)  # type: Any
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 564, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}]
    1. `Error: Task function was not provided required {parameter}`: Some upstream task was skipped and thus result were not passed down. 2. Setting up non-data dependency: the difference between a.
    task_A.map(param1=param1); task_A.set_upstream(task_B)
    b.
    task_A.map(param1=param1, upstream_task = [task_B])
    c.
    task_A_result =task_A.map(param1=param1); task_A_result.set_upstream(task_B)
    (First one is wrong, it will create another different instance of
    task_B
    and
    task_A
    unmapped, second one expects
    task_B
    to return iterable and will error if it doesn't, third one is correct if
    task_B
    doesn't return iterable. 4. In general, one of the best ways in debugging tasks' dependencies (when you have unit-tested each task independently) in a flow is to run it locally using
    flow_result = flow.run()
    and then visualize with
    flow.visualize(flow_state= flow_result)
    k
    • 2
    • 4
  • j

    Jimmy Le

    07/27/2021, 10:44 PM
    New slack webhook with the error message 🔥
    🔥 3
  • n

    Nick Coy

    07/28/2021, 1:25 AM
    Hello everyone, I have been running into an issue where a task downstream of a mapped task starts once the mapped task enters a mapped state but does not actually run. here is the flow
    with Flow('mapped_flow') as flow:
        aws_list = aws_list_files(AWS_CREDENTIALS)
        file = aws_download.map(
            key = aws_list, 
            credentials =  unmapped(AWS_CREDENTIALS),
            bucket = unmapped('some_bucket'),
            upstream_tasks=[unmapped(aws_list)])
        format_df = format_data.map(file, aws_list, upstream_tasks=[unmapped(file)])
        move_to_cloud = move_to_cloud_storage.map(format_df, aws_list, unmapped(GCP_CREDENTIALS), upstream_tasks=[unmapped(format_df)])
        load_job = load_files_gcs_bq(GCP_CREDENTIALS, upstream_task=[move_to_cloud])
    the problem is that the load_files_gcs_bq starts once all the mapped tasks enter a mapped state but none have actually run. I have been reading the mapping docs but I feel like I am missing something? any help would be greatly appreciated
    k
    • 2
    • 13
  • m

    matta

    07/28/2021, 3:11 AM
    Hrm,
    BigQueryLoadGoogleCloudStorage
    seems to not be inferring my GCP credentials? Not sure how to pass it, either (I can grab it as a
    PrefectSecret
    fine, and can pass it directly to GCP functions)
    k
    • 2
    • 7
  • z

    Zhilong Li

    07/28/2021, 5:06 AM
    Hi all, I am trying to do a custom deployment on Kubernetes as a Prefect task. I see that prefect comes with a
    CreateNamespacedDeployment
    task for standard k8s deployments, but I want to deploy a Seldon deployment where the crd starts with
    apiVersion: <http://machinelearning.seldon.io/v1|machinelearning.seldon.io/v1>
    kind: SeldonDeployment
    Does anyone have an idea how i can do this deployment as a prefect task? Thanks so much!
    m
    • 2
    • 2
  • j

    jake lee

    07/28/2021, 9:34 AM
    Hi I tried to run the prefect server on my ec2 instance, I modified the security group so that it accepts the inbound rule to accept port 8080, but i could not able to access the prefect ui from my laptop, do you have any idea where I need to work on? fyi the grapql url with port number 4200 is accessible but just not 8080 of UI
    k
    • 2
    • 10
  • m

    montardon

    07/28/2021, 12:15 PM
    Hi, I'd like to run prefect server on premise. It looks like it depends on docker, with the use of docker-compose. Unfortunately, I can not have docker running on site. Is docker the only way to have prefect server running ?
    j
    d
    k
    • 4
    • 4
  • s

    Samuel Hinton

    07/28/2021, 1:33 PM
    Hey all, sometimes prefect seems to just go crazy and things stop working. Is there a way to tell prefect to “refresh, cancel all tasks that are running or pending”. I’ve tried restarting the services but it doesnt seem to change things. Screenshot of my run history below. Interesting that 52% of 561 flow runs failed, and that 52% of 561 = 12 failed flows. Essentially things seem fubar and I dont know how to make it go back to normal! Some of the runs have been going for hours too, despite every flow having a timeout of five minutes, but the timeouts not being respected has been an issue Ive known about in prefect since we first implemented it.
    k
    s
    • 3
    • 16
  • a

    An Hoang

    07/28/2021, 3:47 PM
    Let me know if I should trim more of the code to make it more of a minimal example. To deal with not executing an expensive task if the outputs are already persisted on disk, I'm trying to see if I should 1) raise the
    SKIP
    signal inside the task or 2) use conditional task
    case
    . I have provided the two scenarios on this gist https://gist.github.com/hoangthienan95/e0d8c3d73cb25f90f0d427c689ea80d8/revisions?diff=unified, along with the flow visualizations In the Gist, initial version is
    SKIP
    signal, and the second version are the modifications to make it use
    case
    . I have tested both of the flows, they work and both satisfy my requirements so far, so is there any caveats I haven't thought of or best practices that I should consider to choose one over the other?
    k
    • 2
    • 7
  • k

    Kurt Rhee

    07/28/2021, 4:32 PM
    Hello, I was wondering if anybody has ever set up prefect to work with an on-prem server? Can you still schedule tasks?
    k
    a
    • 3
    • 6
  • h

    Hugo Kitano

    07/28/2021, 6:15 PM
    I remember reading a Prefect doc about how to set additional reference tasks (other than default final one) but can’t find it now. How would I add a reference task to a flow?
    s
    k
    • 3
    • 8
  • t

    Tim Enders

    07/28/2021, 7:04 PM
    For Prefect 0.15.3 what is the correct extra to install to go with this error?
    ImportError: Using `prefect.tasks.gcp` requires Prefect to be installed with the "gcp" extra.
    gcp
    isn't listed and doesn't seem to fix the problem
  • t

    Tim Enders

    07/28/2021, 7:04 PM
    Do I want
    google
    ?
    k
    • 2
    • 4
  • h

    Harry Baker

    07/28/2021, 8:25 PM
    when running a flow that itself triggers other flows, is there a way to set it up in such a way that when you look at the visual timeline of the flow execution on the dashboard, you see the sub-flows own tasks displayed as their own 'bubble', rather than having one big bubble for the entire flow? I know i can get the url of the sub flows execution in the logs, but wanted a single way to display each task called within my chain of flows
    k
    m
    y
    • 4
    • 13
  • h

    Harry Baker

    07/28/2021, 10:01 PM
    ok not sure what i did (probably caused my editing directory structure) but for whatever reason the cloud dashboard no longer is able to execute my flows. I keep getting this error:
    Traceback (most recent call last):
     
    File "/home/ubuntu/anaconda3/envs/ca_covid_main/lib/python3.8/site-packages/prefect/agent/agent.py", line 384, in _deploy_flow_run
       
    deployment_info = self.deploy_flow(flow_run)
     
    File "/home/ubuntu/anaconda3/envs/ca_covid_main/lib/python3.8/site-packages/prefect/agent/local/agent.py", line 142, in deploy_flow
       
    env = self.populate_env_vars(flow_run, run_config=run_config)
     
    File "/home/ubuntu/anaconda3/envs/ca_covid_main/lib/python3.8/site-packages/prefect/agent/local/agent.py", line 199, in populate_env_vars
       
    else os.getcwd()
    FileNotFoundError: [Errno 2] No such file or directory
    k
    • 2
    • 4
  • h

    Harish

    07/28/2021, 11:41 PM
    Hi prefect community, I'm new to Prefect. Does anyone know why do cancel/change state option in the Prefect UI does not result in state handler function being called? Am I missing something? for example, I could not see the logger info msgs on clicking cancel or change state of flow/tasks after registering this flow.
    from prefect import Flow, task
    from prefect.tasks.shell import ShellTask
    import prefect
    
    def test_on_cancel(flow, old_state, new_state):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(old_state)
        <http://logger.info|logger.info>(new_state)
    
    @task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
    def plus_one(x):
        """A task that adds 1 to a number"""
        return x + 1
    
    @task(on_failure=test_on_cancel, state_handlers=[test_on_cancel])
    def build_command():
        return 'sleep 200'
    
    run_in_bash = ShellTask(name='run a command in bash', on_failure=test_on_cancel, state_handlers=[test_on_cancel])
    
    with Flow('Best Practices') as flow:
        two = plus_one(1)
        cmd = build_command()
        shell_result = run_in_bash(command=cmd)
        shell_result.set_upstream(two)
    k
    • 2
    • 10
  • d

    Danny Vilela

    07/28/2021, 11:43 PM
    Hi! I’m facing an issue where I can’t quite get the Prefect Slack app connected to our Slack instance, but we do have our own hand-rolled Slack notifier that sends messages into a channel. I currently have a
    SendSlackNotificationTask
    task at the end of my (linear and static) flow, but it fails to run if any of its parents failed. This means the notification won’t run, so I won’t know things failed, which is probably bad. My question: is there a way to make a task run regardless of its parent tasks’ states? I know there’s the
    case
    option — is that a better fit here? Can
    case
    work with a task’s state?
    a
    k
    • 3
    • 7
  • d

    Danny Vilela

    07/29/2021, 1:44 AM
    Hi, sorry to bother again: does setting a task’s
    timeout
    work in conjunction with
    max_retries
    ? Is a “timeout” the same as a “failure”?
    k
    • 2
    • 2
Powered by Linen
Title
d

Danny Vilela

07/29/2021, 1:44 AM
Hi, sorry to bother again: does setting a task’s
timeout
work in conjunction with
max_retries
? Is a “timeout” the same as a “failure”?
k

Kevin Kho

07/29/2021, 1:46 AM
No worries! I believe the timeout is a failure and will trigger a retry.
✅ 1
d

Danny Vilela

07/29/2021, 1:46 AM
🙏 Thank you @Kevin Kho!!
View count: 1