https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • p

    Pedro Machado

    05/16/2020, 2:34 AM
    Hi everyone. What is the recommended pattern to implement a groups of tasks that must be retried together"? Let's say I have a task that checks the status of a cluster (for example EMR) and starts it if the cluster is not running. There are some downstream tasks that run on this cluster. Is there a pattern that would ensure that before a given downstream task runs (including the case when the task needs to be retried), it will first run the task that checks/starts the cluster?
    c
    • 2
    • 9
  • n

    Nicolas Michel

    05/16/2020, 6:26 PM
    With prefect core installed on premise, do you also have an endpoint available for making graphql requests?
    c
    • 2
    • 2
  • m

    Matthias Busch

    05/16/2020, 7:18 PM
    Hi guys. I've just came over from @Laura Lorenz (she/her) PyCon Sprint-Stream. Looking forward to learn both how to use and how to contribute to prefect ! :)
    👋 3
    🎉 3
    j
    • 2
    • 1
  • n

    Nate Atkins

    05/16/2020, 8:02 PM
    I'd like to be able to pass a trigger to a task when I add the task to the flow. I have a trigger I'm working on that takes additional parameters based on its associated task. I can replace the default trigger directly, but this adds a bit of clutter when constructing the flow. Existing:
    with Flow("Existing") as flow:
        file_b = file_builder("b.data")
        file_b.trigger = upstream_dependency(src_fpaths=["a.data"])
    Proposed:
    with Flow("Proposed") as flow:
        file_b = file_builder("b.data", trigger=upstream_dependency(src_fpaths="a.data"))
    Example Trigger:
    @curry
    def upstream_dependency(
        upstream_states: Callable[[Dict["core.Edge", "state.State"]], bool],
        source_fpaths: List[Path] = None,
    ) -> bool:
        if not all(s.is_successful() for s in _get_all_states_as_set(upstream_states)):
            raise signals.TRIGGERFAIL(
                'Trigger was "all_successful" but some of the upstream tasks failed.'
            )
        # Everything skipped.   Check to see if any file dependencies require task to run.
        run_required = file_dependency_skipper(source_fpaths)
        if run_required:
            return True
        else:
            raise signals.SKIP("All dependencies are up to date.")
    I hacked this together with the following change. In
    prefect.core.task
    def bind(
        self,
        *args: Any,
        mapped: bool = False,
        upstream_tasks: Iterable[Any] = None,
        trigger: Callable[[Dict["core.Edge", "state.State"]], bool] = None,
        flow: "Flow" = None,
        **kwargs: Any
    ) -> "Task":
    self.trigger = trigger
    If I build the task by inheritance instead of the @task decorator, I can pass the trigger. I'm not sure if we want to dump all the parameters into the parameters for constructing a task as the list is pretty short now.
    upstream_tasks
    being the most similar one that is already in the signature of bind.
    j
    • 2
    • 7
  • i

    itay livni

    05/16/2020, 9:19 PM
    Hi - I am working with
    S3Result
    and receiving a
    botocore.errorfactory.NoSuchKey: An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist
    Which upon further research - it can be anything including a permission error. (I tried different buckets with settings) The credentials are stored as AWS_CREDENTIALS in prefect cloud. With the config.toml set to use cloud secrets
    [cloud]
    use_local_secrets = false
    Switching back to
    result_handler
    argument with
    S3Result
    subclass did work, . And combining
    result handler
    with
    target
    does not. Is there something different in the way that credentials are handled between
    result
    and
    result_handler
    ? The new prefect is really nice 🙂
    😄 2
    c
    m
    • 3
    • 17
  • b

    Brad

    05/17/2020, 12:45 AM
    Hey guys - possible bug with
    log_stdout
    , it looks like maybe some references were changed from
    result
    to
    value
    here: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L938
    c
    • 2
    • 9
  • n

    Nate Atkins

    05/17/2020, 1:22 AM
    Ohhhhh so close. I was working with the new
    target
    parameter on tasks. This in conjunction with the addition of the upstream
    edges
    on the trigger signature almost got me the ability to rebuild the current task target if the upstream was updated. I did a little trickery if the current task target needs to be rebuilt, I delete the target cache file and then raise a RETRY signal. When the task retries it can't find the cache file runs the task. The only problem I have is that if the upstream task didn't run and update, and the current task doesn't need to run - what do I raise/return from the trigger to get the task to use the cached results? True: The task will run False: The flow will fail SUCCESS: No cached results to pass on to the next task.
    workflow.py
    👀 2
    👍 2
    c
    b
    • 3
    • 5
  • n

    Nate Atkins

    05/17/2020, 5:52 PM
    I vote for changing the color of the
    cached
    state to purple #800080. Now that I'm using a bunch of cached stuff with the new
    target
    parameter, I want to know if things were run or used the cached results. I find the
    success
    and
    cached
    green colors hard to distinguish.
    :upvote: 6
  • m

    Matthias

    05/18/2020, 9:28 AM
    Hi! I just upgrade from 0.10.7 to 0.11.1 and everything worked fine before. Now, when I register a Flow and run it from the UI I get the following error. The flow seems to run. Everything running in Docker and on the latest Image.
    ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ClientError([{'message': "{'_result': {'type': ['Unsupported value: LocalResult']}}", 'locations': [{'line': 4, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 123, in call_runner_target_handlers
        cache_for=self.task.cache_for,
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1104, in set_task_run_state
        version=version,
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 226, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'message': "{'_result': {'type': ['Unsupported value: LocalResult']}}", 'locations': [{'line': 4, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    j
    • 2
    • 13
  • f

    Florian K. (He/Him)

    05/18/2020, 4:21 PM
    Quick question regarding prefect server and Docker: I tried to get the UI running on a Windows Server 2019 environment but run into the following error:
    prefect server start
    Pulling postgres  ... pulling from library/postgres
    Pulling hasura    ... pulling from hasura/graphql-engine
    Pulling graphql   ... pulling from prefecthq/server
    Pulling scheduler ... pulling from prefecthq/server
    Pulling apollo    ... pulling from prefecthq/apollo
    Pulling ui        ... pulling from prefecthq/ui
    
    ERROR: for postgres  no matching manifest for windows/amd64 10.0.17763 in the manifest list entries
    ERROR: for ui  image operating system "linux" cannot be used on this platform
    ERROR: for scheduler  image operating system "linux" cannot be used on this platform
    ERROR: for graphql  image operating system "linux" cannot be used on this platform
    ERROR: for apollo  image operating system "linux" cannot be used on this platform
    ERROR: for hasura  image operating system "linux" cannot be used on this platform
    ERROR: image operating system "linux" cannot be used on this platform
    Exception caught; killing services (press ctrl-C to force)
    Had anyone else similar issues before? Thanks! p.S.: the Docker installation looks good. At least the Hello World image works flawlessly.
    docker-compose --version
    docker-compose version 1.25.5, build 8a1c60f6
    (RodeoEnv) PS C:\windows\system32> docker --version
    Docker version 19.03.5, build 2ee0c57608
    (RodeoEnv) PS C:\windows\system32>
    k
    • 2
    • 9
  • w

    Will Milner

    05/18/2020, 4:38 PM
    What's the proper way to use the
    secrets
    argument in prefect docker storage object? I tried doing
    flow.storage = Docker(....,secrets=["SECRET_NAME"])
    But when I try running the flow It fails to start and I get this error message from apollo
    {
      "message": "Cannot query field \"secret_value\" on type \"Query\".",
      "locations": [
        {
          "line": 2,
          "column": 9
        }
      ],
      "extensions": {
        "code": "GRAPHQL_VALIDATION_FAILED"
      }
    }
    I want to avoid sending in my secret values as environment variables to my docker images so was hoping the secrets arg would solve this for me
    k
    • 2
    • 3
  • z

    Zach

    05/18/2020, 7:44 PM
    Is there a way to have a flow automatically restart from the beginning of the flow if one of the tasks fail, instead of starting from the failed task?
    k
    p
    • 3
    • 4
  • m

    Matthias Busch

    05/18/2020, 8:12 PM
    Hi guys, does https://docs.prefect.io/orchestration/ui/flow.html work for you? It seems I'm not able to load the site
    k
    • 2
    • 6
  • n

    Noah Nethery

    05/18/2020, 8:14 PM
    Hello Prefect users, I’m using a kubernetes agent that is up and running on our cluster with our runner token no problem, and it is connected to our prefect server cloud backend with a healthy status code. However, Flows that I have registered are not being scheduled (aka late). Here is my flow code. Am I missing something here?
    from prefect import task, Flow, Parameter
    from prefect import Client
    @task(log_stdout=True)
    def say_hello(name):
        print("Hello, {}!".format(name))
        return True
    @task
    def print_completed(status):
        if status:
            print('Success')
    with Flow("Simple-Flow") as flow:
        name = Parameter('name')
        task1 = say_hello(name)
        print_completed(task1)
    client = Client()
    flow.register(project_name="Hello K8s")
    c
    • 2
    • 3
  • n

    Nicolas Michel

    05/19/2020, 7:44 AM
    Can you implement a "manual" task in a middle of a workflow with prefect? The expected behavior would be something like: the workflow stops to the manual steps. Once the work is done, somebody, from the web interface, mark the task as done, and the workflow continue its job.
    👍 1
    j
    c
    • 3
    • 5
  • a

    Azuma

    05/19/2020, 2:15 PM
    Hello everyone! I am new to Prefect, please tell me if starting is equivalent if I do "docker-compouse up" in prefect/server/docker or run the "prefect server start" command in cmd? 2) Where is the configuration file stored at startup through the "prefect server start" command? (For example, if I need to install my endpoint different from localhost)
    z
    • 2
    • 3
  • a

    Alex Cano

    05/19/2020, 3:33 PM
    Flows can’t have a schedule in server (same in cloud?) if they have required parameters, right?
    z
    • 2
    • 6
  • a

    Azuma

    05/19/2020, 4:07 PM
    Hello everyone! Please tell me if the server starts as follows: 1) prefect backend server 2) prefect server start Where does config.toml come from if I need to change the endpoint in it? 1) ~ / .prefect / config.toml 2) prefect / src / prefect / config.toml 3) prefect / server / src / prefect_server / config.toml? I fluently tried all 3 options, but it did not bring success = (
    👀 1
    👍 1
    a
    b
    c
    • 4
    • 6
  • b

    Barry Roszak

    05/19/2020, 4:15 PM
    Is it possible to run mapped tasks in parallel using docker agent?
    c
    • 2
    • 8
  • q

    Questionnaire

    05/19/2020, 4:26 PM
    Hey folks, I'm a begginer here.... Can someone share me the step by step process for third party authentication like AWS. I read docs but it's not clear to me.
    j
    • 2
    • 49
  • z

    Zach

    05/19/2020, 4:45 PM
    I am trying to register one of my flows and I am getting this error that I don't understand:
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 135, in <module>
        flows = cloudpickle_deserialization_check(flow_file_path)
      File "/opt/prefect/healthcheck.py", line 40, in cloudpickle_deserialization_check
        flows.append(cloudpickle.load(f))
    ModuleNotFoundError: No module named 'prefect.engine.results.prefect_result'
  • z

    Zach

    05/19/2020, 4:47 PM
    That
    prefect_result
    module does exist in the prefect package code that I have installed. So why is it saying it doesn't? I can't see how my code would cause this to break, since all the errors seem to be coming from the prefect package code
    j
    • 2
    • 1
  • z

    Zach

    05/19/2020, 4:49 PM
    I thought the v0.11.0 was supposed to be non-breaking but it seems to have broken several things for us.
  • m

    Marwan Sarieddine

    05/19/2020, 5:29 PM
    Hi everyone - new to prefect, I got a the server running on prefect cloud, along with a kuberenetes agent that is polling from the server - I registered a flow that shows up on the server and I am trying to run it but I am getting an error. I am following the [Docker Sidecar on Kubernetes deployment recipe](https://docs.prefect.io/orchestration/recipes/k8s_docker_sidecar.html) I did not do any code changes and used the flow script as is - I am running prefect version
    0.10.7
    and I am getting the below error in the logs
    19 May 2020,01:03:00 	agent	INFO	Submitted for execution: Job prefect-job-d7e49926
    19 May 2020,01:03:26 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'Run a Prefect Flow in Docker'
    19 May 2020,01:03:27 	prefect.CloudFlowRunner	INFO	Starting flow run.
    19 May 2020,01:03:27 	prefect.CloudTaskRunner	INFO	Task 'PullImage': Starting task run...
    19 May 2020,01:03:27 	prefect.CloudTaskRunner	ERROR	Unexpected error: DockerException("Error while fetching server API version: HTTPConnectionPool(host='localhost', port=2375): Max retries exceeded with url: /version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f8768ed7c10>: Failed to establish a new connection: [Errno 111] Connection refused'))")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
        (self._dns_host, self.port), self.timeout, **extra_kw
      File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
        raise err
      File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
        sock.connect(sa)
    ConnectionRefusedError: [Errno 111] Connection refused
    It seems that
    localhost:2375
    (docker daemon address is not reachable for some reason) - anyone faced this issue before ?
    z
    j
    • 3
    • 6
  • s

    Sanjay Patel

    05/19/2020, 7:12 PM
    Hello! is there a way to use **kwargs with the prefect map function as per the below?
    changes = generator_task.map(
                    sim, 
                    unmapped(x), 
                    unmapped(y),
                    unmapped(z),
                    **kwargs #need this unmapped
                )
    j
    • 2
    • 6
  • h

    hakki cankaya

    05/19/2020, 7:43 PM
    Is there any url or example/tutorial that explains how resilient prefect scheduler is?
    z
    • 2
    • 1
  • h

    hakki cankaya

    05/19/2020, 7:53 PM
    Thanks @Zachary Hughes. Say if the host executor fails in mid execution or there is a server power outage. How can it survive and be resilient?
    z
    j
    • 3
    • 14
  • d

    Dan DiPasquo

    05/19/2020, 8:03 PM
    I have Flows that are seemingly being restarted in the middle of a long-running task, trying to track down why. Abbreviated log looks like below, whereas at 19:39 it's explicitly killed and rescheduled by Cloud, prior that at 19:22 there's a seemingly spontaneous and unexplained second "Beginning Flow run" --- suggestions on how to narrow down the cause of this?
    TIMESTAMP                         LEVEL    MESSAGE
    2020-05-19T18:58:29.732839+00:00  INFO     Submitted for execution: Job prefect-job-35b1423b
    2020-05-19T19:04:28.107917+00:00  INFO     Beginning Flow run for 'compute_**_flow'
    2020-05-19T19:04:28.240831+00:00  INFO     Starting flow run.
    2020-05-19T19:04:28.241095+00:00  DEBUG    Flow 'compute_**_flow': Handling state change from Scheduled to Running
    2020-05-19T19:05:12.954233+00:00  INFO     Task 'compute_**_task': Starting task run...
    2020-05-19T19:05:12.95458+00:00   DEBUG    Task 'compute_**_task': Handling state change from Pending to Running
    2020-05-19T19:05:13.210047+00:00  DEBUG    Task 'compute_**_task': Calling task.run() method...
    2020-05-19T19:22:07.863766+00:00  INFO     Beginning Flow run for 'compute_**_flow'
    2020-05-19T19:22:08.577243+00:00  INFO     Task 'compute_**_task': Starting task run...
    2020-05-19T19:22:08.578027+00:00  DEBUG    Task 'compute_**_task': task is already running.
    2020-05-19T19:22:08.59477+00:00   INFO     Task 'compute_**_task': finished task run for task with final state: 'Running'
    2020-05-19T19:24:34.702197+00:00  ERROR    Marked "Failed" by a Zombie Killer process.
    2020-05-19T19:39:33.646426+00:00  INFO     Rescheduled by a Lazarus process. This is attempt 1.
    2020-05-19T19:39:56.331103+00:00  INFO     Submitted for execution: Job prefect-job-21a433ec
    2020-05-19T19:42:33.754737+00:00  INFO     Beginning Flow run for 'compute_**_flow'
    2020-05-19T19:42:33.869824+00:00  INFO     Starting flow run.
    ...
    z
    • 2
    • 4
  • m

    Marwan Sarieddine

    05/19/2020, 11:12 PM
    Hi again everyone, I am trying to replicate the [Dask Cluster on Kubernetes deployment recipe](https://docs.prefect.io/orchestration/recipes/k8s_dask.html) and I face the following error
    AttributeError: 'FunctionTask' object has no attribute 'result'
    Wondering if you have encountered this before - One thing to note is if the flow is empty then it runs successfully … I am using prefect version
    0.10.7
    19 May 2020,07:027 	agent	INFO	Submitted for execution: Job prefect-job-d3fc6dc5
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'Static Dask Cluster'
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	INFO	Starting flow run.
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	DEBUG	Flow 'Static Dask Cluster': Handling state change from Scheduled to Running
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	ERROR	Unexpected error: AttributeError("'FunctionTask' object has no attribute 'result'")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 465, in get_flow_run_state
        for t in final_tasks
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 196, in wait
        return self.client.gather(futures)
      File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1931, in gather
        asynchronous=asynchronous,
      File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 780, in sync
        self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
      File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 347, in sync
        raise exc.with_traceback(tb)
      File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 331, in f
        result[0] = yield future
      File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
        value = future.result()
      File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1790, in _gather
        raise exception.with_traceback(traceback)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 569, in run_task
        default_handler = task.result_handler or self.flow.result_handler
    AttributeError: 'FunctionTask' object has no attribute 'result'
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	DEBUG	Flow 'Static Dask Cluster': Handling state change from Running to Failed
    c
    • 2
    • 5
  • j

    Jacques Jamieson

    05/20/2020, 3:49 AM
    can Prefect be used for building a processing pipeline that pulls data from various REST api's and then perform analysis on that data. can it also support this in a multi-tenant architecture?
    n
    • 2
    • 10
Powered by Linen
Title
j

Jacques Jamieson

05/20/2020, 3:49 AM
can Prefect be used for building a processing pipeline that pulls data from various REST api's and then perform analysis on that data. can it also support this in a multi-tenant architecture?
n

nicholas

05/20/2020, 12:18 PM
Hi @Jacques Jamieson ! Prefect can definitely be used for the pipeline you’ve described; in fact the task dependency structure makes it uniquely suited to that sort of work. Can you expand on the multi tenant architecture question a bit?
j

Jacques Jamieson

05/20/2020, 10:11 PM
with regard to multi tenancy, I have somewhere in the space of 100-1000 of clients that need to ahve data pulled from the same REST api using the individual clients api credentials and perform the same analyse. Ive tested Prefect and was able to get a single client flow working as a prototype and get a feel for how the pipeline operates.
n

nicholas

05/20/2020, 10:26 PM
Ahh, ok, thank you. So if I understand correctly, you want to be able to create a flow that processes a set of data from some APIs but you want to use different credentials to access the APIs each time the flow is run. If that's the case, you could build references to the credentials as flow parameters. You could then use those references to get Secrets that would hold different client credentials. Something like this:
@task
def get_credentials(client):
  return Secret(f"{client}_API_TOKEN").get()

with Flow("Data Processing Flow") as flow:
  client_ref = Parameter("client", required=True)
  credentials = get_credentials(client_ref)

  # do something with the credentials downstream
👍 1
Or you could even use that
Secret().get()
method in the relevant API tasks
And then when you kick off each run, you would just pass the client reference as a runtime parameter 🙂
👍 1
j

Jacques Jamieson

05/21/2020, 3:36 AM
ahh right yeah, so the idea is that every 15 or so minutes depending what each tenant has configured. Prefect would fetch all the clients for the current execution time period along with there credentials, then for each client kick of the task that pulls the data from the APIs and performs the analyses.
the part thats not clear is given the list of credentials, how should this list of credentials be looped over in Prefect. is a standard python loop good to go?
with Flow("Data Processing Flow") as flow:
  client_ref = Parameter("client", required=True)
  credentials = get_credentials(client_ref)
  # do something with the credentials downstream
  for credential in credentials:
      # call_api(credential)
n

nicholas

05/21/2020, 3:45 AM
Even better, @Jacques Jamieson, you can use Prefect's native mapping:
with Flow("Data Processing Flow") as flow:
  client_ref = Parameter("client", required=True)
  credentials = get_credentials(client_ref)
  # do something with the credentials downstream
  call_api.map(credential)
For something like you described you could use iterated mapping really nicely:
with Flow("Data Processing Flow") as flow:
  # get list of clients
  clients = get_clients()
  
  # get credentials for each client
  credentials = get_credentials.map(clients)

  # call api for each set of client credentials
  call_api.map(credentials)
j

Jacques Jamieson

05/21/2020, 5:34 AM
Aaahh, this is perfect. Thanks so much for getting back to me! I'll give it a go and see how get along.
View count: 1