https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Dinu Gherman

    09/23/2020, 11:47 AM
    Still pretty new to Prefect… I wonder if there is a way to start a fresh local server without docker and docker-compose?
    j
    2 replies · 2 participants
  • h

    Hassan Javeed

    09/23/2020, 11:54 AM
    I'm seeing this when I run a flow on prefect cloud, we've seen this before as well and sometimes it just goes away on its own. Does anyone have any idea ?
    Unexpected error raised during flow run: 
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 239, in run
        parameters=parameters,
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/flow_runner.py", line 184, in initialize_run
        task = tasks[task_run.task_slug]
    KeyError: '7d9e37de-16ab-4664-a688-421da03bf1b2'
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 162, in handle_state_change
        new_state = self.call_runner_target_handlers(old_state, new_state)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/flow_runner.py", line 126, in call_runner_target_handlers
        prefect.context.update(flow_run_version=version + 1)
    TypeError: unsupported operand type(s) for +: 'NoneType' and 'int'
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/environments/execution/k8s/job.py", line 179, in run_flow
        runner_cls(flow=flow).run(executor=executor_cls)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 273, in run
        state = self.handle_state_change(state or Pending(), new_state)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 184, in handle_state_change
        raise ENDRUN(Failed(msg, result=exc))
    prefect.engine.signals.ENDRUN
    j
    b
    2 replies · 3 participants
  • l

    Lars Corneliussen

    09/23/2020, 12:41 PM
    I'm trying to run prefect from docker, but can't get the token through. Somehow I'm getting:
    prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
    Anyone?
    j
    11 replies · 2 participants
  • a

    as

    09/23/2020, 12:51 PM
    Hi, I was wondering if prefect always keep its task results values in memory during whole flow run or if gets removed from memory when enabling writing to a LocalResults target. As an example, what happens in the following task?
    jsonresult = task(
            get_json_fun,
            result=LocalResult(serializer=JSONSerializer()),
            target='/path/result.json',
            checkpoint=True,
        )(p)
    Is jsonresult still in memory next to being written to result.json or not? I'm worried that my machine will get out of memory when my flow get's bigger with many task results floating around.
    j
    m
    16 replies · 3 participants
  • z

    Zach

    09/23/2020, 3:34 PM
    New to GraphQL and trying to use the Prefect Interactive API; this is my query but I am getting an error, can anyone help? I have a feeling there is something super simple I am missing: query:
    query {
      flow_run_state {
        flow_run {
          id,
          parameters,
          created,
          flow(where:{name:"my-flow-name"}) {
            name,
            id
          },
        }
      }
    }
    Error:
    {
      "graphQLErrors": [
        {
          "message": "Unknown argument \"where\" on field \"flow\" of type \"flow_run\".",
          "extensions": {
            "code": "GRAPHQL_VALIDATION_FAILED"
          }
        }
      ],
      "networkError": null,
      "message": "GraphQL error: Unknown argument \"where\" on field \"flow\" of type \"flow_run\"."
    }
    n
    14 replies · 2 participants
  • o

    Oleg

    09/23/2020, 4:33 PM
    Hi! I’m trying to run prefect server on virtual centos machine with custom .prefect/config.toml
    [server]
    
    host = "ip"
    port = "8081"
    url = "https://${server.host}:${server.port}"
    
    [ui]
    
    host = "<http://ip>"
    port = "8081"
    host_port = "8081"
    endpoint = "${ui.host}:${ui.port}"
    And after checking prefect.config in python, I see my custom properties:
    <Box: {'debug': False, 'home_dir': '/home/o.ilinsky/.prefect', 'backend': 'server', 'server': {'host': 'ip', 'port': 8081, 'host_port': 4200, 'endpoint': 'ip:8081', 'database': {'host': 'localhost', 'port': 5432, 'host_port': 5432, 'name': 'prefect_server', 'username': 'prefect', 'password': 'test-password', 'connection_url': '<postgresql://prefect:test-password@localhost:5432/prefect_server>', 'volume_path': '/home/o.ilinsky/.prefect/pg_data'}, 'graphql': {'host': '0.0.0.0', 'port': 4201, 'host_port': 4201, 'debug': False, 'path': '/graphql/'}, 'hasura': {'host': 'localhost', 'port': 3000, 'host_port': 3000, 'admin_secret': '', 'claims_namespace': 'hasura-claims', 'graphql_url': '<http://localhost:3000/v1alpha1/graphql>', 'ws_url': '<ws://localhost:3000/v1alpha1/graphql>', 'execute_retry_seconds': 10}, 'ui': {'host': '<http://localhost>', 'port': 8080, 'host_port': 8080, 'endpoint': '<http://localhost:8080>', 'graphql_url': '<http://localhost:4200/graphql>'}, 'telemetry': {'enabled': True}, 'url': '<https://ip:8081>'}, 'cloud': {'api': 'ip:8081', 'endpoint': '<https://api.prefect.io>', 'graphql': 'ip:8081/graphql', 'use_local_secrets': True, 'heartbeat_interval': 30.0, 'check_cancellation_interval': 15.0, 'diagnostics': False, 'logging_heartbeat': 5, 'queue_interval': 30.0, 'agent': {'name': 'agent', 'labels': [], 'level': 'INFO', 'auth_token': '', 'agent_address': '', 'resource_manager': {'loop_interval': 60}}}, 'logging': {'level': 'INFO', 'format': '[%(asctime)s] %(levelname)s - %(name)s | %(message)s', 'log_attributes': [], 'datefmt': '%Y-%m-%d %H:%M:%S', 'log_to_cloud': False, 'extra_loggers': []}, 'flows': {'eager_edge_validation': False, 'run_on_schedule': True, 'checkpointing': False, 'defaults': {'storage': {'add_default_labels': True, 'default_class': 'prefect.environments.storage.Local'}}}, 'tasks': {'defaults': {'max_retries': 0, 'retry_delay': None, 'timeout': None}}, 'engine': {'executor': {'default_class': 'prefect.engine.executors.LocalExecutor', 'dask': {'address': '', 'cluster_class': 'distributed.deploy.local.LocalCluster'}}, 'flow_runner': {'default_class': 'prefect.engine.flow_runner.FlowRunner'}, 'task_runner': {'default_class': 'prefect.engine.task_runner.TaskRunner'}}, 'ui': {'host': '<http://ip>', 'port': 8081, 'host_port': 8081, 'endpoint': '<http://ip:8081>', 'graphql_url': '<http://ip:4200>'}}>
    But server starts with localhost:8080 option:
    Visit <http://localhost:8080> to get started
    And nothing on 8081 port. I also tried to set env
    export PREFECT__USER_CONFIG_PATH=/home/o.ilinsky/.prefect/config.toml
    But it’s does’t work too. Do you know what I do wrong?) P.S. i’m new in prefect, so it may by stupid error :) ``````
    n
    7 replies · 2 participants
  • a

    Alex

    09/23/2020, 4:43 PM
    @Marvin
    c
    1 reply · 2 participants
  • j

    Johnny

    09/23/2020, 5:04 PM
    hello! Has anyone found an s3 task example using prefect cloud ? tad confusing on how to authenticate + download a file. Do we use S3ResultHandler + cloud secret >> then pass this to S3Download task ?
    c
    m
    16 replies · 3 participants
  • d

    Dolor Oculus

    09/23/2020, 6:32 PM
    Hi, are there any examples of testing out the config functionality from unit tests? I tried the below but config.environment yields a BoxKeyError ("Config object has no attribute environment")
    def test_configuration_works_as_expected(monkeypatch, tmp_path):
        config_file = tmp_path / "config.toml"
        config_text = """
    environment = "prod"
    user = "${environments.${environment}.user}"
    
    [environments]
    
        [environments.dev]
            user = "test"
    
        [environments.prod]
            user = "admin"
    """
        with config_file.open("w") as cf:
            cf.write(config_text)
    
        monkeypatch.setenv("PREFECT__USER_CONFIG_PATH", str(tmp_path))
        assert config.environment == "prod"
        assert config.user == "admin"
    c
    9 replies · 2 participants
  • a

    Arsenii

    09/24/2020, 6:14 AM
    Anyone ran into weird behaviour with Task Concurrency? When the limit is 4, it will use all of these available slots (expected behaviour), but as soon as we set 6 or 8 it only allows 2 tasks to run at a time (not 100% of the potential slot capacity). The difference in total flow runtime (concurrency = 4 vs 8 ) also follows the pattern, of course
    n
    d
    3 replies · 3 participants
  • p

    psimakis

    09/24/2020, 8:59 AM
    Hello everyone, I have three schedulers that trigger the same flow. Each scheduler is passing different
    parameters_default
    arguments. The problem is that the third scheduler never triggers a flow run and his
    parameters_default
    arguments are never passed to any flow run. Is this some kind of conflict between the second and the third scheduler (same time)? I'm pretty sure that I have misunderstand how the scheduling is working. Thanks in advance
    d
    p
    11 replies · 3 participants
  • l

    Lukas

    09/24/2020, 12:41 PM
    Hey everyone, I'd like to start a
    FlowRunTask
    based on the result of a previous task. To be precise: Let's say I have a task and that task needs to fetch data from an API but hits the API request limit, now I want the task to return the data that it was able to fetch but also some sort of flag that shows that the task couldn't fetch all data because the api limit was hit, e.g. in a dict
    {"data": data, "all_data_fetched": False}
    . Now if
    all_data_fetched
    is false I'd like to start a new flow that continues fetching the data one hour later since that's the time it takes until the api limit is reset. I want to create a new flow since we're running on fargate and I don't want to let the task wait for one hour and pay all the time. And at the same time I want the rest of my flow to continue processing that data that the task was able to fetch from the API. I hope a) it's somewhat understandable what I want to achieve, b) it's possible and c) there is someone out there who can give me a hint on how to implement it. My biggest issue is that I don't know how to access the result from a task within a flow to check if I need to trigger the
    FlowRunTask
    or not. AFAIK I cannot run
    FlowRunTask
    from within a task, right? TIA 🙂
    d
    9 replies · 2 participants
  • k

    kiran

    09/24/2020, 3:16 PM
    Hi y’all. Can folks tell me whether it’s necessary to know/learn all the ins and outs of Airflow before using Prefect? I’m fairly new to DAGs — I’ve only ever dabbled with Airflow in short video tutorials. Ideally, I would skip it entirely and just use Prefect at my current job, if the learning curve isn’t too steep and/or expects that you’ll understand all the Airflow-related things before using it. Thanks!
    d
    6 replies · 2 participants
  • c

    Chris Vrooman

    09/24/2020, 3:40 PM
    Hi Everyone, In prefect cloud - I’m curious if there is a way to either restrict what input parameter values can be entered or see what options are available for a flow? I was just looking at your arithmetic example where you’re using a switch to set available math operations to pick from. I guess in prefect cloud, you would need to know ahead of time what options you can use? https://docs.prefect.io/core/advanced_tutorials/calculator.html
    j
    2 replies · 2 participants
  • b

    Ben Fogelson

    09/24/2020, 4:50 PM
    Is there any sort of built-in support for task arguments that can be either a
    Result
    instance or a raw value? The functionality I’m looking for is basically this:
    @task
    def foo(bar):
        if isinstance(bar, Result):
            bar = bar.read(bar.location).value
        return do_something(bar)
    Clearly the above isn’t hard to add, but it also seems like the kind of thing where (a) there might be hidden gotchas and (b) it seems common enough that
    prefect
    might have something built in
    d
    10 replies · 2 participants
  • j

    Jeremy Knickerbocker

    09/24/2020, 7:07 PM
    Hi Everyone, hoping I am overlooking something simple. My drive filled up in my test environment, due to the results folder caching everything. I am running Prefect Core Server (v0.13.5) and I currently have checkpointing disabled. I read the docs here https://docs.prefect.io/core/advanced_tutorials/using-results.html and I believe I have everything configured properly. How can I a) make sure checkpointing is really disabled and b) change the global location of the results directory?
    c
    m
    7 replies · 3 participants
  • t

    Thomas Borgen

    09/24/2020, 7:08 PM
    Hi, looking into Prefect now and I’m falling in love. I have a question regarding subscribing to Queues like GCP-PubSub, RabbitMQ, etc. Subscribing and reacting when something is put on the queue is very convenient for a lot of usecases. Does Prefect have some sort of subscriber functionality?
    :marvin: 1
    d
    4 replies · 2 participants
  • s

    sark

    09/25/2020, 4:57 AM
    hi guys for google container registry authentication i got it to work with docker agent which can pull images for flows using docker storage but the flow containers themselves seem to fail to pull images how is the docker agent picking up the credentials? from my experimentation it seems to look at
    ~/.docker/config.json
    but does this mean for flows i have to pack the credentials in the image, instead of being able to mount a volume when the container is run? i have looked at API documentation and couldn’t find any options for specifying volumes for flow containers
    n
    10 replies · 2 participants
  • n

    Nejc Vesel

    09/25/2020, 7:18 AM
    Hi , I have a question about combining parametrized flow runs into one flow. Assume this pseudocode:
    with Flow('flow-a') as flowA:
        paramsFlowA = Parameter('flow-a-param', default=...)
       < Does something here > 
    with Flow('flow-b') as flowB:
       paramsFlowB = Parameter('flow-b-param', default=...) 
      < Does something here> 
    with Ffow('combined-flow') as combined_flow: 
       flow_a = FlowRunTask(flow_name='flow-a', project_name='test') 
       flow_b = FlowRunTask(flow_name='flow-b', project_name='test) 
    
       result = flow_b(upstream_tasks[flow_a])
    When I deploy the combined_flow to the server, I can't set the parameters for FlowA and FlowB. Is it possible to do so and how? Thanks!
    e
    4 replies · 2 participants
  • s

    sark

    09/25/2020, 7:20 AM
    hi guys with a prefect function task i can return a result simply with
    return
    like this:
    @task
    def start_job():
       return job_id
    i was wondering if it was possible to do the same for a container task created with
    CreateContainer
    in my case what i am trying to do is to return the job id of the job started by that container, so that another task can use it to stop the job
    j
    3 replies · 2 participants
  • a

    ale

    09/25/2020, 7:29 AM
    Hi all, It may seems a stupid question, but I can’t find a way to print/log the name and the value of a parameter inside a Flow. How is it supposed to be done? Any help and directions is much appreciated 🙂
    j
    2 replies · 2 participants
  • s

    sark

    09/25/2020, 9:06 AM
    hi guys is it possible to have a task have both data and non-data dependencies?
    upstream_tasks=[upstream_task]
    only seems to work when there are no data-dependencies?
    e
    3 replies · 2 participants
  • e

    Eric

    09/25/2020, 7:40 PM
    hi everyone, how can I set AWS_CREDENTIALS from the Secrets GUI?
  • e

    Eric

    09/25/2020, 7:40 PM
  • e

    Eric

    09/25/2020, 7:41 PM
    I don't see a way to set a key/value pair there
  • e

    Eric

    09/25/2020, 7:41 PM
    j
    n
    22 replies · 3 participants
  • e

    Eric

    09/25/2020, 9:29 PM
    seeing some typos in the docs, parameter is databricks_conn_secret but examples say databricks_conn_string
    n
    3 replies · 2 participants
  • m

    Manuel Mourato

    09/25/2020, 9:30 PM
    Hello all! I have kind of a generic question. As anyone tried to use sqlalchemy mappers to store Prefect Tasks in a Postgres database or similar? Is this possible with Prefect Task classes? Something similar to how Airflow creates its tables
    n
    1 reply · 2 participants
  • r

    Rahat Zaman

    09/25/2020, 11:18 PM
    Hi there,
  • r

    Rahat Zaman

    09/25/2020, 11:20 PM
    I was searching for a paradigm where a task will create small results and another task will take that data and start instantly. So something like a flow (A -> B) where A, B are tasks and A will
    yield
    result for B. B will start when it gets first
    yield
    from A
    c
    s
    7 replies · 3 participants
Powered by Linen
Title
r

Rahat Zaman

09/25/2020, 11:20 PM
I was searching for a paradigm where a task will create small results and another task will take that data and start instantly. So something like a flow (A -> B) where A, B are tasks and A will
yield
result for B. B will start when it gets first
yield
from A
c

Chris White

09/25/2020, 11:23 PM
Hi @Rahat Zaman! It sounds like you are describing Prefect data dependencies:
@task
def task_A():
    return 3

@task
def task_B(a):
    return a + 1

with Flow("data") as flow:
    task_B(task_A)
the moment that A finishes, B will begin with sub-second latency. Is there something else you’re looking for?
👀 1
❤️ 1
r

Rahat Zaman

09/28/2020, 3:15 AM
Hi Chris, watched almost all of your videos on youtube. I have another problem
With running dask executor, dask-worker do not seem to recognize a module in its working directory.
Here is a minimal reproducable.
from test_custom import HelloWorld
helloworld_task = HelloWorld()

with Flow("outside-task") as flow:
    val = helloworld_task()

executor = DaskExecutor(address="<tcp://192.168.0.7:8786>")
flow_state = flow.run(executor=executor)
And here is what it is in test_custom.py
from prefect import Task
class HelloWorld(Task):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def run(self):
        return val
I used
dask-scheduler
and
dask-worker <IP>
in local for debugging. All run from the same directory. And, if the flow is run without passing the
executor
from that dir, it runs perfectly fine.
Oh, and sorry for not responding on your previous message @Chris White No, I wasn't looking for data dependency. I want the below code to "taskIZE" in prefect.
def task_A():
    for i in range(3):
        yield i

def task_B(a):
    for i in task_A():
        return i + 1

# The flow
task_B(task_A)
And, I want the flow to do something like this: 1. First run task_A 2. Whenever task_A yields something: pass it to task_B 3. Perellelly keep running task_A and task_B (when task_B has a single data from task_A).
s

Skip Breidbach

09/29/2020, 4:21 PM
@Rahat Zaman I think you're asking about depth-first-execution? https://medium.com/the-prefect-blog/map-faster-mapping-improvements-in-prefect-0-12-0-7cacc3f14e16
r

Rahat Zaman

09/30/2020, 6:10 AM
Yes, thank you. I was exactly looking for this feature.
💥 1
View count: 1