https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Dylan

    06/11/2020, 5:41 PM
    Hey Everyone! I’d like to ask for your help to make Prefect Cloud and Prefect Server even better. Would you take 4 minutes to fill out this feedback survey? https://forms.gle/GArq27SyxBd8v4MCA Thank you!
    ❤️ 1
    :marvin: 2
  • n

    Nazeer Hussain

    06/11/2020, 6:04 PM
    Hello All, I am trying to run Prefect on Docker, and I am getting the following error : docker: Error response from daemon: OCI runtime create failed: container_linux.go:349: starting container process caused "exec: \"-p\": executable file not found in $PATH": unknown. , Can someone help please
    👀 1
    d
    • 2
    • 92
  • n

    Nazeer Hussain

    06/11/2020, 6:06 PM
    Following is the command : docker run -dit prefecthq/prefect -p 8080:8080 and we are using https://hub.docker.com/r/prefecthq/prefect
  • c

    Chris Vrooman

    06/11/2020, 7:03 PM
    Has anyone had success with running prefect docker agents within another docker image? (docker within docker). I’ve been able to do this just fine using an older docker daemon (v 1.13 on Centos) launching the container with the --privileged flag. But, I can’t seem to get this to work on a newer daemon version (1.40). [2020-06-10 23:02:59,073] INFO - agent | Successfully pulled image <ecr_image_name>... [2020-06-10 23:02:59,180] ERROR - agent | Logging platform error for flow run ee3bb337-ad78-466a-a774-6e67f467aedc [2020-06-10 23:02:59,500] ERROR - agent | Error while deploying flow: APIError(HTTPError(’400 Client Error: Bad Request for url: http+docker://localhost/v1.40/containers/create'))
    👀 1
    l
    • 2
    • 11
  • j

    jorwoods

    06/11/2020, 9:49 PM
    Thank you very much to Laura for your help earlier! One more question regarding checkpoints and caching. Adding the
    target
    kwarg to the task caches the result as expected. I will try to contribute back to the documentation soon now that I have a better understanding of that vs a Result's
    location
    . Here is a modified version of the toy example I used before, this time I am trying to get it to forcibly ignore the cache and recompute. It is not considering the tasks' cache as still valid, despite a short
    cache_for
    and a validator of
    never_use
    . I know that
    cache_for
    and
    cache_validator
    in
    Task
    are deprecated. I tried specifying the validator on the
    LocalResult
    as you will see in the example.
    from prefect import Flow, task, unmapped, Parameter
    from prefect.engine.results import LocalResult
    from prefect.engine.executors import DaskExecutor
    from prefect.engine.cache_validators import all_parameters, never_use
    import datetime
    
    delta=datetime.timedelta(seconds=5)
    
    lr = LocalResult(validators=[never_use])
    
    @task(log_stdout=True, checkpoint=True, 
          target='{flow_name}-{task_name}-{parameters[x]}-{parameters[y]}.pkl',
          cache_validator=never_use, cache_for=delta)
    def add(x, y):
        print(f'add ran with {x} {y}')
        try:
            return sum(x) + y
        except TypeError:
            return x + y
    
    with Flow('iterated map', result=lr) as flow:
        y = unmapped(Parameter('y', default=8))
        x = Parameter('x', default=[1,2,3])
        mapped_result = add.map(x, y=y)
        out = add(mapped_result, y)
    
    if __name__ == "__main__":
        flow.run(executor=DaskExecutor(),)
    👀 1
    l
    • 2
    • 3
  • h

    Howard Cornwell

    06/12/2020, 8:26 AM
    Is there a way to get failed child tasks to re-run without re-running each child task individually? For example:
    import prefect
    from prefect import Flow, task
    from prefect.engine.results import PrefectResult
    
    from random import random
    
    
    @task
    def make_data():
        return list(range(5))
    
    
    @task
    def randomly_raise(x):
        x = random()
        if x <= 0.5:
            raise Exception()
    
    
    with Flow("test retries", result=PrefectResult()) as flow:
        data = make_data()
        randomly_raise.map(data)
    When the above runs, it creates 5 mapped tasks, each of which randomly fail with 50% chance. When there are failed runs, if I click re-run on the failed tasks, I can get them to individually re-run & succeed. But, re-running on the entire flow never attempts the mapped tasks again. Here’s what’s logged
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise': task is mapped, but run will proceed so children are generated."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise': Handling state change from Mapped to Mapped"
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[0]': Starting task run..."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[0]': task is already finished."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[0]': finished task run for task with final state: 'Success'"
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[1]': Starting task run..."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[1]': task is already finished."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[1]': finished task run for task with final state: 'Success'"
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[2]': Starting task run..."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[2]': task is already finished."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[2]': finished task run for task with final state: 'Failed'"
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[3]': Starting task run..."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[3]': task is already finished."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[3]': finished task run for task with final state: 'Failed'"
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[4]': Starting task run..."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[4]': task is already finished."
    June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[4]': finished task run for task with final state: 'Failed'"
    Is there a way to re-run a whole group of mapped tasks?
    j
    • 2
    • 8
  • n

    Nazeer Hussain

    06/12/2020, 9:09 AM
    how can i change the graphql IP address please
  • n

    Nazeer Hussain

    06/12/2020, 9:10 AM
    🤨 1
  • n

    Nazeer Hussain

    06/12/2020, 9:11 AM
    i created the config.toml file at : /home/ec2-user/.prefect/config.toml
  • n

    Nazeer Hussain

    06/12/2020, 9:11 AM
    but its not taking the values
  • n

    Nazeer Hussain

    06/12/2020, 9:11 AM
    am i missing something
    j
    n
    s
    • 4
    • 53
  • n

    Nazeer Hussain

    06/12/2020, 2:51 PM
    Still i am not able to make the API Status green, its not changing from localhost to IP address
  • n

    Nazeer Hussain

    06/12/2020, 2:51 PM
    let me try from a domain name
  • b

    Ben Davison

    06/12/2020, 4:19 PM
    Looking to see how anyone else implemented this: We have prefect up and running in our kubernetes cluster just fine in a VPC which can be accessed internally or externally (if we allow the service to) -- the problem I have, is that our CI pipeline is a hosted service. My question is, how can I register the flow without exposing the apollo endpoint to everyone who uses the CI service (I can whitelist the IP range of the CI provider, but then anyone who uses that provider could in theory access apollo) I can set auth on the ingress controller, but then how do I get
    python my_flow.py
    with the register method to respect that?
    👀 1
    j
    n
    • 3
    • 4
  • a

    Aaron Y

    06/12/2020, 5:57 PM
    has anyone worked with preprocessing large videos in a data pipeline?
    j
    a
    • 3
    • 6
  • d

    Dan DiPasquo

    06/12/2020, 6:39 PM
    Do I understand correctly that new GCSResult will automatically retrieve service account key stored in Prefect Cloud Secret called GCP_CREDENTIALS and use that for authentication? I have created the Secret called GCP_CREDENTIALS and stored in it the contents of SA key that I verified has access to specified GCS bucket, but in flow am getting
    Unexpected error: Forbidden("POST <https://storage.googleapis.com/upload/storage/v1/b>...
    I don't see logging about having retrieved the GCP_CREDENTIALS as I might if I were explicitly using a Secret task, so it's a bit opaque
    j
    j
    c
    • 4
    • 18
  • d

    Dan DiPasquo

    06/12/2020, 9:31 PM
    OK I find it mentioned here, now, related to GCS storage- ... let's see if this is it. Before we were explicitly pulling the credential using a client.Secret()
  • d

    Digo

    06/15/2020, 7:13 AM
    Hi Everyone, How to deploy prefect in AWS ECS?
    j
    s
    +4
    • 7
    • 15
  • t

    Tomas Thornquist

    06/15/2020, 7:23 AM
    We're having major issues with imagepullsecrets occasionally not being added to the k8s jobs causing the jobs to fail. Since this only happens occasionally it's very difficult to debug.
    j
    • 2
    • 3
  • e

    emre

    06/15/2020, 10:13 AM
    Hello everyone, judging from the source code,
    FargateTaskEnvironment
    seems to only support
    Docker
    as the underlying flow storage. Is that the case, and if so, are there any plans to support
    S3
    storage as well?
    j
    • 2
    • 2
  • h

    Howard Cornwell

    06/15/2020, 10:37 AM
    Hey, is there a theoretical limit to the number of child tasks created from a mapped task? The server gets really clogged up if I create >1000 child tasks & cancel the run. Each task fails in the server output with something like the following, which is expected but it takes about 5 minutes for 1000 child tasks to cancel.
    graphql_1    | ERROR:    State update failed for task run ID 191a019d-4e0a-4692-a05a-823659c3f535: provided a running state but associated flow run 635f983d-8082-49c3-9dcd-af694e400ced is not in a running state.
    graphql_1    | 
    graphql_1    | GraphQL request:6:7
    graphql_1    | 5 |       status
    graphql_1    | 6 |       id
    graphql_1    |   |       ^
    graphql_1    | 7 |     }
    graphql_1    | Traceback (most recent call last):
    graphql_1    |   File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 668, in complete_value_catching_error
    graphql_1    |     return_type, field_nodes, info, path, result
    graphql_1    |   File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 733, in complete_value
    graphql_1    |     raise result
    graphql_1    |   File "/prefect-server/src/prefect_server/graphql/states.py", line 73, in set_state
    graphql_1    |     task_run_id=state_input["task_run_id"], state=state,
    graphql_1    |   File "/prefect-server/src/prefect_server/api/states.py", line 91, in set_task_run_state
    graphql_1    |     f"State update failed for task run ID {task_run_id}: provided "
    graphql_1    | ValueError: State update failed for task run ID 191a019d-4e0a-4692-a05a-823659c3f535: provided a running state but associated flow run 635f983d-8082-49c3-9dcd-af694e400ced is not in a running state.
    graphql_1    | 
    graphql_1    | The above exception was the direct cause of the following exception:
    graphql_1    | 
    graphql_1    | Traceback (most recent call last):
    graphql_1    |   File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 668, in complete_value_catching_error
    graphql_1    |     return_type, field_nodes, info, path, result
    graphql_1    |   File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 733, in complete_value
    graphql_1    |     raise result
    graphql_1    |   File "/prefect-server/src/prefect_server/graphql/states.py", line 73, in set_state
    graphql_1    |     task_run_id=state_input["task_run_id"], state=state,
    graphql_1    |   File "/prefect-server/src/prefect_server/api/states.py", line 91, in set_task_run_state
    graphql_1    |     f"State update failed for task run ID {task_run_id}: provided "
    graphql_1    | graphql.error.graphql_error.GraphQLError: State update failed for task run ID 191a019d-4e0a-4692-a05a-823659c3f535: provided a running state but associated flow run 635f983d-8082-49c3-9dcd-af694e400ced is not in a running state.
    graphql_1    | 
    graphql_1    | GraphQL request:6:7
    graphql_1    | 5 |       status
    graphql_1    | 6 |       id
    graphql_1    |   |       ^
    graphql_1    | 7 |     }
    j
    • 2
    • 2
  • d

    Darragh

    06/15/2020, 10:59 AM
    Hey guys, looking for some advice on the best way to do the following. We’re emulating SubFlows until they’re available in Prefect , and we have a bunch of partitions that we map over and use the Prefect client to trigger a run of A.N. Other flow with that data partition as input. Depnding on our input data set size, this results in up to 100 sub flows. Using the standard map functionality we can trigger the flows and then return the flow ids to the parent flow. We then want to check the status of those flows, so we know when the entire data set has been processed. We have a task defined that takes a sub_flow_id, and queries the client for flow state, until it is completed. The problem is that the 2nd map, to check the state of each sub flow, seems to be a synchronous call - If I have 10, 20, whatever sub flows, the map task to check their state is only executing sequentially. So the question is: Is that expected [sequential/blocking run] or am I doing something wrong? Sample code attached: @task(trigger=triggers.all_successful, log_stdout=True) def run_collector(partition, bucket_name, collector_name): logger = prefect.context.get(“logger”) logger.info(“Run CollectorSubFlows”) FlowRun = FlowRunTask() sub_flow_id = FlowRun.run(flow_name=collector_name, parameters={“bucket_name”: bucket_name, “input_location”: partition[“input”], “output_location”: partition[“output”]}) logger.info(“Started Collector Flow ID: %s”, sub_flow_id) return sub_flow_id @task(trigger=triggers.all_successful, log_stdout=True) def get_state(flow_run_id): logger = prefect.context.get(“logger”) logger.info(“Checking state for subFlow: %s”, flow_run_id) from prefect.client import Client import time client = Client() flow_in_progress = True flow_status = “” while flow_in_progress: query = ‘{flow_run(where: {id: {_eq:“’ + flow_run_id + ‘“}}, limit: 1) {id current_state {message state}}}’ flow_run = client.graphql(query).data.flow_run if (flow_run[0].current_state.state == “Success” or flow_run[0].current_state.state == “Failed”): flow_status = flow_run[0].current_state.state flow_in_progress = False else: time.sleep(2.4) return flow_status def main() with Flow(“Partition & Collect”) as flow: partitions = partition_input_data(bucket_name, input_location, output_location, partition_count) sub_flow_ids = run_collector.map(partitions, unmapped(bucket_name), unmapped(collector_flow)) final_states = get_state.map(sub_flow_ids)
    j
    • 2
    • 5
  • i

    itay livni

    06/15/2020, 2:08 PM
    Hi - I am trying to deploy a flow that runs with a
    LocalExcecutor
    using a
    FargateAgent
    . The error I am getting is.
    Error while deploying flow: InvalidParameterException("An error occurred (InvalidParameterException) when calling the RunTask operation: Network Configuration must be provided when networkMode 'awsvpc' is specified.")
    The
    FargateAgent
    NetworkMode
    is configured like this:
    networkMode={
            "awsvpcConfiguration": {
                "assignPublicIp": "ENABLED",
                "subnets": ["subnet-****"],
                "securityGroups": ["sg-****"]
                }
        },
    Any ideas? Thanks.
    j
    • 2
    • 20
  • j

    jeff sadler

    06/15/2020, 3:17 PM
    Hi - I'm a data scientist for the US Geological Survey. I run expensive Deep Learning training tasks in pipelines. I've been using Snakemake for a while now, but Prefect has caught my attention as a potentially more cloud-friendly alternative. I have a few questions about Prefect, but maybe my biggest is is about dependency tracking. If an upstream dependency changes in a Prefect Flow, will a downstream cached task run? (I especially need the caching/persisting of task results b/c of how expensive the training steps are in my pipelines) For example, Snakemake reruns downstream tasks if the timestamp on an upstream file is more recent than the downstream file (Snakemake tracks everything as files on disk).
    i
    c
    • 3
    • 8
  • r

    Rafal

    06/15/2020, 4:07 PM
    raise AuthorizationError("No agent API token provided.")
    prefect.utilities.exceptions.AuthorizationError: No agent API token provided.
    - I've got this error even though server is up and running. Any ideas?
    c
    d
    m
    • 4
    • 6
  • b

    Ben Davison

    06/15/2020, 4:34 PM
    Is there anyway when you register a flow to not create a new version if nothing has changed?
    r
    k
    • 3
    • 5
  • r

    Rafal

    06/15/2020, 4:46 PM
    Unexpected error: ImageNotFound(HTTPError('404 Client Error: Not Found for url: <http+docker://localhost/v1.40/containers/create')>)
    - this error occurs when I am trying to run my local docker image. I was trying to change docker_server_url but nothing worked. Error
    Unexpected error: DockerException('Invalid bind address format: /var/run/docker.sock')
    . Any ideas?
    k
    • 2
    • 7
  • j

    Jackson Maxfield Brown

    06/15/2020, 6:07 PM
    Hey everyone looking for ideas on how to solve a Prefect / Dask interaction I am having. In a general form I have a flow that does the following:
    ids = [some_iterable_of_data_ids_to_retrieve]
    with Flow() as flow:
        big_mixed_result = get_data.map(ids)
        filtered_result = filter_data(big_mixed_result)
        processed = process_data.map(filtered_result)
    I run the flow using a
    distributed.LocalCluster
    /
    DaskExecutor
    and when it hits the
    process_data
    task, I get the: "UserWarning: Large object ... detected in task graph... consider using Client.scatter" Prefect / Dask tries to continue on but fails and restarts the Flow after workers hit memory limit. I guess what is confusing to me is that I am doing a
    map
    operation on the task so I wouldn't expect any large object being transferred between workers. I would have assumed that the
    map
    call only sends each small iteration but I guess that's not the case? One idea I was considering was instead of using a
    List[Dict]
    instead using a
    dask.bag
    to split the memory across workers maybe? I don't really know, any and all ideas welcome.
    k
    j
    • 3
    • 4
  • a

    Adam Kelleher

    06/15/2020, 9:13 PM
    Hello Prefect community. I'm a data engineer evaluating prefect coming from Mara data integration and I'm curious, is there a Prefect idiomatic way to represent a collection of flows as a single flow? For instance, I have a series of tasks related to replicating some data, these tasks cover all the ingress/egress and should be run as a flow every hour. However, I would like to bundle the replication task flow with an auditing task flow and run it daily, without redefining the replication task flow.
    k
    • 2
    • 11
  • d

    Dan Kerrigan

    06/15/2020, 9:43 PM
    Is there a way to get the result returned by a flow triggered using client.create_flow_run?
    k
    • 2
    • 4
Powered by Linen
Title
d

Dan Kerrigan

06/15/2020, 9:43 PM
Is there a way to get the result returned by a flow triggered using client.create_flow_run?
Using the following code, the state.result is always None:
import time

from prefect import Client, task, Flow

client = Client()

@task
def dummy_task():
    return 42

with Flow("meaning of life") as flow:
    life_meaning = dummy_task()

flow_id = flow.register()

flow_run_id = client.create_flow_run(flow_id)

state = client.get_flow_run_info(flow_run_id).state
while not state.is_finished():
    print(f"{state.message} Sleeping 5.")
    time.sleep(5)
    state = client.get_flow_run_info(flow_run_id).state
print(f"Flow finished. {state.message}")

assert state.is_successful()

print(f"Result is None? {state.result is None}")
Result check: OK
Flow: <http://localhost:8080/flow/015ce980-762a-48cf-a864-1f9c5bafb2a8>
Flow run scheduled. Sleeping 5.
Flow run scheduled. Sleeping 5.
Flow finished. All reference tasks succeeded.
Result is None? True
k

Kyle Moon-Wright

06/15/2020, 10:25 PM
Hello @Dan Kerrigan! Just to clarify, a flow run will not return a result - but a task run will. So to grab any results from the flow run, we’d have to grab the results from the desired
task_run
of your flow. In this case, we can do something like this:
from prefect import Client
c = Client()

# Getting task runs
flow_run = c.get_flow_run_info("YOUR_FLOW_RUN_ID")
task_runs = flow_run.task_runs

# Grabbing the location of a task's result
# (in this case the first task in the list)
location = task_runs[0].state._result.location

# Grabbing the result of a specific task (based on location)
task_runs[0].state._result.read(location)
I hope that makes sense, let me know if we can clarify this to suit your provided flow.
d

Dan Kerrigan

06/16/2020, 12:36 PM
It does, thank you very much!
🎉 1
View count: 1