https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Cab Maddux

    03/31/2020, 8:49 PM
    Hi! Quick question: I'm using the GCSResultHandler with a task and I'd like to write the URI safe value of the result handler to a database. Is it possible during flow runtime to access a ResultHandler to persist the safe value?
    👍 1
    z
    z
    • 3
    • 14
  • i

    itay livni

    04/01/2020, 2:49 AM
    Hi - I am looking for guidance on deploying a simple E,T,L
    Flow
    with a schedule to run once a day at a specific time on the cloud (AWS). The size of the data is sub 1 mb. Should the ETL flow have a scheduler? -- Thanks in advance.
    n
    • 2
    • 3
  • k

    Kyle Foreman (Convoy)

    04/01/2020, 7:33 AM
    hi all, just wanted to share these updates from the covid19 data scraping project - please let me know your email address if you'd like to join our group!
    covid19_parsing_project_update
    ❤️ 3
    🚀 4
  • p

    Pierre CORBEL

    04/01/2020, 11:28 AM
    Hello there, 👋 It seems that not all Prefect video are listed on YouTube (like the
    Server Start UI Flow Run
    one) In fact, there is only one public video in the channel (Prefect Cloud Demo) Would you mind sharing the others one? I'm pretty interested in your videos / tutorials and I think it could greatly benefit the community 😊
    j
    d
    • 3
    • 3
  • j

    John Ramirez

    04/01/2020, 12:04 PM
    Hey - I am getting this error when running workflows:
    Unexpected error: ClientError([{'message': "('Connection aborted.', OSError(0, 'Error'))", 'locations': [{'line': 2, 'column': 9}], 'path': ['secretValue'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'errors': [{'message': "('Connection aborted.', OSError(0, 'Error'))", 'locations': [], 'path': ['secretValue']}]}}}])
    Is this a error on the prefect side?
  • d

    Daniel Sali

    04/01/2020, 12:09 PM
    Hi all, We are looking into Prefect for a data science project. Couple of things are not immediately clear (we have history with Airflow): 1. Is there a concept of a dags/flows folder where Prefect server looks for flow definitions? 2. Is there a possibility on the core UI to authenticate against LDAP?
    j
    t
    +2
    • 5
    • 8
  • a

    Armin

    04/01/2020, 1:30 PM
    Dear Prefect devs, I've noticed that there's internal work in progress on supporting custom cache storage (https://github.com/PrefectHQ/prefect/issues/2104#issuecomment-594811242), are there any early versions available for public to try out?
    l
    • 2
    • 4
  • h

    Henry Cohen

    04/01/2020, 2:56 PM
    Hi all, really pumped to test out the new server/UI, and I was wondering if there is a way currently, or plans in the future to add new pages/plugins like airflow has to the UI, or if I would need to fork the repo and maintain my own version?
    n
    p
    • 3
    • 10
  • s

    Scott Zelenka

    04/01/2020, 4:23 PM
    Can someone help me figure out why the results of a Task wont load in Prefect Cloud? The page just hangs
    n
    • 2
    • 5
  • p

    Preston Marshall

    04/01/2020, 4:39 PM
    Any chance there is a google cloud run or cloud build backend in the works? I know there's not a direct analogue of Fargate on GCP but both are pretty close. If Cloud Run didn't have the 15 minute timeout it would be almost perfect for orchestration tasks (not any sort of processing though). Cloud build could work if not for the concurrency limitations
    s
    j
    • 3
    • 6
  • d

    David N

    04/01/2020, 5:56 PM
    Is there a best practice for getting a flow to run continuously? So it starts over again after the last task is completed..
    j
    • 2
    • 8
  • b

    Brian Bergeron

    04/01/2020, 6:14 PM
    A live Q&A with @Jeremiah (our CEO) and @Chris White (out CTO) is going on right now on Hangouts until 3pm! https://meet.google.com/ige-wygj-dcb
    p
    j
    • 3
    • 2
  • j

    John Ramirez

    04/01/2020, 6:58 PM
    hey - I am getting this error when running a mapping task
    Failed to set task state with error: KeyError('endpoint_resolver')
    Traceback (most recent call last):
      File "/opt/conda/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 113, in call_runner_target_handlers
        cloud_state = prepare_state_for_cloud(new_state)
      File "/opt/conda/lib/python3.7/site-packages/prefect/engine/cloud/utilities.py", line 21, in prepare_state_for_cloud
        res.store_safe_value()
      File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result.py", line 92, in store_safe_value
        value = self.result_handler.write(self.value)
      File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result_handlers/s3_result_handler.py", line 103, in write
        self.client.upload_fileobj(stream, Bucket=self.bucket, Key=uri)
      File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result_handlers/s3_result_handler.py", line 67, in client
        self.initialize_client()
      File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result_handlers/s3_result_handler.py", line 60, in initialize_client
        aws_secret_access_key=aws_secret_access_key,
      File "/opt/conda/lib/python3.7/site-packages/boto3/__init__.py", line 91, in client
        return _get_default_session().client(*args, **kwargs)
      File "/opt/conda/lib/python3.7/site-packages/boto3/session.py", line 263, in client
        aws_session_token=aws_session_token, config=config)
      File "/opt/conda/lib/python3.7/site-packages/botocore/session.py", line 824, in create_client
        endpoint_resolver = self._get_internal_component('endpoint_resolver')
      File "/opt/conda/lib/python3.7/site-packages/botocore/session.py", line 697, in _get_internal_component
        return self._internal_components.get_component(name)
      File "/opt/conda/lib/python3.7/site-packages/botocore/session.py", line 923, in get_component
        del self._deferred[name]
    KeyError: 'endpoint_resolver'
    c
    j
    m
    • 4
    • 7
  • d

    Dylan

    04/01/2020, 6:59 PM
    Thanks to everyone who came out for our Q&A! Let us know if you had a good time and what you’d like to see next time: https://forms.gle/A72FSTW29vGd7K4KA
    👍 1
    • 1
    • 1
  • d

    David Haines

    04/01/2020, 7:23 PM
    Can parameters be used with the imperative api? Only see reference in the docs to their use with the functional api.
    c
    j
    m
    • 4
    • 5
  • s

    Scott Zelenka

    04/01/2020, 7:28 PM
    How do I get a mapped task to execute in parallel when orchestrated in Prefect Cloud? I can use the
    LocalDaskExecutor
    locally via
    flow.run(executor=LocalDaskExecutor())
    , to get it to spawn multiple threads to execute in parallel, but when I register the same flow to Cloud, it seems to execute the mapped tasks in sequence.
    z
    j
    • 3
    • 9
  • g

    George Coyne

    04/01/2020, 7:51 PM
    I am running into an error when trying to a register a Docker Flow, I don’t recognize these errors. Could you guys let me know if there is anything I am missing here?
    j
    • 2
    • 1
  • g

    George Coyne

    04/01/2020, 7:51 PM
    Traceback (most recent call last):
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/prefect/client/client.py", line 591, in register
        prefect.serialization.flow.FlowSchema().load(serialized_flow)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 141, in load
        return super().load(data, **kwargs)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 723, in load
        data, many=many, partial=partial, unknown=unknown, postprocess=True
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 861, in _do_load
        unknown=unknown,
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 669, in _deserialize
        index=index,
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 493, in _call_and_store
        value = getter_func(data)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 662, in <lambda>
        val, field_name, data, **d_kwargs
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/fields.py", line 342, in deserialize
        output = self._deserialize(value, attr, data, **kwargs)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/fields.py", line 597, in _deserialize
        return self._load(value, data, partial=partial)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/fields.py", line 580, in _load
        valid_data = self.schema.load(value, unknown=self.unknown, partial=partial)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow_oneofschema/one_of_schema.py", line 122, in load
        data, partial=partial, unknown=unknown
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow_oneofschema/one_of_schema.py", line 176, in _load
        return schema.load(data, many=False, partial=partial, unknown=unknown)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 141, in load
        return super().load(data, **kwargs)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 723, in load
        data, many=many, partial=partial, unknown=unknown, postprocess=True
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 861, in _do_load
        unknown=unknown,
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 669, in _deserialize
        index=index,
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 493, in _call_and_store
        value = getter_func(data)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/schema.py", line 662, in <lambda>
        val, field_name, data, **d_kwargs
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/fields.py", line 342, in deserialize
        output = self._deserialize(value, attr, data, **kwargs)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/fields.py", line 597, in _deserialize
        return self._load(value, data, partial=partial)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow/fields.py", line 580, in _load
        valid_data = self.schema.load(value, unknown=self.unknown, partial=partial)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow_oneofschema/one_of_schema.py", line 131, in load
        result = self._load(item, partial=partial)
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/marshmallow_oneofschema/one_of_schema.py", line 148, in _load
        raise ValidationError({"_schema": "Invalid data type: %s" % data})
    TypeError: not all arguments converted during string formatting
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "knack_facts.py", line 708, in <module>
        flow.register("PROJECT_FLOW")
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/prefect/core/flow.py", line 1372, in register
        no_url=no_url,
      File "/Users/gcoyne/miniconda3/envs/FLOW_NAME/lib/python3.7/site-packages/prefect/client/client.py", line 595, in register
        repr(exc)
    ValueError: Flow could not be deserialized successfully. Error was: TypeError('not all arguments converted during string formatting')
    c
    • 2
    • 8
  • l

    Louis Guitton

    04/01/2020, 8:25 PM
    hi everyone, I'm trying to visualise in the UI the spacy_nlp.py DAG from the examples. I can see the DAG for etl.py just fine. I'm wondering what I'm doing wrong. Code to reproduce
    git clone --depth 1 <https://github.com/PrefectHQ/prefect.git>
    cd prefect/examples
    sed -i '' 's/flow.run/flow.register/g' etl.py
    sed -i '' 's/flow.run/flow.register/g' spacy_nlp.py
    prefect server start
    python etl.py
    python spacy_nlp.py
    j
    m
    • 3
    • 6
  • m

    Manuel Aristarán

    04/01/2020, 8:50 PM
    Hi! Are there any examples of tasks that run in a Docker container, and capture its stdout?
    j
    • 2
    • 10
  • b

    Brett Naul

    04/02/2020, 12:03 AM
    q re: triggering flow runs: we're mostly re-using
    cloud
    from
    cli.run
    to start runs since it handles a lot of the boilerplate of checking status / fetching logs for us. but it doesn't really seem like it was designed for programmatic use: in particular there's no meaningful return value or exit code, so it can't be used to see if a flow run succeeded or failed or anything else about it. would changing that function to return more info (say flow run ID and state) make sense?
    a
    • 2
    • 1
  • j

    Jeremy Yeo

    04/02/2020, 7:03 AM
    Hi all... currently trying to evaluate prefect and trying to run a simple flow locally with UI and wondering how you go about registering a flow? And wondering why it seems to take so long to complete a simple flow. Here are the steps I took: 1. Install with
    pip install prefect
    . 2.
    prefect backend server
    . 3. In a new terminal:
    prefect server start
    - this allows UI to be accessible locally (no flows yet). 4. In another new terminal, start an agent to run flows:
    prefect agent start
    . 5. Create a simple new flow in `demo-flow.py`:
    import prefect
    from prefect import task, Flow
    
    @task
    def hello_task():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Cloud!")
    
    @task
    def another_task():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Our second flow!")
    
    
    flow = Flow("hello-flow", tasks=[hello_task])
    flow.register()
    flow_2 = Flow("second-flow", tasks=[another_task])
    flow_2.register()
    6. Register the new flows:
    python demo-flow.py
    . 7. Navigate to the UI (
    localhost:8080
    ) and start a manual run of the "hello-flow" flow. 8. Output is visible in the log in the UI. --- Is this how one should "register" a flow (step 6) and why would a run of such a simple flow take 3 minutes to complete? Thanks :)
    j
    • 2
    • 1
  • m

    Milos Tomic

    04/02/2020, 8:24 AM
    Hey everyone, I was researching about ETL workflows and this came as the best solution, as I'm newbie here, I have few simple questions.I want to make workflow for extracting a really big mongodb collection(s) transforming them, and then load them into something like redshift or s3 or kinesis. 1. How do you suggest loading really big collection into workflow / Chunking or something? 2. Does Prefect have some support for multithreading or multiprocessing? 3. Is there any resources on working with big data rather than working with 1+2 as flows? That's all for now! Thanks
    j
    • 2
    • 1
  • s

    Scott Zelenka

    04/02/2020, 1:48 PM
    Looking for some help debugging a failed Flow in Prefect Cloud. I have a Flow that maps over ~1500 items, then combines the results to dump into a DB. On each of those items, I have a retry set and a result-handler pushing the result object to S3. The location where the Flow was executed had a network issue, were it couldn't write to S3 for a brief period of time that appears to coincide with when the mapped Task was in the middle of executing. On the main landing page for the Flow it highlights the most recent Error messages, where we can click to navigate to see more details. In this case I was investigating the
    inquisitive-wildcat
    Flow Run. And the downstream Tasks all indicate they had a failure. However, when I navigate to that Flow Run and look at the mapped Task, I cannot locate the exact mapped instance where these failures happened. It seems the only way to jump to that specific error message is to navigate from the main Flow page? 1. Shouldn't the failed mapped Task show up as "FAILED" in the UI? 2. Is the Task considered "FAILED" if there was a problem in the Result Handler? On the immediate next Task that consumes the output of the mapped Task, it seems Prefect sent a
    None
    object, which then caused an exception and finally failed the Flow Run. 3. Why is Prefect sending a
    None
    to a downstream Task of a mapped Task output that had a failure?
    j
    l
    m
    • 4
    • 10
  • m

    Manuel Mourato

    04/02/2020, 2:36 PM
    Hello everyone, Apologies if this is a basic question: I understand that in Prefect we can cache Task results for a given time duration as long as the flow is executing, and we can also checkpoint these results to a persistent storage if needed. Let's say that I am executing 3 sequential tasks in a Prefect flow, locally , and the machine fails for some reason. At the time of failure, the first 2 tasks from my flow executed successfully, and I checkpointed their results before the machine failure in a persistent storage. In this case, when I restart my local machine, and rerun the Prefect flow, will it make use of the checkpointed results? Or will it re execute the entire flow?
    j
    • 2
    • 2
  • m

    Manuel Mourato

    04/02/2020, 2:43 PM
    Also, is there any support from Prefect Core to store flow and task metadata in a Postgres db? For example, store each flow and task runs ID and their states? Thank you.
    e
    • 2
    • 2
  • j

    John Ramirez

    04/02/2020, 2:43 PM
    Hey everyone - I’m getting this error in prefect release
    0.10.0
    Unexpected error: AttributeError("'S3ResultHandler' object has no attribute '_client'")
    I thought this was fixed in the
    0.10.0
    release
    j
    • 2
    • 7
  • k

    Kostas Chalikias

    04/02/2020, 3:34 PM
    Hello team, I am trying to understand why none of our logs are making it to the cloud interface. We are using the PREFECT__LOGGING__EXTRA_LOGGERS setting and I am currently running the agent locally in Pycharm to debug. I see the loggers being configured with the StreamHandler & CloudHandler and when I log from my code the lines locally look like they have the right format applied. However I don't see Prefect's or my own logs in the cloud interface. Any ideas?
    j
    a
    • 3
    • 14
  • c

    Chris Hart

    04/02/2020, 7:02 PM
    hai! I'm trying to use Parameters for the first time for a model training dag, and it seems to make each one into Tasks, is that normal or did I do something wrong? Code is:
    with Flow("trainer", schedule=None) as flow:
    
            rand = Parameter("rand", default=params["rand"])
            limit = Parameter("limit", default=params["limit"])
            class_type = Parameter("class_type", default=params["class_type"])
    
            # tasks
            training_data = fetch_training_data(rand, limit, class_type)
            test_data = fetch_test_data(rand, limit, class_type)
            trained_model = train_model(training_data)
            test_model(trained_model, test_data)
    
        flow_state = flow.run(executor=DaskExecutor(), parameters=params)
    s
    • 2
    • 4
  • c

    Chris Hart

    04/02/2020, 8:08 PM
    hmm, using prefect server dashboard, with the local backend selected and my flow registered. the flow appears as expected but I cannot see any runs happening, nor trigger one (they are always "late" executing (never))
    c
    • 2
    • 4
Powered by Linen
Title
c

Chris Hart

04/02/2020, 8:08 PM
hmm, using prefect server dashboard, with the local backend selected and my flow registered. the flow appears as expected but I cannot see any runs happening, nor trigger one (they are always "late" executing (never))
c

Chris White

04/02/2020, 8:09 PM
Do you have an agent running?
c

Chris Hart

04/02/2020, 8:10 PM
ah nope that must be it then.. searching docs, thx
c

Chris White

04/02/2020, 8:10 PM
👍 👍 you can run
prefect agent start
and that will probably unblock you!
c

Chris Hart

04/02/2020, 8:14 PM
awesome, yep all good now!
💯 3
View count: 1