https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Sean Talia

    02/18/2021, 9:22 PM
    has anyone used the AWS
    BatchSubmit
    task before? I'm just starting to play around with it and am curious as to exactly what actions i'm going to need to include in my iam policy allow list for the host on which the task will execute (besides obviously the
    batch:SubmitJob
    action)
    a
    • 2
    • 3
  • m

    matta

    02/18/2021, 10:43 PM
    Is there a way to control the formatting for code blocks in Artifacts? Right now the code displays as a kinda small rectangular box, and I'd maybe like different color theming.
    m
    • 2
    • 3
  • a

    Ajith Kumara Beragala Acharige Lal

    02/19/2021, 1:31 AM
    Hi Prefect experts , am trying to deploy a workflow on prefect-cloud setup on our Kubernetes cluster, when I run
    prefect auth create-token -n my-runner-token -s RUNNER
    i get this error
    c
    • 2
    • 17
  • m

    matta

    02/19/2021, 4:31 AM
    So, I've been developing on a JupyterHub instance in Kubernetes. Sometimes I use
    DaskKubernetesEnvironment
    for flows in there, and it launches even when I don't specify an image (which is awesome!). Just out of curiosity, what exactly am I launching? Is it the same image as the container of the JupyterHub instance?
  • h

    Harshal Rane

    02/19/2021, 6:21 AM
    Hi Prefect team, is prefect.io server compatible with ubuntu 16.04? please if there are any documents related to it please share
    n
    • 2
    • 1
  • m

    Milly gupta

    02/19/2021, 3:06 PM
    Hi All, a little confused how Prefect agent knows about the flow code. I am using Prefect Cloud and running a local Prefect agent. I have a simple python workflow script which does flow.register(project_name="Prefect Demo") which registers the flow with Cloud and assuming Cloud only has the metadata of the Flow. How does Prefect agent gets to know about the Workflow code?
    n
    • 2
    • 43
  • v

    Varun Joshi

    02/19/2021, 3:27 PM
    If I printed something in my flow, why can't I see it in my logs?
    m
    • 2
    • 1
  • a

    Ajith Kumara Beragala Acharige Lal

    02/19/2021, 5:29 PM
    Hi Prefect Experts, is there way to map cluster_ip (of
    service/prefect-server-ui
    ) with a prefect-web-host-name ( eg: prefect.mycompany.com ) when installing prefect server via helm ? ) , so that internal users can access prefect-server via a hostname instead of a IP? https://github.com/PrefectHQ/server/blob/master/helm/prefect-server/values.yaml
    n
    • 2
    • 2
  • t

    Tony

    02/19/2021, 6:59 PM
    I tried to do some searching for
    flow not found
    when using Docker storage, but having mixed results. I’m working on a CICD script for any arbitrary repo to loop through a directory of flow files, build container storage (can be many containers or a single one, i’m fine with either), and then register them. It starts by copying and building my repo into a container, then hopefully builds that container with all the prefect goodies, and sets the
    run_config
    :
    image_name = f"{git_repo}/prefect-{Path(flow_file).stem.lower()}"
            image_tag = f"githash-{git_commit}"
            # built by cicd agent
            base_image = f"{REGISTRY_URL}/{git_repo}/flows:{image_tag}"
            # second image built
            base_image_plus_prefect = f"{REGISTRY_URL}/{image_name}:{image_tag}"
    
             flow.storage = Docker(
                registry_url=REGISTRY_URL,
                base_image=base_image,
                image_name=image_name,
                image_tag=image_tag,
                env_vars=fetch_env_vars(os.environ["GIT_BRANCH"]),
                # here be the trouble
                # same path as my Dockerfile???
                path=f"/repo/{flow_file}",
                stored_as_script=True,
            )
            flow.storage.build()
    
            flow.run_config = DockerRun(
                image=base_image_plus_prefect,
                labels=fetch_agent_labels(os.environ["GIT_BRANCH"]),
            )
            ... cicd stuff ...
            flow.register()
    I’m getting:
    ValueError('Flow is not contained in this Storage')
    when running this currently. First option I see is specifying
    Docker(…, files={…})
    yet some repos might have dozens or hundreds of extra files that they need to include, any chance
    files
    takes in wildcard pathing? Second option I see is [multi-flow storage], but then I run into CICD problems:
    File "/opt/prefect/healthcheck.py", line 151, in <module>
        flows = cloudpickle_deserialization_check(flow_file_paths)
      File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
        flows.append(cloudpickle.loads(flow_bytes))
    ModuleNotFoundError: No module named 'flows.<my flow name>'
    Anyone have some better ideas?
    m
    • 2
    • 13
  • o

    Omar Sultan

    02/20/2021, 12:49 PM
    Hello Everyone, I have a question about Flow Runs, is it possible for a flow to run multiple runs at the same time concurrently each with different paramters?
    c
    • 2
    • 2
  • o

    Omar Sultan

    02/20/2021, 12:49 PM
    or will the server only allow me to have 1 active run,
  • j

    Jack Sundberg

    02/20/2021, 6:09 PM
    Hello again, I am soon to submit a large number of flow runs to prefect cloud (on the free "dev" tier) and wanted to make sure I don't overstep & put too much load on your servers. In summary, how many flow runs would your team need a preemptive heads-up for? Is there a limit to the free tier where I begin abusing it? In my case, I will be submitting these flow runs without scheduled run dates (so Agents are directed to run them all immediately and in the order they were submitted). This will be on the order of 10,000 flow runs at a time, but in the future this may increase by a factor of 100. I do not need long-term storage of the flow-run metadata either -- so if it is easier/cheaper for your team to delete my flow-run metadata every week or month, I can have a scheduled "clean-up" workflow to do this. Also yes, flow-run concurrency would be nice in this scenario but I am unfortunately not in a position to leave the free tier. Alongside this, is there a limit to prefect Agents and how often they are disconnecting/reconnecting? I expect to have 20-30 agents that will be terminated and restarted every week. This is because the Agents will be running on third-party HPC clusters within a slurm jobs with timelimits. -Jack
    :cool-llama: 2
    j
    • 2
    • 2
  • j

    Jack Sundberg

    02/21/2021, 1:38 AM
    I'm back already haha - sorry. The python API for creating a flow run seems like a hassle to me so I may be missing something. Using the client, I can script flow runs via:
    client.create_flow_run(flow_id)
    But this requires me to store or know the flow_id, which I'd rather not do between separate scripts. Is there a reason this type of submission isn't supported?:
    client.create_flow_run(project_name, flow_name)
    The CLI supports this so I expected the python API for client to as well. If it's alright, I'd like to open a feature-request on github. I think the fix will be easy. Something like:
    def create_flow_run(flow_id=None, project_name=None, flow_name=None):
    	
    	if not flow_id and not (project_name and flow_name):
    		raise ValueError("Either a flow_id or a project_name+flow_name must be provided")
    	
    	# then add if/else to handle proper graphql mutation
    a
    • 2
    • 3
  • v

    Varun Joshi

    02/21/2021, 2:30 PM
    Hey Prefect experts, I have a couple of questions: 1. I have entered a schedule (
    schedule = IntervalSchedule(start_date=datetime.datetime.now() + datetime.timedelta(seconds=1), interval=datetime.timedelta(minutes=1),)
    as described on the website. Then why don't I see the flow running every minute. 2. If I make any changes to my flow code, will just running it again reflect changes in the UI?
    n
    • 2
    • 23
  • a

    Anthony LE LUYER

    02/22/2021, 12:35 PM
    Hello, I'm using Prefect in a AI project, and we will soon move to OpenShift for a client. Is it possible to deploy Prefect on OpenShift ?
    d
    • 2
    • 10
  • i

    Igor Dykhta

    02/22/2021, 2:06 PM
    Hi everyone, Was trying to communicate with API at https://api.prefect.io, managed to fetch flows and create a flow_run. But after some time the API started to return 403 Forbidden. I created new tokens, but still same 403 error. Is there some API rate limiting, or shadow ban? I don't think that I did more than 50 requests in few hours.
    d
    • 2
    • 11
  • i

    Igor Dykhta

    02/22/2021, 2:40 PM
    Access denied (403) Current session has been terminated. For further information, do not hesitate to contact us.
  • a

    Andy

    02/22/2021, 2:48 PM
    Is there a way to run a specific registered flow from Python instead clicking the run button in the Prefect UI? e.g. is there a way to do something like
    flow = load('{project_name}.{flow})
    and
    flow.run()
    which would then show up in the Prefect UI?
    s
    d
    • 3
    • 3
  • r

    Robert Bastian

    02/22/2021, 3:07 PM
    @support We are getting 404 when clicking email invites. Is there an account management issue curently?
    n
    • 2
    • 3
  • v

    vish

    02/22/2021, 3:17 PM
    Hello everyone! I have been building an ETL pipeline using Prefect, Pandas and Pyarrow(Parquet). I would like to make use of the
    PandasSerializer
    and
    GCSResults
    to convert the output of a task from a pandas DataFrame to a partitioned Parquet dataset, before uploading to GCS. It worked well when serializing to a single parquet file (without partitions). However, when using partitions, the Serializer fails. From my initial look into the source code, it revealed that the Results class expects a binary stream from the serializer. However, _`pandas.to_parquet(*, partition_cols=["col1"])`_ does not return a binary output as it results in multiple parquet files being created. Example code
    pandas_serializer = PandasSerializer(
        file_type="parquet",
        serialize_kwargs=dict(
            engine="pyarrow", partition_cols=["date"]
        ),
    )
    
    gcs_result = GCSResult(
        bucket="bucket-name", serializer=pandas_serializer
    )
    
    class Extract(Task):
        def __init__(self, **kwargs):
    
            super().__init__(**kwargs, result=gcs_result, target="target_file_location")
    
        def run(self, df: pd.DataFrame) -> pd.DataFrame:
            # Some processing steps
            return df
    From my assessment this looks like this pattern (ie. serializer to multiple files) is not supported right now? If that is the case, what are your thoughts on the "prefect-way" to achieve the above.
    d
    • 2
    • 3
  • t

    Thanneermalai Lakshmanan

    02/22/2021, 3:39 PM
    Hi Everyone I am trying to build a pipeline with intermediate steps that needs user input, is that possible to to build such pipeline? Any document or blog link that explains how to go about will be helpful
    n
    m
    • 3
    • 8
  • g

    Gelinger Media

    02/22/2021, 4:14 PM
    hi all! we are really new to this type of technology like Prefect. Our aim is to build a comparison service for niche and we have dosens of csv product feeds which we have to read, import to databse and during improting also normalize . My understanding is that we can use Prefect to make this part if the process more organised. Do you think prefect is the right tool for such a task?
    d
    • 2
    • 1
  • b

    bral

    02/22/2021, 4:23 PM
    hi folks! i have a pipeline like : task1 -> task2(mapped) -> task3 If some of child tasks of task2 finished with state Failed , it could not trigger further task. How can i to start task3 forced if task2 failed??
    a
    d
    v
    • 4
    • 6
  • i

    itay livni

    02/22/2021, 5:13 PM
    Hi - I am trying to debug a flow with a case statement inside apply_map. • Although no errors are being raised, the case statement seems to be running twice with only one mapped task to complete. • The other issue I am seeing is
    SKIP signal raised: SKIP('Provided value "True" did not match "False"')
    . • In prefect cloud the
    case
    statement continues to run indefinitely even though the flow has terminated. ## Logs
    [2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
    [2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Mapped'
    [2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)[0]': Starting task run...
    [2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)[0]': Finished task run for task with final state: 'Success'
    [2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)': Starting task run...
    [2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)': Finished task run for task with final state: 'Mapped'
    [2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)[0]': Starting task run...
    [2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
    [2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)[0]': Finished task run for task with final state: 'Skipped'
    Any thoughts on what is going on here. Thanks
    n
    d
    • 3
    • 19
  • d

    Dave

    02/22/2021, 6:51 PM
    Hello there! I have a flow that schedules other flows based on a third party API. So I thought StartFlowRun had the functionality I needed but I came up a little short. My problem is I want to execute the StartFlowRun task but I have a list of (flow_name,parameters), So I am not quite sure if I can accomplish what I want with task.map(...). I've attached a little snippet in the thread to give an idea on what my problem is ⬇️
    k
    • 2
    • 6
  • i

    itay livni

    02/22/2021, 9:12 PM
    Hello again - I am running into issues running
    StartFlowRun
    in a
    task
    . Where the flow inside the task only runs once. In the example below I would expect it to run three times. Using
    state = adder.run(parameters=dict(num_lst=num_lst))
    # # Get results
    param_tsk = adder.get_tasks("add_num")
    num_lst = state.result[param_tsk[0]]._result.value
    Works as expected. Any thoughts on how to run a Flow in a loop with Cloud? Thanks!
    k
    • 2
    • 5
  • s

    Sean Talia

    02/22/2021, 9:53 PM
    I'm reading through the source code of the ECSAgent and see that in the agent's deploy_flow method, and see that if the task ARN isn't supplied as a part of the flow's runconfig, then the agent is going to call the boto
    register_task_definition()
    method. Does this mean that even if the contents of task definition hasn't actually changed, (e.g. the flow is never re-registered, or it's registered but with an identical task definition), the Agent is still going to register a new task to AWS on each and every flow run?
    k
    • 2
    • 4
  • l

    ludwig

    02/22/2021, 10:34 PM
    Hi, I have a flow that fails at some random point, when the next task starts even though the previous task is not finished. Prefect even acknowledges that by writing “Finished task run for task with final state: ‘Running’” in the log. Do you have any idea what could be the reason? I am running prefect 0.13.18 on a kubernetes cluster.
    f
    • 2
    • 1
  • f

    Fina Silva-Santisteban

    02/22/2021, 10:52 PM
    Hi Prefect Community, I’m using Docker storage and I’m running the Prefect ECS Agent on an AWS Fargate cluster successfully. Whenever I register a flow I need to authenticate my docker client. At the moment I’m doing that by using the AWS CLI and running
    aws ecr get-login-password --region regionname | docker login --username AWS --password-stdin <http://xxxxxx.dkr.ecr.us-east-2.amazonaws.com|xxxxxx.dkr.ecr.us-east-2.amazonaws.com>
    . I’ve tried using boto3 to do a docker login right before the
    register()
    call but it seems like the auth token expires as soon as I run register:
    denied: Your authorization token has expired. Reauthenticate and try again.
    Is it possible to have Prefect do the docker login? Or do I always have to use the AWS CLI?
    m
    • 2
    • 13
  • a

    Ajith Kumara Beragala Acharige Lal

    02/23/2021, 1:07 AM
    Hi Prefect Experts, am getting
    InterruptedError: denied: access forbidden
    error when trying prefect.Docker | Pushing image to the registry..., Can anyone help me - how to /where to specify the registry username passwords in the code ? or anywhere it matters
    f
    c
    • 3
    • 9
Powered by Linen
Title
a

Ajith Kumara Beragala Acharige Lal

02/23/2021, 1:07 AM
Hi Prefect Experts, am getting
InterruptedError: denied: access forbidden
error when trying prefect.Docker | Pushing image to the registry..., Can anyone help me - how to /where to specify the registry username passwords in the code ? or anywhere it matters
import prefect
from prefect import task, Flow
from prefect.environments.storage import Docker
from prefect.run_configs import KubernetesRun

@task(log_stdout=True)
def log_my_stdout(print_message):
    print("I will log this ",print_message)


with Flow("KubeTestFlow") as f:
    log_my_stdout("Example!")

f.storage = Docker( registry_url="hakko.sekai.dev:5050/sekai-backend/", image_name="k8s-job-flow", image_tag="0.1.0" )

f.run_config = KubernetesRun(labels=["DEV"],job_template_path="/home/ajith/prefect/job_spec.yaml")
out = f.register(project_name="kubeTest")
this is my code
Complete Stacktrace
Successfully tagged hakko.sekai.dev:5050/sekai-backend/k8s-job-flow:0.1.0
[2021-02-23 06:35:32+0530] INFO - prefect.Docker | Pushing image to the registry...
Traceback (most recent call last):
  File "kubeTutorial.py", line 20, in <module>
    out = f.register(project_name="kubeTest")
  File "/usr/local/lib/python3.8/dist-packages/prefect/core/flow.py", line 1668, in register
    registered_flow = client.register(
  File "/usr/local/lib/python3.8/dist-packages/prefect/client/client.py", line 783, in register
    serialized_flow = flow.serialize(build=build)  # type: Any
  File "/usr/local/lib/python3.8/dist-packages/prefect/core/flow.py", line 1450, in serialize
    storage = self.storage.build()  # type: Optional[Storage]
  File "/usr/local/lib/python3.8/dist-packages/prefect/storage/docker.py", line 303, in build
    self._build_image(push=push)
  File "/usr/local/lib/python3.8/dist-packages/prefect/storage/docker.py", line 378, in _build_image
    self.push_image(full_name, self.image_tag)
  File "/usr/local/lib/python3.8/dist-packages/prefect/storage/docker.py", line 586, in push_image
    raise InterruptedError(line.get("error"))
InterruptedError: denied: access forbidden
f

Fina Silva-Santisteban

02/23/2021, 1:10 AM
If you’re using an ECS registry you need to specify your AWS credentials. You can do that in a few different places, e.g. in the Run config or as arguments when you start the prefect agent. I hope this helps!
a

Ajith Kumara Beragala Acharige Lal

02/23/2021, 1:16 AM
Thank you @Fina Silva-Santisteban, our k8s cluster in on DigitalOcean , Docker registry is on GitLabs , i'll try that out , Any prefect document related to this ? Thank you!
f

Fina Silva-Santisteban

02/23/2021, 1:29 AM
@Ajith Kumara Beragala Acharige Lal ah sorry, now I see your code snippets! Im using the ECS run config and that one has a param
run_task_kwargs
which I use for setting things like cluster name and launchtype, but for that to work I must have set the permissions correctly on aws. I save env variables in my Docker storage using the
env_vars
param and then use prefect secrets to make them available. Even though it’s not Kubernetes or gitlab registry setup, I hope this helps!
a

Ajith Kumara Beragala Acharige Lal

02/23/2021, 2:00 AM
Thank you @Fina Silva-Santisteban, this helps lot !
c

Chris White

02/23/2021, 4:08 AM
Since this happens on push it sounds like a potential issue with your local setup — prefect directly uses your local docker daemon, so ensuring that you can run
docker push
commands from the command line might help you identify where the issue is
View count: 4