Harshal Rane
02/19/2021, 6:21 AMMilly gupta
02/19/2021, 3:06 PMVarun Joshi
02/19/2021, 3:27 PMAjith Kumara Beragala Acharige Lal
02/19/2021, 5:29 PMservice/prefect-server-ui
) with a prefect-web-host-name ( eg: prefect.mycompany.com ) when installing prefect server via helm ? ) , so that internal users can access prefect-server via a hostname instead of a IP? https://github.com/PrefectHQ/server/blob/master/helm/prefect-server/values.yamlTony
02/19/2021, 6:59 PMflow not found
when using Docker storage, but having mixed results.
I’m working on a CICD script for any arbitrary repo to loop through a directory of flow files, build container storage (can be many containers or a single one, i’m fine with either), and then register them. It starts by copying and building my repo into a container, then hopefully builds that container with all the prefect goodies, and sets the run_config
:
image_name = f"{git_repo}/prefect-{Path(flow_file).stem.lower()}"
image_tag = f"githash-{git_commit}"
# built by cicd agent
base_image = f"{REGISTRY_URL}/{git_repo}/flows:{image_tag}"
# second image built
base_image_plus_prefect = f"{REGISTRY_URL}/{image_name}:{image_tag}"
flow.storage = Docker(
registry_url=REGISTRY_URL,
base_image=base_image,
image_name=image_name,
image_tag=image_tag,
env_vars=fetch_env_vars(os.environ["GIT_BRANCH"]),
# here be the trouble
# same path as my Dockerfile???
path=f"/repo/{flow_file}",
stored_as_script=True,
)
flow.storage.build()
flow.run_config = DockerRun(
image=base_image_plus_prefect,
labels=fetch_agent_labels(os.environ["GIT_BRANCH"]),
)
... cicd stuff ...
flow.register()
I’m getting: ValueError('Flow is not contained in this Storage')
when running this currently.
First option I see is specifying Docker(…, files={…})
yet some repos might have dozens or hundreds of extra files that they need to include, any chance files
takes in wildcard pathing?
Second option I see is [multi-flow storage], but then I run into CICD problems:
File "/opt/prefect/healthcheck.py", line 151, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named 'flows.<my flow name>'
Anyone have some better ideas?Omar Sultan
02/20/2021, 12:49 PMOmar Sultan
02/20/2021, 12:49 PMJack Sundberg
02/20/2021, 6:09 PMJack Sundberg
02/21/2021, 1:38 AMclient.create_flow_run(flow_id)
But this requires me to store or know the flow_id, which I'd rather not do between separate scripts. Is there a reason this type of submission isn't supported?:
client.create_flow_run(project_name, flow_name)
The CLI supports this so I expected the python API for client to as well. If it's alright, I'd like to open a feature-request on github. I think the fix will be easy. Something like:
def create_flow_run(flow_id=None, project_name=None, flow_name=None):
if not flow_id and not (project_name and flow_name):
raise ValueError("Either a flow_id or a project_name+flow_name must be provided")
# then add if/else to handle proper graphql mutation
Varun Joshi
02/21/2021, 2:30 PMschedule = IntervalSchedule(start_date=datetime.datetime.now() + datetime.timedelta(seconds=1), interval=datetime.timedelta(minutes=1),)
as described on the website. Then why don't I see the flow running every minute.
2. If I make any changes to my flow code, will just running it again reflect changes in the UI?Anthony LE LUYER
02/22/2021, 12:35 PMIgor Dykhta
02/22/2021, 2:06 PMIgor Dykhta
02/22/2021, 2:40 PMAccess denied (403) Current session has been terminated. For further information, do not hesitate to contact us.
Andy
02/22/2021, 2:48 PMflow = load('{project_name}.{flow})
and flow.run()
which would then show up in the Prefect UI?Robert Bastian
02/22/2021, 3:07 PMvish
02/22/2021, 3:17 PMPandasSerializer
and GCSResults
to convert the output of a task from a pandas DataFrame to a partitioned Parquet dataset, before uploading to GCS. It worked well when serializing to a single parquet file (without partitions).
However, when using partitions, the Serializer fails. From my initial look into the source code, it revealed that the Results class expects a binary stream from the serializer. However, _`pandas.to_parquet(*, partition_cols=["col1"])`_ does not return a binary output as it results in multiple parquet files being created.
Example code
pandas_serializer = PandasSerializer(
file_type="parquet",
serialize_kwargs=dict(
engine="pyarrow", partition_cols=["date"]
),
)
gcs_result = GCSResult(
bucket="bucket-name", serializer=pandas_serializer
)
class Extract(Task):
def __init__(self, **kwargs):
super().__init__(**kwargs, result=gcs_result, target="target_file_location")
def run(self, df: pd.DataFrame) -> pd.DataFrame:
# Some processing steps
return df
From my assessment this looks like this pattern (ie. serializer to multiple files) is not supported right now? If that is the case, what are your thoughts on the "prefect-way" to achieve the above.Thanneermalai Lakshmanan
02/22/2021, 3:39 PMGelinger Media
02/22/2021, 4:14 PMbral
02/22/2021, 4:23 PMitay livni
02/22/2021, 5:13 PMSKIP signal raised: SKIP('Provided value "True" did not match "False"')
.
• In prefect cloud the case
statement continues to run indefinitely even though the flow has terminated.
## Logs
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Mapped'
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)[0]': Starting task run...
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)[0]': Finished task run for task with final state: 'Success'
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)': Starting task run...
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)': Finished task run for task with final state: 'Mapped'
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)[0]': Starting task run...
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)[0]': Finished task run for task with final state: 'Skipped'
Any thoughts on what is going on here. ThanksDave
02/22/2021, 6:51 PMitay livni
02/22/2021, 9:12 PMStartFlowRun
in a task
. Where the flow inside the task only runs once. In the example below I would expect it to run three times.
Using
state = adder.run(parameters=dict(num_lst=num_lst))
# # Get results
param_tsk = adder.get_tasks("add_num")
num_lst = state.result[param_tsk[0]]._result.value
Works as expected. Any thoughts on how to run a Flow in a loop with Cloud? Thanks!Sean Talia
02/22/2021, 9:53 PMregister_task_definition()
method. Does this mean that even if the contents of task definition hasn't actually changed, (e.g. the flow is never re-registered, or it's registered but with an identical task definition), the Agent is still going to register a new task to AWS on each and every flow run?ludwig
02/22/2021, 10:34 PMFina Silva-Santisteban
02/22/2021, 10:52 PMaws ecr get-login-password --region regionname | docker login --username AWS --password-stdin <http://xxxxxx.dkr.ecr.us-east-2.amazonaws.com|xxxxxx.dkr.ecr.us-east-2.amazonaws.com>
. I’ve tried using boto3 to do a docker login right before the register()
call but it seems like the auth token expires as soon as I run register:
denied: Your authorization token has expired. Reauthenticate and try again.
Is it possible to have Prefect do the docker login? Or do I always have to use the AWS CLI?Ajith Kumara Beragala Acharige Lal
02/23/2021, 1:07 AMInterruptedError: denied: access forbidden
error when trying prefect.Docker | Pushing image to the registry..., Can anyone help me - how to /where to specify the registry username passwords in the code ? or anywhere it mattersCarl
02/23/2021, 2:41 AMManik Singh
02/23/2021, 3:14 AMAndré Bonatto
02/23/2021, 4:36 AMVarun Joshi
02/23/2021, 7:28 AM#!/usr/bin/env python
# coding: utf-8
import pyodbc
from prefect.storage import GCS
import json
import time
import os
import datetime
from google.cloud import pubsub_v1
import pymysql
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def extract_metadata(source_system,source_database_id):
#provides a list of metadata to loop through and extract delta
return metadata
def delta_push(metadata):
# This function extract data from every metadata detail provided and pushes it further
@task
def delta_push_wrapper(metadata):
#Looping through ever metadata row and calling the push function
for metadata_row in metadata:
delta_push(metadata_row)
with Flow("data_flow") as flow:
flow.storage = GCS(bucket="bucketname")
parameter1 = Parameter("paramater1",default="default")
parameter2 = Parameter("parameter2",default=1)
metadata = extract_metadata(parameter1,parameter2)
delta_push_wrapper(metadata)
flow.register(project_name="test_project")
I'm getting error at the flow.register(project_name="test_project") line where it says 'TypeError: Cannot serialize socket object'. Any help will be much appreciated 🙂