https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • n

    Nicolas van de Walle

    07/08/2020, 10:13 AM
    Hi all, I was wondering how to make the parent flow wait for the FlowRunTask child flow output. What I mean here is that I don't want the FlowRunTask to only trigger the second flow but I want to be able to wait for it to end and use its output in the parent flow. Thanks in advance!
    j
    c
    4 replies · 3 participants
  • s

    SImon Hawe

    07/08/2020, 11:14 AM
    Hey guys. I have a question regarding prefect 0.12.2 and the slack_notifier. We are running on prefect core without prefect cloud. By default, the slack notifier tries to connect to the prerect cloud server to get some api token in slack_message_formatter in notifications.py. looks like if there is a prefect.context.flow_run_id and prefect.config.backend is set to cloud, try to connect to cloud. I have tried to both set flow_run_id to None and also backend to "cloud" but that only had an effect when the flow itself tried to connect to slack. It did not had an effect on individual tasks. Is there any propper way of configuring this, other than a local prefect/backend.toml?
    j
    8 replies · 2 participants
  • d

    Darragh

    07/08/2020, 12:13 PM
    Hey folks! We’ve been trying to wrap our heads around getting a Fargate Flow to call/start another docker container that will also run on fargate, but if we use the normal Prefect DockerTasks we hit the always-hilarious-but-not-really docker in docker problem. It’s just occurred to me we may have been doing this completely ass ways. Am I right in thinking that if I want the following: • Container A contains Prefect Flow • Container B contains some business logic code that knows nothing about the Prefect Flow • Fargate Task that allows mounting of a volume between A and B Then I should be able to create both of these in the containerDefinitions of the FargateTaskEnvironment? It would mean we’re properly able to get the side-by-side/peer container approach working, rather than the nested/d-in-d approach that the DockerTasks would give us… And, far more importantly, are there examples of how to do the above??? 😄
    j
    6 replies · 2 participants
  • k

    Kevin Weiler

    07/08/2020, 3:02 PM
    hi there - I’m using prefect server and a Docker agent and encountering an issue I haven’t seen before - I submit my job and then go to the UI and click run. It seems to pull the image down from storage ok, but then it just hangs and all of my jobs are in a submitted state - not sure where to debug from here
    j
    16 replies · 2 participants
  • k

    Kevin Weiler

    07/08/2020, 3:25 PM
    hi again - any idea why I might be getting this error message on flow run?
    Unexpected error: AttributeError("'FunctionTask' object has no attribute 'result_handler'")
    Traceback (most recent call last):
      File "/opt/miniconda3/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/opt/miniconda3/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 447, in get_flow_run_state
        executor=executor,
      File "/opt/miniconda3/lib/python3.7/site-packages/prefect/engine/executors/local.py", line 24, in submit
        return fn(*args, **kwargs)
      File "/opt/miniconda3/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 569, in run_task
        default_handler = task.result_handler or self.flow.result_handler
    AttributeError: 'FunctionTask' object has no attribute 'result_handler'
    j
    c
    +1
    7 replies · 4 participants
  • i

    itay livni

    07/08/2020, 3:45 PM
    https://prefect-community.slack.com/archives/CL09KU1K7/p1594222953368900?thread_ts=1581979700.373800&cid=CL09KU1K7
  • a

    Ankit

    07/09/2020, 7:52 AM
    Hi all, I want to run prefect on a docker container and then spin up
    prefect ui+server
    as well. My question is
    prefect server
    requires
    docker and docker-compose
    as mentioned in the docs. So I can't run it inside the docker container for prefect, right? How would I go about doing this then? Any help would be appreciated. TIA
    n
    g
    20 replies · 3 participants
  • f

    Florian L

    07/09/2020, 9:09 AM
    Hello ! Firstable congrats for your awesome project, i've been using it profesionally for an ETL since a couple of months and it works perfectly. But i''m starting to have storage problems with results, which start to pile up. Is there a method or even a best practice defined by prefect for that ? Right now i'm manually cleaning results that are getting old, and could do it via a crons so it's techincally not a huge problem. But if prefect has a solution for that i would rather use it.
    👀 1
    l
    1 reply · 2 participants
  • s

    Sandeep Aggarwal

    07/09/2020, 9:22 AM
    Hi all, Is there a way to pass-on current context to next flow run triggered using
    FlowRunTask
    task? Basically, I am sending some context parameter while creating a flow run using python client's
    create_flow_run
    method. The flow upon completion triggers other downstream flows. Now, I want the context that I originally sent, available to downstream flows but I haven't had any success so far.
    👀 1
    l
    m
    +1
    5 replies · 4 participants
  • b

    bruno.corucho

    07/09/2020, 10:35 AM
    Hello again, we've been able to successfully run our Dask workers in our kubernetes EKS cluster (by installing a kubernetes agent), and although our prefect cloud logs status displays that the flow has been executed successfully, we still got a log entry at the very end stating:
    response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services is forbidden: User \"system:serviceaccount:default:default\" cannot list resource \"services\" in API group \"\" in the namespace \"default\"","reason":"Forbidden","details":{"kind":"services"},"code":403}
    PS: We can see "sucessful" response bodies, but should we ignore this last entry / is it intended? Thanks in advance.
    👀 1
    l
    11 replies · 2 participants
  • d

    Darragh

    07/09/2020, 12:27 PM
    Afternoon all, having a problem getting a Flow to run on
    FargateTaskEnvironment
    . I have a self hosted Prefect 0.12.2 instance running on AWS, and am trying to run a flow against Fargate, but getting the following error that seems to only relate to Cloud hosted versions. Any ideas? From fargate_task.py:287:
    container_overrides = [
        {
            "name": "flow-container",
            "environment": [
                {
                    "name": "PREFECT__CLOUD__AUTH_TOKEN",
                    "value": config.cloud.agent.auth_token
                    or config.cloud.auth_token,
                },
                {"name": "PREFECT__CONTEXT__FLOW_RUN_ID", "value": flow_run_id},
                {"name": "PREFECT__CONTEXT__IMAGE", "value": get_flow_image(flow)},
            ],
        }
    ]
    i
    j
    43 replies · 3 participants
  • a

    Amit Singh

    07/09/2020, 2:26 PM
    <!here> Is there a way to remove/stop a schedule attached to a flow at runtime. I want to ensure the flow stops only before the next scheduled iteration. Just killing the script might stop the current execution abruptly.
    with Flow('Dummy Cron Flow', schedule=CronSchedule("*/1 * * * *")) as cron_flow:
    👀 1
    d
    14 replies · 2 participants
  • m

    Michael Ludwig

    07/09/2020, 2:32 PM
    We are having issues getting up and running with
    FargateTaskEnvironment
    . I think we did all according to the docs but when passing things through to AWS when it tries to run the flow we get the following error (on prefect cloud):
    An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Invalid setting for container 'flow'. At least one of 'memory' or 'memoryReservation' must be specified.
    This is our environment:
    flow.environment = FargateTaskEnvironment(
                executor=DaskExecutor(),
                labels=[f"reco-{self._config.env}"],
                launch_type="FARGATE",
                region="eu-west-1",
                taskRoleArn="arn:aws:iam::XXXXXXXX:role/prefect-task-role-manual",
                executionRoleArn="arn:aws:iam::XXXXXX:role/prefect-execution-role-manual",
                family="tst_flow",
                cluster="reco-dev-tasks-ecscluster-ECSCluster-13542I9PRAV48",
                networkConfiguration={
                    "awsvpcConfiguration": {
                        "assignPublicIp": "ENABLED",
                        "subnets": [
                            "subnet-05fd20a54e43646xx",
                            "subnet-04e8a87c3f718dcxx",
                            "subnet-0545ba61d9f1b5exx",
                        ],
                        "securityGroups": ["sg-005d5df0d9d6594xx"],
                    }
                },
                cpu="512",
                memory="3072",
                containerDefinitions={
                        "name": "flow-container",
                        "image": "image",
                        "command": [],
                        "environment": [],
                        "essential": True,
                        "memoryReservation": "3072",
                        "logConfiguration": {
                            "logDriver": "awslogs",
                            "options": {
                                "awslogs-group": "/ecs/prefect-runs-manual",
                                "awslogs-region": "eu-west-1",
                                "awslogs-stream-prefix": "prefect-flow-runs",
                                "awslogs-create-group": "true",
                            },
                        },
                    }
            )
    We also use Docker-based storage. When putting this config to the
    FargateAgent
    it can correctly run flows e.g. with a
    LocalEnvironment
    but then the whole config is in the agent and the same for all flows. The issue is somehow how the parameters are passed forward to AWS. Our code is very close to the example in the docs: https://docs.prefect.io/orchestration/execution/fargate_task_environment.html#examples But we tried it with
    memory
    and
    memoryReservation
    in container definitions but same error in both cases. Also tried to pass
    containerDefinitions
    as array. Any help would be greatly appreciated. Thanks 🙂
    👀 1
    d
    j
    21 replies · 3 participants
  • d

    Dnyaneshwar

    07/09/2020, 2:52 PM
    Hello, I have a flow which looks like below:
    @task
    def get_list_1():
    	
    	l = list()
    	l.append((t1, t2))
    	l.append((t3, t4))
    	
    	return l
    
    @task
    def function_1(arg1, arg2):
    	
    	# do something
    	
    	return arg2, False/True
    
    # Other two tasks also have the similar structure.
    	
    with Flow('test_flow') as flow:
    	l_1 = get_list_1()
    	res_1 = function_1.map(l_1)
    	
    	l_2 = get_list_2()
    	get_list_2.set_upstream(function_1)
    	
    	res_2 = function_2.map(l_2)
    I want to run this flow with DaskExecutor where the dask cluster has been created with YarnCluster API from dask. The number of workers which will be assigned to scheduler will be limited However, the lists l_1 and l_2 might run into hundreds of tuples. To avoid any worker being killed because of memory or other issues, I want to pass the slices of l_1 (or l_2) to the functions instead of full list. How many slices will depend on the the length of output lists. Can anyone help on how to handle this? or may be there is better solution that passing the slices to the map. Thanks.
    👀 1
    d
    21 replies · 2 participants
  • n

    Nicolas van de Walle

    07/09/2020, 2:55 PM
    Hi, I am trying to run flows using a Kubernetes agent and I just set it up as detailed here: https://docs.prefect.io/orchestration/agents/kubernetes.html#installation
    prefect agent install kubernetes -t <MY_TOKEN> -n prefect --label prefect-namespace --rbac | kubectl apply -n prefect -f -
    I used Prefect cloud for that. The flow has well been scheduled but nothing gets executed by the agent (it appears in the agents tab in the cloud UI with its prefect-namespace label but it does not see the flows that need to be run). To be honest, I am quite new to kubernetes and do not really understand how to use RBAC. Do I need to do anything else?
    👀 1
    l
    13 replies · 2 participants
  • k

    Kevin Weiler

    07/09/2020, 3:40 PM
    hi there - is there a way to set
    log_stdout
    on a more global level, say, the flow level - or even in the
    config.toml
    ?
    👀 1
    d
    2 replies · 2 participants
  • m

    Mark McDonald

    07/09/2020, 8:35 PM
    hi - in Cloud, I've notice that when marking a flow state (both with and without its children option marked) as "failed" or "cancelled", the flow's task that is currently running continues to run. This is apparent in Fargate/ECS and in the live cloud logs. Only after these prefect tasks complete will the flow stop executing. For the most part, this isn't a problem but we have some tasks that can run for hours. If I mark the flow as failed during the execution of one of these tasks, it's not great because they'll continue using resources. My question is, what's the appropriate way to a kill a flow run and immediately stop the running tasks? Do I have to do this from ECS? Ideally, marking a flow as "failed" would stop the running task in ECS.
    👀 1
    l
    2 replies · 2 participants
  • b

    Bernard Greyling

    07/10/2020, 8:36 AM
    Top of the mornin to ya
  • b

    Bernard Greyling

    07/10/2020, 8:47 AM
    We are busy trying out different deployment strategies at the moment. Initially went the Dask executor route, using prefect to create workflows and K8s/DaskGateway to provide the cluster interface. Worked well for some tasks, however ran into cluster memory issues with more complex flows. Need to sort that approach out still, but we can't afford to waste more time going that route. To resolve the dask worker memory issues, we are considering running a prefect k8s agent ( using either s3 or docker storage). That way runs are self-contained and easier to manage memory for. I have two questions: 1. In the documentation/github the issue of limited agent resources
    cpu: 100m & memory: 128Mi
    is mentioned but not explained. What is the reasoning behind this limit? 2. We've successfully setup/authenticated a k8s runner and scheduled both s3 and docker runs. After a custom image pull error on k8s the k8s-prefect agent seems to be in a feedback loop announcing that it can see flows :
    Found 2 flow run(s) to submit for execution.
    But not executing them. Note - I did manually terminate the k8s job via kubectl. Not sure if this messed up the prefect-cloud state EDIT - Before this feedback loop state, we managed to run both s3 and docker runs with the vanilla examples
    j
    n
    10 replies · 3 participants
  • r

    Rafal

    07/10/2020, 11:21 AM
    Traceback (most recent call last):
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
        (self._dns_host, self.port), self.timeout, **extra_kw
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
        raise err
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
        sock.connect(sa)
    ConnectionRefusedError: [Errno 111] Connection refused
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in urlopen
        chunked=chunked,
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connectionpool.py", line 392, in _make_request
        conn.request(method, url, **httplib_request_kw)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 1252, in request
        self._send_request(method, url, body, headers, encode_chunked)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 1298, in _send_request
        self.endheaders(body, encode_chunked=encode_chunked)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 1247, in endheaders
        self._send_output(message_body, encode_chunked=encode_chunked)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 1026, in _send_output
        self.send(msg)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/http/client.py", line 966, in send
        self.connect()
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connection.py", line 187, in connect
        conn = self._new_conn()
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connection.py", line 172, in _new_conn
        self, "Failed to establish a new connection: %s" % e
    urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f5c6b7cf6d0>: Failed to establish a new connection: [Errno 111] Connection refused
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
        timeout=timeout
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/connectionpool.py", line 725, in urlopen
        method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/urllib3/util/retry.py", line 439, in increment
        raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f5c6b7cf6d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/home/users/chojnar1/anaconda3/envs/prefect/bin/prefect", line 10, in <module>
        sys.exit(cli())
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/cli/auth.py", line 77, in login
        query={"query": {"user": {"default_membership": "tenant_id"}}}
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 218, in graphql
        token=token,
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 178, in post
        token=token,
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 329, in _request
        session=session, method=method, url=url, params=params, headers=headers
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 251, in _send_request
        response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/sessions.py", line 578, in post
        return self.request('POST', url, data=data, json=json, **kwargs)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/sessions.py", line 530, in request
        resp = self.send(prep, **send_kwargs)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/sessions.py", line 643, in send
        r = adapter.send(request, **kwargs)
      File "/home/users/chojnar1/anaconda3/envs/prefect/lib/python3.7/site-packages/requests/adapters.py", line 516, in send
        raise ConnectionError(e, request=request)
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f5c6b7cf6d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    j
    11 replies · 2 participants
  • r

    Rafal

    07/10/2020, 11:21 AM
    Hello, during trying to registter to prefect cloud I have following error:
  • h

    Hannah Amundson

    07/10/2020, 12:40 PM
    Good morning! I have a flow that uses a self configured version of spark to write to s3. When I run this code in a kubernetes pod on my own without prefect code, it works fine. However, when I run this code using KubernetesJobEnvironment and prefect tasks/flow, it constantly says the heartbeat cannot be detected once it gets to the task that writes from one folder in s3 to another. Does anyone have any ideas on how to troubleshoot this? The problem appears to be with prefect or some interaction with prefect + spark/s3.
    n
    6 replies · 2 participants
  • a

    Adam Roderick

    07/10/2020, 12:49 PM
    Any idea why my environment variables are missing during a pipeline execution? I'm using Docker storage, and locally I can spin up a container and confirm that the env vars are indeed there
    n
    j
    12 replies · 3 participants
  • d

    Dnyaneshwar

    07/10/2020, 1:38 PM
    Hello, I am running
    DaskExecutor
    with
    YarnCluster
    . This gives me
    KilledWorker
    error. I am not able to log more data as
    debug=True
    option also doesn't add more information. However, when I try the same tasks on
    DaskExecutor
    with
    address=None
    , I do not get any error. What am I missing?
    n
    j
    11 replies · 3 participants
  • b

    bruno.corucho

    07/10/2020, 1:51 PM
    Don't mind, adding another question... 😣 Topic: Cloud Secrets Context: My flow is dependent on some Secrets I added to the cloud. I have a custom dockerfile that tries to set things up, with the following code:
    COPY configuration /opt/strdata-prefect/configuration
    ENV PREFECT__USER_CONFIG_PATH /opt/strdata-prefect/configuration/config.toml
    Config.toml:
    backend = "cloud"
    
    [cloud]
    use_local_secrets = false
    
        [cloud.agent]
        name = "strdata-agent"
    
        # Setting it to `DEBUG` for verbose logging
        level = "DEBUG"
    
    [logging]
    # The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL
    level = "DEBUG"
    
    # Send logs to Prefect Cloud
    log_to_cloud = true
    
    # Extra loggers for Prefect log configuration
    extra_loggers = "[]"
    Error Logs:
    ValueError: Local Secret "REDSHIFT_PASSWORD" was not found.
    Thanks again, Prefect Team!
    n
    l
    40 replies · 3 participants
  • m

    Marwan Sarieddine

    07/10/2020, 3:36 PM
    Question - in a simple local flow run, no parallelism involved, is there a way to make sure the flow run will always execute in some order ? (this helps for an ML reproducibility use-case I am thinking of … )
    n
    8 replies · 2 participants
  • p

    Pedro Machado

    07/10/2020, 5:48 PM
    Hi everyone. Is my understanding correct that when dealing with multiple python files (ex: one file has a flow that imports objects from another python file) the choices are: 1. use docker storage passing the
    files
    dictionary 2. use a custom docker image with everything needed to run the flow 3. packaging the additional python files and pip installing them on the machine(s) that will execute the flow Am I missing other approaches? I don't think it's possible to package the flow + extra files when using the other types of storage. Correct? Are there plans to support this?
    n
    g
    7 replies · 3 participants
  • c

    Chris White

    07/10/2020, 8:02 PM
    Hey everyone! @Laura Lorenz (she/her) and @Tyler Wanner are live, talking about Prefect Server deployments (a common discussion point here); come stop by!

    https://www.youtube.com/watch?v=yjORjWHyKhg&amp;feature=youtu.be▾

    :marvin: 1
    🎉 1
    😍 2
  • a

    Adam Roderick

    07/10/2020, 9:33 PM
    Sorry I missed the live stream about deployments today! I'll listen to it. I'm hitting a new error during the health check.
    ModuleNotFoundError: No module named 'prefect.core.parameter'
    does that look familiar to anyone?
    c
    4 replies · 2 participants
  • j

    Jackson Maxfield Brown

    07/10/2020, 9:36 PM
    Hey I have been having some trouble with Results in Prefect 0.12.3 and would love input as to where I may be going wrong. For example I can't even get this working:
    from distributed import LocalCluster
    from prefect import Flow, task
    from prefect.engine.executors import DaskExecutor
    from prefect.engine.results import LocalResult
    
    @task(result=LocalResult(dir="single-results/"), target="hello.bytes")
    def single_task():
        return list(range(100))
    
    
    def pipeline(debug: bool = False):
        with Flow("example-local-results") as flow:
            items = single_task()
    
        cluster = LocalCluster()
        state = flow.run(executor=DaskExecutor(cluster.scheduler_address))
    I have also tried adding the
    checkpoint=True
    but same deal. Nothing shows up.
    c
    8 replies · 2 participants
Powered by Linen
Title
j

Jackson Maxfield Brown

07/10/2020, 9:36 PM
Hey I have been having some trouble with Results in Prefect 0.12.3 and would love input as to where I may be going wrong. For example I can't even get this working:
from distributed import LocalCluster
from prefect import Flow, task
from prefect.engine.executors import DaskExecutor
from prefect.engine.results import LocalResult

@task(result=LocalResult(dir="single-results/"), target="hello.bytes")
def single_task():
    return list(range(100))


def pipeline(debug: bool = False):
    with Flow("example-local-results") as flow:
        items = single_task()

    cluster = LocalCluster()
    state = flow.run(executor=DaskExecutor(cluster.scheduler_address))
I have also tried adding the
checkpoint=True
but same deal. Nothing shows up.
c

Chris White

07/10/2020, 9:40 PM
Hey Jackson - if you’re running this locally you additionally need to set the environment variable
PREFECT__FLOWS__CHECKPOINTING=true
to “turn on” the feature (it is enabled by default against a backend but toggleable for local runs for testing purposes)
:upvote: 1
j

Jackson Maxfield Brown

07/10/2020, 9:43 PM
Ahhhhh! I don't see that anywhere on the docs for this: https://docs.prefect.io/core/concepts/results.html#persisting-user-created-results I also think it may be a bit out of date because I think
checkpointing=True
as a parameter turned into
checkpoint=True
I will make a PR to update those I get all this working
:upvote: 1
Now that it's working related question: One of the reasons I was interested in this was to potentially push a dataset to s3 with a nice manifest (single CSV / parquet or similar) as a result of a
map
operation. I now see that stored results of
map
are all individual. Is the go-to recommendation in this situation to basically have a
map
->
gather_and_form_dataset
? Or is there some secret kwarg to let me store all results of a
map
together as one?
c

Chris White

07/10/2020, 10:01 PM
That’s a good question; I hadn’t considered that before but my first hunch is that this would work:
@task(checkpoint=False)
def mapped_task(x):
    ...

@task(checkpoint=True, result=MyResult)
def gather_and_form_dataset(results):
    ...
j

Jackson Maxfield Brown

07/10/2020, 10:02 PM
Yep! I expect I will be making a custom result handler as well to do this all nicely but hoping it will be real neat
Cool thanks for all the help
c

Chris White

07/10/2020, 10:07 PM
for sure!
View count: 1